Google just announced a new interoperability protocol for AI agents called Agent 2 Agent (A2A). In this post I will share some of my learnings from playing with some of the early bits of A2A in the official Google repo. Specifically, I will show how I ported one of my previous agent implementations to use A2A. I also changed the A2A sample to work with local LLMs (qwen and Llama) instead of the default gemini implementation.

A2A

A2A is an ongoing effort to come up with an industry standard for AI Agents to standardize key concerns like communication, security, task execution and agent discovery. Already, we can see that A2A is gaining traction from some major players in the industry. Per this blog post from Google, more than 50 companies have already joined the effort.

It’s important to point out that A2A is not a competing standard to MCP, but rather a complement. While MCP is focused on standardizing tool usage, A2A solves a different problem by focusing on improving how we interact and work with agents. The assumption is that you will use both A2A and MCP together in your agent implementations.

Playing with A2A

As part of announcing A2A, Google also shared a repo with an early look at what A2A integrations might look like. I figured it would be a fun experiment to try to decipher some of the code behind these samples and use this knowledge to port one of my previous agent implementations to A2A.

In a previous post I showed how to implement a multi-stage agent using LangGraph. In this post I will take the original agent and integrate it into an A2A workflow.

When working with A2A there are two concepts: A2AClient and A2AServer. I the sections below I will show how to work with both.

A2A Server

At a high level the A2A server is responsible for advertising available agents and task execution of all incoming agent requests (e.g. generate news summary in my case).

The google repo comes with a template for creating A2A servers that I have included below:

from common.server import A2AServer from common.types import AgentCard, AgentCapabilities, AgentSkill from common.utils.push_notification_auth import PushNotificationSenderAuth from agents.news.task_manager import AgentTaskManager from agents.news.news_agent import NewsAgent import click import os import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @click.command() @click.option("--host", "host", default="localhost") @click.option("--port", "port", default=10001) def main(host, port): """Starts the News Agent server.""" try: capabilities = AgentCapabilities(streaming=True, pushNotifications=True) skill = AgentSkill( id="new_summary", name="News Summary Tool", description="Helps with news summaries and catcategorization of news", tags=["news summary", "news category"], examples=["RSS Feed URL"], ) agent_card = AgentCard( name="News Agent", description="Helps with news summaries and news categorization", url=f"http://{host}:{port}/", version="1.0.0", defaultInputModes=NewsAgent.SUPPORTED_CONTENT_TYPES, defaultOutputModes=NewsAgent.SUPPORTED_CONTENT_TYPES, capabilities=capabilities, skills=[skill], ) notification_sender_auth = PushNotificationSenderAuth() notification_sender_auth.generate_jwk() server = A2AServer( agent_card=agent_card, task_manager=AgentTaskManager(agent=NewsAgent(), notification_sender_auth=notification_sender_auth), host=host, port=port, ) server.app.add_route( "/.well-known/jwks.json", notification_sender_auth.handle_jwks_endpoint, methods=["GET"] ) logger.info(f"Starting server on {host}:{port}") server.start() except Exception as e: logger.error(f"An error occurred during server startup: {e}") exit(1) if __name__ == "__main__": main()

From the A2A server you can request an agent card where the creators of the agent can advertise the skills and capabilities of the agent. In the current Google repo, the agent card functionality is made available by implementing the 'http://[domain]/.well-known/agent.json' endpoint. In my example I have created the following Agent card for my agent:

{ "name": "News Agent", "description": "Helps with news summaries and news categorization", "url": "http://localhost:10001/", "version": "1.0.0", "capabilities": { "streaming": true, "pushNotifications": true, "stateTransitionHistory": false }, "defaultInputModes": [ "text", "text/plain" ], "defaultOutputModes": [ "text", "text/plain" ], "skills": [ { "id": "new_summary", "name": "News Summary Tool", "description": "Helps with news summaries and catcategorization of news", "tags": [ "news summary", "news category" ], "examples": [ "RSS Feed URL" ] } ] }

The reference implementation in the Google repo comes with an implementation of task management in the form of a task_manager class. I would say their implementation can be leveraged “as is” but it’s helpful to know that it comes in two forms: streaming and non-streaming. Non streaming is simpler since it behaves like a single execution call, but with streaming you get the capability of sending back asynchronous status updates for long running workflows. Opting in or out of streaming can be done through the agent card.

In my case I wanted to take advantage of async status updates with push notifications, so I configured my agent card to enable streaming by setting “capabilities”:{"streaming": true”}.

The second part of the A2A server implementation is the agent itself. My implementation assumes LangGraph, but A2A is implementation agnostic, so any agent implementation should be supported.

