From 5810fd7afa945e33b4c278b0dc76a3a8dcea198e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Sun, 19 Apr 2026 04:43:45 +0800 Subject: [PATCH] Add files via upload --- internal/agent/agent.go | 40 +++ .../agent/default_single_system_prompt.go | 4 +- internal/handler/agent.go | 2 +- internal/handler/batch_task_mcp.go | 28 +- internal/handler/eino_single_agent.go | 290 ++++++++++++++++++ internal/handler/multi_agent_prepare.go | 4 + internal/handler/openapi.go | 70 +++++ 7 files changed, 423 insertions(+), 15 deletions(-) create mode 100644 internal/handler/eino_single_agent.go diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 2e872a82..bfe1938f 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -324,6 +324,46 @@ func (a *Agent) AgentLoopWithConversationID(ctx context.Context, userInput strin return a.AgentLoopWithProgress(ctx, userInput, historyMessages, conversationID, nil, nil, nil) } +// EinoSingleAgentSystemInstruction 供 Eino adk.ChatModelAgent.Instruction 使用,与 AgentLoopWithProgress 首条 system 对齐(含 system_prompt_path 与 Skills 提示)。 +func (a *Agent) EinoSingleAgentSystemInstruction(roleSkills []string) string { + systemPrompt := DefaultSingleAgentSystemPrompt() + if a.agentConfig != nil { + if p := strings.TrimSpace(a.agentConfig.SystemPromptPath); p != "" { + path := p + a.mu.RLock() + base := a.promptBaseDir + a.mu.RUnlock() + if !filepath.IsAbs(path) && base != "" { + path = filepath.Join(base, path) + } + if b, err := os.ReadFile(path); err != nil { + a.logger.Warn("读取单代理 system_prompt_path 失败,使用内置提示", zap.String("path", path), zap.Error(err)) + } else if s := strings.TrimSpace(string(b)); s != "" { + systemPrompt = s + } + } + } + if len(roleSkills) > 0 { + var skillsHint strings.Builder + skillsHint.WriteString("\n\n本角色推荐使用的Skills:\n") + for i, skillName := range roleSkills { + if i > 0 { + skillsHint.WriteString("、") + } + skillsHint.WriteString("`") + skillsHint.WriteString(skillName) + skillsHint.WriteString("`") + } + skillsHint.WriteString("\n- 这些名称与 skills/ 下 SKILL.md 的 `name` 一致。") + skillsHint.WriteString("\n- 若当前会话已启用 Eino 内置 `skill` 工具,请按需加载;否则以 MCP 与文本工作流完成。") + skillsHint.WriteString("\n- 例如传入 skill 参数为 `") + skillsHint.WriteString(roleSkills[0]) + skillsHint.WriteString("`") + systemPrompt += skillsHint.String() + } + return systemPrompt +} + // AgentLoopWithProgress 执行Agent循环(带进度回调和对话ID) // roleSkills: 角色配置的skills列表(用于在系统提示词中提示AI,但不硬编码内容) func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, historyMessages []ChatMessage, conversationID string, callback ProgressCallback, roleTools []string, roleSkills []string) (*AgentLoopResult, error) { diff --git a/internal/agent/default_single_system_prompt.go b/internal/agent/default_single_system_prompt.go index 3929313d..bf9ff21a 100644 --- a/internal/agent/default_single_system_prompt.go +++ b/internal/agent/default_single_system_prompt.go @@ -100,6 +100,6 @@ func DefaultSingleAgentSystemPrompt() string { ## 技能库(Skills)与知识库 - 技能包位于服务器 skills/ 目录(各子目录 SKILL.md,遵循 agentskills.io);知识库用于向量检索片段,Skills 为可执行工作流指令。 -- 单代理本会话通过 MCP 使用知识库与漏洞记录等;Skills 的渐进式加载在「多代理 / Eino DeepAgent」中由内置 skill 工具完成。 -- 若当前无 skill 工具,需要完整 Skill 工作流时请使用多代理模式或切换为 Eino 编排会话。` +- 单代理本会话通过 MCP 使用知识库与漏洞记录等;Skills 的渐进式加载在「Eino ADK 单代理(/api/eino-agent)」或「多代理 / Eino DeepAgent」中由内置 skill 工具完成(需在配置中启用 multi_agent.eino_skills)。 +- 若当前无 skill 工具,需要完整 Skill 工作流时请使用 **Eino 单代理** 或 **多代理** 对话模式。` } diff --git a/internal/handler/agent.go b/internal/handler/agent.go index dd8330ac..5a2f52dc 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -177,7 +177,7 @@ type ChatRequest struct { Role string `json:"role,omitempty"` // 角色名称 Attachments []ChatAttachment `json:"attachments,omitempty"` WebShellConnectionID string `json:"webshellConnectionId,omitempty"` // WebShell 管理 - AI 助手:当前选中的连接 ID,仅使用 webshell_* 工具 - // Orchestration 仅对 /api/multi-agent、/api/multi-agent/stream:deep | plan_execute | supervisor;空则等同 deep。机器人/批量等无请求体时由服务端默认 deep。 + // Orchestration 仅对 /api/multi-agent、/api/multi-agent/stream:deep | plan_execute | supervisor;空则等同 deep。机器人/批量等无请求体时由服务端默认 deep。/api/eino-agent* 不使用此字段。 Orchestration string `json:"orchestration,omitempty"` } diff --git a/internal/handler/batch_task_mcp.go b/internal/handler/batch_task_mcp.go index 72ae8457..0f3f5089 100644 --- a/internal/handler/batch_task_mcp.go +++ b/internal/handler/batch_task_mcp.go @@ -128,47 +128,51 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z // --- create --- reg(mcp.Tool{ Name: builtin.ToolBatchTaskCreate, - Description: `创建新的批量任务队列。任务列表使用 tasks(字符串数组)或 tasks_text(多行,每行一条)。 -agent_mode: single(默认)或 multi(需系统启用多代理)。schedule_mode: manual(默认)或 cron;为 cron 时必须提供 cron_expr(如 "0 */6 * * *")。 -默认创建后不会立即执行。可通过 execute_now=true 在创建后立即启动;也可后续调用 batch_task_start 手工启动。Cron 队列若需按表达式自动触发下一轮,还需保持调度开关开启(可用 batch_task_schedule_enabled)。`, - ShortDescription: "创建批量任务队列(可选立即执行)", + Description: `【用途】应用内「任务管理 / 批量任务队列」:把多条彼此独立的用户指令登记成一条队列,便于在界面里查看进度、暂停/继续、定时重跑等。这是队列数据与调度入口,不是再开一个“子代理会话”替你探索当前问题。 + +【何时用】用户明确要批量排队执行、Cron 周期跑同一批指令、或需要与任务管理页面对齐时调用。需要即时追问、强依赖当前对话上下文的分析/编码,应在本对话内直接完成,不要为了“委派”而创建队列。 + +【参数】tasks(字符串数组)或 tasks_text(多行,每行一条)二选一;每项是一条将来由系统按队列顺序执行的指令文案。agent_mode:single(默认)或 multi(仅表示队列内每条子任务使用的执行模式,需系统已启用多代理);非“把主对话拆给子代理”。schedule_mode:manual(默认)或 cron;cron 须填 cron_expr(5 段,如 "0 */6 * * *")。 + +【执行】默认创建后为 pending,不自动跑。execute_now=true 可创建后立即跑;否则之后调用 batch_task_start。Cron 自动下一轮需 schedule_enabled 为 true(可用 batch_task_schedule_enabled)。`, + ShortDescription: "任务管理:创建批量任务队列(登记多条指令,可选立即或 Cron)", InputSchema: map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "title": map[string]interface{}{ "type": "string", - "description": "可选标题", + "description": "可选队列标题,便于在任务管理中识别", }, "role": map[string]interface{}{ "type": "string", - "description": "角色名称,空表示默认", + "description": "队列使用的角色名,空表示默认", }, "tasks": map[string]interface{}{ "type": "array", - "description": "任务指令列表,每项一条", + "description": "队列中的子任务指令,每项一条独立待执行文案(与 tasks_text 二选一)", "items": map[string]interface{}{"type": "string"}, }, "tasks_text": map[string]interface{}{ "type": "string", - "description": "多行文本,每行一条任务(与 tasks 二选一)", + "description": "多行文本,每行一条子任务指令(与 tasks 二选一)", }, "agent_mode": map[string]interface{}{ "type": "string", - "description": "single 或 multi", + "description": "队列内子任务的执行模式:single 或 multi(multi 需系统启用多代理;非子代理委派语义)", "enum": []string{"single", "multi"}, }, "schedule_mode": map[string]interface{}{ "type": "string", - "description": "manual 或 cron", + "description": "manual(仅手工/启动后跑)或 cron(按表达式触发)", "enum": []string{"manual", "cron"}, }, "cron_expr": map[string]interface{}{ "type": "string", - "description": "schedule_mode 为 cron 时必填。标准 5 段格式:分钟 小时 日 月 星期,例如 \"0 */6 * * *\"(每6小时)、\"30 2 * * 1-5\"(工作日凌晨2:30)", + "description": "schedule_mode 为 cron 时必填。标准 5 段:分钟 小时 日 月 星期,例如 \"0 */6 * * *\"、\"30 2 * * 1-5\"", }, "execute_now": map[string]interface{}{ "type": "boolean", - "description": "是否创建后立即执行,默认 false", + "description": "创建后是否立即开始执行队列,默认 false(pending,需 batch_task_start)", }, }, }, diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go new file mode 100644 index 00000000..76d3c908 --- /dev/null +++ b/internal/handler/eino_single_agent.go @@ -0,0 +1,290 @@ +package handler + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "cyberstrike-ai/internal/multiagent" + + "github.com/gin-gonic/gin" + "go.uber.org/zap" +) + +// EinoSingleAgentLoopStream Eino ADK 单代理(ChatModelAgent + Runner)流式对话;不依赖 multi_agent.enabled。 +func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + + var req ChatRequest + if err := c.ShouldBindJSON(&req); err != nil { + ev := StreamEvent{Type: "error", Message: "请求参数错误: " + err.Error()} + b, _ := json.Marshal(ev) + fmt.Fprintf(c.Writer, "data: %s\n\n", b) + done := StreamEvent{Type: "done", Message: ""} + db, _ := json.Marshal(done) + fmt.Fprintf(c.Writer, "data: %s\n\n", db) + if flusher, ok := c.Writer.(http.Flusher); ok { + flusher.Flush() + } + return + } + + c.Header("X-Accel-Buffering", "no") + + var baseCtx context.Context + clientDisconnected := false + var sseWriteMu sync.Mutex + sendEvent := func(eventType, message string, data interface{}) { + if clientDisconnected { + return + } + if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) { + return + } + select { + case <-c.Request.Context().Done(): + clientDisconnected = true + return + default: + } + ev := StreamEvent{Type: eventType, Message: message, Data: data} + b, _ := json.Marshal(ev) + sseWriteMu.Lock() + _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", b) + if err != nil { + sseWriteMu.Unlock() + clientDisconnected = true + return + } + if flusher, ok := c.Writer.(http.Flusher); ok { + flusher.Flush() + } else { + c.Writer.Flush() + } + sseWriteMu.Unlock() + } + + h.logger.Info("收到 Eino ADK 单代理流式请求", + zap.String("conversationId", req.ConversationID), + ) + + prep, err := h.prepareMultiAgentSession(&req) + if err != nil { + sendEvent("error", err.Error(), nil) + sendEvent("done", "", nil) + return + } + if prep.CreatedNew { + sendEvent("conversation", "会话已创建", map[string]interface{}{ + "conversationId": prep.ConversationID, + }) + } + + conversationID := prep.ConversationID + assistantMessageID := prep.AssistantMessageID + + if prep.UserMessageID != "" { + sendEvent("message_saved", "", map[string]interface{}{ + "conversationId": conversationID, + "userMessageId": prep.UserMessageID, + }) + } + + progressCallback := h.createProgressCallback(conversationID, assistantMessageID, sendEvent) + + var cancelWithCause context.CancelCauseFunc + baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) + taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) + defer timeoutCancel() + defer cancelWithCause(nil) + + if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil { + var errorMsg string + if errors.Is(err, ErrTaskAlreadyRunning) { + errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。" + sendEvent("error", errorMsg, map[string]interface{}{ + "conversationId": conversationID, + "errorType": "task_already_running", + }) + } else { + errorMsg = "❌ 无法启动任务: " + err.Error() + sendEvent("error", errorMsg, nil) + } + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errorMsg, assistantMessageID) + } + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + return + } + + taskStatus := "completed" + defer h.tasks.FinishTask(conversationID, taskStatus) + + sendEvent("progress", "正在启动 Eino ADK 单代理(ChatModelAgent)...", map[string]interface{}{ + "conversationId": conversationID, + }) + + stopKeepalive := make(chan struct{}) + go sseKeepalive(c, stopKeepalive, &sseWriteMu) + defer close(stopKeepalive) + + if h.config == nil { + sendEvent("error", "服务器配置未加载", nil) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + return + } + + result, runErr := multiagent.RunEinoSingleChatModelAgent( + taskCtx, + h.config, + &h.config.MultiAgent, + h.agent, + h.logger, + conversationID, + prep.FinalMessage, + prep.History, + prep.RoleTools, + prep.RoleSkills, + progressCallback, + ) + + if runErr != nil { + cause := context.Cause(baseCtx) + if errors.Is(cause, ErrTaskCancelled) { + taskStatus = "cancelled" + h.tasks.UpdateTaskStatus(conversationID, taskStatus) + cancelMsg := "任务已被用户取消,后续操作已停止。" + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", cancelMsg, assistantMessageID) + _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil) + } + sendEvent("cancelled", cancelMsg, map[string]interface{}{ + "conversationId": conversationID, + "messageId": assistantMessageID, + }) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + return + } + + h.logger.Error("Eino ADK 单代理执行失败", zap.Error(runErr)) + taskStatus = "failed" + h.tasks.UpdateTaskStatus(conversationID, taskStatus) + errMsg := "执行失败: " + runErr.Error() + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID) + _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) + } + sendEvent("error", errMsg, map[string]interface{}{ + "conversationId": conversationID, + "messageId": assistantMessageID, + }) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + return + } + + if assistantMessageID != "" { + mcpIDsJSON := "" + if len(result.MCPExecutionIDs) > 0 { + jsonData, _ := json.Marshal(result.MCPExecutionIDs) + mcpIDsJSON = string(jsonData) + } + _, _ = h.db.Exec( + "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + result.Response, + mcpIDsJSON, + assistantMessageID, + ) + } + + if result.LastReActInput != "" || result.LastReActOutput != "" { + if err := h.db.SaveReActData(conversationID, result.LastReActInput, result.LastReActOutput); err != nil { + h.logger.Warn("保存 ReAct 数据失败", zap.Error(err)) + } + } + + sendEvent("response", result.Response, map[string]interface{}{ + "mcpExecutionIds": result.MCPExecutionIDs, + "conversationId": conversationID, + "messageId": assistantMessageID, + "agentMode": "eino_single", + }) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) +} + +// EinoSingleAgentLoop Eino ADK 单代理非流式对话。 +func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) { + var req ChatRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + h.logger.Info("收到 Eino ADK 单代理非流式请求", zap.String("conversationId", req.ConversationID)) + + prep, err := h.prepareMultiAgentSession(&req) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + var progressBuf strings.Builder + progressCallback := func(eventType, message string, data interface{}) { + progressBuf.WriteString(eventType) + progressBuf.WriteByte('\n') + } + + if h.config == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "服务器配置未加载"}) + return + } + + result, runErr := multiagent.RunEinoSingleChatModelAgent( + c.Request.Context(), + h.config, + &h.config.MultiAgent, + h.agent, + h.logger, + prep.ConversationID, + prep.FinalMessage, + prep.History, + prep.RoleTools, + prep.RoleSkills, + progressCallback, + ) + if runErr != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": runErr.Error()}) + return + } + + if prep.AssistantMessageID != "" { + mcpIDsJSON := "" + if len(result.MCPExecutionIDs) > 0 { + jsonData, _ := json.Marshal(result.MCPExecutionIDs) + mcpIDsJSON = string(jsonData) + } + _, _ = h.db.Exec( + "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + result.Response, + mcpIDsJSON, + prep.AssistantMessageID, + ) + } + if result.LastReActInput != "" || result.LastReActOutput != "" { + _ = h.db.SaveReActData(prep.ConversationID, result.LastReActInput, result.LastReActOutput) + } + + c.JSON(http.StatusOK, gin.H{ + "response": result.Response, + "conversationId": prep.ConversationID, + "mcpExecutionIds": result.MCPExecutionIDs, + "assistantMessageId": prep.AssistantMessageID, + "agentMode": "eino_single", + }) +} diff --git a/internal/handler/multi_agent_prepare.go b/internal/handler/multi_agent_prepare.go index 27190013..9fce494e 100644 --- a/internal/handler/multi_agent_prepare.go +++ b/internal/handler/multi_agent_prepare.go @@ -18,6 +18,7 @@ type multiAgentPrepared struct { History []agent.ChatMessage FinalMessage string RoleTools []string + RoleSkills []string AssistantMessageID string UserMessageID string } @@ -67,6 +68,7 @@ func (h *AgentHandler) prepareMultiAgentSession(req *ChatRequest) (*multiAgentPr finalMessage := req.Message var roleTools []string + var roleSkills []string if req.WebShellConnectionID != "" { conn, errConn := h.db.GetWebshellConnection(strings.TrimSpace(req.WebShellConnectionID)) if errConn != nil || conn == nil { @@ -94,6 +96,7 @@ func (h *AgentHandler) prepareMultiAgentSession(req *ChatRequest) (*multiAgentPr finalMessage = role.UserPrompt + "\n\n" + req.Message } roleTools = role.Tools + roleSkills = role.Skills } } @@ -132,6 +135,7 @@ func (h *AgentHandler) prepareMultiAgentSession(req *ChatRequest) (*multiAgentPr History: agentHistoryMessages, FinalMessage: finalMessage, RoleTools: roleTools, + RoleSkills: roleSkills, AssistantMessageID: assistantMessageID, UserMessageID: userMessageID, }, nil diff --git a/internal/handler/openapi.go b/internal/handler/openapi.go index 09bb5d0d..ea484bd4 100644 --- a/internal/handler/openapi.go +++ b/internal/handler/openapi.go @@ -1499,6 +1499,76 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, }, }, + "/api/eino-agent": map[string]interface{}{ + "post": map[string]interface{}{ + "tags": []string{"对话交互"}, + "summary": "发送消息并获取 AI 回复(Eino ADK 单代理,非流式)", + "description": "与 `POST /api/agent-loop` 请求体相同,由 **CloudWeGo Eino** `adk.NewChatModelAgent` + `adk.NewRunner.Run` 执行(单代理 MCP 工具链)。**不依赖** `multi_agent.enabled`;`multi_agent.eino_skills` / `eino_middleware` 等与多代理主代理一致时可生效。支持 `webshellConnectionId`。", + "operationId": "sendMessageEinoSingleAgent", + "requestBody": map[string]interface{}{ + "required": true, + "content": map[string]interface{}{ + "application/json": map[string]interface{}{ + "schema": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "message": map[string]interface{}{"type": "string"}, + "conversationId": map[string]interface{}{"type": "string"}, + "role": map[string]interface{}{"type": "string"}, + "webshellConnectionId": map[string]interface{}{"type": "string"}, + }, + "required": []string{"message"}, + }, + }, + }, + }, + "responses": map[string]interface{}{ + "200": map[string]interface{}{"description": "成功,响应格式同 /api/agent-loop"}, + "400": map[string]interface{}{"description": "参数错误"}, + "401": map[string]interface{}{"description": "未授权"}, + "500": map[string]interface{}{"description": "执行失败"}, + }, + }, + }, + "/api/eino-agent/stream": map[string]interface{}{ + "post": map[string]interface{}{ + "tags": []string{"对话交互"}, + "summary": "发送消息并获取 AI 回复(Eino ADK 单代理,SSE)", + "description": "与 `POST /api/agent-loop/stream` 类似;由 Eino **单代理** ADK 执行。事件类型与多代理流式一致(含 `tool_call` / `response_delta` 等)。**不依赖** `multi_agent.enabled`。", + "operationId": "sendMessageEinoSingleAgentStream", + "requestBody": map[string]interface{}{ + "required": true, + "content": map[string]interface{}{ + "application/json": map[string]interface{}{ + "schema": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "message": map[string]interface{}{"type": "string"}, + "conversationId": map[string]interface{}{"type": "string"}, + "role": map[string]interface{}{"type": "string"}, + "webshellConnectionId": map[string]interface{}{"type": "string"}, + }, + "required": []string{"message"}, + }, + }, + }, + }, + "responses": map[string]interface{}{ + "200": map[string]interface{}{ + "description": "text/event-stream(SSE)", + "content": map[string]interface{}{ + "text/event-stream": map[string]interface{}{ + "schema": map[string]interface{}{ + "type": "string", + "description": "SSE 流", + }, + }, + }, + }, + "401": map[string]interface{}{"description": "未授权"}, + }, + }, + }, "/api/multi-agent": map[string]interface{}{ "post": map[string]interface{}{ "tags": []string{"对话交互"},