Agent Handlers
Introduction
Within the uAgents Framework, functions can be decorated with handlers, to only be triggered on a condition caught by the uAgents library.
Below, we show how to use the following different event handlers:
- Interval tasks:
.on_interval()
- Handle messages:
.on_message()
- Answer queries:
.on_query()
- Triggered by event:
on_event()
Creating an interval task with on_interval()
handler
Sometimes an agent will need to perform a task periodically. To do this we can use the on_interval()
decorator which periodically repeats a given function for the agent. For instance, an agent could send a message every 2 seconds to another agent.
Let's get started and create our first interval task!
Prerequisites
Make sure you have read the following resources before going on with this guide:
- Quick Start Guide for uAgents Framework
- Creating your first agent
- Agents address
- Communicating with other agents
Imports needed
Walk-through
-
Let's create a Python script for this task, and name it:
interval-task.py
-
Then import the necessary classes from
uagents
library,Agent
andContext
, and create our agent:from uagents import Agent, Context agent = Agent(name="alice", seed="alice recovery phrase")
-
Create a function to handle the startup event, which will introduce the agent:
@agent.on_event("startup") async def introduce_agent(ctx: Context): ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.")
-
We can now define our agent's interval behavior. We want our agent to log a message every 2 seconds using the
on_interval
decorator:@agent.on_interval(period=2.0) async def say_hello(ctx: Context): ctx.logger.info("Hello!") if __name__ == "__main__": agent.run()
The output will be printed out using the
ctx.logger.info()
method. -
Save the script.
The overall script should look as follows:
from uagents import Agent, Context
agent = Agent(name="alice", seed="alice recovery phrase")
@agent.on_event("startup")
async def introduce_agent(ctx: Context):
ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.")
@agent.on_interval(period=2.0)
async def say_hello(ctx: Context):
ctx.logger.info("Hello!")
if __name__ == "__main__":
agent.run()
Run the script
Run the script: python interval-task.py
The output should be as follows:
INFO: [alice]: Starting server on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: [alice]: Hello, I'm agent alice and my address is agent1qww3ju3h6kfcuqf54gkghvt2pqe8qp97a7nzm2vp8plfxflc0epzcjsv79t.
INFO: [alice]: Hello!
INFO: [alice]: Hello!
INFO: [alice]: Hello!
Handle messages using the on_message()
handler
We now showcase a scenario where three agents, named alice
, bob
, and charles
, use a custom protocol to communicate. In the example, Alice and Bob support the protocol, whereas Charles attempts to send broadcast messages to all agents using the protocol. Agents use the on_message()
handler which allows them to handle messages matching specific data models.
Let's get started!
Walk-through
-
First of all, let's create a Python script for this task, and name it:
broadcast.py
-
We then need to import the
Agent
,Bureau
,Context
,Model
, andProtocol
classes from theuagents
library, and thefund_agent_if_low
fromuagents.setup
. Then, let's create the 3 different agents using the classAgent
. Each agent is initialized with a unique name and a seed phrase for wallet recovery. Additionally, if an agent's wallet balance is low, thefund_agent_if_low()
function is called to add funds to their wallet:from uagents import Agent, Bureau, Context, Model, Protocol from uagents.setup import fund_agent_if_low alice = Agent(name="alice", seed="alice recovery phrase") bob = Agent(name="bob", seed="bob recovery phrase") charles = Agent(name="charles", seed="charles recovery phrase") fund_agent_if_low(alice.wallet.address()) fund_agent_if_low(bob.wallet.address()) fund_agent_if_low(charles.wallet.address())
It is optional but useful to include a
seed
parameter when creating an agent to set fixed addresses . Otherwise, random addresses will be generated every time you run the agent. -
Let's then define the message data models to specify the type of messages being handled and exchanged by the agents. We define a
BroadcastExampleRequest
and aBroadcastExampleResponse
data models. Finally, create aprotocol
namedproto
with version1.0
:class BroadcastExampleRequest(Model): pass class BroadcastExampleResponse(Model): text: str proto = Protocol(name="proto", version="1.0")
-
Let's now define a message handler function for incoming messages of type
BroadcastExampleRequest
in the protocol:@proto.on_message(model=BroadcastExampleRequest, replies=BroadcastExampleResponse) async def handle_request(ctx: Context, sender: str, _msg: BroadcastExampleRequest): await ctx.send( sender, BroadcastExampleResponse(text=f"Hello from {ctx.agent.name}") )
Here we defined a
handle_request()
function which is used whenever a request is received. This sends a response back to the sender. This function is decorated with the.on_message()
decorator indicating that this function is triggered whenever a message of typeBroadcastExampleRequest
is received. The function sends a response containing a greeting message with the name of the agent that sent the request in the first place. -
Now, we need to include the
protocol
into the agents. Specifically, the protocol is included in bothalice
andbob
agents. This means they will follow the rules defined in the protocol when communicating:alice.include(proto) bob.include(proto)
ℹ️After the first registration in the Almanac smart contract, it will take about 5 minutes before the agents can be found through the protocol.
-
It is now time to define the behavior and function of
charles
agent:@charles.on_interval(period=5) async def say_hello(ctx: Context): status_list = await ctx.broadcast(proto.digest, message=BroadcastExampleRequest()) ctx.logger.info(f"Trying to contact {len(status_list)} agents.") @charles.on_message(model=BroadcastExampleResponse) async def handle_response(ctx: Context, sender: str, msg: BroadcastExampleResponse): ctx.logger.info(f"Received response from {sender}: {msg.text}")
In the first part, we use the
.on_interval()
decorator to define an interval behavior for this agent when the script is being run. In this case, the agent will execute thesay_hello()
function every 5 seconds. TheContext
object is a collection of data and functions related to the agent. Inside thesay_hello()
function, the agent uses thectx.broadcast()
method to send a broadcast message. The message is of typeBroadcastExampleRequest()
and it is being sent using the protocol's digest (proto.digest
).Then, we defined a
.on_message()
decorator which decorateshandle_response()
function. This function handles all incoming messages of typeBroadcastExampleResponse
from other agents. When a response is received, it logs the information. Inside thehandle_response()
function, the agent logs an informational message usingctx.logger.info()
method to print the sender and the content of the message. The message includes the sender's name and the text content of the response message. -
We are now ready to set up a
Bureau
object for agents to be run together at the same time, and we addalice
,bob
, andcharles
to it using thebureau.add()
method:bureau = Bureau(port=8000, endpoint="http://localhost:8000/submit") bureau.add(alice) bureau.add(bob) bureau.add(charles) if __name__ == "__main__": bureau.run()
The bureau is assigned to listen on
port=8000
and specifies anendpoint
at"http://localhost:8000/submit"
for submitting data. -
Save the script.
The overall script should look as follows:
from uagents import Agent, Bureau, Context, Model, Protocol
# create agents
# alice and bob will support the protocol
# charles will try to reach all agents supporting the protocol
alice = Agent(name="alice", seed="alice recovery phrase")
bob = Agent(name="bob", seed="bob recovery phrase")
charles = Agent(name="charles", seed="charles recovery phrase")
class BroadcastExampleRequest(Model):
pass
class BroadcastExampleResponse(Model):
text: str
# define protocol
proto = Protocol(name="proto", version="1.0")
@proto.on_message(model=BroadcastExampleRequest, replies=BroadcastExampleResponse)
async def handle_request(ctx: Context, sender: str, _msg: BroadcastExampleRequest):
await ctx.send(
sender, BroadcastExampleResponse(text=f"Hello from {ctx.agent.name}")
)
# include protocol
# Note: after the first registration on the almanac smart contract, it will
# take about 5 minutes before the agents can be found through the protocol
alice.include(proto)
bob.include(proto)
# let charles send the message to all agents supporting the protocol
@charles.on_interval(period=5)
async def say_hello(ctx: Context):
status_list = await ctx.broadcast(proto.digest, message=BroadcastExampleRequest())
ctx.logger.info(f"Trying to contact {len(status_list)} agents.")
@charles.on_message(model=BroadcastExampleResponse)
async def handle_response(ctx: Context, sender: str, msg: BroadcastExampleResponse):
ctx.logger.info(f"Received response from {sender}: {msg.text}")
bureau = Bureau(port=8000, endpoint="http://localhost:8000/submit")
bureau.add(alice)
bureau.add(bob)
bureau.add(charles)
if __name__ == "__main__":
bureau.run()
Run the script
Make sure to have activated your virtual environment correctly.
Run the script: python broadcast.py
The output would be:
Trying to contact 2 agents.
Received response from alice: Hello from alice
Received response from bob: Hello from bob
Answer queries with on_query()
handler
The on_query()
handler is used to register a Function as a handler for incoming queries that match a specified Model
. This decorator enables the agent to respond to queries in an event-driven manner.
Walk-through
Agent's script
For the agent, the script sets up an agent to handle incoming queries. It defines two models: TestRequest
and Response
. Upon startup, it logs the agent's details. The core functionality lies in the query_handler
, decorated with @agent.on_query()
, which processes received queries and sends back a predefined response. This demonstrates creating responsive agents within the uagents
Framework, showcasing how they can interact with other agents or functions in an asynchronous, event-driven architecture.
from uagents import Agent, Context, Model
class TestRequest(Model):
message: str
class Response(Model):
text: str
# Initialize the agent with its configuration.
agent = Agent(
name="your_agent_name_here",
seed="your_agent_seed_here",
port=8001,
endpoint="http://localhost:8001/submit",
)
@agent.on_event("startup")
async def startup(ctx: Context):
ctx.logger.info(f"Starting up {agent.name}")
ctx.logger.info(f"With address: {agent.address}")
ctx.logger.info(f"And wallet address: {agent.wallet.address()}")
# Decorator to handle incoming queries.
@agent.on_query(model=TestRequest, replies={Response})
async def query_handler(ctx: Context, sender: str, _query: TestRequest):
ctx.logger.info("Query received")
try:
# do something here
await ctx.send(sender, Response(text="success"))
except Exception:
await ctx.send(sender, Response(text="fail"))
# Main execution block to run the agent.
if __name__ == "__main__":
agent.run()
The agent is created using the Agent
class from uagents
library. You can initialise it by providing it with a name
, seed
, port
, and endpoint
. It defines an on_event()
handler for the startup
event, where it logs information about the agent's initialisation. It defines an on_query()
handler for handling queries of type TestRequest
. Upon receiving a query, it processes it and sends back a Response
. The agent is then set to run.
Proxy
The proxy is implemented using FastAPI
. It sets up two routes: "/"
for a simple root message and "/endpoint"
for receiving requests. When a POST
request is made to "/endpoint"
with a JSON payload containing a TestRequest
, it triggers the make_agent_call
function. Inside make_agent_call
, it calls agent_query
to communicate with the agent. The agent receives the query, processes it, and sends back a response. The proxy receives the response from the agent and sends back a success message along with the response text.
Let's explore the Proxy code script step-by-step:
-
First of all navigate to directory where you want to create your project.
-
Create a Python script name
on_query.py
by runningtouch on_query.py
. -
We need to import
json
,fastapi
,uagent
'sModel
andquery
. Then we would need to define the query format using theTestRequest
class as a subclass ofModel
:import json from fastapi import FastAPI, Request from uagents import Model from uagents.query import query from uagents.envelope import Envelope AGENT_ADDRESS = "agent1qt6ehs6kqdgtrsduuzslqnrzwkrcn3z0cfvwsdj22s27kvatrxu8sy3vag0" class TestRequest(Model): message: str
-
Create
agent_query()
function to send query to agent and decode the response received.async def agent_query(req): response = await query(destination=AGENT_ADDRESS, message=req, timeout=15) if isinstance(response, Envelope): data = json.loads(response.decode_payload()) return data["text"] return response
-
Initialize a
FastAPI
app:app = FastAPI()
-
Define a root endpoint to test the server:
@app.get("/") def read_root(): return "Hello from the Agent controller"
-
Define an endpoint to make agent calls:
@app.post("/endpoint") async def make_agent_call(req: Request): model = TestRequest.parse_obj(await req.json()) try: res = await agent_query(model) return f"successful call - agent response: {res}" except Exception: return "unsuccessful agent call"
-
Save the script. Remember that you need to provide the
AGENT_ADDRESS
parameter to correctly run this code.
The overall script should look as follows:
import json
from fastapi import FastAPI, Request
from uagents import Model
from uagents.query import query
from uagents.envelope import Envelope
AGENT_ADDRESS = "agent1qt6ehs6kqdgtrsduuzslqnrzwkrcn3z0cfvwsdj22s27kvatrxu8sy3vag0"
class TestRequest(Model):
message: str
async def agent_query(req):
response = await query(destination=AGENT_ADDRESS, message=req, timeout=15)
if isinstance(response, Envelope):
data = json.loads(response.decode_payload())
return data["text"]
return response
app = FastAPI()
@app.get("/")
def read_root():
return "Hello from the Agent controller"
@app.post("/endpoint")
async def make_agent_call(req: Request):
model = TestRequest.parse_obj(await req.json())
try:
res = await agent_query(model)
return f"successful call - agent response: {res}"
except Exception:
return "unsuccessful agent call"
Run the example
In separate terminals:
-
Run the FastAPI proxy:
uvicorn proxy:app
-
Run the agent:
python agent.py
-
Query the agent via the proxy:
curl -d '{"message": "test"}' -H "Content-Type: application/json" -X POST http://localhost:8000/endpoint
Catching events with on_event()
handler
During startup, and shutdown there are two events that are caught by the uAgents library, startup
and shutdown
.
Here's an example:
on_event("startup")
@agent.on_event("startup")
async def introduce_agent(ctx: Context):
ctx.logger.info(f"Hello, I'm agent {agent.name} and my address is {agent.address}.")
...
on_event("shutdown")
@agent.on_event("shutdown")
async def introduce_agent(ctx: Context):
ctx.logger.info(f"Hello, I'm agent {agent.name} and I am shutting down")
...