Langchain LLM Streaming
By mggg
298 words
Langchain LLM Streaming
langchain通过callback
机制,可以将LLM输出的token实时处理
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
HumanMessage,
)
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
chat = ChatOpenAI(streaming=True, callbacks=[StreamingStdOutCallbackHandler()], temperature=0)
resp = chat([HumanMessage(content="Write me a song about sparkling water.")])
langchian同时支持同步IO和异步IO的方式来输出token,分别对应StreamingStdOutCallbackHandler
和AsyncIteratorCallbackHandler
StreamingStdOutCallbackHandler
首先可以看看Langchain官方实现StreamingStdOutCallbackHandler
, 将LLM输出的token实时打印在终端, 主要实现了on_llm_new_token
class StreamingStdOutCallbackHandler(BaseCallbackHandler):
...
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
sys.stdout.write(token)
sys.stdout.flush()
但这种方式是同步的,接下来看看异步IO的方式
AsyncIteratorCallbackHandler
以下是使用AsyncIteratorCallbackHandler
, 异步打印返回的token
import asyncio
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
import os
import openai
from typing import AsyncIterable, Awaitable
# Async chat function for interacting with the ChatOpenAI model
async def async_chat(message) -> AsyncIterable[str]:
# Create an asynchronous callback handler
callback = AsyncIteratorCallbackHandler()
# Create a ChatOpenAI instance with streaming mode enabled, using the callback handler and a temperature parameter
chat = ChatOpenAI(streaming=True, callbacks=[callback], temperature=0)
# Define an asynchronous function to wrap another asynchronous function and signal completion or exceptions using an event
async def wrap_done(fn: Awaitable, event: asyncio.Event):
try:
await fn # Wait for the provided asynchronous function to complete
except Exception as e:
# TODO: Handle exceptions - here, we simply print the exception information
print(f"Caught exception: {e}")
finally:
event.set() # Set the event to indicate completion
# Create a task to perform message generation with ChatOpenAI and monitor the completion event of the callback handler
task = asyncio.create_task(wrap_done(chat.agenerate(messages=[[HumanMessage(content=message)]], callback.done))
# Iterate asynchronously to obtain tokens from the callback handler
async for token in callback.aiter():
yield f"{token}" # Convert tokens to strings and yield them
await task # Wait for the task to complete
# Async function to print tokens generated by ChatOpenAI
async def async_print():
message = "Write me a song about sparkling water."
async for token in async_chat(message):
print(token, end='')
if __name__ == '__main__':
asyncio.run(async_print()) # Run the async_print function as the entry point