I will refer you to my previous article for implementation specifics, but at a high level my agent is responsible for loading a rss feed and providing news summaries and news categorization. The ported agent implementation can be found below:

from typing import Any, AsyncIterable, Dict, Literal from langchain_core.messages import SystemMessage, HumanMessage from pydantic import BaseModel from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from langchain_core.messages import AIMessage, ToolMessage from model import init_llm_with_tool_calling, init_llm from nodes.rss_tools import get_rss_links, get_rss_feed from nodes.url_tools import load_links from state import GraphState from tools.category_tools import get_article_categories from dtos.news_context import NewsContext from dtos.news_result import NewsResult import json tools = [get_article_categories] tools_names = {t.name: t for t in tools} memory = MemorySaver() class ResponseFormat(BaseModel): """Respond to the user in this format.""" status: Literal["input_required", "completed", "error"] = "input_required" message: str class NewsAgent: SUPPORTED_CONTENT_TYPES = ["text", "text/plain"] def __init__(self): graph_builder=StateGraph(GraphState) graph_builder.add_node("get_rss_feed", get_rss_feed) graph_builder.add_node("get_rss_links", get_rss_links) graph_builder.add_node("load_links", load_links) graph_builder.add_node("generate_summary", generate_summary) graph_builder.add_node("execute_tools", execute_tools) graph_builder.add_node("get_categories", get_categories) graph_builder.add_edge(START, "get_rss_feed") graph_builder.add_edge("get_rss_feed", "get_rss_links" ) graph_builder.add_edge("get_rss_links", "load_links") graph_builder.add_edge("load_links", "generate_summary") graph_builder.add_edge("generate_summary", "get_categories") graph_builder.add_edge("get_categories", "execute_tools") graph_builder.add_edge("execute_tools", END) self.graph=graph_builder.compile(checkpointer=memory) print(self.graph.get_graph().draw_ascii()) def invoke(self, query, sessionId) -> str: config = {"configurable": {"thread_id": sessionId}} news_ctx = NewsContext(rss_feed=query) self.graph.invoke({"data": news_ctx.rss_feed}, config) return self.get_agent_response(config=config) async def stream(self, query, sessionId) -> AsyncIterable[Dict[str, Any]]: inputs = {"messages": [("user", query)]} config = {"configurable": {"thread_id": sessionId}} for item in self.graph.stream(inputs, config, stream_mode="values"): message = item["messages"][-1] if isinstance(message, AIMessage): yield { "is_task_complete": False, "require_user_input": False, "content": message.content, } elif isinstance(message, ToolMessage): yield { "is_task_complete": False, "require_user_input": False, "content": message.content, } yield self.get_agent_response(config) def get_agent_response(self, config): current_state = self.graph.get_state(config) structured_response = current_state.values.get('structured_response') if structured_response and isinstance(structured_response, ResponseFormat): if structured_response.status == "input_required": return { "is_task_complete": False, "require_user_input": True, "content": structured_response.message } elif structured_response.status == "error": return { "is_task_complete": False, "require_user_input": True, "content": structured_response.message } elif structured_response.status == "completed": return { "is_task_complete": True, "require_user_input": False, "content": structured_response.message } return { "is_task_complete": False, "require_user_input": True, "content": "We are unable to process your request at the moment. Please try again.", } def generate_summary(state: GraphState): model = init_llm() articles = state["articles"] for article in articles: content = f""" Generate a two paragraph summary of the following text: {article["content"]} """ request = [HumanMessage(content=content)] summary = model.invoke(request) article["summary"] = summary return {"articles": articles, "messages": [AIMessage(content="Completed getting article summaries from LLM")]} def get_categories(state: GraphState): model = init_llm_with_tool_calling() articles = state["articles"] for article in articles: content= f"""You are an assistant who will use the tool called get_article_categories. Determine the categories that best describe this text: {article["summary"]} Use only categories from the following list: Real Estate, Politics, Sports, Immigration, Food, Entertainment, Business, Crime, Weather, Technology, Medicine, Science or Other. You may select more than one category per text""" request = [SystemMessage(content=content)] categories = model.invoke(request) article["tool_call_raw"] = categories.content return {"tool": content, "messages": [ToolMessage(content="Completed getting tool calls from LLM", tool_call_id="567")]} def parse_tool_call(article): start_index = article["tool_call_raw"].find("{") end_index = article["tool_call_raw"].rfind("}") + 1 json_str = article["tool_call_raw"][start_index:end_index] data = json.loads(json_str) article["tool_name"] = data.get("name") categories = data.get("arguments", {}).get("article_categories") article["tool_argument"] = {"article_categories": categories} def execute_tools(state: GraphState): articles = state["articles"] for article in articles: parse_tool_call(article) article["tool_result"] = tools_names[article["tool_name"]].invoke(article["tool_argument"]) res = [NewsResult(r).to_dict() for r in articles] res_json = json.dumps(res) structured_response = ResponseFormat(message=res_json, status="completed") return {"messages": [ToolMessage(artifact=res, content="Completed calling tools to categorize articles", tool_call_id="123")], "structured_response": structured_response}

