Data Science

Multi-proxy communication with A2A Python SDK

If you are working with AI under the Rock, you might hear about the Agent2agent (A2A) protocol, “an open standard designed to achieve communication and collaboration between AI agents.” It’s still new, but it’s already causing a lot of buzz. Since it is very good with MCP (it looks like it is becoming the industry standard), A2A is shaping into this The preferred standard for multi-agent communication in this industry.

When Google first removed the protocol specification, my first reaction was basically: “Okay, cool…but what should I do?” Thankfully, this week they released the protocol for the official Python SDK, so now it finally speaks a language I understand.

In this article, we will dig into how protocols actually set up communication between agents and clients. Spoiler: This is all a mission-oriented approach. To make things less abstract, let’s build a small toy example together.

Communication between event detection agent and A2A client

In our system, we have a Event Detector AI Agent (Responsible for detecting events) and Remind AI agents (Responsible for reminding users of events). Since I’m following the A2A protocol here, both proxies are mocked as simple Python methods that return strings. But in real life, you can build an agent using any framework you like (Langgraph, Google ADK, Crewai, etc.).

There are three characters in our system. userthis Event Agent and Alert Agent. They all use Messages. one Message Represents a single communication in the A2A protocol. We wrap the agent in A2A Server. The server exposes the HTTP endpoint that implements the protocol. Every A2A server has Event queues This acts as a buffer between the asynchronous execution of the proxy and the response processing of the server.

this A2A Client Initiate communication, if two agents need to communicate A2A Server Can also play A2A Client. The following figure shows how clients and servers communicate in the protocol.

Image of the author

this EventQueue shop Messages,,,,, Tasks,,,,, TaskStatusUpdateEvent,,,,, TaskArtifactUpdateEvent,,,,, A2AError and JSONRPCError Object. this Task Probably the most important object to understand how to build a multi-agent system using A2A. According to the A2A documentation:

  • When a customer sends a message to the agent, the agent may determine that a status task is required to meet the request (e.g., “Generate a report,” “Subscribe to a flight,” “Answer a question”).
  • Each task has a unique ID defined by the proxy and is performed through the defined lifecycle (e.g. submitted,,,,, working,,,,, input-required,,,,, completed,,,,, failed).
  • Tasks are stateful and can involve multiple exchanges (messages) between the client and the server.

Think of one Task As something in your multi-proxy system Clear and Unique Target. There are two tasks in our system:

  1. Detect events
  2. Remind users

Each agent does its own thing (task). Let’s build an A2A server for event proxy to make the situation more realistic.

Build an A2A server for event proxy

first: Agent Card. A proxy card is a JSON document that is used to understand other available proxy. :

  • The identity of the server
  • Function
  • Skill
  • Service endpoint
  • URL
  • How customers should verify and interact with agents

Let’s first define the proxy card for the event detector AI proxy (I have defined the skills based on Google’s example):

agent_card = AgentCard(  
    name='Event Detection Agent',  
    description='Detects relevant events and alerts the user',  
    url='  
    version='1.0.0',  
    defaultInputModes=['text'],  
    defaultOutputModes=['text'],  
    capabilities=AgentCapabilities(streaming=False),  
    authentication={ "schemes": ["basic"] },  
    skills=[  
        AgentSkill(  
            id='detect_events',  
            name='Detect Events',  
            description='Detects events and alert the user',  
            tags=['event'],  
        ),  
    ],  
)

You can learn more about proxy card object structure here:

The proxy itself will actually be a Uvicorn server, so let’s build main() How to get it up and running. All requests will be DefaultRequestHandler A2A-PYTHON SDK. The handler requires a TaskStore Storage tasks and AgentExecutor Implementation of core logic with proxy (we will build EventAgentExecutor One minute).

The last component of main() The method is A2AStarletteApplicationThis is a Starlette application that implements the A2A protocol server endpoint. We need to provide Agent Card and DefaultRequestHandler Initialize it. Now, the last step is to run the application using Uvicorn. This is main() method:

import click  
import uvicorn  
from a2a.types import (  
    AgentCard, AgentCapabilities, AgentSkill
) 
from a2a.server.request_handlers import DefaultRequestHandler  
from a2a.server.tasks import InMemoryTaskStore  
from a2a.server.apps import A2AStarletteApplication 
 
@click.command()  
@click.option('--host', default='localhost')  
@click.option('--port', default=10008)  
def main(host: str, port: int):  
    agent_executor = EventAgentExecutor()

    agent_card = AgentCard(  
        name='Event Detection Agent',  
        description='Detects relevant events and alerts the user',  
        url='  
        version='1.0.0',  
        defaultInputModes=['text'],  
        defaultOutputModes=['text'],  
        capabilities=AgentCapabilities(streaming=False),  
        authentication={ "schemes": ["basic"] },  
        skills=[              AgentSkill(                  id='detect_events',                  name='Detect Events',                  description='Detects events and alert the user',                  tags=['event'],  
            ),  
        ],  
    )
      
    request_handler = DefaultRequestHandler(  
        agent_executor=agent_executor,  
        task_store=InMemoryTaskStore()  
    ) 
 
    a2a_app = A2AStarletteApplication(  
        agent_card=agent_card,  
        http_handler=request_handler  
    )  

    uvicorn.run(a2a_app.build(), host=host, port=port)

Create EventAgentExecutor

