A2A Protocol

Agent2Agent (A2A) Protocol

Maven Central

MockAgentServer provides a local mock server for simulating A2A (Agent-to-Agent) API endpoints. It simplifies testing by allowing you to define request expectations and responses without making real network calls.

NB! The server only supports JSON-RPC 2.0 transport. Supported A2A protocol version is 0.3.0.

Quick Start

Add Dependency

Include the library in your test dependencies (Maven or Gradle).

pom.xml
1<dependency>
2    <groupId>dev.mokksy.aimocks</groupId>
3    <artifactId>ai-mocks-a2a-jvm</artifactId>
4    <version>[LATEST_VERSION]</version>
5  <scope>test</scope>
6</dependency>
build.gradle.kts
1dependencies {
2    testImplementation("me.kpavlov.aimocks:ai-mocks-a2a:0.x.x")
3    // Optional: typed model classes
4    testImplementation("me.kpavlov.aimocks:ai-mocks-a2a-models:0.x.x")
5}
build.gradle
1dependencies {
2    testImplementation 'me.kpavlov.aimocks:ai-mocks-a2a:0.x.x'
3    testImplementation 'me.kpavlov.aimocks:ai-mocks-a2a-models:0.x.x'
4}

Initialize the Server

1val a2aServer = MockAgentServer(verbose = true)
  • The server will start on a random free port by default.
  • You can retrieve the server's base URL via a2aServer.baseUrl().

HTTP Client Setup

You may use any HTTP client that supports Server-Sent Events (SSE) to make requests to the mock server. The AI-Mocks A2A library provides a convenient function to create a Ktor client configured for A2A:

1// Create a Ktor client configured for A2A
2val a2aClient = A2AClientFactory.create(baseUrl = a2aServer.baseUrl())

Alternatively, you can create the client manually:

 1// Create a Ktor client configured for A2A
 2val a2aClient = HttpClient(Java) {
 3    val json = Json {
 4        prettyPrint = true
 5        isLenient = true
 6    }
 7    install(ContentNegotiation) {
 8        json(json)
 9    }
10    install(SSE) {
11        showRetryEvents()
12        showCommentEvents()
13    }
14    install(DefaultRequest) {
15        url(a2aServer.baseUrl()) // Set the base URL
16    }
17}

Agent Card Endpoint

The Agent Card endpoint provides information about the agent's capabilities, skills, and authentication mechanisms. Remote Agents that support A2A are required to publish an Agent Card in JSON format describing the agent's capabilities/skills and authentication mechanism. Clients use the Agent Card information to identify the best agent that can perform a task and leverage A2A to communicate with that remote agent.

Mock Server configuration:

 1// Create an AgentCard object
 2val agentCard = AgentCard.create {
 3    name = "test-agent"
 4    description = "test-agent-description"
 5    url = a2aServer.baseUrl()
 6    documentationUrl = "https://example.com/documentation"
 7    version = "0.0.1"
 8    provider {
 9        organization = "Acme, Inc."
10        url = "https://example.com/organization"
11    }
12    capabilities {
13        streaming = true
14        pushNotifications = true
15        stateTransitionHistory = true
16    }
17    skills += skill {
18        id = "walk"
19        name = "Walk the walk"
20      description = "I can walk"
21      tags = listOf("move")
22    }
23    skills += skill {
24        id = "talk"
25        name = "Talk the talk"
26      description = "I can talk"
27      tags = listOf("communicate")
28    }
29}
30
31// Configure the mock server to respond with the AgentCard
32a2aServer.agentCard() responds {
33    delay = 1.milliseconds
34    card = agentCard
35}

Client call example:

1// Make a GET request to the Agent Card endpoint
2val response = a2aClient
3  .get("/.well-known/agent-card.json") {
4    }.call
5    .response
6    .body<String>()
7
8// Parse the response into an AgentCard object
9val receivedCard = Json.decodeFromString<AgentCard>(response)

Get Task Endpoint

The Get Task endpoint allows clients to retrieve information about a specific task. Clients may use this method to retrieve the generated Artifacts for a Task. The agent determines the retention window for Tasks previously submitted to it. The client may also request the last N items of history of the Task which will include all Messages, in order, sent by client and server.