One thing to note is that the agent must implement streaming functionality in case you want to stream asynchronous updates as the workflow executes. However, feel free to skip that if all you need is a single synchronous workflow. In LangGraph streaming is supported through the graph.stream api.

A2A Client

Google’s reference implementation comes with a flexible A2AClient class that can be configured to hit your specific A2AServer by updating the --agent parameter. No other changes are required to start working with the A2A Server.

The good news is that everything you need for push notifications to support streaming is already wired up in the client. In my example I focused on the reference implementation under hosts/cli, but you can easily stick this in a different client application.

As a reference I have included the sample client from the Google repo below:

from common.client import A2AClient, A2ACardResolver from common.types import TaskState, Task from common.utils.push_notification_auth import PushNotificationReceiverAuth import asyncclick as click import asyncio from uuid import uuid4 import urllib @click.command() @click.option("--agent", default="http://localhost:10001") @click.option("--session", default=0) @click.option("--history", default=True) @click.option("--use_push_notifications", default=True) @click.option("--push_notification_receiver", default="http://localhost:5000") async def cli(agent, session, history, use_push_notifications: bool, push_notification_receiver: str): card_resolver = A2ACardResolver(agent) card = card_resolver.get_agent_card() print("======= Agent Card ========") print(card.model_dump_json(exclude_none=True)) notif_receiver_parsed = urllib.parse.urlparse(push_notification_receiver) notification_receiver_host = notif_receiver_parsed.hostname notification_receiver_port = notif_receiver_parsed.port print(f"Requesting to use push notification {use_push_notifications}") if use_push_notifications: from hosts.cli.push_notification_listener import PushNotificationListener notification_receiver_auth = PushNotificationReceiverAuth() await notification_receiver_auth.load_jwks(f"{agent}/.well-known/jwks.json") push_notification_listener = PushNotificationListener( host = notification_receiver_host, port = notification_receiver_port, notification_receiver_auth=notification_receiver_auth, ) push_notification_listener.start() client = A2AClient(agent_card=card) if session == 0: sessionId = uuid4().hex else: sessionId = session continue_loop = True streaming = card.capabilities.streaming while continue_loop: taskId = uuid4().hex print("========= starting a new task ======== ") continue_loop = await completeTask(client, streaming, use_push_notifications, notification_receiver_host, notification_receiver_port, taskId, sessionId) if history and continue_loop: print("========= history ======== ") task_response = await client.get_task({"id": taskId, "historyLength": 10}) print(task_response.model_dump_json(include={"result": {"history": True}})) async def completeTask(client: A2AClient, streaming, use_push_notifications: bool, notification_receiver_host: str, notification_receiver_port: int, taskId, sessionId): prompt = click.prompt( "\nWhat do you want to send to the agent? (:q or quit to exit)" ) if prompt == ":q" or prompt == "quit": return False payload = { "id": taskId, "sessionId": sessionId, "acceptedOutputModes": ["text"], "message": { "role": "user", "parts": [ { "type": "text", "text": prompt, } ], }, } if use_push_notifications: payload["pushNotification"] = { "url": f"http://{notification_receiver_host}:{notification_receiver_port}/notify", "authentication": { "schemes": ["bearer"], }, } taskResult = None if streaming: response_stream = client.send_task_streaming(payload) async for result in response_stream: print(f"stream event => {result.model_dump_json(exclude_none=True)}") taskResult = await client.get_task({"id": taskId}) else: taskResult = await client.send_task(payload) print(f"\n{taskResult.model_dump_json(exclude_none=True)}") ## if the result is that more input is required, loop again. state = TaskState(taskResult.result.status.state) if state.name == TaskState.INPUT_REQUIRED.name: return await completeTask( client, streaming, use_push_notifications, notification_receiver_host, notification_receiver_port, taskId, sessionId ) else: ## task is complete return True if __name__ == "__main__": asyncio.run(cli())

I have added the sample to my fork of the original repo here in case you are interested in seeing the full implementation.