From e7609c5fc43e0ebc847db25d3e9534c5c0a30b30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 3 Apr 2026 22:09:23 +0800 Subject: [PATCH] Add files via upload --- internal/attackchain/builder.go | 128 +++++++++++++++++++++++++++++- internal/database/conversation.go | 13 +++ 2 files changed, 140 insertions(+), 1 deletion(-) diff --git a/internal/attackchain/builder.go b/internal/attackchain/builder.go index 5a5d6649..de1a7d52 100644 --- a/internal/attackchain/builder.go +++ b/internal/attackchain/builder.go @@ -97,7 +97,8 @@ func (b *Builder) BuildChainFromConversation(ctx context.Context, conversationID return &Chain{Nodes: []Node{}, Edges: []Edge{}}, nil } - // 检查是否有实际的工具执行(通过检查assistant消息的mcp_execution_ids) + // 检查是否有实际的工具执行:assistant 的 mcp_execution_ids,或过程详情中的 tool_call/tool_result + //(多代理下若 MCP 未返回 execution_id,IDs 可能为空,但工具已通过 Eino 执行并写入 process_details) hasToolExecutions := false for i := len(messages) - 1; i >= 0; i-- { if strings.EqualFold(messages[i].Role, "assistant") { @@ -107,6 +108,13 @@ func (b *Builder) BuildChainFromConversation(ctx context.Context, conversationID } } } + if !hasToolExecutions { + if pdOK, err := b.db.ConversationHasToolProcessDetails(conversationID); err != nil { + b.logger.Warn("查询过程详情判定工具执行失败", zap.Error(err)) + } else if pdOK { + hasToolExecutions = true + } + } // 检查任务是否被取消(通过检查最后一条assistant消息内容或process_details) taskCancelled := false @@ -204,6 +212,37 @@ func (b *Builder) BuildChainFromConversation(ctx context.Context, conversationID } } + // 多代理:保存的 last_react_input 可能仅为首轮用户消息,不含工具轨迹;补充最后一轮助手的过程详情(与单代理「最后一轮 ReAct」对齐) + hasMCPOnAssistant := false + var lastAssistantID string + for i := len(messages) - 1; i >= 0; i-- { + if strings.EqualFold(messages[i].Role, "assistant") { + lastAssistantID = messages[i].ID + if len(messages[i].MCPExecutionIDs) > 0 { + hasMCPOnAssistant = true + } + break + } + } + if lastAssistantID != "" { + pdHasTools, _ := b.db.ConversationHasToolProcessDetails(conversationID) + if pdHasTools && !(hasMCPOnAssistant && reactInputContainsToolTrace(reactInputJSON)) { + detailsMap, err := b.db.GetProcessDetailsByConversation(conversationID) + if err != nil { + b.logger.Warn("加载过程详情用于攻击链失败", zap.Error(err)) + } else if dets := detailsMap[lastAssistantID]; len(dets) > 0 { + extra := b.formatProcessDetailsForAttackChain(dets) + if strings.TrimSpace(extra) != "" { + reactInputFinal = reactInputFinal + "\n\n## 执行过程与工具记录(含多代理编排与子任务)\n\n" + extra + b.logger.Info("攻击链输入已补充过程详情", + zap.String("conversationId", conversationID), + zap.String("messageId", lastAssistantID), + zap.Int("detailEvents", len(dets))) + } + } + } + } + // 3. 构建简化的prompt,一次性传递给大模型 prompt := b.buildSimplePrompt(reactInputFinal, modelOutput) // fmt.Println(prompt) @@ -240,6 +279,93 @@ func (b *Builder) BuildChainFromConversation(ctx context.Context, conversationID return chainData, nil } +// reactInputContainsToolTrace 判断保存的 ReAct JSON 是否包含可解析的工具调用轨迹(单代理完整保存时为 true)。 +func reactInputContainsToolTrace(reactInputJSON string) bool { + s := strings.TrimSpace(reactInputJSON) + if s == "" { + return false + } + return strings.Contains(s, "tool_calls") || + strings.Contains(s, "tool_call_id") || + strings.Contains(s, `"role":"tool"`) || + strings.Contains(s, `"role": "tool"`) +} + +// formatProcessDetailsForAttackChain 将最后一轮助手的过程详情格式化为攻击链分析的输入(覆盖多代理下 last_react_input 不完整的情况)。 +func (b *Builder) formatProcessDetailsForAttackChain(details []database.ProcessDetail) string { + if len(details) == 0 { + return "" + } + var sb strings.Builder + for _, d := range details { + // 目标:以主 agent(编排器)视角输出整轮迭代 + // - 保留:编排器工具调用/结果、对子代理的 task 调度、子代理最终回复(不含推理) + // - 丢弃:thinking/planning/progress 等噪声、子代理的工具细节与推理过程 + if d.EventType == "progress" || d.EventType == "thinking" || d.EventType == "planning" { + continue + } + + // 解析 data(JSON string),用于识别 einoRole / toolName 等 + var dataMap map[string]interface{} + if strings.TrimSpace(d.Data) != "" { + _ = json.Unmarshal([]byte(d.Data), &dataMap) + } + einoRole := "" + if v, ok := dataMap["einoRole"]; ok { + einoRole = strings.ToLower(strings.TrimSpace(fmt.Sprint(v))) + } + toolName := "" + if v, ok := dataMap["toolName"]; ok { + toolName = strings.TrimSpace(fmt.Sprint(v)) + } + + // 1) 编排器的工具调用/结果:保留(这是“主 agent 调了什么工具”) + if (d.EventType == "tool_call" || d.EventType == "tool_result" || d.EventType == "tool_calls_detected" || d.EventType == "iteration" || d.EventType == "eino_recovery") && einoRole == "orchestrator" { + sb.WriteString("[") + sb.WriteString(d.EventType) + sb.WriteString("] ") + sb.WriteString(strings.TrimSpace(d.Message)) + sb.WriteString("\n") + if strings.TrimSpace(d.Data) != "" { + sb.WriteString(d.Data) + sb.WriteString("\n") + } + sb.WriteString("\n") + continue + } + + // 2) 子代理调度:tool_call(toolName=="task") 代表编排器把子任务派发出去;保留(只需任务,不要子代理推理) + if d.EventType == "tool_call" && strings.EqualFold(toolName, "task") { + sb.WriteString("[dispatch_subagent_task] ") + sb.WriteString(strings.TrimSpace(d.Message)) + sb.WriteString("\n") + if strings.TrimSpace(d.Data) != "" { + sb.WriteString(d.Data) + sb.WriteString("\n") + } + sb.WriteString("\n") + continue + } + + // 3) 子代理最终回复:保留(只保留最终输出,不保留分析过程) + if d.EventType == "eino_agent_reply" && einoRole == "sub" { + sb.WriteString("[subagent_final_reply] ") + sb.WriteString(strings.TrimSpace(d.Message)) + sb.WriteString("\n") + // data 里含 einoAgent 等元信息,保留有助于追踪“哪个子代理说的” + if strings.TrimSpace(d.Data) != "" { + sb.WriteString(d.Data) + sb.WriteString("\n") + } + sb.WriteString("\n") + continue + } + + // 其他事件默认丢弃,避免把子代理工具细节/推理塞进 prompt,偏离“主 agent 一轮迭代”的视角。 + } + return strings.TrimSpace(sb.String()) +} + // buildReActInput 构建最后一轮ReAct的输入(历史消息+当前用户输入) func (b *Builder) buildReActInput(messages []database.Message) string { var builder strings.Builder diff --git a/internal/database/conversation.go b/internal/database/conversation.go index cf96f445..bd7bd97f 100644 --- a/internal/database/conversation.go +++ b/internal/database/conversation.go @@ -457,6 +457,19 @@ func (db *DB) GetReActData(conversationID string) (reactInput, reactOutput strin return reactInput, reactOutput, nil } +// ConversationHasToolProcessDetails 对话是否存在已落库的工具调用/结果(用于多代理等场景下 MCP execution id 未汇总时的攻击链判定)。 +func (db *DB) ConversationHasToolProcessDetails(conversationID string) (bool, error) { + var n int + err := db.QueryRow( + `SELECT COUNT(*) FROM process_details WHERE conversation_id = ? AND event_type IN ('tool_call', 'tool_result')`, + conversationID, + ).Scan(&n) + if err != nil { + return false, fmt.Errorf("查询过程详情失败: %w", err) + } + return n > 0, nil +} + // AddMessage 添加消息 func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs []string) (*Message, error) { id := uuid.New().String()