Mock Server configuration:

 1// Configure the mock server to respond with a task
 2a2aServer.getTask() responds {
 3    id = 1
 4    result {
 5        id = "tid_12345"
 6        contextId = "ctx_12345"
 7        status {
 8            state = "completed"
 9        }
10        artifacts += artifact {
11            name = "joke"
12            parts += textPart {
13                text = "This is a joke"
14            }
15        }
16    }
17}

You can also configure the mock server to respond with an error:

1// Configure the mock server to respond with a task not found error
2a2aServer.getTask() responds {
3    id = 1
4    error = taskNotFoundError()
5}

Client call example:

 1// Create a GetTaskRequest object
 2val jsonRpcRequest = GetTaskRequest(
 3    id = "1",
 4    params = TaskQueryParams(
 5        id = UUID.randomUUID().toString(),
 6        historyLength = 2,
 7    ),
 8)
 9
10// Make a POST request to the Get Task endpoint
11val response = a2aClient
12    .post("/") {
13        contentType(ContentType.Application.Json)
14        setBody(Json.encodeToString(jsonRpcRequest))
15    }.call
16    .response
17
18// Parse the response into a GetTaskResponse object
19val body = response.body<String>()
20val payload = Json.decodeFromString<GetTaskResponse>(body)

Send Message Endpoint

The Send Message endpoint allows clients to send a message to the agent for processing. This method allows a client to send content to a remote agent to start a new Task, resume an interrupted Task or reopen a completed Task. A Task interrupt may be caused due to an agent requiring additional user input or a runtime error.

Mock Server configuration:

 1// Create a Task object
 2val task = Task.create {
 3  id = "tid_12345"
 4  contextId = "ctx_12345"
 5  status {
 6    state = "completed"
 7  }
 8  artifact {
 9    name = "joke"
10    parts += text { "This is a joke" }
11    parts += file { uri = "https://example.com/readme.md" }
12    parts += file { bytes = "1234".toByteArray() }
13    parts += data { mapOf("foo" to "bar") }
14  }
15}
16
17// Configure the mock server to respond with the task
18a2aServer.sendMessage() responds {
19  id = 1
20  result = task
21}

Client call example:

 1// Create a SendMessageRequest object using the builder function
 2val jsonRpcRequest = sendMessageRequest {
 3    id = "1"
 4    params {
 5        message {
 6            role = Message.Role.user
 7            parts += text { "Tell me a joke" }
 8            parts += file { uri = "https://example.com/readme.md" }
 9            parts += file { bytes = "1234".toByteArray() }
10            parts += data { mapOf("foo" to "bar") }
11        }
12    }
13}
14
15// Make a POST request to the Send Message endpoint
16val response = a2aClient
17    .post("/") {
18        contentType(ContentType.Application.Json)
19        setBody(Json.encodeToString(jsonRpcRequest))
20    }.call
21    .response
22
23// Parse the response into a SendMessageResponse object
24val body = response.body<String>()
25val payload = Json.decodeFromString<SendMessageResponse>(body)

Send Message Streaming Endpoint

The Send Message Streaming endpoint allows clients to send a message to the agent for processing and receive streaming updates. For clients and remote agents capable of communicating over HTTP with Server-Sent Events (SSE), clients can send the RPC request with method message/stream when creating a new Task. The remote agent can respond with a stream of TaskStatusUpdateEvents (to communicate status changes or instructions/requests) and TaskArtifactUpdateEvents (to stream generated results).

Mock Server configuration:

 1// Configure the mock server to respond with streaming updates
 2val taskId = "task_12345"
 3
 4a2aServer.sendMessageStreaming() responds {
 5    delayBetweenChunks = 1.seconds
 6    responseFlow = flow {
 7      emit(
 8        taskStatusUpdateEvent {
 9          id = taskId
10          status {
11            state = "working"
12            timestamp = Clock.System.now()
13          }
14        }
15      )
16      emit(
17        taskArtifactUpdateEvent {
18          id = taskId
19          artifact {
20            name = "joke"
21            parts += textPart {
22              text = "This"
23            }
24          }
25        }
26      )
27      emit(
28        taskArtifactUpdateEvent {
29          id = taskId
30          artifact {
31            name = "joke"
32            parts += textPart {
33              text = "is"
34            }
35            append = true
36          }
37        }
38      )
39      emit(
40        taskArtifactUpdateEvent {
41          id = taskId
42          artifact {
43            name = "joke"
44            parts += textPart {
45              text = "a"
46            }
47            append = true
48          }
49        }
50      )
51      emit(
52        taskArtifactUpdateEvent {
53          id = taskId
54          artifact {
55            name = "joke"
56            parts += textPart {
57              text = "joke!"
58            }
59            append = true
60            lastChunk = true
61          }
62        }
63      )
64      emit(
65        taskStatusUpdateEvent {
66          id = taskId
67          status {
68            state = "completed"
69            timestamp = Clock.System.now()
70          }
71          final = true
72        }
73      )
74    }
75}

