How to integrate Spring AI LLM streaming in Langgraph4j

Initialize Logger

try( var file = new java.io.FileInputStream("./logging.properties")) {
    java.util.logging.LogManager.getLogManager().readConfiguration( file );
}

var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");

How to use StreamingChatGenerator

import org.bsc.async.AsyncGenerator;
import org.bsc.async.FlowGenerator;
import org.bsc.langgraph4j.NodeOutput;
import org.bsc.langgraph4j.StateGraph;
import org.bsc.langgraph4j.action.AsyncNodeAction;
import org.bsc.langgraph4j.action.EdgeAction;
import org.bsc.langgraph4j.action.NodeAction;
import org.bsc.langgraph4j.prebuilt.MessagesState;
import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer;
import org.bsc.langgraph4j.spring.ai.generators.StreamingChatGenerator;
import org.bsc.langgraph4j.spring.ai.serializer.std.SpringAIStateSerializer;
import org.bsc.langgraph4j.spring.ai.tool.SpringAIToolService;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import org.bsc.langgraph4j.utils.EdgeMappings;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.MessageType;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.model.tool.ToolCallingChatOptions;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.ollama.api.OllamaApi;
import org.springframework.ai.ollama.api.OllamaOptions;
import org.springframework.ai.openai.OpenAiChatModel;
import org.springframework.ai.openai.OpenAiChatOptions;
import org.springframework.ai.openai.api.OpenAiApi;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.ai.tool.function.FunctionToolCallback;
import reactor.core.publisher.Flux;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import static java.util.Optional.ofNullable;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;

enum AiModel {

        OPENAI_GPT_4O_MINI(
                OpenAiChatModel.builder()
                        .openAiApi(OpenAiApi.builder()
                                .baseUrl("https://api.openai.com")
                                .apiKey(System.getenv("OPENAI_API_KEY"))
                                .build())
                        .defaultOptions(OpenAiChatOptions.builder()
                                .model("gpt-4o-mini")
                                .logprobs(false)
                                .temperature(0.1)
                                .build())
                        .build()),        
        OLLAMA_QWEN2_5_7B(
            OllamaChatModel.builder()
                    .ollamaApi( OllamaApi.builder().baseUrl("http://localhost:11434").build() )
                    .defaultOptions(OllamaOptions.builder()
                            .model("qwen2.5:7b")
                            .temperature(0.1)
                            .build())
                    .build());
    ;

    public final ChatModel model;

    AiModel(  ChatModel model ) {
        this.model = model;
    }
}

var chatClient = ChatClient.builder(AiModel.OLLAMA_QWEN2_5_7B.model)
        .defaultOptions(ToolCallingChatOptions.builder()
                .internalToolExecutionEnabled(false) // Disable automatic tool execution
                .build())
        .defaultSystem("You are a helpful AI Assistant answering questions." )
        .build();

var flux = chatClient.prompt()
        .messages( new UserMessage("tell me a joke"))
        .stream()
        .chatResponse()
        ;

var generator  = StreamingChatGenerator.builder()
        .startingNode("agent")
        .mapResult( response -> Map.of( "messages", response.getResult().getOutput()))
        .build(flux);        

for( var item : generator ) {
    System.out.println("Received: " + item );
}

Received: StreamingOutput{node=agent, state=null, chunk=Sure}
Received: StreamingOutput{node=agent, state=null, chunk=,}
Received: StreamingOutput{node=agent, state=null, chunk= here}
Received: StreamingOutput{node=agent, state=null, chunk='s}
Received: StreamingOutput{node=agent, state=null, chunk= a}
Received: StreamingOutput{node=agent, state=null, chunk= light}
Received: StreamingOutput{node=agent, state=null, chunk= joke}
Received: StreamingOutput{node=agent, state=null, chunk= for}
Received: StreamingOutput{node=agent, state=null, chunk= you}
Received: StreamingOutput{node=agent, state=null, chunk=:

}
Received: StreamingOutput{node=agent, state=null, chunk=Why}
Received: StreamingOutput{node=agent, state=null, chunk= don}
Received: StreamingOutput{node=agent, state=null, chunk='t}
Received: StreamingOutput{node=agent, state=null, chunk= scientists}
Received: StreamingOutput{node=agent, state=null, chunk= trust}
Received: StreamingOutput{node=agent, state=null, chunk= atoms}
Received: StreamingOutput{node=agent, state=null, chunk=?

}
Received: StreamingOutput{node=agent, state=null, chunk=Because}
Received: StreamingOutput{node=agent, state=null, chunk= they}
Received: StreamingOutput{node=agent, state=null, chunk= make}
Received: StreamingOutput{node=agent, state=null, chunk= up}
Received: StreamingOutput{node=agent, state=null, chunk= everything}
Received: StreamingOutput{node=agent, state=null, chunk=!}
Received: StreamingOutput{node=agent, state=null, chunk=}

Use StreamingChatGenerator in Agent Executor

Set up the agent's tools

public class WeatherTool {

    @Tool( description = "Get the weather in location")
    public String execQuery(@ToolParam( description = "The query to use in your search.") String query) {
        // This is a placeholder for the actual implementation
        return "Cold, with a low of 13 degrees";
    }
}

Create Agent executor

import org.bsc.langgraph4j.spring.ai.agentexecutor.AgentExecutor;
import org.bsc.langgraph4j.NodeOutput;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.tool.ToolCallback;


var agent = AgentExecutor.builder()
                .streamingChatModel(AiModel.OPENAI_GPT_4O_MINI.model)
                .toolsFromObject( new WeatherTool() )
                .build()
                .compile();

var result = agent.stream( Map.of( "messages", new UserMessage("Weather in Napoli ?") ));

var state = result.stream()
        .peek( s -> {
                if( s instanceof StreamingOutput<?> sout ) {
                        System.out.printf( "%s: (%s)\n", sout.node(), sout.chunk());
                }
                else {
                        System.out.println(s.node());
                }
        })
        .reduce((a, b) -> b)
        .map( NodeOutput::state)
        .orElseThrow();

log.info( "result: {}", state.lastMessage()
                                .map(AssistantMessage.class::cast)
                                .map(AssistantMessage::getText)
                                .orElseThrow() );
START 
callAgent 


__START__
agent: ()


executeTools 


agent


callAgent 


action
agent: ()
agent: (The)
agent: ( weather)
agent: ( in)
agent: ( Napoli)
agent: ( is)
agent: ( currently)
agent: ( cold)
agent: (,)
agent: ( with)
agent: ( a)
agent: ( low)
agent: ( of)
agent: ( )
agent: (13)
agent: ( degrees)
agent: ( Celsius)
agent: (.)
agent: (null)
agent
__END__


result: The weather in Napoli is currently cold, with a low of 13 degrees Celsius.