Langchain4j LLM streaming

Initialize Logger

try( var file = new java.io.FileInputStream("./logging.properties")) {
    var lm = java.util.logging.LogManager.getLogManager();
    lm.checkAccess(); 
    lm.readConfiguration( file );
}

var log = org.slf4j.LoggerFactory.getLogger("llm-streaming");

How to use LLMStreamingGenerator

import dev.langchain4j.model.StreamingResponseHandler;
import dev.langchain4j.model.chat.StreamingChatLanguageModel;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.model.output.Response;
import static dev.langchain4j.model.openai.OpenAiChatModelName.GPT_4_O_MINI;
import org.bsc.langgraph4j.langchain4j.generators.LLMStreamingGenerator;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.streaming.StreamingOutput;

var generator = LLMStreamingGenerator.<AiMessage,AgentState>builder()
                        .mapResult( r -> Map.of( "content", r.content() ) )
                        .build();

StreamingChatLanguageModel model = OpenAiStreamingChatModel.builder()
    .apiKey(System.getenv("OPENAI_API_KEY"))
    .modelName(GPT_4_O_MINI)
    .build();

String userMessage = "Tell me a joke";

model.generate(userMessage, generator.handler() );

for( var r : generator ) {
    log.info( "{}", r);
}
  
log.info( "RESULT: {}", generator.resultValue().orElse(null) );
  
