A2A Protocol
Agent2Agent (A2A) Protocol
MockAgentServer provides a local mock server for simulating A2A (Agent-to-Agent) API endpoints. It simplifies testing by allowing you to define request expectations and responses without making real network calls.
NB! The server only supports JSON-RPC 2.0 transport. Supported A2A protocol version is 0.3.0.
Quick Start
Add Dependency
Include the library in your test dependencies (Maven or Gradle).
1<dependency>
2 <groupId>dev.mokksy.aimocks</groupId>
3 <artifactId>ai-mocks-a2a-jvm</artifactId>
4 <version>[LATEST_VERSION]</version>
5 <scope>test</scope>
6</dependency>1dependencies {
2 testImplementation("me.kpavlov.aimocks:ai-mocks-a2a:0.x.x")
3 // Optional: typed model classes
4 testImplementation("me.kpavlov.aimocks:ai-mocks-a2a-models:0.x.x")
5}1dependencies {
2 testImplementation 'me.kpavlov.aimocks:ai-mocks-a2a:0.x.x'
3 testImplementation 'me.kpavlov.aimocks:ai-mocks-a2a-models:0.x.x'
4}Initialize the Server
1val a2aServer = MockAgentServer(verbose = true)- The server will start on a random free port by default.
- You can retrieve the server's base URL via
a2aServer.baseUrl().
HTTP Client Setup
You may use any HTTP client that supports Server-Sent Events (SSE) to make requests to the mock server. The AI-Mocks A2A library provides a convenient function to create a Ktor client configured for A2A:
1// Create a Ktor client configured for A2A
2val a2aClient = A2AClientFactory.create(baseUrl = a2aServer.baseUrl())Alternatively, you can create the client manually:
1// Create a Ktor client configured for A2A
2val a2aClient = HttpClient(Java) {
3 val json = Json {
4 prettyPrint = true
5 isLenient = true
6 }
7 install(ContentNegotiation) {
8 json(json)
9 }
10 install(SSE) {
11 showRetryEvents()
12 showCommentEvents()
13 }
14 install(DefaultRequest) {
15 url(a2aServer.baseUrl()) // Set the base URL
16 }
17}Agent Card Endpoint
The Agent Card endpoint provides information about the agent's capabilities, skills, and authentication mechanisms. Remote Agents that support A2A are required to publish an Agent Card in JSON format describing the agent's capabilities/skills and authentication mechanism. Clients use the Agent Card information to identify the best agent that can perform a task and leverage A2A to communicate with that remote agent.
Mock Server configuration:
1// Create an AgentCard object
2val agentCard = AgentCard.create {
3 name = "test-agent"
4 description = "test-agent-description"
5 url = a2aServer.baseUrl()
6 documentationUrl = "https://example.com/documentation"
7 version = "0.0.1"
8 provider {
9 organization = "Acme, Inc."
10 url = "https://example.com/organization"
11 }
12 capabilities {
13 streaming = true
14 pushNotifications = true
15 stateTransitionHistory = true
16 }
17 skills += skill {
18 id = "walk"
19 name = "Walk the walk"
20 description = "I can walk"
21 tags = listOf("move")
22 }
23 skills += skill {
24 id = "talk"
25 name = "Talk the talk"
26 description = "I can talk"
27 tags = listOf("communicate")
28 }
29}
30
31// Configure the mock server to respond with the AgentCard
32a2aServer.agentCard() responds {
33 delay = 1.milliseconds
34 card = agentCard
35}Client call example:
1// Make a GET request to the Agent Card endpoint
2val response = a2aClient
3 .get("/.well-known/agent-card.json") {
4 }.call
5 .response
6 .body<String>()
7
8// Parse the response into an AgentCard object
9val receivedCard = Json.decodeFromString<AgentCard>(response)Get Task Endpoint
The Get Task endpoint allows clients to retrieve information about a specific task. Clients may use this method to retrieve the generated Artifacts for a Task. The agent determines the retention window for Tasks previously submitted to it. The client may also request the last N items of history of the Task which will include all Messages, in order, sent by client and server.
Mock Server configuration:
1// Configure the mock server to respond with a task
2a2aServer.getTask() responds {
3 id = 1
4 result {
5 id = "tid_12345"
6 contextId = "ctx_12345"
7 status {
8 state = "completed"
9 }
10 artifacts += artifact {
11 name = "joke"
12 parts += textPart {
13 text = "This is a joke"
14 }
15 }
16 }
17}You can also configure the mock server to respond with an error:
1// Configure the mock server to respond with a task not found error
2a2aServer.getTask() responds {
3 id = 1
4 error = taskNotFoundError()
5}Client call example:
1// Create a GetTaskRequest object
2val jsonRpcRequest = GetTaskRequest(
3 id = "1",
4 params = TaskQueryParams(
5 id = UUID.randomUUID().toString(),
6 historyLength = 2,
7 ),
8)
9
10// Make a POST request to the Get Task endpoint
11val response = a2aClient
12 .post("/") {
13 contentType(ContentType.Application.Json)
14 setBody(Json.encodeToString(jsonRpcRequest))
15 }.call
16 .response
17
18// Parse the response into a GetTaskResponse object
19val body = response.body<String>()
20val payload = Json.decodeFromString<GetTaskResponse>(body)Send Message Endpoint
The Send Message endpoint allows clients to send a message to the agent for processing. This method allows a client to send content to a remote agent to start a new Task, resume an interrupted Task or reopen a completed Task. A Task interrupt may be caused due to an agent requiring additional user input or a runtime error.
Mock Server configuration:
1// Create a Task object
2val task = Task.create {
3 id = "tid_12345"
4 contextId = "ctx_12345"
5 status {
6 state = "completed"
7 }
8 artifact {
9 name = "joke"
10 parts += text { "This is a joke" }
11 parts += file { uri = "https://example.com/readme.md" }
12 parts += file { bytes = "1234".toByteArray() }
13 parts += data { mapOf("foo" to "bar") }
14 }
15}
16
17// Configure the mock server to respond with the task
18a2aServer.sendMessage() responds {
19 id = 1
20 result = task
21}Client call example:
1// Create a SendMessageRequest object using the builder function
2val jsonRpcRequest = sendMessageRequest {
3 id = "1"
4 params {
5 message {
6 role = Message.Role.user
7 parts += text { "Tell me a joke" }
8 parts += file { uri = "https://example.com/readme.md" }
9 parts += file { bytes = "1234".toByteArray() }
10 parts += data { mapOf("foo" to "bar") }
11 }
12 }
13}
14
15// Make a POST request to the Send Message endpoint
16val response = a2aClient
17 .post("/") {
18 contentType(ContentType.Application.Json)
19 setBody(Json.encodeToString(jsonRpcRequest))
20 }.call
21 .response
22
23// Parse the response into a SendMessageResponse object
24val body = response.body<String>()
25val payload = Json.decodeFromString<SendMessageResponse>(body)Send Message Streaming Endpoint
The Send Message Streaming endpoint allows clients to send a message to the agent for processing and receive streaming updates. For clients and remote agents capable of communicating over HTTP with Server-Sent Events (SSE), clients can send the RPC request with method message/stream when creating a new Task. The remote agent can respond with a stream of TaskStatusUpdateEvents (to communicate status changes or instructions/requests) and TaskArtifactUpdateEvents (to stream generated results).
Mock Server configuration:
1// Configure the mock server to respond with streaming updates
2val taskId = "task_12345"
3
4a2aServer.sendMessageStreaming() responds {
5 delayBetweenChunks = 1.seconds
6 responseFlow = flow {
7 emit(
8 taskStatusUpdateEvent {
9 id = taskId
10 status {
11 state = "working"
12 timestamp = Clock.System.now()
13 }
14 }
15 )
16 emit(
17 taskArtifactUpdateEvent {
18 id = taskId
19 artifact {
20 name = "joke"
21 parts += textPart {
22 text = "This"
23 }
24 }
25 }
26 )
27 emit(
28 taskArtifactUpdateEvent {
29 id = taskId
30 artifact {
31 name = "joke"
32 parts += textPart {
33 text = "is"
34 }
35 append = true
36 }
37 }
38 )
39 emit(
40 taskArtifactUpdateEvent {
41 id = taskId
42 artifact {
43 name = "joke"
44 parts += textPart {
45 text = "a"
46 }
47 append = true
48 }
49 }
50 )
51 emit(
52 taskArtifactUpdateEvent {
53 id = taskId
54 artifact {
55 name = "joke"
56 parts += textPart {
57 text = "joke!"
58 }
59 append = true
60 lastChunk = true
61 }
62 }
63 )
64 emit(
65 taskStatusUpdateEvent {
66 id = taskId
67 status {
68 state = "completed"
69 timestamp = Clock.System.now()
70 }
71 final = true
72 }
73 )
74 }
75}Client call example:
1// Create a collection to store the events
2var collectedEvents = ConcurrentLinkedQueue<TaskUpdateEvent>()
3
4// Helper function to handle events
5fun handleEvent(event: TaskUpdateEvent): Boolean {
6 when (event) {
7 is TaskStatusUpdateEvent -> {
8 println("Task status: $event")
9 if (event.final) {
10 return false
11 }
12 }
13 is TaskArtifactUpdateEvent -> {
14 println("Task artifact: $event")
15 }
16 }
17 return true
18}
19
20// Make a POST request to the Send Message Streaming endpoint with SSE
21a2aClient.sse(
22 request = {
23 url { a2aServer.baseUrl() }
24 method = HttpMethod.Post
25 val payload = SendStreamingMessageRequest(
26 id = "1",
27 params = MessageSendParams.create {
28 message {
29 role = Message.Role.user
30 parts += textPart {
31 text = "Tell me a joke"
32 }
33 }
34 },
35 )
36 body = TextContent(
37 text = Json.encodeToString(payload),
38 contentType = ContentType.Application.Json,
39 )
40 },
41) {
42 var reading = true
43 while (reading) {
44 incoming.collect {
45 println("Event from server:\n$it")
46 it.data?.let {
47 val event = Json.decodeFromString<TaskUpdateEvent>(it)
48 collectedEvents.add(event)
49 if (!handleEvent(event)) {
50 reading = false
51 cancel("Finished")
52 }
53 }
54 }
55 }
56}Cancel Task Endpoint
The Cancel Task endpoint allows clients to cancel a task that is in progress. A client may choose to cancel previously submitted Tasks, for example when the user no longer needs the result or wants to stop a long-running task.
Mock Server configuration:
1// Configure the mock server to respond with a canceled task
2a2aServer.cancelTask() responds {
3 id = 1
4 result {
5 id = "tid_12345"
6 contextId = UUID.randomUUID().toString()
7 status = TaskStatus(state = "canceled")
8 }
9}Client call example:
1// Create a CancelTaskRequest object
2val jsonRpcRequest = cancelTaskRequest {
3 id = "1"
4 params {
5 id = UUID.randomUUID().toString()
6 }
7}
8
9// Make a POST request to the Cancel Task endpoint
10val response = a2aClient
11 .post("/") {
12 contentType(ContentType.Application.Json)
13 setBody(Json.encodeToString(jsonRpcRequest))
14 }.call
15 .response
16
17// Parse the response into a CancelTaskResponse object
18val body = response.body<String>()
19val payload = Json.decodeFromString<CancelTaskResponse>(body)Set Task Push Notification Config Endpoint
The Set Task Push Notification endpoint allows clients to configure push notifications for a task. Clients may configure a push notification URL for receiving updates on Task status changes. This is particularly useful for long-running tasks where the client may not want to maintain an open connection.
Mock Server configuration:
1// Create a TaskPushNotificationConfig object
2val taskId: TaskId = "task_12345"
3val config = TaskPushNotificationConfig.create {
4 id = taskId
5 pushNotificationConfig {
6 url = "https://example.com/callback"
7 token = "abc.def.jk"
8 authentication {
9 credentials = "secret"
10 schemes += "Bearer"
11 }
12 }
13}
14
15// Configure the mock server to respond with the config
16a2aServer.setTaskPushNotification() responds {
17 id = 1
18 result {
19 id = taskId
20 pushNotificationConfig {
21 url = "https://example.com/callback"
22 token = "abc.def.jk"
23 authentication {
24 credentials = "secret"
25 schemes += "Bearer"
26 }
27 }
28 }
29}Client call example:
1// Create a TaskPushNotificationConfig object
2val config = TaskPushNotificationConfig.create {
3 id = "task_12345"
4 pushNotificationConfig {
5 url = "https://example.com/callback"
6 token = "abc.def.jk"
7 authentication {
8 credentials = "secret"
9 schemes += "Bearer"
10 }
11 }
12}
13
14// Create a SetTaskPushNotificationRequest object
15val jsonRpcRequest = SetTaskPushNotificationRequest(
16 id = "1",
17 params = config,
18)
19
20// Make a POST request to the Set Task Push Notification endpoint
21val response = a2aClient
22 .post("/") {
23 contentType(ContentType.Application.Json)
24 setBody(Json.encodeToString(jsonRpcRequest))
25 }.call
26 .response
27
28// Parse the response into a SetTaskPushNotificationResponse object
29val body = response.body<String>()
30val payload = Json.decodeFromString<SetTaskPushNotificationResponse>(body)Get Task Push Notification Config Endpoint
The Get Task Push Notification endpoint allows clients to retrieve the push notification configuration for a specific task. Clients may retrieve the currently configured push notification configuration for a Task using this method, which is useful for verifying or displaying the current notification settings.
Mock Server configuration:
1// Create a TaskPushNotificationConfig object
2val taskId: TaskId = "task_12345"
3val config = TaskPushNotificationConfig(
4 id = taskId,
5 pushNotificationConfig = PushNotificationConfig(
6 url = "https://example.com/callback",
7 token = "abc.def.jk",
8 authentication = AuthenticationInfo(
9 schemes = listOf("Bearer"),
10 ),
11 ),
12)
13
14// Configure the mock server to respond with the config
15a2aServer.getTaskPushNotification() responds {
16 id = 1
17 result = config
18}Client call example:
1// Create a GetTaskPushNotificationRequest object
2val jsonRpcRequest = GetTaskPushNotificationRequest(
3 id = "1",
4 params = TaskIdParams(
5 id = taskId,
6 ),
7)
8
9// Make a POST request to the Get Task Push Notification endpoint
10val response = a2aClient
11 .post("/") {
12 contentType(ContentType.Application.Json)
13 setBody(Json.encodeToString(jsonRpcRequest))
14 }.call
15 .response
16
17// Parse the response into a GetTaskPushNotificationResponse object
18val body = response.body<String>()
19val payload = Json.decodeFromString<GetTaskPushNotificationResponse>(body)List Task Push Notification Config Endpoint
The List Task Push Notification Config endpoint allows clients to list configured push notification destinations. This can be useful to inspect or manage existing configurations.
Mock Server configuration:
1// Configure the mock server to respond with a list of push notification configs
2val taskId: TaskId = "task_12345"
3
4a2aServer.listTaskPushNotificationConfig() responds {
5 id = 1
6 result = listOf(
7 TaskPushNotificationConfig.create {
8 id = taskId
9 pushNotificationConfig {
10 url = "https://example.com/callback"
11 token = "abc.def.jk"
12 authentication {
13 schemes += "Bearer"
14 }
15 }
16 }
17 )
18}Client call example:
1// Build a ListTaskPushNotificationConfigRequest
2val jsonRpcRequest = ListTaskPushNotificationConfigRequest(
3 id = "1",
4 params = ListTaskPushNotificationConfigParams.create {
5 limit(10)
6 offset(0)
7 },
8)
9
10// Make a POST request to the List Task Push Notification Config endpoint
11val response = a2aClient
12 .post("/") {
13 contentType(ContentType.Application.Json)
14 setBody(Json.encodeToString(jsonRpcRequest))
15 }.call
16 .response
17
18// Parse the response
19val body = response.body<String>()
20val payload = Json.decodeFromString<ListTaskPushNotificationConfigResponse>(body)Delete Task Push Notification Config Endpoint
The Delete Task Push Notification Config endpoint allows clients to delete the configured push notification destination for a task.
Mock Server configuration:
1// Configure the mock server to respond to delete push notification config
2val taskId: TaskId = "task_12345"
3
4a2aServer.deleteTaskPushNotificationConfig() responds {
5 id = 1
6 // success without error
7}Client call example:
1// Build a DeleteTaskPushNotificationConfigRequest
2val jsonRpcRequest = DeleteTaskPushNotificationConfigRequest(
3 id = "1",
4 params = deleteTaskPushNotificationConfigParams {
5 id(taskId)
6 },
7)
8
9// Make a POST request to the Delete Task Push Notification Config endpoint
10val response = a2aClient
11 .post("/") {
12 contentType(ContentType.Application.Json)
13 setBody(Json.encodeToString(jsonRpcRequest))
14 }.call
15 .response
16
17// Parse the response
18val body = response.body<String>()
19val payload = Json.decodeFromString<DeleteTaskPushNotificationConfigResponse>(body)Task Resubscription Endpoint
The Task Resubscription endpoint allows clients to resubscribe to streaming updates for a task that was previously created. This is useful when a client loses connection and needs to resume receiving updates for an ongoing task. A disconnected client may resubscribe to a remote agent that supports streaming to receive Task updates via Server-Sent Events (SSE).
Mock Server configuration:
1// Configure the mock server to respond with streaming updates
2val taskId: TaskId = "task_12345"
3
4a2aServer.taskResubscription() responds {
5 delayBetweenChunks = 1.seconds
6 responseFlow = flow {
7 emit(
8 taskStatusUpdateEvent {
9 id = taskId
10 status {
11 state = "working"
12 timestamp = Clock.System.now()
13 }
14 }
15 )
16 emit(
17 taskArtifactUpdateEvent {
18 id = taskId
19 artifact {
20 name = "joke"
21 parts += textPart {
22 text = "This is a resubscribed joke!"
23 }
24 lastChunk = true
25 }
26 }
27 )
28 emit(
29 taskStatusUpdateEvent {
30 id = taskId
31 status {
32 state = "completed"
33 timestamp = Clock.System.now()
34 }
35 final = true
36 }
37 )
38 }
39}Client call example:
1// Create a collection to store the events
2val collectedEvents = ConcurrentLinkedQueue<TaskUpdateEvent>()
3
4// Helper function to handle events
5fun handleEvent(event: TaskUpdateEvent): Boolean {
6 when (event) {
7 is TaskStatusUpdateEvent -> {
8 println("Task status: $event")
9 if (event.final) {
10 return false
11 }
12 }
13 is TaskArtifactUpdateEvent -> {
14 println("Task artifact: $event")
15 }
16 }
17 return true
18}
19
20// Make a POST request to the Task Resubscription endpoint with SSE
21a2aClient.sse(
22 request = {
23 url { a2aServer.baseUrl() }
24 method = HttpMethod.Post
25 contentType(ContentType.Application.Json)
26 val payload = TaskResubscriptionRequest(
27 id = "1",
28 params = TaskQueryParams(
29 id = taskId,
30 ),
31 )
32 setBody(payload)
33 },
34) {
35 var reading = true
36 while (reading) {
37 incoming.collect {
38 println("Event from server:\n$it")
39 it.data?.let {
40 val event = Json.decodeFromString<TaskUpdateEvent>(it)
41 collectedEvents.add(event)
42 if (!handleEvent(event)) {
43 reading = false
44 cancel("Finished")
45 }
46 }
47 }
48 }
49}Testing Push Notifications
The A2A protocol supports push notifications, which allow agents to notify clients of updates outside a connected session. This is particularly useful for long-running tasks where the client may not want to maintain an open connection.
Accessing Task Notification History
You can access the notification history for a specific task using the getTaskNotifications method:
1val taskId: TaskId = "task_12345"
2val notificationHistory = a2aServer.getTaskNotifications(taskId)
3
4// Verify that the history is initially empty
5notificationHistory.events() shouldHaveSize 0Sending Push Notifications
You can send push notifications using the sendPushNotification method:
1val taskUpdateEvent = taskArtifactUpdateEvent {
2 id = taskId
3 artifact {
4 name = "joke"
5 parts += textPart {
6 text = "This is a notification joke!"
7 }
8 lastChunk = true
9 }
10}
11a2aServer.sendPushNotification(event = taskUpdateEvent)Verifying Notifications
You can verify that notifications were received by checking the notification history:
1// Verify that the notification history contains the event
2notificationHistory.events() shouldContain taskUpdateEventVerifying Requests
After your test is complete, you can verify that all expected requests were received:
1a2aServer.verifyNoUnexpectedRequests()This ensures that your test made all the expected requests to the mock server.