Client call example:

 1// Create a collection to store the events
 2var collectedEvents = ConcurrentLinkedQueue<TaskUpdateEvent>()
 3
 4// Helper function to handle events
 5fun handleEvent(event: TaskUpdateEvent): Boolean {
 6    when (event) {
 7        is TaskStatusUpdateEvent -> {
 8            println("Task status: $event")
 9            if (event.final) {
10                return false
11            }
12        }
13        is TaskArtifactUpdateEvent -> {
14            println("Task artifact: $event")
15        }
16    }
17    return true
18}
19
20// Make a POST request to the Send Message Streaming endpoint with SSE
21a2aClient.sse(
22    request = {
23        url { a2aServer.baseUrl() }
24        method = HttpMethod.Post
25        val payload = SendStreamingMessageRequest(
26            id = "1",
27            params = MessageSendParams.create {
28                message {
29                    role = Message.Role.user
30                    parts += textPart {
31                        text = "Tell me a joke"
32                    }
33                }
34            },
35        )
36        body = TextContent(
37            text = Json.encodeToString(payload),
38            contentType = ContentType.Application.Json,
39        )
40    },
41) {
42    var reading = true
43    while (reading) {
44        incoming.collect {
45            println("Event from server:\n$it")
46            it.data?.let {
47                val event = Json.decodeFromString<TaskUpdateEvent>(it)
48                collectedEvents.add(event)
49                if (!handleEvent(event)) {
50                    reading = false
51                    cancel("Finished")
52                }
53            }
54        }
55    }
56}

Cancel Task Endpoint

The Cancel Task endpoint allows clients to cancel a task that is in progress. A client may choose to cancel previously submitted Tasks, for example when the user no longer needs the result or wants to stop a long-running task.

Mock Server configuration:

1// Configure the mock server to respond with a canceled task
2a2aServer.cancelTask() responds {
3    id = 1
4    result {
5        id = "tid_12345"
6        contextId = UUID.randomUUID().toString()
7        status = TaskStatus(state = "canceled")
8    }
9}

Client call example:

 1// Create a CancelTaskRequest object
 2val jsonRpcRequest = cancelTaskRequest {
 3    id = "1"
 4    params {
 5        id = UUID.randomUUID().toString()
 6    }
 7}
 8
 9// Make a POST request to the Cancel Task endpoint
10val response = a2aClient
11    .post("/") {
12        contentType(ContentType.Application.Json)
13        setBody(Json.encodeToString(jsonRpcRequest))
14    }.call
15    .response
16
17// Parse the response into a CancelTaskResponse object
18val body = response.body<String>()
19val payload = Json.decodeFromString<CancelTaskResponse>(body)

Set Task Push Notification Config Endpoint

The Set Task Push Notification endpoint allows clients to configure push notifications for a task. Clients may configure a push notification URL for receiving updates on Task status changes. This is particularly useful for long-running tasks where the client may not want to maintain an open connection.

Mock Server configuration:

 1// Create a TaskPushNotificationConfig object
 2val taskId: TaskId = "task_12345"
 3val config = TaskPushNotificationConfig.create {
 4    id = taskId
 5    pushNotificationConfig {
 6        url = "https://example.com/callback"
 7        token = "abc.def.jk"
 8        authentication {
 9            credentials = "secret"
10            schemes += "Bearer"
11        }
12    }
13}
14
15// Configure the mock server to respond with the config
16a2aServer.setTaskPushNotification() responds {
17    id = 1
18    result {
19        id = taskId
20        pushNotificationConfig {
21            url = "https://example.com/callback"
22            token = "abc.def.jk"
23            authentication {
24                credentials = "secret"
25                schemes += "Bearer"
26            }
27        }
28    }
29}

