diff --git a/internal/app/app.go b/internal/app/app.go index 528e2e85..40f2dd50 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -801,10 +801,6 @@ func setupRoutes( protected.POST("/robot/wechat/qrcode/verify", wechatRobotHandler.HandleWechatVerifyCode) protected.GET("/robot/wechat/status", wechatRobotHandler.HandleWechatStatus) - // Agent Loop - protected.POST("/agent-loop", agentHandler.AgentLoop) - // Agent Loop 流式输出 - protected.POST("/agent-loop/stream", agentHandler.AgentLoopStream) // Eino ADK 单代理(ChatModelAgent + Runner;不依赖 multi_agent.enabled) protected.POST("/eino-agent", agentHandler.EinoSingleAgentLoop) protected.POST("/eino-agent/stream", agentHandler.EinoSingleAgentLoopStream) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 43369cd9..1a600706 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -227,7 +227,7 @@ type ChatAttachment struct { ServerPath string `json:"serverPath,omitempty"` // 已保存在 chat_uploads 下的绝对路径(由 POST /api/chat-uploads 返回) } -// ChatReasoningRequest 对话页「模型推理」意图(仅 Eino 路径消费;原生 agent-loop 忽略)。 +// ChatReasoningRequest 对话页「模型推理」意图(Eino 单/多代理路径消费)。 type ChatReasoningRequest struct { // Mode: default(跟随系统)| off | on | auto Mode string `json:"mode,omitempty"` @@ -561,191 +561,6 @@ type ChatResponse struct { Time time.Time `json:"time"` } -// AgentLoop 处理Agent Loop请求 -func (h *AgentHandler) AgentLoop(c *gin.Context) { - var req ChatRequest - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return - } - - h.logger.Info("收到Agent Loop请求", - zap.String("message", req.Message), - zap.String("conversationId", req.ConversationID), - ) - - // 如果没有对话ID,创建新对话 - conversationID := req.ConversationID - if conversationID == "" { - title := safeTruncateString(req.Message, 50) - meta := audit.ConversationCreateMetaFromGin(c, "agent_loop") - meta.ProjectID = effectiveProjectID(h.config, req.ProjectID) - conv, err := h.db.CreateConversation(title, meta) - if err != nil { - h.logger.Error("创建对话失败", zap.Error(err)) - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - conversationID = conv.ID - } else { - // 验证对话是否存在 - _, err := h.db.GetConversation(conversationID) - if err != nil { - h.logger.Error("对话不存在", zap.String("conversationId", conversationID), zap.Error(err)) - c.JSON(http.StatusNotFound, gin.H{"error": "对话不存在"}) - return - } - } - - h.activateHITLForConversation(conversationID, req.Hitl) - if h.hitlManager != nil { - defer h.hitlManager.DeactivateConversation(conversationID) - } - - // 优先尝试从保存的代理轨迹恢复历史上下文 - agentHistoryMessages, err := h.loadHistoryFromAgentTrace(conversationID) - if err != nil { - h.logger.Warn("从代理轨迹加载历史消息失败,使用消息表", zap.Error(err)) - // 回退到使用数据库消息表 - historyMessages, err := h.db.GetMessages(conversationID) - if err != nil { - h.logger.Warn("获取历史消息失败", zap.Error(err)) - agentHistoryMessages = []agent.ChatMessage{} - } else { - agentHistoryMessages = dbMessagesToAgentChatMessages(historyMessages) - h.logger.Info("从消息表加载历史消息", zap.Int("count", len(agentHistoryMessages))) - } - } else { - h.logger.Info("从代理轨迹恢复历史上下文", zap.Int("count", len(agentHistoryMessages))) - } - - // 校验附件数量(非流式) - if len(req.Attachments) > maxAttachments { - c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("附件最多 %d 个", maxAttachments)}) - return - } - - // 应用角色用户提示词和工具配置 - finalMessage := req.Message - var roleTools []string // 角色配置的工具列表 - - // WebShell AI 助手模式:绑定当前连接,仅开放 webshell_* 工具并注入 connection_id - if req.WebShellConnectionID != "" { - conn, err := h.db.GetWebshellConnection(strings.TrimSpace(req.WebShellConnectionID)) - if err != nil || conn == nil { - h.logger.Warn("WebShell AI 助手:未找到连接", zap.String("id", req.WebShellConnectionID), zap.Error(err)) - c.JSON(http.StatusBadRequest, gin.H{"error": "未找到该 WebShell 连接"}) - return - } - webshellContext := BuildWebshellAssistantContext(conn, WebshellSkillHintDefault, req.Message) - // WebShell 模式下如果同时指定了角色,追加角色 user_prompt(工具集仍仅限 webshell 专用工具) - if req.Role != "" && req.Role != "默认" && h.config.Roles != nil { - if role, exists := h.config.Roles[req.Role]; exists && role.Enabled && role.UserPrompt != "" { - finalMessage = role.UserPrompt + "\n\n" + webshellContext - h.logger.Info("WebShell + 角色: 应用角色提示词", zap.String("role", req.Role)) - } else { - finalMessage = webshellContext - } - } else { - finalMessage = webshellContext - } - roleTools = []string{ - builtin.ToolWebshellExec, - builtin.ToolWebshellFileList, - builtin.ToolWebshellFileRead, - builtin.ToolWebshellFileWrite, - builtin.ToolRecordVulnerability, - builtin.ToolListVulnerabilities, - builtin.ToolGetVulnerability, - builtin.ToolListKnowledgeRiskTypes, - builtin.ToolSearchKnowledgeBase, - } - } else if req.Role != "" && req.Role != "默认" { - if h.config.Roles != nil { - if role, exists := h.config.Roles[req.Role]; exists && role.Enabled { - // 应用用户提示词 - if role.UserPrompt != "" { - finalMessage = role.UserPrompt + "\n\n" + req.Message - h.logger.Info("应用角色用户提示词", zap.String("role", req.Role)) - } - // 获取角色配置的工具列表(优先使用tools字段,向后兼容mcps字段) - if len(role.Tools) > 0 { - roleTools = role.Tools - h.logger.Info("使用角色配置的工具列表", zap.String("role", req.Role), zap.Int("toolCount", len(roleTools))) - } - } - } - } - var savedPaths []string - if len(req.Attachments) > 0 { - savedPaths, err = saveAttachmentsToDateAndConversationDir(req.Attachments, conversationID, h.logger) - if err != nil { - h.logger.Error("保存对话附件失败", zap.Error(err)) - c.JSON(http.StatusInternalServerError, gin.H{"error": "保存上传文件失败: " + err.Error()}) - return - } - } - finalMessage = appendAttachmentsToMessage(finalMessage, req.Attachments, savedPaths) - - // 保存用户消息:有附件时一并保存附件名与路径,刷新后显示、继续对话时大模型也能从历史中拿到路径 - userContent := userMessageContentForStorage(req.Message, req.Attachments, savedPaths) - _, err = h.db.AddMessage(conversationID, "user", userContent, nil) - if err != nil { - h.logger.Error("保存用户消息失败", zap.Error(err)) - c.JSON(http.StatusInternalServerError, gin.H{"error": "保存用户消息失败: " + err.Error()}) - return - } - - baseCtx, cancelWithCause := context.WithCancelCause(c.Request.Context()) - defer cancelWithCause(nil) - taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) - defer timeoutCancel() - progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, "", nil) - taskCtx = h.injectReactHITLInterceptor(taskCtx, cancelWithCause, conversationID, "", nil) - - // 执行Agent Loop,传入历史消息和对话ID(使用包含角色提示词的finalMessage和角色工具列表) - result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools, h.projectBlackboardBlock(conversationID)) - if err != nil { - h.logger.Error("Agent Loop执行失败", zap.Error(err)) - - // 即使执行失败,也尝试保存代理轨迹(如果 result 中有) - if result != nil && (result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "") { - if saveErr := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); saveErr != nil { - h.logger.Warn("保存失败任务的代理轨迹失败", zap.Error(saveErr)) - } else { - h.logger.Info("已保存失败任务的代理轨迹", zap.String("conversationId", conversationID)) - } - } - - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - - // 保存助手回复 - _, err = h.db.AddMessage(conversationID, "assistant", result.Response, result.MCPExecutionIDs) - if err != nil { - h.logger.Error("保存助手消息失败", zap.Error(err)) - // 即使保存失败,也返回响应,但记录错误 - // 因为AI已经生成了回复,用户应该能看到 - } - - // 保存最后一轮代理轨迹与助手输出 - if result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "" { - if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存代理轨迹失败", zap.Error(err)) - } else { - h.logger.Info("已保存代理轨迹", zap.String("conversationId", conversationID)) - } - } - - c.JSON(http.StatusOK, ChatResponse{ - Response: result.Response, - MCPExecutionIDs: result.MCPExecutionIDs, - ConversationID: conversationID, - Time: time.Now(), - }) -} - func (h *AgentHandler) finalizeRobotAgentError(ctx context.Context, assistantMessageID, conversationID string, resultMA *multiagent.RunResult, errMA error) (string, string, error) { if shouldPersistEinoAgentTraceAfterRunError(ctx) { h.persistEinoAgentTraceForResume(conversationID, resultMA) @@ -774,7 +589,80 @@ func (h *AgentHandler) finalizeRobotAgentSuccess(assistantMessageID, conversatio return resultMA.Response, conversationID, nil } -// ProcessMessageForRobot 供机器人(企业微信/钉钉/飞书)调用:与 /api/agent-loop/stream 相同执行路径(含 progressCallback、过程详情),仅不发送 SSE,最后返回完整回复 +func (h *AgentHandler) runRobotEinoSingleWithRetry( + taskCtx context.Context, + conversationID, finalMessage string, + history []agent.ChatMessage, + roleTools []string, + progressCallback agent.ProgressCallback, + assistantMessageID string, + taskStatus *string, +) (string, string, error) { + curHist := history + curMsg := finalMessage + segmentUserMessage := finalMessage + var resultMA *multiagent.RunResult + var errMA error + var transientRunAttempts int + for { + resultMA, errMA = multiagent.RunEinoSingleChatModelAgent( + taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, + conversationID, curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID), + ) + if errMA == nil { + transientRunAttempts = 0 + break + } + if handled, _ := h.handleEinoTransientRetryContinue( + taskCtx, conversationID, resultMA, errMA, &transientRunAttempts, + &curHist, &curMsg, segmentUserMessage, progressCallback, nil, + ); handled { + continue + } + *taskStatus = "failed" + return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) + } + return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA) +} + +func (h *AgentHandler) runRobotMultiAgentWithRetry( + taskCtx context.Context, + conversationID, finalMessage, orchestration string, + history []agent.ChatMessage, + roleTools []string, + progressCallback agent.ProgressCallback, + assistantMessageID string, + taskStatus *string, +) (string, string, error) { + curHist := history + curMsg := finalMessage + segmentUserMessage := finalMessage + var resultMA *multiagent.RunResult + var errMA error + var transientRunAttempts int + for { + resultMA, errMA = multiagent.RunDeepAgent( + taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, + conversationID, curMsg, curHist, roleTools, progressCallback, + h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID), + ) + if errMA == nil { + transientRunAttempts = 0 + break + } + if handled, _ := h.handleEinoTransientRetryContinue( + taskCtx, conversationID, resultMA, errMA, &transientRunAttempts, + &curHist, &curMsg, segmentUserMessage, progressCallback, nil, + ); handled { + continue + } + *taskStatus = "failed" + return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) + } + return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA) +} + +// ProcessMessageForRobot 供机器人(企业微信/钉钉/飞书)调用:Eino 单/多代理执行路径(含 progressCallback、过程详情),仅不发送 SSE,最后返回完整回复 func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, conversationID, message, role string) (response string, convID string, err error) { if conversationID == "" { title := safeTruncateString(message, 50) @@ -823,7 +711,7 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con return "", "", fmt.Errorf("保存用户消息失败: %w", err) } - // 与 agent-loop/stream 一致:先创建助手消息占位,用 progressCallback 写过程详情(不发送 SSE) + // 与 Eino 流式对话一致:先创建助手消息占位,用 progressCallback 写过程详情(不发送 SSE) assistantMsg, err := h.db.AddMessage(conversationID, "assistant", "处理中...", nil) if err != nil { h.logger.Warn("机器人:创建助手消息占位失败", zap.Error(err)) @@ -833,7 +721,7 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con assistantMessageID = assistantMsg.ID } - // 注册运行中任务并向 taskEventBus 镜像进度事件,供 Web 端 task-events 补流(与 agent-loop/stream 一致)。 + // 注册运行中任务并向 taskEventBus 镜像进度事件,供 Web 端 task-events 补流。 taskCtx, cancelWithCause := context.WithCancelCause(ctx) defer cancelWithCause(nil) taskStatus := "completed" @@ -848,98 +736,24 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con } progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, nil) - robotMode := "react" + robotMode := "eino_single" if h.config != nil { robotMode = config.NormalizeRobotAgentMode(h.config.MultiAgent) } switch robotMode { case "eino_single": - curHist := agentHistoryMessages - curMsg := finalMessage - segmentUserMessage := finalMessage - var resultMA *multiagent.RunResult - var errMA error - var transientRunAttempts int - for { - resultMA, errMA = multiagent.RunEinoSingleChatModelAgent( - taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, - conversationID, curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID), - ) - if errMA == nil { - // 成功后重置 transient 重试窗口,下一次分段从第 1 次重试开始。 - transientRunAttempts = 0 - break - } - if handled, _ := h.handleEinoTransientRetryContinue( - taskCtx, conversationID, resultMA, errMA, &transientRunAttempts, - &curHist, &curMsg, segmentUserMessage, progressCallback, nil, - ); handled { - continue - } - taskStatus = "failed" - return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) - } - return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA) + return h.runRobotEinoSingleWithRetry(taskCtx, conversationID, finalMessage, agentHistoryMessages, roleTools, progressCallback, assistantMessageID, &taskStatus) case "deep", "plan_execute", "supervisor": if h.config == nil || !h.config.MultiAgent.Enabled { - h.logger.Warn("机器人配置为多代理模式但未启用 multi_agent,回退原生 ReAct", + h.logger.Warn("机器人配置为多代理模式但未启用 multi_agent,回退 Eino 单代理", zap.String("robot_mode", robotMode)) - break + return h.runRobotEinoSingleWithRetry(taskCtx, conversationID, finalMessage, agentHistoryMessages, roleTools, progressCallback, assistantMessageID, &taskStatus) } - curHist := agentHistoryMessages - curMsg := finalMessage - segmentUserMessage := finalMessage - var resultMA *multiagent.RunResult - var errMA error - var transientRunAttempts int - for { - resultMA, errMA = multiagent.RunDeepAgent( - taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, - conversationID, curMsg, curHist, roleTools, progressCallback, - h.agentsMarkdownDir, robotMode, nil, h.projectBlackboardBlock(conversationID), - ) - if errMA == nil { - // 成功后重置 transient 重试窗口,下一次分段从第 1 次重试开始。 - transientRunAttempts = 0 - break - } - if handled, _ := h.handleEinoTransientRetryContinue( - taskCtx, conversationID, resultMA, errMA, &transientRunAttempts, - &curHist, &curMsg, segmentUserMessage, progressCallback, nil, - ); handled { - continue - } - taskStatus = "failed" - return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) - } - return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA) + return h.runRobotMultiAgentWithRetry(taskCtx, conversationID, finalMessage, robotMode, agentHistoryMessages, roleTools, progressCallback, assistantMessageID, &taskStatus) } - result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools, h.projectBlackboardBlock(conversationID)) - if err != nil { - taskStatus = "failed" - errMsg := "执行失败: " + err.Error() - if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) - _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) - } - return "", conversationID, err - } - - // 更新助手消息内容与 MCP 执行 ID(与 stream 一致) - if assistantMessageID != "" { - if errU := h.db.UpdateAssistantMessageFinalize(assistantMessageID, result.Response, result.MCPExecutionIDs, multiagent.AggregatedReasoningFromTraceJSON(result.LastAgentTraceInput)); errU != nil { - h.logger.Warn("机器人:更新助手消息失败", zap.Error(errU)) - } - } else { - if _, err = h.db.AddMessage(conversationID, "assistant", result.Response, result.MCPExecutionIDs); err != nil { - h.logger.Warn("机器人:保存助手消息失败", zap.Error(err)) - } - } - if result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "" { - _ = h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput) - } - return result.Response, conversationID, nil + taskStatus = "failed" + return "", conversationID, fmt.Errorf("不支持的机器人代理模式: %s", robotMode) } // StreamEvent 流式事件 @@ -1440,507 +1254,6 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun } } -// AgentLoopStream 处理Agent Loop流式请求 -func (h *AgentHandler) AgentLoopStream(c *gin.Context) { - var req ChatRequest - if err := c.ShouldBindJSON(&req); err != nil { - // 对于流式请求,也发送SSE格式的错误 - c.Header("Content-Type", "text/event-stream; charset=utf-8") - c.Header("Cache-Control", "no-cache") - c.Header("Connection", "keep-alive") - event := StreamEvent{ - Type: "error", - Message: "请求参数错误: " + err.Error(), - } - eventJSON, _ := json.Marshal(event) - fmt.Fprintf(c.Writer, "data: %s\n\n", eventJSON) - done := StreamEvent{Type: "done", Message: ""} - doneJSON, _ := json.Marshal(done) - fmt.Fprintf(c.Writer, "data: %s\n\n", doneJSON) - c.Writer.Flush() - return - } - - h.logger.Info("收到Agent Loop流式请求", - zap.String("message", req.Message), - zap.String("conversationId", req.ConversationID), - ) - - // 设置SSE响应头 - c.Header("Content-Type", "text/event-stream; charset=utf-8") - c.Header("Cache-Control", "no-cache") - c.Header("Connection", "keep-alive") - c.Header("X-Accel-Buffering", "no") // 禁用nginx缓冲 - - // 发送初始事件 - // 用于跟踪客户端是否已断开连接 - clientDisconnected := false - // 与 sseKeepalive 共用:禁止并发写 ResponseWriter,否则会破坏 chunked 编码(ERR_INVALID_CHUNKED_ENCODING)。 - var sseWriteMu sync.Mutex - var ssePublishConversationID string - // 用于快速确认模型是否真的产生了流式 delta - var responseDeltaCount int - var responseStartLogged bool - - sendEvent := func(eventType, message string, data interface{}) { - if eventType == "response_start" { - responseDeltaCount = 0 - responseStartLogged = true - h.logger.Info("SSE: response_start", - zap.Int("conversationIdPresent", func() int { - if m, ok := data.(map[string]interface{}); ok { - if v, ok2 := m["conversationId"]; ok2 && v != nil && fmt.Sprint(v) != "" { - return 1 - } - } - return 0 - }()), - zap.String("messageGeneratedBy", func() string { - if m, ok := data.(map[string]interface{}); ok { - if v, ok2 := m["messageGeneratedBy"]; ok2 { - if s, ok3 := v.(string); ok3 { - return s - } - return fmt.Sprint(v) - } - } - return "" - }()), - ) - } else if eventType == "response_delta" { - responseDeltaCount++ - // 只打前几条,避免刷屏 - if responseStartLogged && responseDeltaCount <= 3 { - h.logger.Info("SSE: response_delta", - zap.Int("index", responseDeltaCount), - zap.Int("deltaLen", len(message)), - zap.String("deltaPreview", func() string { - p := strings.ReplaceAll(message, "\n", "\\n") - if len(p) > 80 { - return p[:80] + "..." - } - return p - }()), - ) - } - } - - event := StreamEvent{ - Type: eventType, - Message: message, - Data: data, - } - eventJSON, errJSON := json.Marshal(event) - if errJSON != nil { - eventJSON = []byte(`{"type":"error","message":"marshal failed"}`) - } - sseLine := make([]byte, 0, len(eventJSON)+8) - sseLine = append(sseLine, []byte("data: ")...) - sseLine = append(sseLine, eventJSON...) - sseLine = append(sseLine, '\n', '\n') - if ssePublishConversationID != "" && h.taskEventBus != nil { - h.taskEventBus.Publish(ssePublishConversationID, sseLine) - } - - // 如果客户端已断开,不再写入 HTTP(镜像订阅仍可收到事件) - if clientDisconnected { - return - } - - // 检查请求上下文是否被取消(客户端断开) - select { - case <-c.Request.Context().Done(): - clientDisconnected = true - return - default: - } - - sseWriteMu.Lock() - _, err := c.Writer.Write(sseLine) - if err != nil { - sseWriteMu.Unlock() - clientDisconnected = true - h.logger.Debug("客户端断开连接,停止发送SSE事件", zap.Error(err)) - return - } - if flusher, ok := c.Writer.(http.Flusher); ok { - flusher.Flush() - } else { - c.Writer.Flush() - } - sseWriteMu.Unlock() - } - - // 如果没有对话ID,创建新对话(WebShell 助手模式下关联连接 ID 以便持久化展示) - conversationID := req.ConversationID - if conversationID == "" { - title := safeTruncateString(req.Message, 50) - var conv *database.Conversation - var err error - meta := audit.ConversationCreateMetaFromGin(c, "agent_loop_stream") - meta.ProjectID = effectiveProjectID(h.config, req.ProjectID) - if req.WebShellConnectionID != "" { - meta.Source = "webshell_chat" - conv, err = h.db.CreateConversationWithWebshell(strings.TrimSpace(req.WebShellConnectionID), title, meta) - } else { - conv, err = h.db.CreateConversation(title, meta) - } - if err != nil { - h.logger.Error("创建对话失败", zap.Error(err)) - sendEvent("error", "创建对话失败: "+err.Error(), nil) - return - } - conversationID = conv.ID - sendEvent("conversation", "会话已创建", map[string]interface{}{ - "conversationId": conversationID, - }) - } else { - // 验证对话是否存在 - _, err := h.db.GetConversation(conversationID) - if err != nil { - h.logger.Error("对话不存在", zap.String("conversationId", conversationID), zap.Error(err)) - sendEvent("error", "对话不存在", nil) - return - } - } - ssePublishConversationID = conversationID - - // 优先尝试从保存的代理轨迹恢复历史上下文 - agentHistoryMessages, err := h.loadHistoryFromAgentTrace(conversationID) - if err != nil { - h.logger.Warn("从代理轨迹加载历史消息失败,使用消息表", zap.Error(err)) - // 回退到使用数据库消息表 - historyMessages, err := h.db.GetMessages(conversationID) - if err != nil { - h.logger.Warn("获取历史消息失败", zap.Error(err)) - agentHistoryMessages = []agent.ChatMessage{} - } else { - agentHistoryMessages = dbMessagesToAgentChatMessages(historyMessages) - h.logger.Info("从消息表加载历史消息", zap.Int("count", len(agentHistoryMessages))) - } - } else { - h.logger.Info("从代理轨迹恢复历史上下文", zap.Int("count", len(agentHistoryMessages))) - } - - // 校验附件数量 - if len(req.Attachments) > maxAttachments { - sendEvent("error", fmt.Sprintf("附件最多 %d 个", maxAttachments), nil) - return - } - - // 应用角色用户提示词和工具配置 - finalMessage := req.Message - var roleTools []string // 角色配置的工具列表 - if req.WebShellConnectionID != "" { - conn, errConn := h.db.GetWebshellConnection(strings.TrimSpace(req.WebShellConnectionID)) - if errConn != nil || conn == nil { - h.logger.Warn("WebShell AI 助手:未找到连接", zap.String("id", req.WebShellConnectionID), zap.Error(errConn)) - sendEvent("error", "未找到该 WebShell 连接", nil) - return - } - webshellContext := BuildWebshellAssistantContext(conn, WebshellSkillHintDefault, req.Message) - // WebShell 模式下如果同时指定了角色,追加角色 user_prompt(工具集仍仅限 webshell 专用工具) - if req.Role != "" && req.Role != "默认" && h.config.Roles != nil { - if role, exists := h.config.Roles[req.Role]; exists && role.Enabled && role.UserPrompt != "" { - finalMessage = role.UserPrompt + "\n\n" + webshellContext - h.logger.Info("WebShell + 角色: 应用角色提示词(流式)", zap.String("role", req.Role)) - } else { - finalMessage = webshellContext - } - } else { - finalMessage = webshellContext - } - roleTools = []string{ - builtin.ToolWebshellExec, - builtin.ToolWebshellFileList, - builtin.ToolWebshellFileRead, - builtin.ToolWebshellFileWrite, - builtin.ToolRecordVulnerability, - builtin.ToolListVulnerabilities, - builtin.ToolGetVulnerability, - builtin.ToolListKnowledgeRiskTypes, - builtin.ToolSearchKnowledgeBase, - } - } else if req.Role != "" && req.Role != "默认" { - if h.config.Roles != nil { - if role, exists := h.config.Roles[req.Role]; exists && role.Enabled { - // 应用用户提示词 - if role.UserPrompt != "" { - finalMessage = role.UserPrompt + "\n\n" + req.Message - h.logger.Info("应用角色用户提示词", zap.String("role", req.Role)) - } - // 获取角色配置的工具列表(优先使用tools字段,向后兼容mcps字段) - if len(role.Tools) > 0 { - roleTools = role.Tools - h.logger.Info("使用角色配置的工具列表", zap.String("role", req.Role), zap.Int("toolCount", len(roleTools))) - } else if len(role.MCPs) > 0 { - // 向后兼容:如果只有mcps字段,暂时使用空列表(表示使用所有工具) - // 因为mcps是MCP服务器名称,不是工具列表 - h.logger.Info("角色配置使用旧的mcps字段,将使用所有工具", zap.String("role", req.Role)) - } - } - } - } - var savedPaths []string - if len(req.Attachments) > 0 { - savedPaths, err = saveAttachmentsToDateAndConversationDir(req.Attachments, conversationID, h.logger) - if err != nil { - h.logger.Error("保存对话附件失败", zap.Error(err)) - sendEvent("error", "保存上传文件失败: "+err.Error(), nil) - return - } - } - // 仅将附件保存路径追加到 finalMessage,避免将文件内容内联到大模型上下文中 - finalMessage = appendAttachmentsToMessage(finalMessage, req.Attachments, savedPaths) - // 如果roleTools为空,表示使用所有工具(默认角色或未配置工具的角色) - - // 保存用户消息:有附件时一并保存附件名与路径,刷新后显示、继续对话时大模型也能从历史中拿到路径 - userContent := userMessageContentForStorage(req.Message, req.Attachments, savedPaths) - userMsgRow, err := h.db.AddMessage(conversationID, "user", userContent, nil) - if err != nil { - h.logger.Error("保存用户消息失败", zap.Error(err)) - } - - // 预先创建助手消息,以便关联过程详情 - assistantMsg, err := h.db.AddMessage(conversationID, "assistant", "处理中...", nil) - if err != nil { - h.logger.Error("创建助手消息失败", zap.Error(err)) - // 如果创建失败,继续执行但不保存过程详情 - assistantMsg = nil - } - - // 创建进度回调函数,同时保存到数据库 - var assistantMessageID string - if assistantMsg != nil { - assistantMessageID = assistantMsg.ID - } - - // 尽早下发消息 ID,便于前端在流式结束前挂上「删除本轮」等(无需等整段结束再刷新) - if userMsgRow != nil { - sendEvent("message_saved", "", map[string]interface{}{ - "conversationId": conversationID, - "userMessageId": userMsgRow.ID, - }) - } - - // 创建进度回调函数,复用统一逻辑 - // 创建一个独立的上下文用于任务执行,不随HTTP请求取消 - // 这样即使客户端断开连接(如刷新页面),任务也能继续执行 - baseCtx, cancelWithCause := context.WithCancelCause(context.Background()) - taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) - defer timeoutCancel() - defer cancelWithCause(nil) - taskCtx = mcp.WithMCPConversationID(taskCtx, conversationID) - taskCtx = mcp.WithToolRunRegistry(taskCtx, h.tasks) - progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) - taskCtx = h.injectReactHITLInterceptor(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) - - if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil { - var errorMsg string - if errors.Is(err, ErrTaskAlreadyRunning) { - errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」按钮后再尝试。" - sendEvent("error", errorMsg, map[string]interface{}{ - "conversationId": conversationID, - "errorType": "task_already_running", - }) - } else { - errorMsg = "❌ 无法启动任务: " + err.Error() - sendEvent("error", errorMsg, map[string]interface{}{ - "conversationId": conversationID, - "errorType": "task_start_failed", - }) - } - - // 更新助手消息内容并保存错误详情到数据库 - if assistantMessageID != "" { - if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", - errorMsg, - time.Now(), assistantMessageID, - ); updateErr != nil { - h.logger.Warn("更新错误后的助手消息失败", zap.Error(updateErr)) - } - // 保存错误详情到数据库 - if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errorMsg, map[string]interface{}{ - "errorType": func() string { - if errors.Is(err, ErrTaskAlreadyRunning) { - return "task_already_running" - } - return "task_start_failed" - }(), - }); err != nil { - h.logger.Warn("保存错误详情失败", zap.Error(err)) - } - } - - sendEvent("done", "", map[string]interface{}{ - "conversationId": conversationID, - }) - return - } - - taskStatus := "completed" - defer h.tasks.FinishTask(conversationID, taskStatus) - - // 执行Agent Loop,传入独立的上下文,确保任务不会因客户端断开而中断(使用包含角色提示词的finalMessage和角色工具列表) - sendEvent("progress", "正在分析您的请求...", nil) - stopKeepalive := make(chan struct{}) - go sseKeepalive(c, stopKeepalive, &sseWriteMu) - defer close(stopKeepalive) - - result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools, h.projectBlackboardBlock(conversationID)) - if err != nil { - h.logger.Error("Agent Loop执行失败", zap.Error(err)) - cause := context.Cause(baseCtx) - - // 检查是否是用户取消:context的cause是ErrTaskCancelled - // 如果cause是ErrTaskCancelled,无论错误是什么类型(包括context.Canceled),都视为用户取消 - // 这样可以正确处理在API调用过程中被取消的情况 - isCancelled := errors.Is(cause, ErrTaskCancelled) - - switch { - case isCancelled: - taskStatus = "cancelled" - cancelMsg := "任务已被用户取消,后续操作已停止。" - - // 在发送事件前更新任务状态,确保前端能及时看到状态变化 - h.tasks.UpdateTaskStatus(conversationID, taskStatus) - - if assistantMessageID != "" { - if result != nil { - if updateErr := h.mergeAssistantMessagePartialOnCancel(assistantMessageID, result.Response); updateErr != nil { - h.logger.Warn("合并取消前的部分回复失败", zap.Error(updateErr)) - } - } - if updateErr := h.appendAssistantMessageNotice(assistantMessageID, cancelMsg); updateErr != nil { - h.logger.Warn("更新取消后的助手消息失败", zap.Error(updateErr)) - } - h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil) - } - - // 即使任务被取消,也尝试保存代理轨迹(如果 result 中有) - if result != nil && (result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "") { - if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存取消任务的代理轨迹失败", zap.Error(err)) - } else { - h.logger.Info("已保存取消任务的代理轨迹", zap.String("conversationId", conversationID)) - } - } - - sendEvent("cancelled", cancelMsg, map[string]interface{}{ - "conversationId": conversationID, - "messageId": assistantMessageID, - }) - sendEvent("done", "", map[string]interface{}{ - "conversationId": conversationID, - }) - return - case errors.Is(err, context.DeadlineExceeded) || errors.Is(cause, context.DeadlineExceeded): - taskStatus = "timeout" - timeoutMsg := "任务执行超时,已自动终止。" - - // 在发送事件前更新任务状态,确保前端能及时看到状态变化 - h.tasks.UpdateTaskStatus(conversationID, taskStatus) - - if assistantMessageID != "" { - if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", - timeoutMsg, - time.Now(), assistantMessageID, - ); updateErr != nil { - h.logger.Warn("更新超时后的助手消息失败", zap.Error(updateErr)) - } - h.db.AddProcessDetail(assistantMessageID, conversationID, "timeout", timeoutMsg, nil) - } - - // 即使任务超时,也尝试保存代理轨迹(如果 result 中有) - if result != nil && (result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "") { - if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存超时任务的代理轨迹失败", zap.Error(err)) - } else { - h.logger.Info("已保存超时任务的代理轨迹", zap.String("conversationId", conversationID)) - } - } - - sendEvent("error", timeoutMsg, map[string]interface{}{ - "conversationId": conversationID, - "messageId": assistantMessageID, - }) - sendEvent("done", "", map[string]interface{}{ - "conversationId": conversationID, - }) - return - default: - taskStatus = "failed" - errorMsg := "执行失败: " + err.Error() - - // 在发送事件前更新任务状态,确保前端能及时看到状态变化 - h.tasks.UpdateTaskStatus(conversationID, taskStatus) - - if assistantMessageID != "" { - if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", - errorMsg, - time.Now(), assistantMessageID, - ); updateErr != nil { - h.logger.Warn("更新失败后的助手消息失败", zap.Error(updateErr)) - } - h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errorMsg, nil) - } - - // 即使任务失败,也尝试保存代理轨迹(如果 result 中有) - if result != nil && (result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "") { - if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存失败任务的代理轨迹失败", zap.Error(err)) - } else { - h.logger.Info("已保存失败任务的代理轨迹", zap.String("conversationId", conversationID)) - } - } - - sendEvent("error", errorMsg, map[string]interface{}{ - "conversationId": conversationID, - "messageId": assistantMessageID, - }) - sendEvent("done", "", map[string]interface{}{ - "conversationId": conversationID, - }) - } - return - } - - // 更新助手消息内容 - if assistantMsg != nil { - if errU := h.db.UpdateAssistantMessageFinalize(assistantMessageID, result.Response, result.MCPExecutionIDs, multiagent.AggregatedReasoningFromTraceJSON(result.LastAgentTraceInput)); errU != nil { - h.logger.Error("更新助手消息失败", zap.Error(errU)) - } - } else { - // 如果之前创建失败,现在创建 - _, err = h.db.AddMessage(conversationID, "assistant", result.Response, result.MCPExecutionIDs) - if err != nil { - h.logger.Error("保存助手消息失败", zap.Error(err)) - } - } - - // 保存最后一轮代理轨迹与助手输出 - if result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "" { - if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存代理轨迹失败", zap.Error(err)) - } else { - h.logger.Info("已保存代理轨迹", zap.String("conversationId", conversationID)) - } - } - - // 发送最终响应 - sendEvent("response", result.Response, map[string]interface{}{ - "mcpExecutionIds": result.MCPExecutionIDs, - "conversationId": conversationID, - "messageId": assistantMessageID, // 包含消息ID,以便前端关联过程详情 - }) - sendEvent("done", "", map[string]interface{}{ - "conversationId": conversationID, - }) -} - // CancelAgentLoop 取消正在执行的任务 func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { var req struct { @@ -2096,38 +1409,17 @@ type BatchTaskRequest struct { Title string `json:"title"` // 任务标题(可选) Tasks []string `json:"tasks" binding:"required"` // 任务列表,每行一个任务 Role string `json:"role,omitempty"` // 角色名称(可选,空字符串表示默认角色) - AgentMode string `json:"agentMode,omitempty"` // single | eino_single | deep | plan_execute | supervisor(react 同 single;旧版 multi 视为 deep) + AgentMode string `json:"agentMode,omitempty"` // eino_single | deep | plan_execute | supervisor ScheduleMode string `json:"scheduleMode,omitempty"` // manual | cron CronExpr string `json:"cronExpr,omitempty"` // scheduleMode=cron 时必填 ExecuteNow bool `json:"executeNow,omitempty"` // 创建后是否立即执行(默认 false) ProjectID string `json:"projectId,omitempty"` // 队列内子对话绑定的项目(可选) } -func normalizeBatchQueueAgentMode(mode string) string { - m := strings.TrimSpace(strings.ToLower(mode)) - if m == "multi" { - return "deep" - } - if m == "" || m == "single" || m == "react" { - return "single" - } - if m == "eino_single" { - return "eino_single" - } - switch config.NormalizeMultiAgentOrchestration(m) { - case "plan_execute": - return "plan_execute" - case "supervisor": - return "supervisor" - default: - return "deep" - } -} - -// batchQueueWantsEino 队列是否配置为走 Eino 多代理(不含「空 agentMode + 仅 BatchUseMultiAgent」这种运行期推断)。 +// batchQueueWantsEino 队列是否配置为走 Eino 多代理。 func batchQueueWantsEino(agentMode string) bool { m := strings.TrimSpace(strings.ToLower(agentMode)) - return m == "multi" || m == "deep" || m == "plan_execute" || m == "supervisor" + return m == "deep" || m == "plan_execute" || m == "supervisor" } func normalizeBatchQueueScheduleMode(mode string) string { @@ -2163,7 +1455,7 @@ func (h *AgentHandler) CreateBatchQueue(c *gin.Context) { return } - agentMode := normalizeBatchQueueAgentMode(req.AgentMode) + agentMode := config.NormalizeAgentMode(req.AgentMode) scheduleMode := normalizeBatchQueueScheduleMode(req.ScheduleMode) cronExpr := strings.TrimSpace(req.CronExpr) var nextRunAt *time.Time @@ -2843,55 +2135,40 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { // 使用队列配置的角色工具列表(如果为空,表示使用所有工具) useBatchMulti := false - useEinoSingle := false batchOrch := "deep" am := strings.TrimSpace(strings.ToLower(queue.AgentMode)) if am == "multi" { am = "deep" } - if am == "eino_single" { - useEinoSingle = true - } else if batchQueueWantsEino(queue.AgentMode) && h.config != nil && h.config.MultiAgent.Enabled { + if batchQueueWantsEino(queue.AgentMode) && h.config != nil && h.config.MultiAgent.Enabled { useBatchMulti = true batchOrch = config.NormalizeMultiAgentOrchestration(am) - } else if queue.AgentMode == "" { + } else if queue.AgentMode == "" && h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.BatchUseMultiAgent { // 兼容历史数据:未配置队列代理模式时,沿用旧的系统级开关 - if h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.BatchUseMultiAgent { - useBatchMulti = true - batchOrch = "deep" - } + useBatchMulti = true + batchOrch = "deep" } - useRunResult := useBatchMulti || useEinoSingle - var result *agent.AgentLoopResult var resultMA *multiagent.RunResult var runErr error switch { case useBatchMulti: resultMA, runErr = multiagent.RunDeepAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch, nil, h.projectBlackboardBlock(conversationID)) - case useEinoSingle: + default: if h.config == nil { runErr = fmt.Errorf("服务器配置未加载") } else { resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID)) } - default: - result, runErr = h.agent.AgentLoopWithProgress(taskCtx, finalMessage, []agent.ChatMessage{}, conversationID, progressCallback, roleTools, h.projectBlackboardBlock(conversationID)) } if runErr != nil { - if useRunResult && shouldPersistEinoAgentTraceAfterRunError(baseCtx) { + if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { h.persistEinoAgentTraceForResume(conversationID, resultMA) } - // 检查是否是取消错误 - // 1. 直接检查是否是 context.Canceled(包括包装后的错误) - // 2. 检查错误消息中是否包含"context canceled"或"cancelled"关键字 - // 3. 检查 result.Response 中是否包含取消相关的消息 errStr := runErr.Error() partialResp := "" - if useRunResult && resultMA != nil { + if resultMA != nil { partialResp = resultMA.Response - } else if result != nil { - partialResp = result.Response } isCancelled := errors.Is(context.Cause(baseCtx), ErrTaskCancelled) || errors.Is(runErr, context.Canceled) || @@ -2954,20 +2231,10 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { } else { h.logger.Info("批量任务执行成功", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("conversationId", conversationID)) - var resText string - var mcpIDs []string - var lastIn, lastOut string - if useRunResult { - resText = resultMA.Response - mcpIDs = resultMA.MCPExecutionIDs - lastIn = resultMA.LastAgentTraceInput - lastOut = resultMA.LastAgentTraceOutput - } else { - resText = result.Response - mcpIDs = result.MCPExecutionIDs - lastIn = result.LastAgentTraceInput - lastOut = result.LastAgentTraceOutput - } + resText := resultMA.Response + mcpIDs := resultMA.MCPExecutionIDs + lastIn := resultMA.LastAgentTraceInput + lastOut := resultMA.LastAgentTraceOutput // 更新助手消息内容 if assistantMessageID != "" { @@ -3054,7 +2321,7 @@ func (h *AgentHandler) loadHistoryFromAgentTrace(conversationID string) ([]agent continue // 跳过无效消息 } - // 跳过system消息(AgentLoop会重新添加) + // 跳过 system 消息(由 Eino Instruction 提供) if msg.Role == "system" { continue } diff --git a/internal/handler/batch_task_manager.go b/internal/handler/batch_task_manager.go index b33f9dd9..5bdd2018 100644 --- a/internal/handler/batch_task_manager.go +++ b/internal/handler/batch_task_manager.go @@ -11,6 +11,7 @@ import ( "time" "unicode/utf8" + "cyberstrike-ai/internal/config" "cyberstrike-ai/internal/database" "go.uber.org/zap" @@ -128,7 +129,7 @@ func (m *BatchTaskManager) CreateBatchQueue( Title: title, Role: role, ProjectID: strings.TrimSpace(projectID), - AgentMode: normalizeBatchQueueAgentMode(agentMode), + AgentMode: config.NormalizeAgentMode(agentMode), ScheduleMode: normalizeBatchQueueScheduleMode(scheduleMode), CronExpr: strings.TrimSpace(cronExpr), NextRunAt: nextRunAt, @@ -225,7 +226,7 @@ func (m *BatchTaskManager) loadQueueFromDB(queueID string) *BatchTaskQueue { queue := &BatchTaskQueue{ ID: queueRow.ID, - AgentMode: "single", + AgentMode: "eino_single", ScheduleMode: "manual", Status: queueRow.Status, CreatedAt: queueRow.CreatedAt, @@ -240,7 +241,7 @@ func (m *BatchTaskManager) loadQueueFromDB(queueID string) *BatchTaskQueue { queue.Role = queueRow.Role.String } if queueRow.AgentMode.Valid { - queue.AgentMode = normalizeBatchQueueAgentMode(queueRow.AgentMode.String) + queue.AgentMode = config.NormalizeAgentMode(queueRow.AgentMode.String) } if queueRow.ScheduleMode.Valid { queue.ScheduleMode = normalizeBatchQueueScheduleMode(queueRow.ScheduleMode.String) @@ -464,7 +465,7 @@ func (m *BatchTaskManager) LoadFromDB() error { queue := &BatchTaskQueue{ ID: queueRow.ID, - AgentMode: "single", + AgentMode: "eino_single", ScheduleMode: "manual", Status: queueRow.Status, CreatedAt: queueRow.CreatedAt, @@ -479,7 +480,7 @@ func (m *BatchTaskManager) LoadFromDB() error { queue.Role = queueRow.Role.String } if queueRow.AgentMode.Valid { - queue.AgentMode = normalizeBatchQueueAgentMode(queueRow.AgentMode.String) + queue.AgentMode = config.NormalizeAgentMode(queueRow.AgentMode.String) } if queueRow.ScheduleMode.Valid { queue.ScheduleMode = normalizeBatchQueueScheduleMode(queueRow.ScheduleMode.String) @@ -669,7 +670,7 @@ func (m *BatchTaskManager) UpdateQueueMetadata(queueID, title, role, agentMode s // 如果未传 agentMode,保留原值 if strings.TrimSpace(agentMode) != "" { - agentMode = normalizeBatchQueueAgentMode(agentMode) + agentMode = config.NormalizeAgentMode(agentMode) } else { agentMode = queue.AgentMode } diff --git a/internal/handler/batch_task_mcp.go b/internal/handler/batch_task_mcp.go index 27886b6c..bba9ece1 100644 --- a/internal/handler/batch_task_mcp.go +++ b/internal/handler/batch_task_mcp.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "cyberstrike-ai/internal/config" "cyberstrike-ai/internal/mcp" "cyberstrike-ai/internal/mcp/builtin" @@ -134,7 +135,7 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z 【何时用】用户明确要批量排队执行、Cron 周期跑同一批指令、或需要与任务管理页面对齐时调用。需要即时追问、强依赖当前对话上下文的分析/编码,应在本对话内直接完成,不要为了”委派”而创建队列。 -【参数】tasks(字符串数组)或 tasks_text(多行,每行一条)二选一;每项是一条将来由系统按队列顺序执行的指令文案。agent_mode:single(原生 ReAct,默认)、eino_single(Eino ADK 单代理)、deep / plan_execute / supervisor(需系统启用多代理);兼容旧值 multi(视为 deep)。非”把主对话拆给子代理”。schedule_mode:manual(默认)或 cron;cron 须填 cron_expr(5 段,如 “0 */6 * * *”)。 +【参数】tasks(字符串数组)或 tasks_text(多行,每行一条)二选一;每项是一条将来由系统按队列顺序执行的指令文案。agent_mode:eino_single(Eino ADK 单代理,默认)、deep / plan_execute / supervisor(需系统启用多代理)。非”把主对话拆给子代理”。schedule_mode:manual(默认)或 cron;cron 须填 cron_expr(5 段,如 “0 */6 * * *”)。 【执行】默认创建后为 pending,不自动跑。execute_now=true 可创建后立即跑;否则之后调用 batch_task_start。Cron 自动下一轮需 schedule_enabled 为 true(可用 batch_task_schedule_enabled)。`, ShortDescription: "任务管理:创建批量任务队列(登记多条指令,可选立即或 Cron)", @@ -160,8 +161,8 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z }, "agent_mode": map[string]interface{}{ "type": "string", - "description": "执行模式:single(原生 ReAct)、eino_single(Eino ADK)、deep/plan_execute/supervisor(Eino 编排,需启用多代理);multi 兼容为 deep", - "enum": []string{"single", "eino_single", "deep", "plan_execute", "supervisor", "multi"}, + "description": "执行模式:eino_single(Eino ADK,默认)、deep/plan_execute/supervisor(Eino 编排,需启用多代理)", + "enum": []string{"eino_single", "deep", "plan_execute", "supervisor"}, }, "schedule_mode": map[string]interface{}{ "type": "string", @@ -189,7 +190,7 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z } title := mcpArgString(args, "title") role := mcpArgString(args, "role") - agentMode := normalizeBatchQueueAgentMode(mcpArgString(args, "agent_mode")) + agentMode := config.NormalizeAgentMode(mcpArgString(args, "agent_mode")) scheduleMode := normalizeBatchQueueScheduleMode(mcpArgString(args, "schedule_mode")) cronExpr := strings.TrimSpace(mcpArgString(args, "cron_expr")) var nextRunAt *time.Time @@ -393,8 +394,8 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z }, "agent_mode": map[string]interface{}{ "type": "string", - "description": "代理模式:single、eino_single、deep、plan_execute、supervisor;multi 视为 deep", - "enum": []string{"single", "eino_single", "deep", "plan_execute", "supervisor", "multi"}, + "description": "代理模式:eino_single、deep、plan_execute、supervisor", + "enum": []string{"eino_single", "deep", "plan_execute", "supervisor"}, }, }, "required": []string{"queue_id"}, diff --git a/internal/handler/config.go b/internal/handler/config.go index d1cb35f4..212296ac 100644 --- a/internal/handler/config.go +++ b/internal/handler/config.go @@ -783,7 +783,7 @@ func (h *ConfigHandler) UpdateConfig(c *gin.Context) { if mode := strings.TrimSpace(req.MultiAgent.RobotDefaultAgentMode); mode != "" { h.config.MultiAgent.RobotDefaultAgentMode = mode } else { - h.config.MultiAgent.RobotDefaultAgentMode = "react" + h.config.MultiAgent.RobotDefaultAgentMode = "eino_single" } if req.MultiAgent.PlanExecuteLoopMaxIterations != nil { h.config.MultiAgent.PlanExecuteLoopMaxIterations = *req.MultiAgent.PlanExecuteLoopMaxIterations diff --git a/internal/handler/hitl.go b/internal/handler/hitl.go index 96fc9fa9..a6759639 100644 --- a/internal/handler/hitl.go +++ b/internal/handler/hitl.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/database" "cyberstrike-ai/internal/multiagent" @@ -691,35 +690,6 @@ func (h *AgentHandler) interceptHITLForEinoTool(runCtx context.Context, cancelRu return arguments, nil } -func (h *AgentHandler) interceptHITLForReactTool(runCtx context.Context, cancelRun context.CancelCauseFunc, conversationID, assistantMessageID string, sendEventFunc func(eventType, message string, data interface{}), toolName string, arguments map[string]interface{}, toolCallID string) (map[string]interface{}, error) { - payload := map[string]interface{}{ - "toolName": toolName, - "argumentsObj": arguments, - "toolCallId": toolCallID, - "source": "react_pre_exec", - } - d, err := h.waitHITLApproval(runCtx, cancelRun, conversationID, assistantMessageID, toolName, toolCallID, payload, sendEventFunc) - if err != nil || d == nil { - return arguments, err - } - if d.Decision == "reject" { - comment := strings.TrimSpace(d.Comment) - if comment == "" { - comment = "no extra feedback" - } - return arguments, errors.New("human rejected this tool call; feedback: " + comment) - } - if len(d.EditedArguments) > 0 { - return d.EditedArguments, nil - } - return arguments, nil -} - -func (h *AgentHandler) injectReactHITLInterceptor(ctx context.Context, cancelRun context.CancelCauseFunc, conversationID, assistantMessageID string, sendEventFunc func(eventType, message string, data interface{})) context.Context { - return agent.WithToolCallInterceptor(ctx, func(c context.Context, toolName string, args map[string]interface{}, toolCallID string) (map[string]interface{}, error) { - return h.interceptHITLForReactTool(c, cancelRun, conversationID, assistantMessageID, sendEventFunc, toolName, args, toolCallID) - }) -} type hitlConfigReq struct { ConversationID string `json:"conversationId" binding:"required"` diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index b6cc94b5..2e76116a 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -395,7 +395,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) } -// MultiAgentLoop Eino DeepAgent 非流式对话(与 POST /api/agent-loop 对齐,需 multi_agent.enabled)。 +// MultiAgentLoop Eino DeepAgent 非流式对话(需 multi_agent.enabled)。 func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { if h.config == nil || !h.config.MultiAgent.Enabled { c.JSON(http.StatusNotFound, gin.H{"error": "多代理未启用,请在 config.yaml 中设置 multi_agent.enabled: true"}) diff --git a/internal/handler/openapi.go b/internal/handler/openapi.go index 6b7855ae..e3271a81 100644 --- a/internal/handler/openapi.go +++ b/internal/handler/openapi.go @@ -423,8 +423,8 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, "agentMode": map[string]interface{}{ "type": "string", - "description": "代理模式:single(原生 ReAct)| eino_single(Eino ADK 单代理)| deep | plan_execute | supervisor;react 同 single;旧值 multi 按 deep", - "enum": []string{"single", "eino_single", "deep", "plan_execute", "supervisor", "multi", "react"}, + "description": "代理模式:eino_single(Eino ADK 单代理,默认)| deep | plan_execute | supervisor", + "enum": []string{"eino_single", "deep", "plan_execute", "supervisor"}, }, "scheduleMode": map[string]interface{}{ "type": "string", @@ -1121,7 +1121,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "post": map[string]interface{}{ "tags": []string{"对话管理"}, "summary": "创建对话", - "description": "创建一个新的安全测试对话。\n**重要说明**:\n- ✅ 创建的对话会**立即保存到数据库**\n- ✅ 前端页面会**自动刷新**显示新对话\n- ✅ 与前端创建的对话**完全一致**\n**创建对话的两种方式**:\n**方式1(推荐):** 直接使用 `/api/agent-loop` 发送消息,**不提供** `conversationId` 参数,系统会自动创建新对话并发送消息。这是最简单的方式,一步完成创建和发送。\n**方式2:** 先调用此端点创建空对话,然后使用返回的 `conversationId` 调用 `/api/agent-loop` 发送消息。适用于需要先创建对话,稍后再发送消息的场景。\n**示例**:\n```json\n{\n \"title\": \"Web应用安全测试\"\n}\n```", + "description": "创建一个新的安全测试对话。\n**重要说明**:\n- ✅ 创建的对话会**立即保存到数据库**\n- ✅ 前端页面会**自动刷新**显示新对话\n- ✅ 与前端创建的对话**完全一致**\n**创建对话的两种方式**:\n**方式1(推荐):** 直接使用 `/api/eino-agent` 发送消息,**不提供** `conversationId` 参数,系统会自动创建新对话并发送消息。这是最简单的方式,一步完成创建和发送。\n**方式2:** 先调用此端点创建空对话,然后使用返回的 `conversationId` 调用 `/api/eino-agent` 发送消息。适用于需要先创建对话,稍后再发送消息的场景。\n**示例**:\n```json\n{\n \"title\": \"Web应用安全测试\"\n}\n```", "operationId": "createConversation", "requestBody": map[string]interface{}{ "required": true, @@ -1412,148 +1412,11 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, }, }, - "/api/agent-loop": map[string]interface{}{ - "post": map[string]interface{}{ - "tags": []string{"对话交互"}, - "summary": "发送消息并获取AI回复(非流式)", - "description": "向AI发送消息并获取回复(非流式响应)。**这是与AI交互的核心端点**,与前端聊天功能完全一致。\n**重要说明**:\n- ✅ 通过此API创建/发送的消息会**立即保存到数据库**\n- ✅ 前端页面会**自动刷新**显示新创建的对话和消息\n- ✅ 所有操作都有**完整的交互痕迹**,就像在前端操作一样\n- ✅ 支持角色配置,可以指定使用哪个测试角色\n**推荐使用流程**:\n1. **先创建对话**:调用 `POST /api/conversations` 创建新对话,获取 `conversationId`\n2. **再发送消息**:使用返回的 `conversationId` 调用此端点发送消息\n**使用示例**:\n**步骤1 - 创建对话:**\n```json\nPOST /api/conversations\n{\n \"title\": \"Web应用安全测试\"\n}\n```\n**步骤2 - 发送消息:**\n```json\nPOST /api/agent-loop\n{\n \"conversationId\": \"返回的对话ID\",\n \"message\": \"扫描 http://example.com 的SQL注入漏洞\",\n \"role\": \"渗透测试\"\n}\n```\n**其他方式**:\n如果不提供 `conversationId`,系统会自动创建新对话并发送消息。但**推荐先创建对话**,这样可以更好地管理对话列表。\n**响应**:返回AI的回复、对话ID和MCP执行ID列表。前端会自动刷新显示新消息。", - "operationId": "sendMessage", - "requestBody": map[string]interface{}{ - "required": true, - "content": map[string]interface{}{ - "application/json": map[string]interface{}{ - "schema": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "message": map[string]interface{}{ - "type": "string", - "description": "要发送的消息(必需)", - "example": "扫描 http://example.com 的SQL注入漏洞", - }, - "conversationId": map[string]interface{}{ - "type": "string", - "description": "对话ID(可选)。\n- **不提供**:自动创建新对话并发送消息(推荐)\n- **提供**:消息会添加到指定对话中(对话必须存在)", - "example": "550e8400-e29b-41d4-a716-446655440000", - }, - "role": map[string]interface{}{ - "type": "string", - "description": "角色名称(可选),如:默认、渗透测试、Web应用扫描等", - "example": "默认", - }, - }, - "required": []string{"message"}, - }, - }, - }, - }, - "responses": map[string]interface{}{ - "200": map[string]interface{}{ - "description": "消息发送成功,返回AI回复", - "content": map[string]interface{}{ - "application/json": map[string]interface{}{ - "schema": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "response": map[string]interface{}{ - "type": "string", - "description": "AI的回复内容", - }, - "conversationId": map[string]interface{}{ - "type": "string", - "description": "对话ID", - }, - "mcpExecutionIds": map[string]interface{}{ - "type": "array", - "description": "MCP执行ID列表", - "items": map[string]interface{}{ - "type": "string", - }, - }, - "time": map[string]interface{}{ - "type": "string", - "format": "date-time", - "description": "响应时间", - }, - }, - }, - }, - }, - }, - "400": map[string]interface{}{ - "description": "请求参数错误", - }, - "401": map[string]interface{}{ - "description": "未授权,需要有效的Token", - }, - "500": map[string]interface{}{ - "description": "服务器内部错误", - }, - }, - }, - }, - "/api/agent-loop/stream": map[string]interface{}{ - "post": map[string]interface{}{ - "tags": []string{"对话交互"}, - "summary": "发送消息并获取AI回复(流式)", - "description": "向AI发送消息并获取流式回复(Server-Sent Events)。**这是与AI交互的核心端点**,与前端聊天功能完全一致。\n**重要说明**:\n- ✅ 通过此API创建/发送的消息会**立即保存到数据库**\n- ✅ 前端页面会**自动刷新**显示新创建的对话和消息\n- ✅ 所有操作都有**完整的交互痕迹**,就像在前端操作一样\n- ✅ 支持角色配置,可以指定使用哪个测试角色\n- ✅ 返回流式响应,适合实时显示AI回复\n**推荐使用流程**:\n1. **先创建对话**:调用 `POST /api/conversations` 创建新对话,获取 `conversationId`\n2. **再发送消息**:使用返回的 `conversationId` 调用此端点发送消息\n**使用示例**:\n**步骤1 - 创建对话:**\n```json\nPOST /api/conversations\n{\n \"title\": \"Web应用安全测试\"\n}\n```\n**步骤2 - 发送消息(流式):**\n```json\nPOST /api/agent-loop/stream\n{\n \"conversationId\": \"返回的对话ID\",\n \"message\": \"扫描 http://example.com 的SQL注入漏洞\",\n \"role\": \"渗透测试\"\n}\n```\n**响应格式**:Server-Sent Events (SSE),事件类型包括:\n- `message`: 用户消息确认\n- `response`: AI回复片段\n- `progress`: 进度更新\n- `done`: 完成\n- `error`: 错误\n- `cancelled`: 已取消", - "operationId": "sendMessageStream", - "requestBody": map[string]interface{}{ - "required": true, - "content": map[string]interface{}{ - "application/json": map[string]interface{}{ - "schema": map[string]interface{}{ - "type": "object", - "properties": map[string]interface{}{ - "message": map[string]interface{}{ - "type": "string", - "description": "要发送的消息(必需)", - "example": "扫描 http://example.com 的SQL注入漏洞", - }, - "conversationId": map[string]interface{}{ - "type": "string", - "description": "对话ID(可选)。\n- **不提供**:自动创建新对话并发送消息(推荐)\n- **提供**:消息会添加到指定对话中(对话必须存在)", - "example": "550e8400-e29b-41d4-a716-446655440000", - }, - "role": map[string]interface{}{ - "type": "string", - "description": "角色名称(可选),如:默认、渗透测试、Web应用扫描等", - "example": "默认", - }, - }, - "required": []string{"message"}, - }, - }, - }, - }, - "responses": map[string]interface{}{ - "200": map[string]interface{}{ - "description": "流式响应(Server-Sent Events)", - "content": map[string]interface{}{ - "text/event-stream": map[string]interface{}{ - "schema": map[string]interface{}{ - "type": "string", - "description": "SSE流式数据", - }, - }, - }, - }, - "400": map[string]interface{}{ - "description": "请求参数错误", - }, - "401": map[string]interface{}{ - "description": "未授权,需要有效的Token", - }, - "500": map[string]interface{}{ - "description": "服务器内部错误", - }, - }, - }, - }, "/api/eino-agent": map[string]interface{}{ "post": map[string]interface{}{ "tags": []string{"对话交互"}, "summary": "发送消息并获取 AI 回复(Eino ADK 单代理,非流式)", - "description": "与 `POST /api/agent-loop` 请求体相同,由 **CloudWeGo Eino** `adk.NewChatModelAgent` + `adk.NewRunner.Run` 执行(单代理 MCP 工具链)。**不依赖** `multi_agent.enabled`;`multi_agent.eino_skills` / `eino_middleware` 等与多代理主代理一致时可生效。支持 `webshellConnectionId`。", + "description": "向 AI 发送消息并获取回复(非流式)。由 **CloudWeGo Eino** `adk.NewChatModelAgent` + `adk.NewRunner.Run` 执行单代理 MCP 工具链。**不依赖** `multi_agent.enabled`;`multi_agent.eino_skills` / `eino_middleware` 等与多代理主代理一致时可生效。支持 `webshellConnectionId`、角色与附件。", "operationId": "sendMessageEinoSingleAgent", "requestBody": map[string]interface{}{ "required": true, @@ -1573,7 +1436,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, }, "responses": map[string]interface{}{ - "200": map[string]interface{}{"description": "成功,响应格式同 /api/agent-loop"}, + "200": map[string]interface{}{"description": "成功,响应格式同 /api/eino-agent"}, "400": map[string]interface{}{"description": "参数错误"}, "401": map[string]interface{}{"description": "未授权"}, "500": map[string]interface{}{"description": "执行失败"}, @@ -1584,7 +1447,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "post": map[string]interface{}{ "tags": []string{"对话交互"}, "summary": "发送消息并获取 AI 回复(Eino ADK 单代理,SSE)", - "description": "与 `POST /api/agent-loop/stream` 类似;由 Eino **单代理** ADK 执行。事件类型与多代理流式一致(含 `tool_call` / `response_delta` 等)。**不依赖** `multi_agent.enabled`。", + "description": "向 AI 发送消息并获取流式回复(SSE)。由 Eino **单代理** ADK 执行;事件类型与多代理流式一致(含 `tool_call` / `response_delta` / `thinking` 等)。**不依赖** `multi_agent.enabled`。", "operationId": "sendMessageEinoSingleAgentStream", "requestBody": map[string]interface{}{ "required": true, @@ -1623,7 +1486,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "post": map[string]interface{}{ "tags": []string{"对话交互"}, "summary": "发送消息并获取 AI 回复(Eino 多代理,非流式)", - "description": "与 `POST /api/agent-loop` 请求体相同,但由 **CloudWeGo Eino** 多代理执行。编排由请求体 `orchestration`(`deep` | `plan_execute` | `supervisor`)指定,缺省为 `deep`。**前提**:`multi_agent.enabled: true`;未启用时返回 404 JSON。支持 `webshellConnectionId`。", + "description": "与 `POST /api/eino-agent` 请求体相同,但由 **CloudWeGo Eino** 多代理执行。编排由请求体 `orchestration`(`deep` | `plan_execute` | `supervisor`)指定,缺省为 `deep`。**前提**:`multi_agent.enabled: true`;未启用时返回 404 JSON。支持 `webshellConnectionId`。", "operationId": "sendMessageMultiAgent", "requestBody": map[string]interface{}{ "required": true, @@ -1646,7 +1509,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, "webshellConnectionId": map[string]interface{}{ "type": "string", - "description": "WebShell 连接 ID(可选,与 agent-loop 行为一致)", + "description": "WebShell 连接 ID(可选,与 Eino 单/多代理流式行为一致)", }, "orchestration": map[string]interface{}{ "type": "string", @@ -1661,7 +1524,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, "responses": map[string]interface{}{ "200": map[string]interface{}{ - "description": "成功,响应格式同 /api/agent-loop", + "description": "成功,响应格式同 /api/eino-agent", }, "400": map[string]interface{}{"description": "参数错误"}, "401": map[string]interface{}{"description": "未授权"}, @@ -1674,7 +1537,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "post": map[string]interface{}{ "tags": []string{"对话交互"}, "summary": "发送消息并获取 AI 回复(Eino 多代理,SSE)", - "description": "与 `POST /api/agent-loop/stream` 类似;由 Eino 多代理执行。`orchestration` 指定 deep / plan_execute / supervisor,缺省 deep。**前提**:`multi_agent.enabled: true`;未启用时 SSE 内首条为 `type: error` 后接 `done`。支持 `webshellConnectionId`。", + "description": "与 `POST /api/eino-agent/stream` 类似;由 Eino 多代理执行。`orchestration` 指定 deep / plan_execute / supervisor,缺省 deep。**前提**:`multi_agent.enabled: true`;未启用时 SSE 内首条为 `type: error` 后接 `done`。支持 `webshellConnectionId`。", "operationId": "sendMessageMultiAgentStream", "requestBody": map[string]interface{}{ "required": true, @@ -4790,7 +4653,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "properties": map[string]interface{}{ "title": map[string]interface{}{"type": "string", "description": "队列标题"}, "role": map[string]interface{}{"type": "string", "description": "使用的角色名称"}, - "agentMode": map[string]interface{}{"type": "string", "description": "代理模式", "enum": []string{"single", "eino_single", "deep", "plan_execute", "supervisor"}}, + "agentMode": map[string]interface{}{"type": "string", "description": "代理模式", "enum": []string{"eino_single", "deep", "plan_execute", "supervisor"}}, }, }, },