Langchain LLM Streaming
By Jiamin
Langchain LLM Streaming
Langchain offers the capability to perform real-time processing of tokens generated by LLM through a callback
mechanism.
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
HumanMessage,
)
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
chat = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0)
resp = chat([HumanMessage(content="Write me a song about sparkling water.")])
Langchain supports both synchronous and asynchronous IO for token output. This corresponds to StreamingStdOutCallbackHandler
and AsyncIteratorCallbackHandler
, respectively.
StreamingStdOutCallbackHandler
First, let’s take a look at the Langchain official implementation of StreamingStdOutCallbackHandler
, which allows for real-time printing of LLM-generated tokens to the terminal. This is primarily achieved through the on_llm_new_token
method.
class StreamingStdOutCallbackHandler(BaseCallbackHandler):
...
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
sys.stdout.write(token)
sys.stdout.flush()
It’s important to note that this approach is synchronous. Let’s now explore the asynchronous IO method.
AsyncIteratorCallbackHandler
The following code demonstrates the use of AsyncIteratorCallbackHandler
to asynchronously print the returned tokens.
import asyncio
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
import os
import openai
from typing import AsyncIterable, Awaitable
# Async chat function for interacting with the ChatOpenAI model
async def async_chat(message) -> AsyncIterable[str]:
# Create an asynchronous callback handler
callback = AsyncIteratorCallbackHandler()
# Create a ChatOpenAI instance with streaming mode enabled, using the callback handler and a temperature parameter
chat = ChatOpenAI(streaming=True, callbacks=[callback], temperature=0)
# Define an asynchronous function to wrap another asynchronous function and signal completion or exceptions using an event
async def wrap_done(fn: Awaitable, event: asyncio.Event):
try:
await fn # Wait for the provided asynchronous function to complete
except Exception as e:
# TODO: Handle exceptions - here, we simply print the exception information
print(f"Caught exception: {e}")
finally:
event.set() # Set the event to indicate completion
# Create a task to perform message generation with ChatOpenAI and monitor the completion event of the callback handler
task = asyncio.create_task(wrap_done(chat.agenerate(messages=[[HumanMessage(content=message)]], callback.done))
# Iterate asynchronously to obtain tokens from the callback handler
async for token in callback.aiter():
yield f"{token}" # Convert tokens to strings and yield them
await task # Wait for the task to complete
# Async function to print tokens generated by ChatOpenAI
async def async_print():
message = "Write me a song about sparkling water."
async for token in async_chat(message):
print(token, end='')
if __name__ == '__main__':
asyncio.run(async_print()) # Run the async_print function as the entry point
This code allows asynchronous processing of Langchain LLM tokens and printing the results.