Client call example:

 1// Create a TaskPushNotificationConfig object
 2val config = TaskPushNotificationConfig.create {
 3    id = "task_12345"
 4    pushNotificationConfig {
 5        url = "https://example.com/callback"
 6        token = "abc.def.jk"
 7        authentication {
 8            credentials = "secret"
 9            schemes += "Bearer"
10        }
11    }
12}
13
14// Create a SetTaskPushNotificationRequest object
15val jsonRpcRequest = SetTaskPushNotificationRequest(
16    id = "1",
17    params = config,
18)
19
20// Make a POST request to the Set Task Push Notification endpoint
21val response = a2aClient
22    .post("/") {
23        contentType(ContentType.Application.Json)
24        setBody(Json.encodeToString(jsonRpcRequest))
25    }.call
26    .response
27
28// Parse the response into a SetTaskPushNotificationResponse object
29val body = response.body<String>()
30val payload = Json.decodeFromString<SetTaskPushNotificationResponse>(body)

Get Task Push Notification Config Endpoint

The Get Task Push Notification endpoint allows clients to retrieve the push notification configuration for a specific task. Clients may retrieve the currently configured push notification configuration for a Task using this method, which is useful for verifying or displaying the current notification settings.

Mock Server configuration:

 1// Create a TaskPushNotificationConfig object
 2val taskId: TaskId = "task_12345"
 3val config = TaskPushNotificationConfig(
 4    id = taskId,
 5    pushNotificationConfig = PushNotificationConfig(
 6        url = "https://example.com/callback",
 7        token = "abc.def.jk",
 8        authentication = AuthenticationInfo(
 9            schemes = listOf("Bearer"),
10        ),
11    ),
12)
13
14// Configure the mock server to respond with the config
15a2aServer.getTaskPushNotification() responds {
16    id = 1
17    result = config
18}

Client call example:

 1// Create a GetTaskPushNotificationRequest object
 2val jsonRpcRequest = GetTaskPushNotificationRequest(
 3    id = "1",
 4    params = TaskIdParams(
 5        id = taskId,
 6    ),
 7)
 8
 9// Make a POST request to the Get Task Push Notification endpoint
10val response = a2aClient
11    .post("/") {
12        contentType(ContentType.Application.Json)
13        setBody(Json.encodeToString(jsonRpcRequest))
14    }.call
15    .response
16
17// Parse the response into a GetTaskPushNotificationResponse object
18val body = response.body<String>()
19val payload = Json.decodeFromString<GetTaskPushNotificationResponse>(body)

List Task Push Notification Config Endpoint

The List Task Push Notification Config endpoint allows clients to list configured push notification destinations. This can be useful to inspect or manage existing configurations.

Mock Server configuration:

 1// Configure the mock server to respond with a list of push notification configs
 2val taskId: TaskId = "task_12345"
 3
 4a2aServer.listTaskPushNotificationConfig() responds {
 5  id = 1
 6  result = listOf(
 7    TaskPushNotificationConfig.create {
 8      id = taskId
 9      pushNotificationConfig {
10        url = "https://example.com/callback"
11        token = "abc.def.jk"
12        authentication {
13          schemes += "Bearer"
14        }
15      }
16    }
17  )
18}

Client call example:

 1// Build a ListTaskPushNotificationConfigRequest
 2val jsonRpcRequest = ListTaskPushNotificationConfigRequest(
 3  id = "1",
 4  params = ListTaskPushNotificationConfigParams.create {
 5    limit(10)
 6    offset(0)
 7  },
 8)
 9
10// Make a POST request to the List Task Push Notification Config endpoint
11val response = a2aClient
12  .post("/") {
13    contentType(ContentType.Application.Json)
14    setBody(Json.encodeToString(jsonRpcRequest))
15  }.call
16  .response
17
18// Parse the response
19val body = response.body<String>()
20val payload = Json.decodeFromString<ListTaskPushNotificationConfigResponse>(body)

Delete Task Push Notification Config Endpoint

The Delete Task Push Notification Config endpoint allows clients to delete the configured push notification destination for a task.

Mock Server configuration:

1// Configure the mock server to respond to delete push notification config
2val taskId: TaskId = "task_12345"
3
4a2aServer.deleteTaskPushNotificationConfig() responds {
5  id = 1
6  // success without error
7}

Client call example:

 1// Build a DeleteTaskPushNotificationConfigRequest
 2val jsonRpcRequest = DeleteTaskPushNotificationConfigRequest(
 3  id = "1",
 4  params = deleteTaskPushNotificationConfigParams {
 5    id(taskId)
 6  },
 7)
 8
 9// Make a POST request to the Delete Task Push Notification Config endpoint
