Streaming and SSE

Server-Sent Events (SSE) enable servers to push updates to clients over a single HTTP connection. The provided code demonstrates how to use mokksy to simulate an SSE stream and verify its response in both Kotlin and Java.

Server-Sent Events (SSE)

Server-Sent Events (SSE) allow a server to push updates to the client over a single, long-lived HTTP connection.

Streaming clients fail in ways static JSON tests cannot catch: missed chunks, early completion, timeout handling, buffering, and reconnect logic.

1Client opens SSE connection -> Mokksy sends event chunks -> Client handles stream completion, delay, or timeout

This example defines an SSE endpoint, emits two events with controlled timing, and verifies that the client receives a real text/event-stream response.

 1mokksy.post {
 2  path = beEqual("/sse")
 3} respondsWithSseStream {
 4  flow =
 5    flow {
 6      delay(200.milliseconds)
 7      emit(
 8        ServerSentEvent(
 9          data = "One",
10        ),
11      )
12      delay(50.milliseconds)
13      emit(
14        ServerSentEvent(
15          data = "Two",
16        ),
17      )
18    }
19}
20
21// when
22val result = client.post("/sse")
23
24// then
25result shouldNotBeNull {
26  status shouldBe HttpStatusCode.OK
27  contentType() shouldBe ContentType.Text.EventStream.withCharsetIfNeeded(Charsets.UTF_8)
28  bodyAsText() shouldBe "data: One\r\n\r\ndata: Two\r\n\r\n"
29}
 1mokksy.post(spec -> spec.path("/sse"))
 2    .respondsWithSseStream(builder -> builder
 3        .chunk(SseEvent.data("One"))
 4        .chunk(SseEvent.data("Two")));
 5
 6var response = httpClient.send(
 7    HttpRequest.newBuilder()
 8        .uri(URI.create(mokksy.baseUrl() + "/sse"))
 9        .POST(HttpRequest.BodyPublishers.noBody())
10        .build(),
11    HttpResponse.BodyHandlers.ofString()
12);
13
14assertThat(response.statusCode()).isEqualTo(200);
15assertThat(response.body()).isEqualTo("data: One\r\n\r\ndata: Two\r\n\r\n");

Long-lived SSE streams

By default, the SSE stream closes when the flow completes.

To keep it open (e.g. for clients that reconnect on close), end the flow with awaitCancellation():

1mokksy.post {
2  path = beEqual("/sse-ll")
3} respondsWithSseStream {
4  flow = flow {
5    emit(ServerSentEvent(data = "hello"))
6    awaitCancellation()
7  }
8}
1mokksy.post(spec -> spec.path("/sse-ll"))
2    .respondsWithSseStream(stream -> stream
3        .chunks(Stream.generate(() -> SseEvent.data("heartbeat")))
4        .delayBetweenChunksMillis(1_000));

SSE response with chunk delays

Use delayBetweenChunks when you want to verify that a client handles events as they arrive, instead of waiting for the full response body.

1mokksy.get {
2  path("/events")
3} respondsWithSseStream {
4  delayBetweenChunks = 100.milliseconds
5  chunks += ServerSentEvent(data = """{"status":"accepted"}""")
6  chunks += ServerSentEvent(data = """{"status":"processed"}""")
7}
1mokksy.get(spec -> spec.path("/events"))
2    .respondsWithSseStream(stream -> stream
3        .delayBetweenChunksMillis(100)
4        .chunk(SseEvent.data("{\"status\":\"accepted\"}"))
5        .chunk(SseEvent.data("{\"status\":\"processed\"}")));

Plain text stream

Use respondsWithStream for non-SSE streaming responses, such as line-delimited downloads or APIs that return partial data before the transfer completes.

1mokksy.get {
2  path("/download")
3} respondsWithStream {
4  delayBetweenChunks = 50.milliseconds
5  chunks += "part-1\n"
6  chunks += "part-2\n"
7}
1mokksy.get(spec -> spec.path("/download"))
2    .respondsWithStream(stream -> stream
3        .delayBetweenChunksMillis(50)
4        .chunk("part-1\n")
5        .chunk("part-2\n"));

Use these examples to test code that processes data before the full response is available. For timeout, retry, and malformed-stream cases, continue with Failure simulation.