//Thread.sleep( 1000 );
StreamingOutput{chunk=} 
StreamingOutput{chunk=Why} 
StreamingOutput{chunk= did} 
StreamingOutput{chunk= the} 
StreamingOutput{chunk= scare} 
StreamingOutput{chunk=crow} 
StreamingOutput{chunk= win} 
StreamingOutput{chunk= an} 
StreamingOutput{chunk= award} 
StreamingOutput{chunk=?

} 
StreamingOutput{chunk=Because} 
StreamingOutput{chunk= he} 
StreamingOutput{chunk= was} 
StreamingOutput{chunk= outstanding} 
StreamingOutput{chunk= in} 
StreamingOutput{chunk= his} 
StreamingOutput{chunk= field} 
StreamingOutput{chunk=!} 
RESULT: {content=AiMessage { text = "Why did the scarecrow win an award?

Because he was outstanding in his field!" toolExecutionRequests = null }} 

Use LLMStreamGenerator in Agent

Define Serializers

import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.data.message.ToolExecutionResultMessage;
import dev.langchain4j.agent.tool.ToolExecutionRequest;
import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer;
import org.bsc.langgraph4j.langchain4j.serializer.std.ChatMesssageSerializer;
import org.bsc.langgraph4j.langchain4j.serializer.std.ToolExecutionRequestSerializer;
import org.bsc.langgraph4j.state.AgentStateFactory;
import org.bsc.langgraph4j.prebuilt.MessagesState;

var stateSerializer = new ObjectStreamStateSerializer<MessagesState<ChatMessage>>( MessagesState::new );
stateSerializer.mapper()
    // Setup custom serializer for Langchain4j ToolExecutionRequest
    .register(ToolExecutionRequest.class, new ToolExecutionRequestSerializer() )
    // Setup custom serializer for Langchain4j AiMessage
    .register(ChatMessage.class, new ChatMesssageSerializer() );

SerializerMapper: 
java.util.Map
java.util.Collection
dev.langchain4j.agent.tool.ToolExecutionRequest
dev.langchain4j.data.message.ChatMessage

Set up the tools

Using langchain4j, We will first define the tools we want to use. For this simple example, we will use create a placeholder search engine. However, it is really easy to create your own tools - see documentation here on how to do that.

import dev.langchain4j.agent.tool.P;
import dev.langchain4j.agent.tool.Tool;

import java.util.Optional;

import static java.lang.String.format;

public class SearchTool {

    @Tool("Use to surf the web, fetch current information, check the weather, and retrieve other information.")
    String execQuery(@P("The query to use in your search.") String query) {

        // This is a placeholder for the actual implementation
        return "Cold, with a low of 13 degrees";
    }
}
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import org.bsc.langgraph4j.prebuilt.MessagesStateGraph;
import org.bsc.langgraph4j.action.EdgeAction;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import org.bsc.langgraph4j.action.NodeAction;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import dev.langchain4j.service.tool.DefaultToolExecutor;
import org.bsc.langgraph4j.langchain4j.tool.ToolNode;
import dev.langchain4j.agent.tool.ToolSpecification;
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;


// setup streaming model
var model = OpenAiStreamingChatModel.builder()
  .apiKey( System.getenv("OPENAI_API_KEY") )
  .modelName( "gpt-4o-mini" )
  .logResponses(true)
  .temperature(0.0)
  .maxTokens(2000)
  .build();

// setup tools 
var tools = ToolNode.builder()
              .specification( new SearchTool() ) 
              .build(); 

NodeAction<MessagesState<ChatMessage>> callModel = state -> {
  log.info("CallModel:\n{}", state.messages());

  var generator = LLMStreamingGenerator.<AiMessage, MessagesState<ChatMessage>>builder()
          .mapResult(response -> {
              log.info("MapResult: {}", response);
              return Map.of("messages", response.content());
          })
          .startingNode("agent")
          .startingState(state)
          .build();

    model.generate(
            state.messages(),
            tools.toolSpecifications(),
            generator.handler());

    return Map.of("messages", generator);
};
            
// Route Message
EdgeAction<MessagesState<ChatMessage>> routeMessage = state -> {
    log.info("routeMessage:\n{}", state.messages());

    var lastMessage = state.lastMessage()
            .orElseThrow(() -> (new IllegalStateException("last message not found!")));

    if (lastMessage instanceof AiMessage message) {
        // If tools should be called
        if (message.hasToolExecutionRequests()) return "next";
    }

    // If no tools are called, we can finish (respond to the user)
    return "exit";
};
            
// Invoke Tool
NodeAction<MessagesState<ChatMessage>> invokeTool = state -> {
    log.info("invokeTool:\n{}", state.messages());

    var lastMessage = state.lastMessage()
            .orElseThrow(() -> (new IllegalStateException("last message not found!")));


    if (lastMessage instanceof AiMessage lastAiMessage) {

        var result = tools.execute(lastAiMessage.toolExecutionRequests(), null)
                .orElseThrow(() -> (new IllegalStateException("no tool found!")));

        return Map.of("messages", result);

    }

    throw new IllegalStateException("invalid last message");
};
            
// Define Graph
var workflow = new MessagesStateGraph<ChatMessage>(stateSerializer)
        .addNode("agent", node_async(callModel))
        .addNode("tools", node_async(invokeTool))
        .addEdge(START, "agent")
        .addConditionalEdges("agent",
                edge_async(routeMessage),
                Map.of("next", "tools", "exit", END))
        .addEdge("tools", "agent");
            

import org.bsc.langgraph4j.streaming.StreamingOutput;

var app = workflow.compile();

for( var out : app.stream( Map.of( "messages", UserMessage.from( "what is the whether today?")) ) ) {
  if( out instanceof StreamingOutput streaming ) {
    log.info( "StreamingOutput{node={}, chunk={} }", streaming.node(), streaming.chunk() );
  }
  else {
    log.info( "{}", out );
  }

}

START 
CallModel:
[UserMessage { name = null contents = [TextContent { text = "what is the whether today?" }] }] 
MapResult: Response { content = AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }, tokenUsage = TokenUsage { inputTokenCount = 71, outputTokenCount = 16, totalTokenCount = 87 }, finishReason = TOOL_EXECUTION, metadata = {} } 
routeMessage:
[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }] 
NodeOutput{node=__START__, state={messages=[UserMessage { name = null contents = [TextContent { text = "what is the whether today?" }] }]}} 
invokeTool:
[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }] 
execute: execQuery 
NodeOutput{node=agent, state={messages=[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }]}} 
CallModel:
[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }, ToolExecutionResultMessage { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp" toolName = "execQuery" text = "Cold, with a low of 13 degrees" }] 
NodeOutput{node=tools, state={messages=[AiMessage { text = null toolExecutionRequests = [ToolExecutionRequest { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp", name = "execQuery", arguments = "{"query":"current weather"}" }] }, ToolExecutionResultMessage { id = "call_VSMGzPIUc51ZkdtXHsNK3Ekp" toolName = "execQuery" text = "Cold, with a low of 13 degrees" }]}} 
StreamingOutput{node=agent, chunk= } 
StreamingOutput{node=agent, chunk=The } 
StreamingOutput{node=agent, chunk= current } 
StreamingOutput{node=agent, chunk= weather } 
StreamingOutput{node=agent, chunk= is } 
StreamingOutput{node=agent, chunk= cold } 
StreamingOutput{node=agent, chunk=, } 
StreamingOutput{node=agent, chunk= with } 
StreamingOutput{node=agent, chunk= a } 
StreamingOutput{node=agent, chunk= low } 
StreamingOutput{node=agent, chunk= of } 
StreamingOutput{node=agent, chunk=  } 
StreamingOutput{node=agent, chunk=13 } 
StreamingOutput{node=agent, chunk= degrees } 
StreamingOutput{node=agent, chunk=. } 
StreamingOutput{node=agent, chunk= If } 
StreamingOutput{node=agent, chunk= you } 
StreamingOutput{node=agent, chunk= need } 
StreamingOutput{node=agent, chunk= more } 
StreamingOutput{node=agent, chunk= specific } 
StreamingOutput{node=agent, chunk= information } 
StreamingOutput{node=agent, chunk= or } 
StreamingOutput{node=agent, chunk= details } 
StreamingOutput{node=agent, chunk= about } 
StreamingOutput{node=agent, chunk= a } 
StreamingOutput{node=agent, chunk= particular } 
StreamingOutput{node=agent, chunk= location } 
StreamingOutput{node=agent, chunk=, } 
StreamingOutput{node=agent, chunk= feel } 
StreamingOutput{node=agent, chunk= free } 
StreamingOutput{node=agent, chunk= to } 
StreamingOutput{node=agent, chunk= ask } 
MapResult: Response { content = AiMessage { text = "The current weather is cold, with a low of 13 degrees. If you need more specific information or details about a particular location, feel free to ask!" toolExecutionRequests = null }, tokenUsage = TokenUsage { inputTokenCount = 93, outputTokenCount = 34, totalTokenCount = 127 }, finishReason = STOP, metadata = {} } 
routeMessage:
[AiMessage { text = "The current weather is cold, with a low of 13 degrees. If you need more specific information or details about a particular location, feel free to ask!" toolExecutionRequests = null }] 
StreamingOutput{node=agent, chunk=! } 
NodeOutput{node=agent, state={messages=[AiMessage { text = "The current weather is cold, with a low of 13 degrees. If you need more specific information or details about a particular location, feel free to ask!" toolExecutionRequests = null }]}} 
NodeOutput{node=__END__, state={messages=[AiMessage { text = "The current weather is cold, with a low of 13 degrees. If you need more specific information or details about a particular location, feel free to ask!" toolExecutionRequests = null }]}}