Streaming

LangGraph4j is built with first class support for streaming. it uses java-async-generator library to help with this. Below there are the different ways to stream back outputs from a graph run

Streaming graph outputs (.stream())

.stream() is an method for streaming back outputs from a graph run. It returns an AsyncGenerator on which you must iterate to fetch the sequence of performed steps as instance of a [NodeOutput] class that bascally report the executed node name and the resulted state.

Streaming of Streaming (embed and composition)

AsyncGenerator supports embed (i.e. is composable), it can pause main iteration to perform a nested AsyncGenerator after that it resume main iteration. Relies on this feature we can return from Node action an AsyncGenerator that will be embed in main one of the graph, which result will be fetched from the same iterator given from .stream() making sub-streaming a seamlessy experience.

Streaming LLM tokens (using Langchain4j)

So to achieve streaming LLM tokens from an AI call using Langchain4j we use StreamingChatLanguageModel, below an example:

StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder()
    .apiKey(System.getenv("OPENAI_API_KEY"))
    .modelName(GPT_4_O_MINI)
    .build();

model.generate("Tell me a joke";, new StreamingResponseHandler<AiMessage>() {
        public void onNext(String token) { ... }

        public void onComplete(Response<T> response) { ... }

        public void onError(Throwable error) { ... }
}  );

LLMStreamingGenerator

Langgraph4j provides an utility class LLMStreamingGenerator that convert the StreamingResponseHandler in an AsyncGenerator. Below a code snippet, a working example is in the notebook llm-streaming)

var generator = LLMStreamingGenerator.builder()
                        .mapResult( r -> Map.of( "content", r.content() ) )
                        .build();

StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder()
    .apiKey(System.getenv("OPENAI_API_KEY"))
    .modelName(GPT_4_O_MINI)
    .build();

model.generate("Tell me a joke", generator.handler() );

for( var r : generator ) {
    log.info( "{}", r);
}
  
log.info( "RESULT: {}", generator.resultValue().orElse(null) );

When we build LLMStreamingGenerator we must provide a mapping function Function<CompletionResult, Map<String,Object>> that will be invoked on stream completion to convert completion result in a Map that represent a Partial state result that is what Langgrap4j expects as result.

Put all together in Node Action

Now we are ready to implement a Langgraph4j Node Action, below a represenattiove code snippet, for a complete implementation take a look to AgentExecutor sample.

Map<String,Object> callAgent( State state )  {

    // Mapping function
    final Function<Response<AiMessage>, Map<String,Object>> mapResult = response -> {

        if (response.finishReason() == FinishReason.TOOL_EXECUTION) {

            var toolExecutionRequests = response.content().toolExecutionRequests();
            var action = new AgentAction(toolExecutionRequests.get(0), "");

            return Map.of("agent_outcome", new AgentOutcome(action, null));

        } else {
            var result = response.content().text();
            var finish = new AgentFinish(Map.of("returnValues", result), result);

            return Map.of("agent_outcome", new AgentOutcome(null, finish));
        }
    };

    var generator = LLMStreamingGenerator.<AiMessage, State>builder()
            .mapResult(mapResult)
            .startingNode("agent") // optional: the node that require streaming 
            .startingState( state ) // optional: the state of node before streaming 
            .build();

    // call LLM in streaming mode
    streamingChatLanguageModel.generate( messages, tools, generator.handler() );        

    // return the "embed" generator
    return Map.of( "agent_outcome", generator);

}