Now it’s time to build the core of the agent and finally see how to use tasks to make the agents interact with each other. this EventAgentExecutor Class inheritance AgentExecutor interface, so we need to implement execute() and cancel() method. Take both RequestContext And one EventQueue Object is a parameter. this RequestContext Holds information about the current request that the server is processing and EventQueue Acts as a buffer between the asynchronous execution of the agent and the response processing of the server.

Our agents just need to check the stringevent“In the message sent by the user (kiss ✨). If it is “eventThere, we should call the alert agent. We will send Message Give other alert agents. This is a direct configuration policy, which means we will use the URL to configure the proxy to get the proxy card of the alert proxy. To do this, our active agents will be like A2A customers.

Let’s build the executor step by step. First, let’s create the main task (task that detects events). We need instantiation TaskUpdater Object (Assistant class that the agent publishes updates to the task event queue), then submits the task and announces that we are using the task. start_work() method:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

The message the user will send to the proxy looks like this:

send_message_payload = {  
        'message': {  
            'role': 'user',  
            'parts': [{'type': 'text', 'text': f'it has an event!'}],  
            'messageId': uuid4().hex,  
        }  
    }

one Part Represents a different content Messagerepresent the exportable content as TextPart,,,,, FilePartor DataPart. We will use one TextPart Therefore, we need to untie it into the executor:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

It’s time to create super advanced logic for an agent. If the message has no string”event“We don’t need to call the alert agent, the task has been completed:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

        if "event" not in user_message:  
            task_updater.update_status(  
                TaskState.completed,  
                message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
            )

Create an A2A client for users

Let’s create an A2A client so that we can test the proxy as is. Customer use get_client_from_agent_card_url() method A2AClient Get the agent card after class (guzzle what). Then we wrap the message in SendMessageRequest Object and use send_message() Customer’s approach. Here is the complete code:

import httpx  
import asyncio  
from a2a.client import A2AClient  
from a2a.types import SendMessageRequest, MessageSendParams  
from uuid import uuid4  
from pprint import pprint
  
async def main():    
    send_message_payload = {  
        'message': {  
            'role': 'user',  
            'parts': [{'type': 'text', 'text': f'nothing happening here'}],  
            'messageId': uuid4().hex,  
        }  
    }  

    async with httpx.AsyncClient() as httpx_client:  
        client = await A2AClient.get_client_from_agent_card_url(  
            httpx_client, '  
        )  
        request = SendMessageRequest(  
            params=MessageSendParams(**send_message_payload)  
        )  
        response = await client.send_message(request)  
        pprint(response.model_dump(mode='json', exclude_none=True))  
  
if __name__ == "__main__":  
    asyncio.run(main())

This is what happens in the terminal running EventAgent Server:

Image of the author

This is the information the customer sees:

Image of the author

A task that detects events was created and no events were detected, good! But the focus of A2A is to get agents to communicate with each other, so have the active agent talk to the alert agent.

Have the activity agent talk to the alert agent

To have the active agent talk to the alert agent, the active agent will also act as a client:

from a2a.server.agent_execution import AgentExecutor

ALERT_AGENT_URL = " 

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

        if "event" not in user_message:  
            task_updater.update_status(  
                TaskState.completed,  
                message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
            )
        else:
            alert_message = task_updater.new_agent_message(parts=[TextPart(text="Event detected!")])

            send_alert_payload = SendMessageRequest(  
                params=MessageSendParams(  
                    message=alert_message  
                )  
            )  

            async with httpx.AsyncClient() as client:  
                alert_agent = A2AClient(httpx_client=client, url=ALERT_AGENT_URL)  
                response = await alert_agent.send_message(send_alert_payload)  

                if hasattr(response.root, "result"):  
                    alert_task = response.root.result  
                    # Polling until the task is done
                    while alert_task.status.state not in (  
                        TaskState.completed, TaskState.failed, TaskState.canceled, TaskState.rejected  
                    ):  
                        await asyncio.sleep(0.5)  
                        get_resp = await alert_agent.get_task(  
                            GetTaskRequest(params=TaskQueryParams(id=alert_task.id))  
                        )  
                        if isinstance(get_resp.root, GetTaskSuccessResponse):  
                            alert_task = get_resp.root.result  
                        else:  
                            break  
  
                    # Complete the original task  
                    if alert_task.status.state == TaskState.completed:  
                        task_updater.update_status(  
                            TaskState.completed,  
                            message=task_updater.new_agent_message(parts=[TextPart(text="Event detected and alert sent!")]),  
                        )  
                    else:  
                        task_updater.update_status(  
                            TaskState.failed,  
                            message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to send alert: {alert_task.status.state}")]),  
                        )  
                else:  
                    task_updater.update_status(  
                        TaskState.failed,  
                        message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to create alert task")]),  
                    )

We refer to “Alarm Agent” as the user and when we complete the alert Agent task, we complete the original event agent task. Let’s call the event agent again, but this time it’s the event:

Image of the author

The beauty here is that we just call the alert proxy and we don’t need to understand how it reminds the user. We just send it a message and wait for it to finish.

Alert proxy is very similar to event proxy. You can check the entire code here:

The final thought

Understanding how to build a multi-agent system using A2A can be daunting, but ultimately you just Send a message to let the agent do their thing. All you need to do to integrate a proxy with A2A is to create a class with the proxy’s logic AgentExecutor And run the proxy as a server.

I hope this article has helped you in your A2A journey, thanks for reading!

refer to

[1] Padgham, Lin and Michael Winikoff. Developing an intelligent proxy system: a practical guide. John Wiley & Sons, 2005.

[2]

[3] /tree/main/example/google_adk

[4]

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button