当我们深入使用Langchain时,我们都会考虑如何进行流式输出。尽管官方网站提供了一些流式输出的示例,但这些示例只能在控制台中输出,并不能获取我们所需的生成器。而网上的许多教程也只是伪流式输出,即先完全生成结束,再进行流式输出。
以下是我为大家提供的真正的流式输出示例代码:
本方法是开辟新的线程的方法(当然也可以是新的进程的方式) ,然后结合 langchain的callback方法
为什么推荐使用 线程的方法,而不是 方法二中的异步操作,因为在实际使用过程中,很多外部的方法是不支持异步操作的,要想让程序run起来,必须把一些方法(比如 langchain中的 某些检索器,官方代码只帮你写了 同步的方法,而没有实现异步的方法) 重写为异步方法,而在 重写的过程中会遇到很多 的 问题,令人头疼,而所有的代码都是支持同步的,所以开辟新线程的方式是好的方法。
注: 此方法就是我在 使用方法二的异步方式的过程,遇到了难以解决的问题(本人小白)才找到了此种方式。
缺点: 开辟线程要比 协程 更加耗费计算机资源,因此未来的实现 尽可能还是使用 方法二,但是对于不会异步编程的人来说方法一更加简单。
代码实现不解释了,手疼,需要的人自然可以看得懂。
至于 ChatOpenAI 所需要的 key,需要自己想办法。
import os from dotenv import load_dotenv from langchain.chat_models import ChatOpenAI from langchain.callbacks import StreamingStdOutCallbackHandler from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse from typing import Generator import threading import uvicorn os.system('clear') app = FastAPI() load_dotenv() class My_StreamingStdOutCallbackHandler(StreamingStdOutCallbackHandler): # def __init__(self): tokens = [] # 记得结束后这里置true finish = False def on_llm_new_token(self, token: str, **kwargs) -> None: self.tokens.append(token) def on_llm_end(self, response, **kwargs) -> None: self.finish = True def on_llm_error(self, error: Exception, **kwargs) -> None: self.tokens.append(str(error)) def generate_tokens(self) -> Generator: while not self.finish: # or self.tokens: if self.tokens: token = self.tokens.pop(0) yield {'data': token} else: pass # time.sleep(0.02) # wait for a new token # 用于在另一个 线程中运行的方法 def f(llm, query): llm.predict(query) @app.post('/qa') def test(query='你好'): callback = My_StreamingStdOutCallbackHandler() llm = ChatOpenAI(model='chatglm3', streaming=True, callbacks=[callback], max_tokens=1024) thread = threading.Thread(target=f, args=(llm, query)) thread.start() return EventSourceResponse(callback.generate_tokens(), media_type="text/event-stream") if __name__ == '__main__': uvicorn.run(app=app, host='0.0.0.0')
为了方便展示,我直接使用gradio写一个小webUI,因为流式输出的场景就是用于Web的展示。直接进行python输出也是可以的。
值得注意的是 方法 一定要加上 async ,这里对于小白可能看不懂, 因为这里涉及到,异步、协程等概念。
import gradio as gr import asyncio from langchain.chat_models import ChatOpenAI #使用 异步的 Callback AsyncIteratorCallbackHandler from langchain.callbacks import AsyncIteratorCallbackHandler async def f(): callback = AsyncIteratorCallbackHandler() llm = ChatOpenAI(engine='GPT-35',streaming=True,callbacks=[callback]) coro = llm.apredict("写一个1000字的修仙小说") # 这里如果是 LLMChain的话 可以 换成 chain.acall() asyncio.create_task(coro) text = "" async for token in callback.aiter(): text = text+token yield gr.TextArea.update(value=text) with gr.Blocks() as demo: with gr.Column(): 摘要汇总 = gr.TextArea(value="",label="摘要总结",) bn = gr.Button("触发", variant="primary") bn.click(f,[],[摘要汇总]) demo.queue().launch(share=False, inbrowser=False, server_name="0.0.0.0", server_port=8001)
方法来之不易,如果您有所收获,请点赞,收藏,关注。
上一篇:前端可视化数据大屏(1)