在springboot项目中调用通义千问api多轮对话并实现流式输出
作者:mmseoamin日期:2024-04-27

官网文档

阿里灵积提供了详细的官方文档

如何实现多轮对话

官方文档中提到只需要把每轮对话中返回结果添加到消息管理器中,就可以实现多轮对话。本质上就是将历史对话再次发送给接口。

在springboot项目中调用通义千问api多轮对话并实现流式输出,第1张

如何实现流式输出

官方文档中提出使用streamCall()方法就可以实现流式输出,在ResultCallback参数中可以指点每个事件的处理动作。

在springboot项目中调用通义千问api多轮对话并实现流式输出,第2张

流式调用方法没有返回GenerationResult结果类,如何实现多轮对话

方法一

我们每次调用完成后把得到的结果手动构建消息对象并加入消息管理类。

不知道是不是我使用的sdk版本问题(因为老的版本有出现调用okhttp报错的情况,我的在阿里云提交工单后,工作人员给我的最新版本是2.10.1,我当前就在使用这个版本)。经过实际测试,msgManager.get()方法可能会出现第一条对话的发送对象是assistant的情况。

如果第一条对话的发送对象不是user或者system,并且user和assistant没有在历史对话中轮流出现接口会报错的!!!!(我没有报错的截图,哈哈哈哈)

Message assistantMsg = Message.builder().role(Role.ASSISTANT.getValue()).content("如何做西红柿炖牛腩?").build();
msgManager.add(assistantMsg);

方法二

我们自己来控制历史对话

@Component
public class QwenModelService{
    private Generation gen;
    @Resource
    private AiWebsocketService aiWebsocketService;
    public void createGen(){
        gen = new Generation();
    };
    private static final Logger logger = LoggerFactory.getLogger(QwenModelService.class);
    /**
     * prompt 用户对话
     * request 用户请求对象
     * identity 用户身份标识
     */
    public String answer(String prompt, HttpServletRequest request, String identity) {
        // 通过身份标识在缓存中获取对话对象、历史消息对象、参数对象
        List dialogues = CachePool.AI_DIALOGUE_LIST_MAP.get(identity)
                .computeIfAbsent(ConstValuePool.QWEN_DIALOGUES, k -> new LinkedList<>());
        dialogues.add(AiDialogue.createUserDialogue(prompt));
        List msgManager = CachePool.QWEN_MESSAGE_DIALOGUES_MAP.get(identity);
        QwenParam param = CachePool.QWEN_PARAM_MAP.get(identity);
        // 如果第一次发送消息需要初始化历史消息对象
        if (msgManager == null) {
            msgManager = new ArrayList<>();
            CachePool.QWEN_MESSAGE_DIALOGUES_MAP.put(identity, msgManager);
            Message systemMsg = Message.builder()
                    .role(Role.SYSTEM.getValue())
                    .content("You are a helpful assistant.")
                    .build();
            msgManager.add(systemMsg);
            Message userMsg = Message.builder()
                    .role(Role.USER.getValue())
                    .content(prompt)
                    .build();
            msgManager.add(userMsg);
        }else {
            msgManager.add(Message.builder().role("user").content(prompt).build());
            param.setMessages(msgManager);
        }
        // 如果第一次发送消息需要初始化参数对象
        if (param == null) {
            param = QwenParam.builder()
                    .model(Generation.Models.QWEN_MAX)
                    .messages(msgManager)
                    .resultFormat(QwenParam.ResultFormat.MESSAGE)
                    .topP(0.8)
                    .enableSearch(true)
                    .incrementalOutput(true)
                    .build();
            CachePool.QWEN_PARAM_MAP.put(identity, param);
        }
        try {
            logger.debug("发送的请求为{}",param);
            // 同步信号量
            Semaphore semaphore = new Semaphore(0);
            // 结果拼接对象
            StringBuilder resultBuilder = new StringBuilder();
            // 流式调用
            gen.streamCall(param, new ResultCallback(){
                @Override
                public void onEvent(GenerationResult generationResult) {
                    String newMessage = generationResult.getOutput().getChoices().get(0).getMessage().getContent();
                    StringBuilder finalResBuilder = resultBuilder.append(newMessage);
                    // 这里是对markdown代码块进行判断,如果当前代码块未结束,需要手动结束
                    // 否则前端的代码块显示会出问题
                    // 代码块判断的功能就是对"```"字符串计数,偶数个就是结束了,奇数个就是没结束
                    if (1 == (1 & StringUtil.countSubStr(finalResBuilder,ConstValuePool.MARKDOWN_CODE_BLOCK_START))) {
                        finalResBuilder = new StringBuilder(finalResBuilder)
                                .append(ConstValuePool.MARKDOWN_CODE_BLOCK_END);
                    }
                    // 通过websocket返回给前端
                    aiWebsocketService.sendMessage(finalResBuilder.toString(), identity);
                }
                // 结束或者报错需要释放同步信号量
                @Override
                public void onComplete() {
                    semaphore.release();
                }
                @Override
                public void onError(Exception e) {
                    semaphore.release();
                    logger.error("通义千问运行出错, 报错栈如下");
                    Throwable t = e;
                    while (t != null) {
                        logger.error( t.toString());
                        t = e.getCause();
                    }
                }
            });
            semaphore.acquire();
            String resString = resultBuilder.toString();
            
            // 把返回消息加入历史消息中
msgManager.add(Message.builder().role("assistant").content(resString).build());
            // 如果历史消息量过大或者第一条消息发送对象不是user,删除历史消息
            // 下标0是system消息
            while (msgManager.size() > ConstValuePool.QWEN_MAX_MESSAGE
                    || !"user".equals(msgManager.get(1).getRole())) {
                msgManager.remove(1);
            }
            // 添加到对话记录中,方便前端查询对话记录
            dialogues.add(AiDialogue.createAssistantDialogue(resString));
            return "";
        } catch (NoApiKeyException e) {
            logger.error("调用通义千问缺少ApiKey");
            throw new AiException("没有ApiKey", e);
        } catch (Exception e) {
            logger.error("调用通义千问出现问题:{}",e.getMessage());
            throw new AiException("出现了一些问题", e);
        }
    }
}