cover

Yet another article on http streaming

Since the previous article How to stream data over HTTP using Node and Fetch API, received positive feedback, let’s continue to evaluate how to apply HTTP streaming using Java Servlet Container and Fetch API. In this article we will see how to apply HTTP streaming within Jetty a well-known Java Servlet Container. 

Requirements

Here are the specific data streaming requirements for this scenario.

  • Divide our overall computation into smaller tasks that can return a partial (and consistent) result.
  • Send data chunks over HTTP using features available from HttpServletRequest and HttpServletResponse 
  • Use a Readable stream from Fetch API to receive the data chunks over HTTP on the Client 

Look to the implementation 👀

Server - Jetty Servlet

👉 Take note that the implementation will be based on Jetty 12.x and (as requires) Java 17 

In Jetty Servlet Container the main implementation steps are: 

  • Initiate asynchronous processing for a given request using ServletRequest.startAsync() method.

    ServletRequest.startAsync() is part of the Servlet 3.0 API, which introduced asynchronous processing to servlets. This allowing the Servlet to handle tasks that may take a long time to complete without blocking the main request-handling thread.

  • Acquire a PrintWriter from response
  • Perform an asynchronous task for generate chunks of data**, writing them to the response through PrintWriter
  • Complete asynchronous processing once finished
  • Serve a html page (index.html) thta contains javascript code that will fetch data stream (consume the data chunk over http.

The server code concerns the HttpServlet.doGET implementation, I’ve reported the meaningful pieces, the complete implementation is here

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
    response.setContentType("application/json");
    response.setCharacterEncoding("UTF-8");

    // Start asynchronous processing
    var asyncContext = request.startAsync();

    // Acquire a writer from response
    final PrintWriter writer = response.getWriter();

    CompletableFuture.runAsync(() -> {
        try {

            for (int chunk = 0; chunk < 10; ++chunk) {

                TimeUnit.SECONDS.sleep(1); // simulate time to accomplish task
                var data = new ChunkOfData(chunk);

                try {
                    var serializedData = objectMapper.writeValueAsString(data);
                    writer.println(serializedData);
                } catch (IOException e) {
                    log.warn("error serializing data!. skip it.", e);
                }
                writer.flush();
            }
        } catch (InterruptedException e) {
            log.error("got an interrupt on processing!", e);
            throw new RuntimeException(e);
        }
    }).whenComplete((result, ex) -> {
        writer.close();
        asyncContext.complete();
    });
}

As said the request.startAsync() starts the asynchronous processing, while the process for streaming data is performed using CompletableFuture.runAsync() that is asynchronously completed by a task running in the ForkJoinPool.commonPool() after it runs the given action. 

As you can see, data streaming is straightforward. You must write each chunk of data to the response’s writer and then flush it. Once completed entire process, you must terminate both writer (close()) and asynchronous context (complete()

Client - Web Component. 

The last step is to create a Web Component to consume and show streamed chunks of data. 

As we did in the previous article, we used the Fetch API to create the function streamingResponse(), which can handle streaming response from the server using a body reader . We use this function in the connectedCallback() method, a lifecycle hook in Web Components, that is invoked each time a custom element is appended to the DOM. 


async function* streamingResponse(response) {
  // Attach Reader
  const reader = response.body.getReader();
  while (true) {
    // wait for next encoded chunk
    const { done, value } = await reader.read();
    // check if stream is done
    if (done) break;
    // Decodes data chunk and yields it
    yield (new TextDecoder().decode(value));
  }
}

/**
 * StreamingElement is a custom web component .
 * It is used to display streaming data.
 */
export class StreamingElement extends HTMLElement {

    get url() { return this.getAttribute('url') }

    async #fetchStreamingData() {

        console.debug( 'start fetching data')
        const execResponse = await fetch(`${this.url}/stream`);

        for await (let chunk of streamingResponse( execResponse )  ) {
            console.debug( 'fetched chunk', chunk )
            this.render( chunk );
        }
    }

    connectedCallback() {
        this.#fetchStreamingData()
    }

}

window.customElements.define('streaming-poc', StreamingElement);

Bundle all and let’s try

After complete implementations i’ve bundled the client code (js, html) within java project as embedded resources configuring Jetty server to map a static route on them. So, to try we can simply run the jar or using maven exec plugin as shown below: 

mvn exec:java@test  

Et voilà ✅, we have all the main elements to implement successfully streaming data over http using Java Servlet. 

Bonus 💯: Using java-async-generator library 

To achieve chunked transfer encoding over HTTP, we must break the main computation into smaller tasks that yield partial (yet consistent) results. Javascript offers a powerful built-in tool for this purpose: async generators, which are perfect for the task. Java lacks an async generator equivalent, however, we’ve created a library, the java-async-generator, to try bridge this gap. The library uses the CompletableFuture that is the java concept closer to the Promise in javascript. Below the Servlet code that use java-async-generator for streaming data 

Below how to looks like the HttpServlet.doGET implementation using async generator


protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
    response.setContentType("application/json");
    response.setCharacterEncoding("UTF-8");

    // Start asynchronous processing
    var asyncContext = request.startAsync();

    // Acquire a writer from response
    final PrintWriter writer = response.getWriter();

    // create async generator
    var startAsyncTasks = AsyncGeneratorQueue.of(new LinkedBlockingQueue<>(), 
                                                    this::taskEmitter );
    // start async generator
    startAsyncTasks.forEachAsync( chunk -> {
        try {
            var serializedData = objectMapper.writeValueAsString(chunk);
            writer.println(serializedData);
        } catch (IOException e) {
            StreamingServer.log.warn("error serializing data!. skip it.", e);
        }
        writer.flush();

    }).whenComplete((result, ex) -> {
        writer.close();
        asyncContext.complete();
    });
}


private void taskEmitter( BlockingQueue<AsyncGenerator.Data<ChunkOfData>> emitter ) {
    try {
        for (int chunk = 0; chunk < 10; ++chunk) {
            TimeUnit.SECONDS.sleep(1); // simulate time to accomplish task
            // add task to emitter
            emitter.add( AsyncGenerator.Data.of(new ChunkOfData(chunk)));
        }
    }
    catch (InterruptedException e) {
        StreamingServer.log.error("got an interrupt on processing!", e);
        throw new RuntimeException(e);
    }
}

The main difference respects the previous server implementation is the creation of an AsyncIteratorQueue, a specialization of AsyncIterator, designed to enqueue asynchronous tasks with a blocking queue and retrieve each task’s outcome within a for-each iteration. As you can see the async tasks are emitted by taskEmitter() method, providing a well-defined and coherent data stream from the emitting source to the receiver, simplifying implementation of more complex streaming scenarios.

Conclusion 

This article has presented a practical guide to streaming data over HTTP using Java Servlet and how to consume and display it from javascript using the fetch API. Streaming data is preferable to polling or long-polling in terms of efficiency, responsiveness, and scalability. This technique can enhance the functionality and interactivity of web applications that need to show live or near-live data. Feel free to try out this method and share your feedback with us. 

I hope that this knowledge will be helpful, in the meanwhile, enjoy coding! 👋 

💻 The complete code is available on Github 💻 

References