Google just announced a new interoperability protocol for AI agents called Agent 2 Agent (A2A). In this post I will share some of my learnings from playing with some of the early bits of A2A in the official Google repo. Specifically, I will show how I ported one of my previous agent implementations to use A2A. I also changed the A2A sample to work with local LLMs (qwen and Llama) instead of the default gemini implementation.
A2A
A2A is an ongoing effort to come up with an industry standard for AI Agents to standardize key concerns like communication, security, task execution and agent discovery. Already, we can see that A2A is gaining traction from some major players in the industry. Per this blog post from Google, more than 50 companies have already joined the effort.
It’s important to point out that A2A is not a competing standard to MCP, but rather a complement. While MCP is focused on standardizing tool usage, A2A solves a different problem by focusing on improving how we interact and work with agents. The assumption is that you will use both A2A and MCP together in your agent implementations.
Playing with A2A
As part of announcing A2A, Google also shared a repo with an early look at what A2A integrations might look like. I figured it would be a fun experiment to try to decipher some of the code behind these samples and use this knowledge to port one of my previous agent implementations to A2A.
In a previous post I showed how to implement a multi-stage agent using LangGraph. In this post I will take the original agent and integrate it into an A2A workflow.
When working with A2A there are two concepts: A2AClient and A2AServer. I the sections below I will show how to work with both.
A2A Server
At a high level the A2A server is responsible for advertising available agents and task execution of all incoming agent requests (e.g. generate news summary in my case).
The google repo comes with a template for creating A2A servers that I have included below:
from common.server import A2AServer
from common.types import AgentCard, AgentCapabilities, AgentSkill
from common.utils.push_notification_auth import PushNotificationSenderAuth
from agents.news.task_manager import AgentTaskManager
from agents.news.news_agent import NewsAgent
import click
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@click.command()
@click.option("--host", "host", default="localhost")
@click.option("--port", "port", default=10001)
def main(host, port):
"""Starts the News Agent server."""
try:
capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
skill = AgentSkill(
id="new_summary",
name="News Summary Tool",
description="Helps with news summaries and catcategorization of news",
tags=["news summary", "news category"],
examples=["RSS Feed URL"],
)
agent_card = AgentCard(
name="News Agent",
description="Helps with news summaries and news categorization",
url=f"http://{host}:{port}/",
version="1.0.0",
defaultInputModes=NewsAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=NewsAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
notification_sender_auth = PushNotificationSenderAuth()
notification_sender_auth.generate_jwk()
server = A2AServer(
agent_card=agent_card,
task_manager=AgentTaskManager(agent=NewsAgent(), notification_sender_auth=notification_sender_auth),
host=host,
port=port,
)
server.app.add_route(
"/.well-known/jwks.json", notification_sender_auth.handle_jwks_endpoint, methods=["GET"]
)
logger.info(f"Starting server on {host}:{port}")
server.start()
except Exception as e:
logger.error(f"An error occurred during server startup: {e}")
exit(1)
if __name__ == "__main__":
main()
From the A2A server you can request an agent card where the creators of the agent can advertise the skills and capabilities of the agent. In the current Google repo, the agent card functionality is made available by implementing the 'http://[domain]/.well-known/agent.json' endpoint. In my example I have created the following Agent card for my agent:
{
"name": "News Agent",
"description": "Helps with news summaries and news categorization",
"url": "http://localhost:10001/",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "new_summary",
"name": "News Summary Tool",
"description": "Helps with news summaries and catcategorization of news",
"tags": [
"news summary",
"news category"
],
"examples": [
"RSS Feed URL"
]
}
]
}
The reference implementation in the Google repo comes with an implementation of task management in the form of a task_manager class. I would say their implementation can be leveraged “as is” but it’s helpful to know that it comes in two forms: streaming and non-streaming. Non streaming is simpler since it behaves like a single execution call, but with streaming you get the capability of sending back asynchronous status updates for long running workflows. Opting in or out of streaming can be done through the agent card.
In my case I wanted to take advantage of async status updates with push notifications, so I configured my agent card to enable streaming by setting “capabilities”:{"streaming": true”}.
The second part of the A2A server implementation is the agent itself. My implementation assumes LangGraph, but A2A is implementation agnostic, so any agent implementation should be supported.
I will refer you to my previous article for implementation specifics, but at a high level my agent is responsible for loading a rss feed and providing news summaries and news categorization. The ported agent implementation can be found below:
from typing import Any, AsyncIterable, Dict, Literal
from langchain_core.messages import SystemMessage, HumanMessage
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, ToolMessage
from model import init_llm_with_tool_calling, init_llm
from nodes.rss_tools import get_rss_links, get_rss_feed
from nodes.url_tools import load_links
from state import GraphState
from tools.category_tools import get_article_categories
from dtos.news_context import NewsContext
from dtos.news_result import NewsResult
import json
tools = [get_article_categories]
tools_names = {t.name: t for t in tools}
memory = MemorySaver()
class ResponseFormat(BaseModel):
"""Respond to the user in this format."""
status: Literal["input_required", "completed", "error"] = "input_required"
message: str
class NewsAgent:
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
def __init__(self):
graph_builder=StateGraph(GraphState)
graph_builder.add_node("get_rss_feed", get_rss_feed)
graph_builder.add_node("get_rss_links", get_rss_links)
graph_builder.add_node("load_links", load_links)
graph_builder.add_node("generate_summary", generate_summary)
graph_builder.add_node("execute_tools", execute_tools)
graph_builder.add_node("get_categories", get_categories)
graph_builder.add_edge(START, "get_rss_feed")
graph_builder.add_edge("get_rss_feed", "get_rss_links" )
graph_builder.add_edge("get_rss_links", "load_links")
graph_builder.add_edge("load_links", "generate_summary")
graph_builder.add_edge("generate_summary", "get_categories")
graph_builder.add_edge("get_categories", "execute_tools")
graph_builder.add_edge("execute_tools", END)
self.graph=graph_builder.compile(checkpointer=memory)
print(self.graph.get_graph().draw_ascii())
def invoke(self, query, sessionId) -> str:
config = {"configurable": {"thread_id": sessionId}}
news_ctx = NewsContext(rss_feed=query)
self.graph.invoke({"data": news_ctx.rss_feed}, config)
return self.get_agent_response(config=config)
async def stream(self, query, sessionId) -> AsyncIterable[Dict[str, Any]]:
inputs = {"messages": [("user", query)]}
config = {"configurable": {"thread_id": sessionId}}
for item in self.graph.stream(inputs, config, stream_mode="values"):
message = item["messages"][-1]
if isinstance(message, AIMessage):
yield {
"is_task_complete": False,
"require_user_input": False,
"content": message.content,
}
elif isinstance(message, ToolMessage):
yield {
"is_task_complete": False,
"require_user_input": False,
"content": message.content,
}
yield self.get_agent_response(config)
def get_agent_response(self, config):
current_state = self.graph.get_state(config)
structured_response = current_state.values.get('structured_response')
if structured_response and isinstance(structured_response, ResponseFormat):
if structured_response.status == "input_required":
return {
"is_task_complete": False,
"require_user_input": True,
"content": structured_response.message
}
elif structured_response.status == "error":
return {
"is_task_complete": False,
"require_user_input": True,
"content": structured_response.message
}
elif structured_response.status == "completed":
return {
"is_task_complete": True,
"require_user_input": False,
"content": structured_response.message
}
return {
"is_task_complete": False,
"require_user_input": True,
"content": "We are unable to process your request at the moment. Please try again.",
}
def generate_summary(state: GraphState):
model = init_llm()
articles = state["articles"]
for article in articles:
content = f"""
Generate a two paragraph summary of the following text: {article["content"]}
"""
request = [HumanMessage(content=content)]
summary = model.invoke(request)
article["summary"] = summary
return {"articles": articles, "messages": [AIMessage(content="Completed getting article summaries from LLM")]}
def get_categories(state: GraphState):
model = init_llm_with_tool_calling()
articles = state["articles"]
for article in articles:
content= f"""You are an assistant who will use the tool called get_article_categories. Determine the categories that best describe this text: {article["summary"]}
Use only categories from the following list: Real Estate, Politics, Sports, Immigration, Food, Entertainment, Business, Crime, Weather, Technology, Medicine, Science or Other.
You may select more than one category per text"""
request = [SystemMessage(content=content)]
categories = model.invoke(request)
article["tool_call_raw"] = categories.content
return {"tool": content, "messages": [ToolMessage(content="Completed getting tool calls from LLM", tool_call_id="567")]}
def parse_tool_call(article):
start_index = article["tool_call_raw"].find("{")
end_index = article["tool_call_raw"].rfind("}") + 1
json_str = article["tool_call_raw"][start_index:end_index]
data = json.loads(json_str)
article["tool_name"] = data.get("name")
categories = data.get("arguments", {}).get("article_categories")
article["tool_argument"] = {"article_categories": categories}
def execute_tools(state: GraphState):
articles = state["articles"]
for article in articles:
parse_tool_call(article)
article["tool_result"] = tools_names[article["tool_name"]].invoke(article["tool_argument"])
res = [NewsResult(r).to_dict() for r in articles]
res_json = json.dumps(res)
structured_response = ResponseFormat(message=res_json, status="completed")
return {"messages": [ToolMessage(artifact=res, content="Completed calling tools to categorize articles", tool_call_id="123")], "structured_response": structured_response}
One thing to note is that the agent must implement streaming functionality in case you want to stream asynchronous updates as the workflow executes. However, feel free to skip that if all you need is a single synchronous workflow. In LangGraph streaming is supported through the graph.stream api.
A2A Client
Google’s reference implementation comes with a flexible A2AClient class that can be configured to hit your specific A2AServer by updating the --agent parameter. No other changes are required to start working with the A2A Server.
The good news is that everything you need for push notifications to support streaming is already wired up in the client. In my example I focused on the reference implementation under hosts/cli, but you can easily stick this in a different client application.
As a reference I have included the sample client from the Google repo below:
from common.client import A2AClient, A2ACardResolver
from common.types import TaskState, Task
from common.utils.push_notification_auth import PushNotificationReceiverAuth
import asyncclick as click
import asyncio
from uuid import uuid4
import urllib
@click.command()
@click.option("--agent", default="http://localhost:10001")
@click.option("--session", default=0)
@click.option("--history", default=True)
@click.option("--use_push_notifications", default=True)
@click.option("--push_notification_receiver", default="http://localhost:5000")
async def cli(agent, session, history, use_push_notifications: bool, push_notification_receiver: str):
card_resolver = A2ACardResolver(agent)
card = card_resolver.get_agent_card()
print("======= Agent Card ========")
print(card.model_dump_json(exclude_none=True))
notif_receiver_parsed = urllib.parse.urlparse(push_notification_receiver)
notification_receiver_host = notif_receiver_parsed.hostname
notification_receiver_port = notif_receiver_parsed.port
print(f"Requesting to use push notification {use_push_notifications}")
if use_push_notifications:
from hosts.cli.push_notification_listener import PushNotificationListener
notification_receiver_auth = PushNotificationReceiverAuth()
await notification_receiver_auth.load_jwks(f"{agent}/.well-known/jwks.json")
push_notification_listener = PushNotificationListener(
host = notification_receiver_host,
port = notification_receiver_port,
notification_receiver_auth=notification_receiver_auth,
)
push_notification_listener.start()
client = A2AClient(agent_card=card)
if session == 0:
sessionId = uuid4().hex
else:
sessionId = session
continue_loop = True
streaming = card.capabilities.streaming
while continue_loop:
taskId = uuid4().hex
print("========= starting a new task ======== ")
continue_loop = await completeTask(client, streaming, use_push_notifications, notification_receiver_host, notification_receiver_port, taskId, sessionId)
if history and continue_loop:
print("========= history ======== ")
task_response = await client.get_task({"id": taskId, "historyLength": 10})
print(task_response.model_dump_json(include={"result": {"history": True}}))
async def completeTask(client: A2AClient, streaming, use_push_notifications: bool, notification_receiver_host: str, notification_receiver_port: int, taskId, sessionId):
prompt = click.prompt(
"\nWhat do you want to send to the agent? (:q or quit to exit)"
)
if prompt == ":q" or prompt == "quit":
return False
payload = {
"id": taskId,
"sessionId": sessionId,
"acceptedOutputModes": ["text"],
"message": {
"role": "user",
"parts": [
{
"type": "text",
"text": prompt,
}
],
},
}
if use_push_notifications:
payload["pushNotification"] = {
"url": f"http://{notification_receiver_host}:{notification_receiver_port}/notify",
"authentication": {
"schemes": ["bearer"],
},
}
taskResult = None
if streaming:
response_stream = client.send_task_streaming(payload)
async for result in response_stream:
print(f"stream event => {result.model_dump_json(exclude_none=True)}")
taskResult = await client.get_task({"id": taskId})
else:
taskResult = await client.send_task(payload)
print(f"\n{taskResult.model_dump_json(exclude_none=True)}")
## if the result is that more input is required, loop again.
state = TaskState(taskResult.result.status.state)
if state.name == TaskState.INPUT_REQUIRED.name:
return await completeTask(
client,
streaming,
use_push_notifications,
notification_receiver_host,
notification_receiver_port,
taskId,
sessionId
)
else:
## task is complete
return True
if __name__ == "__main__":
asyncio.run(cli())
I have added the sample to my fork of the original repo here in case you are interested in seeing the full implementation.