10val response = a2aClient
11  .post("/") {
12    contentType(ContentType.Application.Json)
13    setBody(Json.encodeToString(jsonRpcRequest))
14  }.call
15  .response
16
17// Parse the response
18val body = response.body<String>()
19val payload = Json.decodeFromString<DeleteTaskPushNotificationConfigResponse>(body)

Task Resubscription Endpoint

The Task Resubscription endpoint allows clients to resubscribe to streaming updates for a task that was previously created. This is useful when a client loses connection and needs to resume receiving updates for an ongoing task. A disconnected client may resubscribe to a remote agent that supports streaming to receive Task updates via Server-Sent Events (SSE).

Mock Server configuration:

 1// Configure the mock server to respond with streaming updates
 2val taskId: TaskId = "task_12345"
 3
 4a2aServer.taskResubscription() responds {
 5    delayBetweenChunks = 1.seconds
 6    responseFlow = flow {
 7        emit(
 8            taskStatusUpdateEvent {
 9                id = taskId
10                status {
11                    state = "working"
12                  timestamp = Clock.System.now()
13                }
14            }
15        )
16        emit(
17            taskArtifactUpdateEvent {
18                id = taskId
19                artifact {
20                    name = "joke"
21                    parts += textPart {
22                        text = "This is a resubscribed joke!"
23                    }
24                    lastChunk = true
25                }
26            }
27        )
28        emit(
29            taskStatusUpdateEvent {
30                id = taskId
31                status {
32                    state = "completed"
33                  timestamp = Clock.System.now()
34                }
35                final = true
36            }
37        )
38    }
39}

Client call example:

 1// Create a collection to store the events
 2val collectedEvents = ConcurrentLinkedQueue<TaskUpdateEvent>()
 3
 4// Helper function to handle events
 5fun handleEvent(event: TaskUpdateEvent): Boolean {
 6    when (event) {
 7        is TaskStatusUpdateEvent -> {
 8            println("Task status: $event")
 9            if (event.final) {
10                return false
11            }
12        }
13        is TaskArtifactUpdateEvent -> {
14            println("Task artifact: $event")
15        }
16    }
17    return true
18}
19
20// Make a POST request to the Task Resubscription endpoint with SSE
21a2aClient.sse(
22    request = {
23        url { a2aServer.baseUrl() }
24        method = HttpMethod.Post
25        contentType(ContentType.Application.Json)
26        val payload = TaskResubscriptionRequest(
27            id = "1",
28            params = TaskQueryParams(
29                id = taskId,
30            ),
31        )
32        setBody(payload)
33    },
34) {
35    var reading = true
36    while (reading) {
37        incoming.collect {
38            println("Event from server:\n$it")
39            it.data?.let {
40                val event = Json.decodeFromString<TaskUpdateEvent>(it)
41                collectedEvents.add(event)
42                if (!handleEvent(event)) {
43                    reading = false
44                    cancel("Finished")
45                }
46            }
47        }
48    }
49}

Testing Push Notifications

The A2A protocol supports push notifications, which allow agents to notify clients of updates outside a connected session. This is particularly useful for long-running tasks where the client may not want to maintain an open connection.

Accessing Task Notification History

You can access the notification history for a specific task using the getTaskNotifications method:

1val taskId: TaskId = "task_12345"
2val notificationHistory = a2aServer.getTaskNotifications(taskId)
3
4// Verify that the history is initially empty
5notificationHistory.events() shouldHaveSize 0

Sending Push Notifications

You can send push notifications using the sendPushNotification method:

 1val taskUpdateEvent = taskArtifactUpdateEvent {
 2    id = taskId
 3    artifact {
 4        name = "joke"
 5        parts += textPart {
 6            text = "This is a notification joke!"
 7        }
 8        lastChunk = true
 9    }
10}
11a2aServer.sendPushNotification(event = taskUpdateEvent)

Verifying Notifications

You can verify that notifications were received by checking the notification history:

1// Verify that the notification history contains the event
2notificationHistory.events() shouldContain taskUpdateEvent

Verifying Requests

After your test is complete, you can verify that all expected requests were received:

1a2aServer.verifyNoUnexpectedRequests()

This ensures that your test made all the expected requests to the mock server.