Compare commits

...

33 Commits

Author SHA1 Message Date
公明 df2506b651 Add files via upload 2026-05-10 02:04:23 +08:00
公明 efe9172f85 Add files via upload 2026-05-10 02:03:07 +08:00
公明 b788bc6dab Add files via upload 2026-05-10 02:01:28 +08:00
公明 9134f2bbcb Update config.yaml 2026-05-10 01:53:51 +08:00
公明 d76cf2a162 Add files via upload 2026-05-10 00:58:35 +08:00
公明 2f96feb98f Add files via upload 2026-05-10 00:57:26 +08:00
公明 a374c3950c Add files via upload 2026-05-10 00:55:20 +08:00
公明 a93e3455fa Add files via upload 2026-05-10 00:53:33 +08:00
公明 6cd864c5ca Update config.yaml 2026-05-08 23:00:15 +08:00
公明 e34faff001 Add files via upload 2026-05-08 22:45:46 +08:00
公明 fa09796ddd Add files via upload 2026-05-08 22:44:32 +08:00
公明 1ab7e98f56 Add files via upload 2026-05-08 22:42:31 +08:00
公明 0743086873 Add files via upload 2026-05-08 22:32:21 +08:00
公明 a1ceb9c108 Add files via upload 2026-05-08 17:22:40 +08:00
公明 9ddea33dab Add files via upload 2026-05-08 17:15:27 +08:00
公明 e948940b18 Delete images/dashboard.png 2026-05-08 17:14:56 +08:00
公明 94bbbf87bf Add files via upload 2026-05-08 16:50:56 +08:00
公明 4f09ffbaaa Add files via upload 2026-05-08 13:57:18 +08:00
公明 6d77081b2b Add files via upload 2026-05-08 13:56:04 +08:00
公明 99ccb07ec9 Add files via upload 2026-05-08 13:54:25 +08:00
公明 1130fdbfa4 Add files via upload 2026-05-08 13:08:45 +08:00
公明 84f4da4d1d Add files via upload 2026-05-08 13:07:33 +08:00
公明 34dae98329 Add files via upload 2026-05-08 13:05:45 +08:00
公明 3ee7d64b09 Add files via upload 2026-05-08 13:04:18 +08:00
公明 22a3aa1531 Add files via upload 2026-05-07 18:03:19 +08:00
公明 8ad61906fa Add files via upload 2026-05-07 18:02:15 +08:00
公明 487522707f Add files via upload 2026-05-07 18:00:22 +08:00
公明 fe625010eb Update config.yaml 2026-05-07 17:04:39 +08:00
公明 40cd0293b5 Add files via upload 2026-05-07 17:04:14 +08:00
公明 b62dc1f326 Add files via upload 2026-05-07 17:02:26 +08:00
公明 6d180c814d Add files via upload 2026-05-07 17:01:15 +08:00
公明 e68d3a3d23 Add files via upload 2026-05-07 16:58:54 +08:00
公明 699b9181e6 Add files via upload 2026-05-07 16:57:17 +08:00
33 changed files with 1980 additions and 292 deletions
+1 -1
View File
@@ -10,7 +10,7 @@
# ============================================
# 前端显示的版本号(可选,不填则显示默认版本)
version: "v1.6.3"
version: "v1.6.6"
# 服务器配置
server:
host: 0.0.0.0 # 监听地址,0.0.0.0 表示监听所有网络接口
Binary file not shown.

Before

Width:  |  Height:  |  Size: 832 KiB

After

Width:  |  Height:  |  Size: 726 KiB

+29 -1
View File
@@ -1514,7 +1514,9 @@ func (a *Agent) executeToolViaMCP(ctx context.Context, toolName string, args map
// 如果调用失败(如工具不存在、超时),返回友好的错误信息而不是抛出异常
if err != nil {
detail := err.Error()
if errors.Is(err, context.DeadlineExceeded) {
if errors.Is(err, context.Canceled) {
detail = "工具调用已被手动终止(MCP 监控页)。智能体将携带此结果继续后续步骤,整条任务不会因此被停止。"
} else if errors.Is(err, context.DeadlineExceeded) {
min := 10
if a.agentConfig != nil && a.agentConfig.ToolTimeoutMinutes > 0 {
min = a.agentConfig.ToolTimeoutMinutes
@@ -1903,9 +1905,35 @@ func (a *Agent) ExecuteMCPToolForConversation(ctx context.Context, conversationI
a.currentConversationID = prev
a.mu.Unlock()
}()
ctx = withAgentConversationID(ctx, conversationID)
return a.executeToolViaMCP(ctx, toolName, args)
}
// RecordLocalToolExecution 将非 CallTool 路径完成的工具调用写入 MCP 监控库(与 CallTool 落库一致),返回 executionId。
// 用于 Eino filesystem execute 等场景,使助手气泡「渗透测试详情」与常规 MCP 一致可点进监控。
func (a *Agent) RecordLocalToolExecution(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
if a == nil || a.mcpServer == nil {
return ""
}
return a.mcpServer.RecordCompletedToolInvocation(toolName, args, resultText, invokeErr)
}
// CancelMCPToolExecutionWithNote 取消一次进行中的 MCP 工具(先内部后外部),与监控页「终止工具」一致;note 非空时合并进返回给模型的文本。
func (a *Agent) CancelMCPToolExecutionWithNote(executionID, note string) bool {
executionID = strings.TrimSpace(executionID)
note = strings.TrimSpace(note)
if executionID == "" {
return false
}
if a.mcpServer != nil && a.mcpServer.CancelToolExecutionWithNote(executionID, note) {
return true
}
if a.externalMCPMgr != nil && a.externalMCPMgr.CancelToolExecutionWithNote(executionID, note) {
return true
}
return false
}
// extractQuotedToolName 尝试从错误信息中提取被引用的工具名称
func extractQuotedToolName(errMsg string) string {
start := strings.Index(errMsg, "\"")
+1
View File
@@ -757,6 +757,7 @@ func setupRoutes(
// 监控
protected.GET("/monitor", monitorHandler.Monitor)
protected.GET("/monitor/execution/:id", monitorHandler.GetExecution)
protected.POST("/monitor/execution/:id/cancel", monitorHandler.CancelExecution)
protected.POST("/monitor/executions/names", monitorHandler.BatchGetToolNames)
protected.DELETE("/monitor/execution/:id", monitorHandler.DeleteExecution)
protected.DELETE("/monitor/executions", monitorHandler.DeleteExecutions)
+40 -13
View File
@@ -23,12 +23,16 @@ type ExecutionRecorder func(executionID string)
const ToolErrorPrefix = "__CYBERSTRIKE_AI_TOOL_ERROR__\n"
// ToolsFromDefinitions 将单 Agent 使用的 OpenAI 风格工具定义转为 Eino InvokableTool,执行时走 Agent 的 MCP 路径。
// invokeNotify 可选:与 runEinoADKAgentLoop 共享,在 InvokableRun 返回时触发 UI 与 pending 清理(与 ADK Tool 事件去重)。
// einoAgentName 为该套工具所属 ChatModelAgent 的 Name(主代理或子代理 id),用于 SSE 上的 einoAgent 字段。
func ToolsFromDefinitions(
ag *agent.Agent,
holder *ConversationHolder,
defs []agent.Tool,
rec ExecutionRecorder,
toolOutputChunk func(toolName, toolCallID, chunk string),
invokeNotify *ToolInvokeNotifyHolder,
einoAgentName string,
) ([]tool.BaseTool, error) {
out := make([]tool.BaseTool, 0, len(defs))
for _, d := range defs {
@@ -40,12 +44,14 @@ func ToolsFromDefinitions(
return nil, fmt.Errorf("tool %q: %w", d.Function.Name, err)
}
out = append(out, &mcpBridgeTool{
info: info,
name: d.Function.Name,
agent: ag,
holder: holder,
record: rec,
chunk: toolOutputChunk,
info: info,
name: d.Function.Name,
agent: ag,
holder: holder,
record: rec,
chunk: toolOutputChunk,
invokeNotify: invokeNotify,
einoAgentName: strings.TrimSpace(einoAgentName),
})
}
return out, nil
@@ -77,12 +83,14 @@ func toolInfoFromDefinition(d agent.Tool) (*schema.ToolInfo, error) {
}
type mcpBridgeTool struct {
info *schema.ToolInfo
name string
agent *agent.Agent
holder *ConversationHolder
record ExecutionRecorder
chunk func(toolName, toolCallID, chunk string)
info *schema.ToolInfo
name string
agent *agent.Agent
holder *ConversationHolder
record ExecutionRecorder
chunk func(toolName, toolCallID, chunk string)
invokeNotify *ToolInvokeNotifyHolder
einoAgentName string
}
func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
@@ -90,8 +98,27 @@ func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
return m.info, nil
}
func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (out string, err error) {
_ = opts
toolCallID := compose.GetToolCallID(ctx)
defer func() {
if m.invokeNotify == nil {
return
}
tid := strings.TrimSpace(toolCallID)
if tid == "" {
return
}
success := err == nil && !strings.HasPrefix(out, ToolErrorPrefix)
body := out
if err != nil {
success = false
} else if strings.HasPrefix(out, ToolErrorPrefix) {
success = false
body = strings.TrimPrefix(out, ToolErrorPrefix)
}
m.invokeNotify.Fire(tid, m.name, m.einoAgentName, success, body, err)
}()
return runMCPToolInvocation(ctx, m.agent, m.holder, m.name, argumentsInJSON, m.record, m.chunk)
}
+39
View File
@@ -0,0 +1,39 @@
package einomcp
import "sync"
// ToolInvokeNotifyHolder 由 Eino run loop 在迭代开始前 Set 回调;MCP 桥在每次 InvokableRun 结束时 Fire
// 用于在 ADK 未透出 schema.Tool 事件时仍推送 tool_result、清 pending,避免 UI 卡在「执行中」或迭代末 force-close。
type ToolInvokeNotifyHolder struct {
mu sync.RWMutex
fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)
}
// NewToolInvokeNotifyHolder 创建可在 ToolsFromDefinitions 与 run loop 之间共享的 holder。
func NewToolInvokeNotifyHolder() *ToolInvokeNotifyHolder {
return &ToolInvokeNotifyHolder{}
}
// Set 由 runEinoADKAgentLoop 在开始消费 iter 之前调用;可多次覆盖(通常仅一次)。
func (h *ToolInvokeNotifyHolder) Set(fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)) {
if h == nil {
return
}
h.mu.Lock()
defer h.mu.Unlock()
h.fn = fn
}
// Fire 由 mcpBridgeTool 在工具调用返回时调用;若尚未 Set 或 toolCallID 为空则忽略。
func (h *ToolInvokeNotifyHolder) Fire(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
if h == nil {
return
}
h.mu.RLock()
fn := h.fn
h.mu.RUnlock()
if fn == nil {
return
}
fn(toolCallID, toolName, einoAgent, success, content, invokeErr)
}
+107 -25
View File
@@ -19,6 +19,7 @@ import (
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/database"
"cyberstrike-ai/internal/mcp"
"cyberstrike-ai/internal/mcp/builtin"
"cyberstrike-ai/internal/multiagent"
@@ -458,6 +459,57 @@ func appendAttachmentsToMessage(msg string, attachments []ChatAttachment, savedP
return b.String()
}
// appendAssistantMessageNotice 在助手消息末尾追加提示,避免覆盖已生成内容。
// 若消息为空则直接写入提示;若已包含相同提示则保持不变。
func (h *AgentHandler) appendAssistantMessageNotice(messageID, notice string) error {
trimmedNotice := strings.TrimSpace(notice)
if strings.TrimSpace(messageID) == "" || trimmedNotice == "" {
return nil
}
_, err := h.db.Exec(
`UPDATE messages
SET content = CASE
WHEN content IS NULL OR TRIM(content) = '' THEN ?
WHEN INSTR(content, ?) > 0 THEN content
ELSE content || '\n\n' || ?
END,
updated_at = ?
WHERE id = ?`,
trimmedNotice,
trimmedNotice,
trimmedNotice,
time.Now(),
messageID,
)
return err
}
// mergeAssistantMessagePartialOnCancel 将取消前已生成的部分回复尽量合并进消息:
// - content 为空或仅占位(处理中...)时,直接替换为 partial;
// - 已有正文时,仅在尚未包含 partial 时追加,避免丢失与重复。
func (h *AgentHandler) mergeAssistantMessagePartialOnCancel(messageID, partial string) error {
trimmedPartial := strings.TrimSpace(partial)
if strings.TrimSpace(messageID) == "" || trimmedPartial == "" {
return nil
}
_, err := h.db.Exec(
`UPDATE messages
SET content = CASE
WHEN content IS NULL OR TRIM(content) = '' OR TRIM(content) = '处理中...' THEN ?
WHEN INSTR(content, ?) > 0 THEN content
ELSE content || '\n\n' || ?
END,
updated_at = ?
WHERE id = ?`,
trimmedPartial,
trimmedPartial,
trimmedPartial,
time.Now(),
messageID,
)
return err
}
// ChatResponse 聊天响应
type ChatResponse struct {
Response string `json:"response"`
@@ -725,7 +777,9 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI
"deep",
)
if errMA != nil {
h.persistEinoAgentTraceForResume(conversationID, resultMA)
if shouldPersistEinoAgentTraceAfterRunError(ctx) {
h.persistEinoAgentTraceForResume(conversationID, resultMA)
}
errMsg := "执行失败: " + errMA.Error()
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
@@ -1493,6 +1547,8 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
defer timeoutCancel()
defer cancelWithCause(nil)
taskCtx = mcp.WithMCPConversationID(taskCtx, conversationID)
taskCtx = mcp.WithToolRunRegistry(taskCtx, h.tasks)
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = h.injectReactHITLInterceptor(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
@@ -1568,11 +1624,12 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
if assistantMessageID != "" {
if _, updateErr := h.db.Exec(
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
cancelMsg,
time.Now(), assistantMessageID,
); updateErr != nil {
if result != nil {
if updateErr := h.mergeAssistantMessagePartialOnCancel(assistantMessageID, result.Response); updateErr != nil {
h.logger.Warn("合并取消前的部分回复失败", zap.Error(updateErr))
}
}
if updateErr := h.appendAssistantMessageNotice(assistantMessageID, cancelMsg); updateErr != nil {
h.logger.Warn("更新取消后的助手消息失败", zap.Error(updateErr))
}
h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil)
@@ -1717,6 +1774,8 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
var req struct {
ConversationID string `json:"conversationId" binding:"required"`
Reason string `json:"reason,omitempty"`
ContinueAfter bool `json:"continueAfter,omitempty"`
}
if err := c.ShouldBindJSON(&req); err != nil {
@@ -1724,7 +1783,40 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
return
}
ok, err := h.tasks.CancelTask(req.ConversationID, ErrTaskCancelled)
if req.ContinueAfter {
if h.tasks.GetTask(req.ConversationID) == nil {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到正在执行的任务"})
return
}
execID := h.tasks.ActiveMCPExecutionID(req.ConversationID)
if execID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "当前没有正在执行的 MCP 工具(例如模型尚在推理、尚未发起工具调用)。请等待工具开始执行后再试,或使用「彻底停止」结束整轮任务。"})
return
}
note := strings.TrimSpace(req.Reason)
if !h.agent.CancelMCPToolExecutionWithNote(execID, note) {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"})
return
}
h.logger.Info("对话页仅终止当前 MCP 工具",
zap.String("conversationId", req.ConversationID),
zap.String("executionId", execID),
zap.Bool("hasNote", note != ""),
)
c.JSON(http.StatusOK, gin.H{
"status": "tool_abort_requested",
"conversationId": req.ConversationID,
"executionId": execID,
"message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。",
"continueAfter": true,
"interruptWithNote": note != "",
})
return
}
var cause error = ErrTaskCancelled
msg := "已提交取消请求,任务将在当前步骤完成后停止。"
ok, err := h.tasks.CancelTask(req.ConversationID, cause)
if err != nil {
h.logger.Error("取消任务失败", zap.Error(err))
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
@@ -1737,9 +1829,11 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{
"status": "cancelling",
"status": "cancelling",
"conversationId": req.ConversationID,
"message": "已提交取消请求,任务将在当前步骤完成后停止。",
"message": msg,
"continueAfter": false,
"interruptWithNote": false,
})
}
@@ -2517,6 +2611,8 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
// 创建进度回调函数:写 DB + 镜像到 task-events,支持刷新后继续流式展示。
progressCallback = h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = mcp.WithMCPConversationID(taskCtx, conversationID)
taskCtx = mcp.WithToolRunRegistry(taskCtx, h.tasks)
// 使用队列配置的角色工具列表(如果为空,表示使用所有工具)
useBatchMulti := false
@@ -2556,7 +2652,7 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
}
if runErr != nil {
if useRunResult {
if useRunResult && shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(conversationID, resultMA)
}
// 检查是否是取消错误
@@ -2594,11 +2690,7 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
}
// 更新助手消息内容
if assistantMessageID != "" {
if _, updateErr := h.db.Exec(
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
cancelMsg,
time.Now(), assistantMessageID,
); updateErr != nil {
if updateErr := h.appendAssistantMessageNotice(assistantMessageID, cancelMsg); updateErr != nil {
h.logger.Warn("更新取消后的助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr))
}
// 保存取消详情到数据库
@@ -2612,16 +2704,6 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
h.logger.Warn("保存取消消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(errMsg))
}
}
// 保存代理轨迹(如果存在)
if result != nil && (result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "") {
if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil {
h.logger.Warn("保存取消任务的代理轨迹失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(err))
}
} else if useRunResult && resultMA != nil && (resultMA.LastAgentTraceInput != "" || resultMA.LastAgentTraceOutput != "") {
if err := h.db.SaveAgentTrace(conversationID, resultMA.LastAgentTraceInput, resultMA.LastAgentTraceOutput); err != nil {
h.logger.Warn("保存取消任务的代理轨迹失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(err))
}
}
h.batchTaskManager.UpdateTaskStatusWithConversationID(queueID, task.ID, "cancelled", cancelMsg, "", conversationID)
} else {
h.logger.Error("批量任务执行失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("conversationId", conversationID), zap.Error(runErr))
+70 -37
View File
@@ -10,6 +10,7 @@ import (
"sync"
"time"
"cyberstrike-ai/internal/mcp"
"cyberstrike-ai/internal/multiagent"
"github.com/gin-gonic/gin"
@@ -43,8 +44,11 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
var sseWriteMu sync.Mutex
var ssePublishConversationID string
sendEvent := func(eventType, message string, data interface{}) {
if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) {
return
if eventType == "error" && baseCtx != nil {
cause := context.Cause(baseCtx)
if errors.Is(cause, ErrTaskCancelled) {
return
}
}
ev := StreamEvent{Type: eventType, Message: message, Data: data}
b, errMarshal := json.Marshal(ev)
@@ -114,36 +118,19 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
}
var cancelWithCause context.CancelCauseFunc
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
defer timeoutCancel()
defer cancelWithCause(nil)
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。"
sendEvent("error", errorMsg, map[string]interface{}{
"conversationId": conversationID,
"errorType": "task_already_running",
})
} else {
errorMsg = "❌ 无法启动任务: " + err.Error()
sendEvent("error", errorMsg, nil)
}
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
return
}
curFinalMessage := prep.FinalMessage
curHistory := prep.History
roleTools := prep.RoleTools
taskStatus := "completed"
defer h.tasks.FinishTask(conversationID, taskStatus)
// 仅在成功 StartTask 后再 FinishTask。若 StartTask 因 ErrTaskAlreadyRunning 失败仍 defer FinishTask
// 会误删其他连接上正在运行的同会话任务,导致「第一次拦截、第二次却放行」。
taskOwned := false
defer func() {
if taskOwned {
h.tasks.FinishTask(conversationID, taskStatus)
}
}()
sendEvent("progress", "正在启动 Eino ADK 单代理(ChatModelAgent...", map[string]interface{}{
"conversationId": conversationID,
@@ -161,28 +148,72 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
return
}
result, runErr := multiagent.RunEinoSingleChatModelAgent(
var result *multiagent.RunResult
var runErr error
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。"
sendEvent("error", errorMsg, map[string]interface{}{
"conversationId": conversationID,
"errorType": "task_already_running",
})
} else {
errorMsg = "❌ 无法启动任务: " + err.Error()
sendEvent("error", errorMsg, nil)
}
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
timeoutCancel()
return
}
taskOwned = true
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = mcp.WithMCPConversationID(taskCtx, conversationID)
taskCtx = mcp.WithToolRunRegistry(taskCtx, h.tasks)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
result, runErr = multiagent.RunEinoSingleChatModelAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
prep.FinalMessage,
prep.History,
prep.RoleTools,
curFinalMessage,
curHistory,
roleTools,
progressCallback,
)
timeoutCancel()
if runErr != nil {
h.persistEinoAgentTraceForResume(conversationID, result)
cause := context.Cause(baseCtx)
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(conversationID, result)
}
if errors.Is(cause, ErrTaskCancelled) {
taskStatus = "cancelled"
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
cancelMsg := "任务已被用户取消,后续操作已停止。"
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, time.Now(), assistantMessageID)
if result != nil {
if err := h.mergeAssistantMessagePartialOnCancel(assistantMessageID, result.Response); err != nil {
h.logger.Warn("合并取消前的部分回复失败", zap.Error(err))
}
}
if err := h.appendAssistantMessageNotice(assistantMessageID, cancelMsg); err != nil {
h.logger.Warn("更新取消后的助手消息失败", zap.Error(err))
}
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil)
}
sendEvent("cancelled", cancelMsg, map[string]interface{}{
@@ -308,7 +339,9 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) {
progressCallback,
)
if runErr != nil {
h.persistEinoAgentTraceForResume(prep.ConversationID, result)
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(prep.ConversationID, result)
}
c.JSON(http.StatusInternalServerError, gin.H{"error": runErr.Error()})
return
}
+36 -2
View File
@@ -1,6 +1,9 @@
package handler
import (
"encoding/json"
"errors"
"io"
"net/http"
"strconv"
"strings"
@@ -245,6 +248,37 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "执行记录未找到"})
}
// CancelExecution 手动取消进行中的 MCP 工具调用(仅取消该次 tools/call 的上下文,不停止整条 Agent / 迭代任务)
// 请求体可选 JSON{ "note": "用户说明" },将与工具已返回输出合并交给模型(含「用户终止说明」标题块,与命令行原文区分)。
func (h *MonitorHandler) CancelExecution(c *gin.Context) {
id := c.Param("id")
if id == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "执行记录ID不能为空"})
return
}
note := ""
dec := json.NewDecoder(c.Request.Body)
var body struct {
Note string `json:"note"`
}
if err := dec.Decode(&body); err != nil && !errors.Is(err, io.EOF) {
c.JSON(http.StatusBadRequest, gin.H{"error": "请求体须为 JSON,例如 {\"note\":\"说明\"},可为空对象"})
return
}
note = strings.TrimSpace(body.Note)
if h.mcpServer.CancelToolExecutionWithNote(id, note) {
h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "internal"), zap.Bool("hasNote", note != ""))
c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id})
return
}
if h.externalMCPMgr != nil && h.externalMCPMgr.CancelToolExecutionWithNote(id, note) {
h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "external"), zap.Bool("hasNote", note != ""))
c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id})
return
}
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行,或该任务已结束"})
}
// BatchGetToolNames 批量获取工具执行的工具名称(消除前端 N+1 请求)
func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) {
var req struct {
@@ -317,7 +351,7 @@ func (h *MonitorHandler) DeleteExecution(c *gin.Context) {
totalCalls := 1
successCalls := 0
failedCalls := 0
if exec.Status == "failed" {
if exec.Status == "failed" || exec.Status == "cancelled" {
failedCalls = 1
} else if exec.Status == "completed" {
successCalls = 1
@@ -381,7 +415,7 @@ func (h *MonitorHandler) DeleteExecutions(c *gin.Context) {
stats := toolStats[exec.ToolName]
stats.totalCalls++
if exec.Status == "failed" {
if exec.Status == "failed" || exec.Status == "cancelled" {
stats.failedCalls++
} else if exec.Status == "completed" {
stats.successCalls++
+60 -26
View File
@@ -11,6 +11,7 @@ import (
"time"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/mcp"
"cyberstrike-ai/internal/multiagent"
"github.com/gin-gonic/gin"
@@ -60,8 +61,11 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
sendEvent := func(eventType, message string, data interface{}) {
// 用户主动停止时,Eino 可能仍会并发上报 eventType=="error"。
// 为避免 UI 看到“取消错误 + cancelled 文案”两条回复,这里直接丢弃取消对应的 error。
if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) {
return
if eventType == "error" && baseCtx != nil {
cause := context.Cause(baseCtx)
if errors.Is(cause, ErrTaskCancelled) {
return
}
}
ev := StreamEvent{Type: eventType, Message: message, Data: data}
b, errMarshal := json.Marshal(ev)
@@ -130,15 +134,35 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
})
}
baseCtx, cancelWithCause := context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
defer timeoutCancel()
defer cancelWithCause(nil)
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
var cancelWithCause context.CancelCauseFunc
curFinalMessage := prep.FinalMessage
curHistory := prep.History
roleTools := prep.RoleTools
orch := strings.TrimSpace(req.Orchestration)
taskStatus := "completed"
// 仅在成功 StartTask 后再 FinishTask;避免「任务已存在」分支 return 时误删正在运行的同会话任务。
taskOwned := false
defer func() {
if taskOwned {
h.tasks.FinishTask(conversationID, taskStatus)
}
}()
sendEvent("progress", "正在启动 Eino 多代理...", map[string]interface{}{
"conversationId": conversationID,
})
stopKeepalive := make(chan struct{})
go sseKeepalive(c, stopKeepalive, &sseWriteMu)
defer close(stopKeepalive)
var result *multiagent.RunResult
var runErr error
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
@@ -155,44 +179,52 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
timeoutCancel()
return
}
taskOwned = true
taskStatus := "completed"
defer h.tasks.FinishTask(conversationID, taskStatus)
sendEvent("progress", "正在启动 Eino 多代理...", map[string]interface{}{
"conversationId": conversationID,
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = mcp.WithMCPConversationID(taskCtx, conversationID)
taskCtx = mcp.WithToolRunRegistry(taskCtx, h.tasks)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
stopKeepalive := make(chan struct{})
go sseKeepalive(c, stopKeepalive, &sseWriteMu)
defer close(stopKeepalive)
result, runErr := multiagent.RunDeepAgent(
result, runErr = multiagent.RunDeepAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
prep.FinalMessage,
prep.History,
prep.RoleTools,
curFinalMessage,
curHistory,
roleTools,
progressCallback,
h.agentsMarkdownDir,
strings.TrimSpace(req.Orchestration),
orch,
)
timeoutCancel()
if runErr != nil {
h.persistEinoAgentTraceForResume(conversationID, result)
cause := context.Cause(baseCtx)
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(conversationID, result)
}
if errors.Is(cause, ErrTaskCancelled) {
taskStatus = "cancelled"
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
cancelMsg := "任务已被用户取消,后续操作已停止。"
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, time.Now(), assistantMessageID)
if result != nil {
if err := h.mergeAssistantMessagePartialOnCancel(assistantMessageID, result.Response); err != nil {
h.logger.Warn("合并取消前的部分回复失败", zap.Error(err))
}
}
if err := h.appendAssistantMessageNotice(assistantMessageID, cancelMsg); err != nil {
h.logger.Warn("更新取消后的助手消息失败", zap.Error(err))
}
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil)
}
sendEvent("cancelled", cancelMsg, map[string]interface{}{
@@ -320,7 +352,9 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
strings.TrimSpace(req.Orchestration),
)
if runErr != nil {
h.persistEinoAgentTraceForResume(prep.ConversationID, result)
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(prep.ConversationID, result)
}
h.logger.Error("Eino DeepAgent 执行失败", zap.Error(runErr))
errMsg := "执行失败: " + runErr.Error()
if prep.AssistantMessageID != "" {
+57
View File
@@ -461,6 +461,14 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"type": "string",
"description": "对话ID",
},
"reason": map[string]interface{}{
"type": "string",
"description": "可选。与 MCP 监控页「终止并说明」一致:非空时合并进当前工具返回给模型的文本(含 USER INTERRUPT NOTE 块)",
},
"continueAfter": map[string]interface{}{
"type": "boolean",
"description": "为 true 时仅终止当前进行中的 MCP 工具调用(不取消整轮任务);须已有工具在执行,否则 400",
},
},
},
"AgentTask": map[string]interface{}{
@@ -3318,6 +3326,55 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
},
},
},
"/api/monitor/execution/{id}/cancel": map[string]interface{}{
"post": map[string]interface{}{
"tags": []string{"监控"},
"summary": "取消进行中的工具执行",
"description": "对当前进程内正在执行的 MCP 工具调用发送 context 取消信号;上层对话/多步任务可继续。若执行已结束或未在本进程内运行则返回 404。",
"operationId": "cancelExecution",
"parameters": []map[string]interface{}{
{
"name": "id",
"in": "path",
"required": true,
"description": "执行ID",
"schema": map[string]interface{}{
"type": "string",
},
},
},
"requestBody": map[string]interface{}{
"required": false,
"content": map[string]interface{}{
"application/json": map[string]interface{}{
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"note": map[string]interface{}{
"type": "string",
"description": "可选。非空时与工具已返回输出合并交给大模型,并带有「用户终止说明」标题块以便与命令行原文区分",
},
},
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "已发送终止信号",
},
"400": map[string]interface{}{
"description": "请求体不是合法 JSON",
},
"404": map[string]interface{}{
"description": "未找到进行中的工具执行",
},
"401": map[string]interface{}{
"description": "未授权",
},
},
},
},
"/api/monitor/executions": map[string]interface{}{
"delete": map[string]interface{}{
"tags": []string{"监控"},
+55
View File
@@ -3,6 +3,7 @@ package handler
import (
"context"
"errors"
"strings"
"sync"
"time"
)
@@ -13,6 +14,13 @@ var ErrTaskCancelled = errors.New("agent task cancelled by user")
// ErrTaskAlreadyRunning 会话已有任务正在执行
var ErrTaskAlreadyRunning = errors.New("agent task already running for conversation")
// shouldPersistEinoAgentTraceAfterRunErrorEino 相关 Run 非成功返回时,是否仍写入 last_react_* 供下轮 loadHistoryFromAgentTrace。
// 当前策略:无论正常结束、异常结束或用户主动停止,都尽量保留最后可用轨迹,
// 以便在同一会话继续时可基于原始上下文续跑,而不是回退到仅消息文本历史。
func shouldPersistEinoAgentTraceAfterRunError(baseCtx context.Context) bool {
return true
}
// AgentTask 描述正在运行的Agent任务
type AgentTask struct {
ConversationID string `json:"conversationId"`
@@ -21,9 +29,56 @@ type AgentTask struct {
Status string `json:"status"`
CancellingAt time.Time `json:"-"` // 进入 cancelling 状态的时间,用于清理长时间卡住的任务
// ActiveMCPExecutionID 当前正在执行的 MCP 工具 executionId(仅内存,供「中断并继续」= 仅掐当前工具)
ActiveMCPExecutionID string `json:"-"`
cancel func(error)
}
// RegisterRunningTool 实现 mcp.ToolRunRegistry:工具开始时登记本会话当前 executionId。
func (m *AgentTaskManager) RegisterRunningTool(conversationID, executionID string) {
conversationID = strings.TrimSpace(conversationID)
executionID = strings.TrimSpace(executionID)
if conversationID == "" || executionID == "" {
return
}
m.mu.Lock()
defer m.mu.Unlock()
if t, ok := m.tasks[conversationID]; ok && t != nil {
t.ActiveMCPExecutionID = executionID
}
}
// UnregisterRunningTool 工具结束时清除登记(仅当 id 仍匹配时清除,避免并发串单)。
func (m *AgentTaskManager) UnregisterRunningTool(conversationID, executionID string) {
conversationID = strings.TrimSpace(conversationID)
executionID = strings.TrimSpace(executionID)
if conversationID == "" || executionID == "" {
return
}
m.mu.Lock()
defer m.mu.Unlock()
if t, ok := m.tasks[conversationID]; ok && t != nil {
if t.ActiveMCPExecutionID == executionID {
t.ActiveMCPExecutionID = ""
}
}
}
// ActiveMCPExecutionID 返回当前会话进行中的工具 executionId,无则空串。
func (m *AgentTaskManager) ActiveMCPExecutionID(conversationID string) string {
conversationID = strings.TrimSpace(conversationID)
if conversationID == "" {
return ""
}
m.mu.RLock()
defer m.mu.RUnlock()
if t, ok := m.tasks[conversationID]; ok && t != nil {
return strings.TrimSpace(t.ActiveMCPExecutionID)
}
return ""
}
// CompletedTask 已完成的任务(用于历史记录)
type CompletedTask struct {
ConversationID string `json:"conversationId"`
+119 -18
View File
@@ -32,6 +32,8 @@ type ExternalMCPManager struct {
refreshWg sync.WaitGroup // 等待后台刷新goroutine完成
refreshing atomic.Bool // 防止 refreshToolCounts 并发堆积
mu sync.RWMutex
runningCancels map[string]context.CancelFunc
abortUserNotes map[string]string
}
// NewExternalMCPManager 创建外部MCP管理器
@@ -42,16 +44,18 @@ func NewExternalMCPManager(logger *zap.Logger) *ExternalMCPManager {
// NewExternalMCPManagerWithStorage 创建外部MCP管理器(带持久化存储)
func NewExternalMCPManagerWithStorage(logger *zap.Logger, storage MonitorStorage) *ExternalMCPManager {
manager := &ExternalMCPManager{
clients: make(map[string]ExternalMCPClient),
configs: make(map[string]config.ExternalMCPServerConfig),
logger: logger,
storage: storage,
executions: make(map[string]*ToolExecution),
stats: make(map[string]*ToolStats),
errors: make(map[string]string),
toolCounts: make(map[string]int),
toolCache: make(map[string][]Tool),
stopRefresh: make(chan struct{}),
clients: make(map[string]ExternalMCPClient),
configs: make(map[string]config.ExternalMCPServerConfig),
logger: logger,
storage: storage,
executions: make(map[string]*ToolExecution),
stats: make(map[string]*ToolStats),
errors: make(map[string]string),
toolCounts: make(map[string]int),
toolCache: make(map[string][]Tool),
stopRefresh: make(chan struct{}),
runningCancels: make(map[string]context.CancelFunc),
abortUserNotes: make(map[string]string),
}
// 启动后台刷新工具数量的goroutine
manager.startToolCountRefresh()
@@ -452,8 +456,18 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args
}
}
execCtx, runCancel := context.WithCancel(ctx)
m.registerRunningCancel(executionID, runCancel)
notifyToolRunBegin(ctx, executionID)
defer func() {
notifyToolRunEnd(ctx, executionID)
runCancel()
m.unregisterRunningCancel(executionID)
}()
// 调用工具
result, err := client.CallTool(ctx, actualToolName, args)
result, err := client.CallTool(execCtx, actualToolName, args)
cancelledWithUserNote := m.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err)
// 更新执行记录
m.mu.Lock()
@@ -462,16 +476,23 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args
execution.Duration = now.Sub(execution.StartTime)
if err != nil {
execution.Status = "failed"
execution.Error = err.Error()
st, msg := executionStatusAndMessage(err)
execution.Status = st
execution.Error = msg
} else if result != nil && result.IsError {
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
if cancelledWithUserNote {
execution.Status = "cancelled"
execution.Error = ""
execution.Result = result
} else {
execution.Error = "工具执行返回错误结果"
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
} else {
execution.Error = "工具执行返回错误结果"
}
execution.Result = result
}
execution.Result = result
} else {
execution.Status = "completed"
if result == nil {
@@ -509,6 +530,50 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args
return result, executionID, nil
}
func (m *ExternalMCPManager) applyAbortUserNoteToCancelledToolResult(executionID string, result **ToolResult, err *error) (cancelledWithUserNote bool) {
note := strings.TrimSpace(m.readAbortUserNote(executionID))
if note == "" {
return false
}
hasErr := err != nil && *err != nil
hasRes := result != nil && *result != nil
if !hasErr && !hasRes {
return false
}
_ = m.takeAbortUserNote(executionID)
partial := ""
if hasRes {
partial = ToolResultPlainText(*result)
}
if partial == "" && hasErr {
partial = (*err).Error()
}
merged := MergePartialToolOutputAndAbortNote(partial, note)
*err = nil
*result = &ToolResult{Content: []Content{{Type: "text", Text: merged}}, IsError: true}
return true
}
func (m *ExternalMCPManager) readAbortUserNote(id string) string {
m.mu.Lock()
defer m.mu.Unlock()
if m.abortUserNotes == nil {
return ""
}
return m.abortUserNotes[id]
}
func (m *ExternalMCPManager) takeAbortUserNote(id string) string {
m.mu.Lock()
defer m.mu.Unlock()
if m.abortUserNotes == nil {
return ""
}
n := m.abortUserNotes[id]
delete(m.abortUserNotes, id)
return n
}
// cleanupOldExecutions 清理旧的执行记录(保持内存中的记录数量在限制内)
func (m *ExternalMCPManager) cleanupOldExecutions() {
const maxExecutionsInMemory = 1000
@@ -562,6 +627,42 @@ func (m *ExternalMCPManager) GetExecution(id string) (*ToolExecution, bool) {
return nil, false
}
func (m *ExternalMCPManager) registerRunningCancel(id string, cancel context.CancelFunc) {
m.mu.Lock()
m.runningCancels[id] = cancel
m.mu.Unlock()
}
func (m *ExternalMCPManager) unregisterRunningCancel(id string) {
m.mu.Lock()
delete(m.runningCancels, id)
m.mu.Unlock()
}
// CancelToolExecutionWithNote 取消外部 MCP 工具;note 非空时与已返回输出合并后交给模型。
func (m *ExternalMCPManager) CancelToolExecutionWithNote(id string, note string) bool {
m.mu.Lock()
cancel, ok := m.runningCancels[id]
if !ok || cancel == nil {
m.mu.Unlock()
return false
}
if strings.TrimSpace(note) != "" {
if m.abortUserNotes == nil {
m.abortUserNotes = make(map[string]string)
}
m.abortUserNotes[id] = strings.TrimSpace(note)
}
m.mu.Unlock()
cancel()
return true
}
// CancelToolExecution 取消正在执行的外部 MCP 工具(无用户说明)。
func (m *ExternalMCPManager) CancelToolExecution(id string) bool {
return m.CancelToolExecutionWithNote(id, "")
}
// updateStats 更新统计信息
func (m *ExternalMCPManager) updateStats(toolName string, failed bool) {
now := time.Now()
+77
View File
@@ -0,0 +1,77 @@
package mcp
import (
"context"
"strings"
)
// ToolRunRegistry 在工具开始/结束时登记当前 executionId,供对话页「仅终止当前工具」与监控页共用取消逻辑。
type ToolRunRegistry interface {
RegisterRunningTool(conversationID, executionID string)
UnregisterRunningTool(conversationID, executionID string)
}
type toolRunRegistryCtxKey struct{}
type mcpConversationIDCtxKey struct{}
// WithToolRunRegistry 将登记器注入 ctxEino / 原生 Agent 任务 ctx)。
func WithToolRunRegistry(ctx context.Context, reg ToolRunRegistry) context.Context {
if ctx == nil || reg == nil {
return ctx
}
return context.WithValue(ctx, toolRunRegistryCtxKey{}, reg)
}
// ToolRunRegistryFromContext 取出登记器(无则 nil)。
func ToolRunRegistryFromContext(ctx context.Context) ToolRunRegistry {
if ctx == nil {
return nil
}
v, _ := ctx.Value(toolRunRegistryCtxKey{}).(ToolRunRegistry)
return v
}
// WithMCPConversationID 将对话 ID 注入 ctx,供 CallTool 内与 executionId 关联。
func WithMCPConversationID(ctx context.Context, conversationID string) context.Context {
if ctx == nil {
return nil
}
id := strings.TrimSpace(conversationID)
if id == "" {
return ctx
}
return context.WithValue(ctx, mcpConversationIDCtxKey{}, id)
}
// MCPConversationIDFromContext 读取对话 ID。
func MCPConversationIDFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
v, _ := ctx.Value(mcpConversationIDCtxKey{}).(string)
return v
}
func notifyToolRunBegin(ctx context.Context, executionID string) {
reg := ToolRunRegistryFromContext(ctx)
if reg == nil {
return
}
conv := MCPConversationIDFromContext(ctx)
if conv == "" || strings.TrimSpace(executionID) == "" {
return
}
reg.RegisterRunningTool(conv, executionID)
}
func notifyToolRunEnd(ctx context.Context, executionID string) {
reg := ToolRunRegistryFromContext(ctx)
if reg == nil {
return
}
conv := MCPConversationIDFromContext(ctx)
if conv == "" || strings.TrimSpace(executionID) == "" {
return
}
reg.UnregisterRunningTool(conv, executionID)
}
+198 -22
View File
@@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@@ -40,6 +41,9 @@ type Server struct {
logger *zap.Logger
maxExecutionsInMemory int // 内存中最大执行记录数
sseClients map[string]*sseClient
runningCancels map[string]context.CancelFunc
runningCancelsMu sync.Mutex
abortUserNotes map[string]string // 监控页终止时附带的用户说明,与 executionID 对应
}
type sseClient struct {
@@ -50,6 +54,13 @@ type sseClient struct {
// ToolHandler 工具处理函数
type ToolHandler func(ctx context.Context, args map[string]interface{}) (*ToolResult, error)
func executionStatusAndMessage(err error) (status string, errMsg string) {
if errors.Is(err, context.Canceled) {
return "cancelled", "已手动终止(MCP 监控)"
}
return "failed", err.Error()
}
// NewServer 创建新的MCP服务器
func NewServer(logger *zap.Logger) *Server {
return NewServerWithStorage(logger, nil)
@@ -68,6 +79,8 @@ func NewServerWithStorage(logger *zap.Logger, storage MonitorStorage) *Server {
logger: logger,
maxExecutionsInMemory: 1000, // 默认最多在内存中保留1000条执行记录
sseClients: make(map[string]*sseClient),
runningCancels: make(map[string]context.CancelFunc),
abortUserNotes: make(map[string]string),
}
// 初始化默认提示词和资源
@@ -444,15 +457,22 @@ func (s *Server) handleCallTool(msg *Message) *Message {
}
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
baseCtx, timeoutCancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer timeoutCancel()
execCtx, runCancel := context.WithCancel(baseCtx)
s.registerRunningCancel(executionID, runCancel)
defer func() {
runCancel()
s.unregisterRunningCancel(executionID)
}()
s.logger.Info("开始执行工具",
zap.String("toolName", req.Name),
zap.Any("arguments", req.Arguments),
)
result, err := handler(ctx, req.Arguments)
result, err := handler(execCtx, req.Arguments)
cancelledWithUserNote := s.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err)
now := time.Now()
var failed bool
var finalResult *ToolResult
@@ -462,18 +482,26 @@ func (s *Server) handleCallTool(msg *Message) *Message {
execution.Duration = now.Sub(execution.StartTime)
if err != nil {
execution.Status = "failed"
execution.Error = err.Error()
st, msg := executionStatusAndMessage(err)
execution.Status = st
execution.Error = msg
failed = true
} else if result != nil && result.IsError {
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
if cancelledWithUserNote {
execution.Status = "cancelled"
execution.Error = ""
execution.Result = result
failed = true
} else {
execution.Error = "工具执行返回错误结果"
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
} else {
execution.Error = "工具执行返回错误结果"
}
execution.Result = result
failed = true
}
execution.Result = result
failed = true
} else {
execution.Status = "completed"
if result == nil {
@@ -510,9 +538,13 @@ func (s *Server) handleCallTool(msg *Message) *Message {
zap.Error(err),
)
errText := fmt.Sprintf("工具执行失败: %v", err)
if errors.Is(err, context.Canceled) {
errText = "工具执行已手动终止(MCP 监控)。后续编排步骤可继续。"
}
errorResult, _ := json.Marshal(CallToolResponse{
Content: []Content{
{Type: "text", Text: fmt.Sprintf("工具执行失败: %v", err)},
{Type: "text", Text: errText},
},
IsError: true,
})
@@ -769,7 +801,17 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
}
}
result, err := handler(ctx, args)
execCtx, runCancel := context.WithCancel(ctx)
s.registerRunningCancel(executionID, runCancel)
notifyToolRunBegin(ctx, executionID)
defer func() {
notifyToolRunEnd(ctx, executionID)
runCancel()
s.unregisterRunningCancel(executionID)
}()
result, err := handler(execCtx, args)
cancelledWithUserNote := s.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err)
s.mu.Lock()
now := time.Now()
@@ -779,19 +821,28 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
var finalResult *ToolResult
if err != nil {
execution.Status = "failed"
execution.Error = err.Error()
st, msg := executionStatusAndMessage(err)
execution.Status = st
execution.Error = msg
failed = true
} else if result != nil && result.IsError {
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
if cancelledWithUserNote {
execution.Status = "cancelled"
execution.Error = ""
execution.Result = result
failed = true
finalResult = result
} else {
execution.Error = "工具执行返回错误结果"
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
} else {
execution.Error = "工具执行返回错误结果"
}
execution.Result = result
failed = true
finalResult = result
}
execution.Result = result
failed = true
finalResult = result
} else {
execution.Status = "completed"
if result == nil {
@@ -832,6 +883,49 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
return finalResult, executionID, nil
}
// RecordCompletedToolInvocation 将已在其它路径完成的工具调用写入监控存储(格式与 CallTool 结束后一致),
// 用于 Eino ADK filesystem execute 等未经过 CallTool 的场景;返回 executionId 供助手消息 mcpExecutionIds 关联。
func (s *Server) RecordCompletedToolInvocation(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
if s == nil {
return ""
}
if args == nil {
args = map[string]interface{}{}
}
executionID := uuid.New().String()
now := time.Now()
failed := invokeErr != nil
exec := &ToolExecution{
ID: executionID,
ToolName: toolName,
Arguments: args,
StartTime: now,
EndTime: &now,
Duration: 0,
}
if failed {
exec.Status = "failed"
exec.Error = invokeErr.Error()
if strings.TrimSpace(resultText) != "" {
exec.Result = &ToolResult{Content: []Content{{Type: "text", Text: resultText}}}
}
} else {
exec.Status = "completed"
text := resultText
if strings.TrimSpace(text) == "" {
text = "(无输出)"
}
exec.Result = &ToolResult{Content: []Content{{Type: "text", Text: text}}}
}
if s.storage != nil {
if err := s.storage.SaveToolExecution(exec); err != nil {
s.logger.Warn("RecordCompletedToolInvocation 保存失败", zap.Error(err))
}
}
s.updateStats(toolName, failed)
return executionID
}
// cleanupOldExecutions 清理旧的执行记录,防止内存无限增长
func (s *Server) cleanupOldExecutions() {
if len(s.executions) <= s.maxExecutionsInMemory {
@@ -869,6 +963,88 @@ func (s *Server) cleanupOldExecutions() {
)
}
func (s *Server) registerRunningCancel(id string, cancel context.CancelFunc) {
s.runningCancelsMu.Lock()
s.runningCancels[id] = cancel
s.runningCancelsMu.Unlock()
}
func (s *Server) unregisterRunningCancel(id string) {
s.runningCancelsMu.Lock()
delete(s.runningCancels, id)
s.runningCancelsMu.Unlock()
}
func (s *Server) readAbortUserNote(id string) string {
s.runningCancelsMu.Lock()
defer s.runningCancelsMu.Unlock()
if s.abortUserNotes == nil {
return ""
}
return s.abortUserNotes[id]
}
func (s *Server) takeAbortUserNote(id string) string {
s.runningCancelsMu.Lock()
defer s.runningCancelsMu.Unlock()
if s.abortUserNotes == nil {
return ""
}
n := s.abortUserNotes[id]
delete(s.abortUserNotes, id)
return n
}
// applyAbortUserNoteToCancelledToolResult 监控页「终止并填写说明」时合并「工具已输出 + 用户说明」交给模型。
// exec 等工具会把失败写在 *ToolResult 里并返回 err==nil,若仅在 err!=nil 时合并会漏掉说明,甚至误 clear 掉 note。
func (s *Server) applyAbortUserNoteToCancelledToolResult(executionID string, result **ToolResult, err *error) (cancelledWithUserNote bool) {
note := strings.TrimSpace(s.readAbortUserNote(executionID))
if note == "" {
return false
}
hasErr := err != nil && *err != nil
hasRes := result != nil && *result != nil
if !hasErr && !hasRes {
return false
}
_ = s.takeAbortUserNote(executionID)
partial := ""
if hasRes {
partial = ToolResultPlainText(*result)
}
if partial == "" && hasErr {
partial = (*err).Error()
}
merged := MergePartialToolOutputAndAbortNote(partial, note)
*err = nil
*result = &ToolResult{Content: []Content{{Type: "text", Text: merged}}, IsError: true}
return true
}
// CancelToolExecutionWithNote 取消内部工具;note 非空时与工具已返回文本合并后交给上层模型。
func (s *Server) CancelToolExecutionWithNote(id string, note string) bool {
s.runningCancelsMu.Lock()
cancel, ok := s.runningCancels[id]
if !ok || cancel == nil {
s.runningCancelsMu.Unlock()
return false
}
if strings.TrimSpace(note) != "" {
if s.abortUserNotes == nil {
s.abortUserNotes = make(map[string]string)
}
s.abortUserNotes[id] = strings.TrimSpace(note)
}
s.runningCancelsMu.Unlock()
cancel()
return true
}
// CancelToolExecution 取消正在执行的内部工具调用(无用户说明)。
func (s *Server) CancelToolExecution(id string) bool {
return s.CancelToolExecutionWithNote(id, "")
}
// initDefaultPrompts 初始化默认提示词模板
func (s *Server) initDefaultPrompts() {
s.mu.Lock()
+35 -1
View File
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
)
@@ -192,7 +193,7 @@ type ToolExecution struct {
ID string `json:"id"`
ToolName string `json:"toolName"`
Arguments map[string]interface{} `json:"arguments"`
Status string `json:"status"` // pending, running, completed, failed
Status string `json:"status"` // pending, running, completed, failed, cancelled
Result *ToolResult `json:"result,omitempty"`
Error string `json:"error,omitempty"`
StartTime time.Time `json:"startTime"`
@@ -293,3 +294,36 @@ type SamplingContent struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
}
// ToolResultPlainText 拼接工具结果中的文本(手动终止时作为「工具原始输出」)。
func ToolResultPlainText(r *ToolResult) string {
if r == nil || len(r.Content) == 0 {
return ""
}
var b strings.Builder
for _, c := range r.Content {
b.WriteString(c.Text)
}
return strings.TrimSpace(b.String())
}
// AbortNoteBannerForModel 标出后续文本来自「用户手动终止工具时在弹窗中填写」,避免与 stdout/stderr 混淆。
const AbortNoteBannerForModel = "---\n" +
"【用户终止说明|USER INTERRUPT NOTE】\n" +
"(以下由操作者填写,用于指示模型如何继续;不是工具原始输出。)\n" +
"Written by the operator when stopping this tool; not raw tool output.\n" +
"---"
// MergePartialToolOutputAndAbortNote 格式:工具原始输出 + 醒目标题 + 用户终止说明(无说明则原样返回 partial)。
func MergePartialToolOutputAndAbortNote(partial, userNote string) string {
partial = strings.TrimSpace(partial)
userNote = strings.TrimSpace(userNote)
if userNote == "" {
return partial
}
section := AbortNoteBannerForModel + "\n" + userNote
if partial == "" {
return section
}
return partial + "\n\n" + section
}
+308 -71
View File
@@ -11,6 +11,7 @@ import (
"strings"
"sync"
"sync/atomic"
"unicode/utf8"
"cyberstrike-ai/internal/einomcp"
@@ -19,6 +20,26 @@ import (
"go.uber.org/zap"
)
// normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。
// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现重复文本。
//
// 注意:与 internal/openai.normalizeStreamingDelta 保持一致。
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
if incoming == "" {
return current, ""
}
if current == "" {
return incoming, incoming
}
if strings.HasPrefix(incoming, current) && len(incoming) > len(current) {
return incoming, incoming[len(current):]
}
if incoming == current && utf8.RuneCountInString(current) > 1 {
return current, ""
}
return current + incoming, incoming
}
func isEinoIterationLimitError(err error) bool {
if err == nil {
return false
@@ -49,6 +70,9 @@ type einoADKRunLoopArgs struct {
McpIDsMu *sync.Mutex
McpIDs *[]string
// ToolInvokeNotify 与 einomcp.ToolsFromDefinitions 共享:run loop 在迭代前 SetMCP 桥 Fire 以补全 tool_result。
ToolInvokeNotify *einomcp.ToolInvokeNotifyHolder
DA adk.Agent
// EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。
@@ -190,6 +214,63 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
pendingQueueByAgent = make(map[string][]string)
}
// 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。
var executeStdoutDupMu sync.Mutex
var pendingExecuteStdoutDup string
recordPendingExecuteStdoutDup := func(toolName, stdout string, isErr bool) {
if isErr || !strings.EqualFold(strings.TrimSpace(toolName), "execute") {
return
}
t := strings.TrimSpace(stdout)
if t == "" {
return
}
executeStdoutDupMu.Lock()
pendingExecuteStdoutDup = t
executeStdoutDupMu.Unlock()
}
var toolResultSent sync.Map // toolCallID -> struct{};与 ADK Tool 消息去重,避免 bridge 与事件流各推一次
if args.ToolInvokeNotify != nil {
args.ToolInvokeNotify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
tid := strings.TrimSpace(toolCallID)
removePendingByID(tid)
if tid == "" || progress == nil {
return
}
if _, loaded := toolResultSent.LoadOrStore(tid, struct{}{}); loaded {
return
}
isErr := !success || invokeErr != nil
body := content
if invokeErr != nil {
body = invokeErr.Error()
isErr = true
}
recordPendingExecuteStdoutDup(toolName, body, isErr)
preview := body
if len(preview) > 200 {
preview = preview[:200] + "..."
}
agentTag := strings.TrimSpace(einoAgent)
if agentTag == "" {
agentTag = orchestratorName
}
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{
"toolName": toolName,
"success": !isErr,
"isError": isErr,
"result": body,
"resultPreview": preview,
"toolCallId": tid,
"conversationId": conversationID,
"einoAgent": agentTag,
"einoRole": einoRoleTag(agentTag),
"source": "eino",
})
})
}
runnerCfg := adk.RunnerConfig{
Agent: da,
EnableStreaming: true,
@@ -430,9 +511,11 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
streamHeaderSent := false
var reasoningStreamID string
var toolStreamFragments []schema.ToolCall
var subAssistantBuf strings.Builder
var subAssistantBuf string
var subReplyStreamID string
var mainAssistantBuf strings.Builder
var mainAssistantBuf string
var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重
var reasoningBuf string
var streamRecvErr error
for {
chunk, rerr := mv.MessageStream.Recv()
@@ -453,23 +536,101 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
continue
}
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if reasoningStreamID == "" {
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
progress("thinking_stream_start", " ", map[string]interface{}{
"streamId": reasoningStreamID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
var reasoningDelta string
reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent)
if reasoningDelta != "" {
if reasoningStreamID == "" {
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
progress("thinking_stream_start", " ", map[string]interface{}{
"streamId": reasoningStreamID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
})
}
progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{
"streamId": reasoningStreamID,
})
}
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
"streamId": reasoningStreamID,
})
}
if chunk.Content != "" {
if progress != nil && streamsMainAssistant(ev.AgentName) {
if !streamHeaderSent {
var contentDelta string
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
if contentDelta != "" {
if mainAssistDupTarget == "" {
executeStdoutDupMu.Lock()
if pendingExecuteStdoutDup != "" {
mainAssistDupTarget = pendingExecuteStdoutDup
}
executeStdoutDupMu.Unlock()
}
if mainAssistDupTarget != "" {
// 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流
} else {
if !streamHeaderSent {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
streamHeaderSent = true
}
progress("response_delta", contentDelta, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
}
}
} else if !streamsMainAssistant(ev.AgentName) {
var subDelta string
subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content)
if subDelta != "" {
if progress != nil {
if subReplyStreamID == "" {
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
"streamId": subReplyStreamID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"conversationId": conversationID,
"source": "eino",
})
}
progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{
"streamId": subReplyStreamID,
"conversationId": conversationID,
})
}
}
}
}
if len(chunk.ToolCalls) > 0 {
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
}
}
if streamsMainAssistant(ev.AgentName) {
s := strings.TrimSpace(mainAssistantBuf)
if mainAssistDupTarget != "" {
executeStdoutDupMu.Lock()
pendingExecuteStdoutDup = ""
executeStdoutDupMu.Unlock()
if s != "" && s == mainAssistDupTarget {
// 与刚展示的 execute 结果完全一致:不再发助手流式事件,仍写入轨迹与最终回复字段
lastAssistant = s
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
}
} else if s != "" {
if progress != nil {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
@@ -478,42 +639,21 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
streamHeaderSent = true
}
progress("response_delta", chunk.Content, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
mainAssistantBuf.WriteString(chunk.Content)
} else if !streamsMainAssistant(ev.AgentName) {
if progress != nil {
if subReplyStreamID == "" {
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
"streamId": subReplyStreamID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"conversationId": conversationID,
"source": "eino",
})
}
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
"streamId": subReplyStreamID,
"conversationId": conversationID,
progress("response_delta", s, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
}
subAssistantBuf.WriteString(chunk.Content)
lastAssistant = s
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
}
}
}
if len(chunk.ToolCalls) > 0 {
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
}
}
if streamsMainAssistant(ev.AgentName) {
if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" {
} else if s != "" {
lastAssistant = s
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
@@ -521,8 +661,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
}
}
if subAssistantBuf.Len() > 0 && progress != nil {
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
if strings.TrimSpace(subAssistantBuf) != "" && progress != nil {
if s := strings.TrimSpace(subAssistantBuf); s != "" {
if subReplyStreamID != "" {
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
"streamId": subReplyStreamID,
@@ -582,26 +722,42 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
body := strings.TrimSpace(msg.Content)
if body != "" {
if streamsMainAssistant(ev.AgentName) {
if progress != nil {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
progress("response_delta", body, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
}
lastAssistant = body
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
executeStdoutDupMu.Lock()
dup := pendingExecuteStdoutDup
if dup != "" && body == dup {
pendingExecuteStdoutDup = ""
executeStdoutDupMu.Unlock()
lastAssistant = body
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
}
// 非流式:与 execute 输出相同则跳过助手通道展示(msg 已在上方写入 runAccumulatedMsgs
} else {
if dup != "" {
pendingExecuteStdoutDup = ""
}
executeStdoutDupMu.Unlock()
if progress != nil {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
progress("response_delta", body, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
}
lastAssistant = body
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
}
}
} else if progress != nil {
progress("eino_agent_reply", body, map[string]interface{}{
@@ -657,12 +813,15 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
break
}
}
} else {
removePendingByID(toolCallID)
}
if toolCallID != "" {
removePendingByID(toolCallID)
if _, loaded := toolResultSent.LoadOrStore(toolCallID, struct{}{}); loaded {
continue
}
data["toolCallId"] = toolCallID
}
recordPendingExecuteStdoutDup(toolName, content, isErr)
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
}
}
@@ -700,6 +859,11 @@ func buildEinoRunResultFromAccumulated(
cleaned = UnwrapPlanExecuteUserText(cleaned)
}
}
if cleaned == "" {
if fb := strings.TrimSpace(einoExtractFallbackAssistantFromMsgs(runAccumulatedMsgs)); fb != "" {
cleaned = fb
}
}
cleaned = dedupeRepeatedParagraphs(cleaned, 80)
cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100)
// 防止超长响应导致 JSON 序列化慢或 OOM(多代理拼接大量工具输出时可能触发)。
@@ -726,6 +890,79 @@ func buildEinoRunResultFromAccumulated(
return out
}
// einoExtractFallbackAssistantFromMsgs 在「主通道未产出助手正文」时,从 Eino ADK 轨迹中回填用户可见回复。
// 典型场景:监督者仅调用 exitfinal_result 落在 Tool 消息中),或工具结果已写入历史但 lastAssistant 未更新。
//
// 优先级:最后一次 exit 工具输出 → 最后一条含 exit 的助手 tool_calls 参数中的 final_result。
func einoExtractFallbackAssistantFromMsgs(msgs []adk.Message) string {
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
if m == nil || m.Role != schema.Tool {
continue
}
if !strings.EqualFold(strings.TrimSpace(m.ToolName), adk.ToolInfoExit.Name) {
continue
}
content := strings.TrimSpace(m.Content)
if content == "" || strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
continue
}
return content
}
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
if m == nil || m.Role != schema.Assistant {
continue
}
if s := einoExtractExitFinalFromAssistantToolCalls(m); s != "" {
return s
}
}
return ""
}
func einoExtractExitFinalFromAssistantToolCalls(msg *schema.Message) string {
if msg == nil || len(msg.ToolCalls) == 0 {
return ""
}
for i := len(msg.ToolCalls) - 1; i >= 0; i-- {
tc := msg.ToolCalls[i]
if !strings.EqualFold(strings.TrimSpace(tc.Function.Name), adk.ToolInfoExit.Name) {
continue
}
if s := einoParseExitFinalResultArguments(tc.Function.Arguments); s != "" {
return s
}
}
return ""
}
func einoParseExitFinalResultArguments(arguments string) string {
arguments = strings.TrimSpace(arguments)
if arguments == "" {
return ""
}
var wrap struct {
FinalResult json.RawMessage `json:"final_result"`
}
if err := json.Unmarshal([]byte(arguments), &wrap); err != nil || len(wrap.FinalResult) == 0 {
return ""
}
var s string
if err := json.Unmarshal(wrap.FinalResult, &s); err == nil {
return strings.TrimSpace(s)
}
var anyVal interface{}
if err := json.Unmarshal(wrap.FinalResult, &anyVal); err != nil {
return ""
}
b, err := json.Marshal(anyVal)
if err != nil {
return ""
}
return strings.TrimSpace(string(b))
}
func buildEinoCheckpointID(orchMode string) string {
mode := sanitizeEinoPathSegment(strings.TrimSpace(orchMode))
if mode == "" {
@@ -0,0 +1,31 @@
package multiagent
import (
"fmt"
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/einomcp"
)
// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId)
// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片。
func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(command, stdout string, success bool, invokeErr error) {
return func(command, stdout string, success bool, invokeErr error) {
if ag == nil || recorder == nil {
return
}
var err error
if !success {
if invokeErr != nil {
err = invokeErr
} else {
err = fmt.Errorf("execute failed")
}
}
args := map[string]interface{}{"command": command}
id := ag.RecordLocalToolExecution("execute", args, stdout, err)
if id != "" {
recorder(id)
}
}
}
@@ -2,11 +2,16 @@ package multiagent
import (
"context"
"errors"
"fmt"
"io"
"strings"
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/security"
"github.com/cloudwego/eino/adk/filesystem"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
@@ -14,8 +19,15 @@ import (
// 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连,
// streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。
// 对「完全后台」命令自动开启 RunInBackendGround,与 local.runCmdInBackground 行为对齐。
//
// 使用 Pipe 将内层流转发给调用方:在 inner EOF 后、关闭 Pipe 前同步调用 ToolInvokeNotify.Fire
// 保证 run loop 在模型开始下一轮输出前已记录 execute 结果(用于 UI 与「重复助手复述」去重)。
type einoStreamingShellWrap struct {
inner filesystem.StreamingShell
inner filesystem.StreamingShell
invokeNotify *einomcp.ToolInvokeNotifyHolder
einoAgentName string
// recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。
recordMonitor func(command, stdout string, success bool, invokeErr error)
}
func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
@@ -26,8 +38,73 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
return w.inner.ExecuteStreaming(ctx, nil)
}
req := *input
cmd := strings.TrimSpace(req.Command)
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
req.RunInBackendGround = true
}
return w.inner.ExecuteStreaming(ctx, &req)
sr, err := w.inner.ExecuteStreaming(ctx, &req)
if err != nil {
return nil, err
}
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
if sr == nil || w.invokeNotify == nil || tid == "" {
return sr, nil
}
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
agentTag := strings.TrimSpace(w.einoAgentName)
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) {
defer inner.Close()
var sb strings.Builder
const maxCapture = 16 * 1024
success := true
var invokeErr error
exitCode := 0
hasExitCode := false
for {
resp, rerr := inner.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
success = false
invokeErr = rerr
_ = outW.Send(nil, rerr)
break
}
if resp != nil {
if resp.ExitCode != nil {
hasExitCode = true
exitCode = *resp.ExitCode
}
if remain := maxCapture - sb.Len(); remain > 0 {
out := resp.Output
if len(out) > remain {
out = out[:remain]
}
sb.WriteString(out)
}
if outW.Send(resp, nil) {
success = false
invokeErr = fmt.Errorf("execute stream closed by consumer")
break
}
}
}
if success && hasExitCode && exitCode != 0 {
success = false
invokeErr = fmt.Errorf("execute exited with code %d", exitCode)
}
if w.recordMonitor != nil {
w.recordMonitor(command, sb.String(), success, invokeErr)
}
w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr)
outW.Close()
}(sr, cmd)
return outR, nil
}
@@ -0,0 +1,62 @@
package multiagent
import (
"testing"
"github.com/cloudwego/eino/schema"
)
func TestEinoExtractFallbackAssistantFromMsgs_exitToolMessage(t *testing.T) {
u := schema.UserMessage("hi")
tm := schema.ToolMessage("answer for user", "call-exit-1")
tm.ToolName = "exit"
if got := einoExtractFallbackAssistantFromMsgs([]*schema.Message{u, tm}); got != "answer for user" {
t.Fatalf("got %q", got)
}
}
func TestEinoExtractFallbackAssistantFromMsgs_lastExitWins(t *testing.T) {
msgs := []*schema.Message{
schema.UserMessage("hi"),
toolExitMsg("first", "c1"),
toolExitMsg("second", "c2"),
}
if got := einoExtractFallbackAssistantFromMsgs(msgs); got != "second" {
t.Fatalf("got %q", got)
}
}
func TestEinoExtractFallbackAssistantFromMsgs_fromAssistantToolCalls(t *testing.T) {
m := schema.AssistantMessage("", []schema.ToolCall{{
ID: "x",
Type: "function",
Function: schema.FunctionCall{
Name: "exit",
Arguments: `{"final_result":"from args"}`,
},
}})
if got := einoExtractFallbackAssistantFromMsgs([]*schema.Message{m}); got != "from args" {
t.Fatalf("got %q", got)
}
}
func TestEinoExtractFallbackAssistantFromMsgs_prefersToolOverEarlierAssistant(t *testing.T) {
asst := schema.AssistantMessage("", []schema.ToolCall{{
ID: "x",
Type: "function",
Function: schema.FunctionCall{
Name: "exit",
Arguments: `{"final_result":"from args"}`,
},
}})
tool := toolExitMsg("from tool", "c1")
if got := einoExtractFallbackAssistantFromMsgs([]*schema.Message{asst, tool}); got != "from tool" {
t.Fatalf("got %q", got)
}
}
func toolExitMsg(content, callID string) *schema.Message {
m := schema.ToolMessage(content, callID)
m.ToolName = "exit"
return m
}
+5 -2
View File
@@ -86,8 +86,10 @@ func RunEinoSingleChatModelAgent(
})
}
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
mainDefs := ag.ToolsForRole(roleTools)
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, einoSingleAgentName)
if err != nil {
return nil, err
}
@@ -136,7 +138,7 @@ func RunEinoSingleChatModelAgent(
}
if einoSkillMW != nil {
if einoFSTools && einoLoc != nil {
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc)
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor)
if fsErr != nil {
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
}
@@ -232,6 +234,7 @@ func RunEinoSingleChatModelAgent(
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
McpIDsMu: &mcpIDsMu,
McpIDs: &mcpIDs,
ToolInvokeNotify: toolInvokeNotify,
DA: chatAgent,
EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " +
"Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
+15 -3
View File
@@ -8,6 +8,7 @@ import (
"strings"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/einomcp"
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
"github.com/cloudwego/eino/adk"
@@ -75,12 +76,23 @@ func prepareEinoSkills(
// subAgentFilesystemMiddleware returns filesystem middleware for a sub-agent when Deep itself
// does not set Backend (fsTools false on orchestrator) but we still want tools on subs — not used;
// when orchestrator has Backend, builtin FS is only on outer agent; subs need explicit FS for parity.
func subAgentFilesystemMiddleware(ctx context.Context, loc *localbk.Local) (adk.ChatModelAgentMiddleware, error) {
func subAgentFilesystemMiddleware(
ctx context.Context,
loc *localbk.Local,
invokeNotify *einomcp.ToolInvokeNotifyHolder,
einoAgentName string,
recordMonitor func(command, stdout string, success bool, invokeErr error),
) (adk.ChatModelAgentMiddleware, error) {
if loc == nil {
return nil, nil
}
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
Backend: loc,
StreamingShell: &einoStreamingShellWrap{inner: loc},
Backend: loc,
StreamingShell: &einoStreamingShellWrap{
inner: loc,
invokeNotify: invokeNotify,
einoAgentName: strings.TrimSpace(einoAgentName),
recordMonitor: recordMonitor,
},
})
}
+22 -14
View File
@@ -110,6 +110,7 @@ func RunDeepAgent(
mcpIDs = append(mcpIDs, id)
mcpIDsMu.Unlock()
}
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
snapshotMCPIDs := func() []string {
@@ -120,6 +121,7 @@ func RunDeepAgent(
return out
}
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
mainDefs := ag.ToolsForRole(roleTools)
toolOutputChunk := func(toolName, toolCallID, chunk string) {
// When toolCallId is missing, frontend ignores tool_result_delta.
@@ -137,16 +139,6 @@ func RunDeepAgent(
})
}
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
if err != nil {
return nil, err
}
mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger)
if err != nil {
return nil, err
}
httpClient := &http.Client{
Timeout: 30 * time.Minute,
Transport: &http.Transport{
@@ -222,7 +214,7 @@ func RunDeepAgent(
}
subDefs := ag.ToolsForRole(roleTools)
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk)
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk, toolInvokeNotify, id)
if err != nil {
return nil, fmt.Errorf("子代理 %q 工具: %w", id, err)
}
@@ -248,7 +240,7 @@ func RunDeepAgent(
}
if einoSkillMW != nil {
if einoFSTools && einoLoc != nil {
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc)
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor)
if fsErr != nil {
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
}
@@ -338,6 +330,16 @@ func RunDeepAgent(
orchDescription = d
}
}
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, orchestratorName)
if err != nil {
return nil, err
}
mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger)
if err != nil {
return nil, err
}
orchInstruction = injectToolNamesOnlyInstruction(ctx, orchInstruction, mainTools)
if logger != nil {
mainNames := collectToolNames(ctx, mainTools)
@@ -381,7 +383,12 @@ func RunDeepAgent(
var deepShell filesystem.StreamingShell
if einoLoc != nil && einoFSTools {
deepBackend = einoLoc
deepShell = einoLoc
deepShell = &einoStreamingShellWrap{
inner: einoLoc,
invokeNotify: toolInvokeNotify,
einoAgentName: orchestratorName,
recordMonitor: einoExecMonitor,
}
}
// noNestedTaskMiddleware 必须在最外层(最先拦截),防止 skill 或其他中间件内部触发 task 调用绕过检测。
@@ -438,7 +445,7 @@ func RunDeepAgent(
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
var peFsMw adk.ChatModelAgentMiddleware
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc)
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor)
if err != nil {
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
}
@@ -560,6 +567,7 @@ func RunDeepAgent(
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
McpIDsMu: &mcpIDsMu,
McpIDs: &mcpIDs,
ToolInvokeNotify: toolInvokeNotify,
DA: da,
EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " +
"(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
+16 -4
View File
@@ -499,6 +499,7 @@ func (c *Client) claudeChatCompletionStream(ctx context.Context, payload interfa
reader := bufio.NewReader(resp.Body)
var full strings.Builder
fullText := ""
for {
line, readErr := reader.ReadString('\n')
@@ -531,9 +532,14 @@ func (c *Client) claudeChatCompletionStream(ctx context.Context, payload interfa
if deltaType == "text_delta" {
text, _ := delta["text"].(string)
if text != "" {
full.WriteString(text)
var textOut string
fullText, textOut = normalizeStreamingDelta(fullText, text)
if textOut == "" {
continue
}
full.WriteString(textOut)
if onDelta != nil {
if err := onDelta(text); err != nil {
if err := onDelta(textOut); err != nil {
return full.String(), err
}
}
@@ -603,6 +609,7 @@ func (c *Client) claudeChatCompletionStreamWithToolCalls(
reader := bufio.NewReader(resp.Body)
var full strings.Builder
fullText := ""
finishReason := ""
// 追踪当前正在构建的 content blocks
@@ -665,9 +672,14 @@ func (c *Client) claudeChatCompletionStreamWithToolCalls(
if deltaType == "text_delta" {
text, _ := delta["text"].(string)
if text != "" {
full.WriteString(text)
var textOut string
fullText, textOut = normalizeStreamingDelta(fullText, text)
if textOut == "" {
continue
}
full.WriteString(textOut)
if onContentDelta != nil {
if err := onContentDelta(text); err != nil {
if err := onContentDelta(textOut); err != nil {
return full.String(), nil, finishReason, err
}
}
@@ -0,0 +1,56 @@
package openai
import "testing"
func TestNormalizeStreamingDelta_RepeatedCharBoundary(t *testing.T) {
// 流式在重复数字边界分片:不得把 "43" 的首字符与 "194" 尾字符误合并。
cur, d := normalizeStreamingDelta("https://x:194", "43")
if want := "https://x:19443"; cur != want {
t.Fatalf("next: want %q got %q", want, cur)
}
if d != "43" {
t.Fatalf("delta: want %q got %q", "43", d)
}
}
func TestNormalizeStreamingDelta_CumulativePrefix(t *testing.T) {
cur, d := normalizeStreamingDelta("今天", "今天天气")
if cur != "今天天气" || d != "天气" {
t.Fatalf("got cur=%q d=%q", cur, d)
}
}
func TestNormalizeStreamingDelta_FullRetransmit(t *testing.T) {
cur, d := normalizeStreamingDelta("今天", "今天")
if d != "" || cur != "今天" {
t.Fatalf("got cur=%q d=%q", cur, d)
}
}
func TestNormalizeStreamingDelta_SingleRuneRepeated(t *testing.T) {
cur, d := normalizeStreamingDelta("呀", "呀")
if want := "呀呀"; cur != want {
t.Fatalf("next: want %q got %q", want, cur)
}
if d != "呀" {
t.Fatalf("delta: want %q got %q", "呀", d)
}
cur, d = normalizeStreamingDelta("4", "4")
if want := "44"; cur != want {
t.Fatalf("next: want %q got %q", want, cur)
}
if d != "4" {
t.Fatalf("delta: want %q got %q", "4", d)
}
}
func TestNormalizeStreamingDelta_CumulativeExtendsNumber(t *testing.T) {
// 已缓冲 "194" 后收到累计串 "19443"(注意 "1943" 并非 "19443" 的前缀,不能靠误写的中间态测 HasPrefix)。
cur, d := normalizeStreamingDelta("194", "19443")
if want := "19443"; cur != want {
t.Fatalf("next: want %q got %q", want, cur)
}
if d != "43" {
t.Fatalf("delta: want %q got %q", "43", d)
}
}
+44 -6
View File
@@ -10,6 +10,7 @@ import (
"net/http"
"strings"
"time"
"unicode/utf8"
"cyberstrike-ai/internal/config"
@@ -33,6 +34,32 @@ func (e *APIError) Error() string {
return fmt.Sprintf("openai api error: status=%d body=%s", e.StatusCode, e.Body)
}
// normalizeStreamingDelta 将可能是“累计片段/重发片段”的内容归一化为“纯增量”。
// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本。
//
// 注意:
// - 不做「任意后缀与前缀重叠」合并;流式可能在重复字符边界分片("194"+"43"→"19443")。
// - HasPrefix 仅在 incoming 严格长于 current 时视为累计全文,否则会把分片产生的第二个相同
// 单字/单码点(叠字、44、22 等)误判为「整段重复」而吞字。
// - incoming==current 仅当 current 长度 >1 个码点时才视为整包重发;单码点重复必须走拼接。
// - 不再使用「current 以 incoming 结尾则丢弃」:否则 "1943"+"43" 会误吞增量(19443 显示成 1943)。
// 若网关重复发送尾部片段,应重复送完整累计串,由 HasPrefix 分支去重。
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
if incoming == "" {
return current, ""
}
if current == "" {
return incoming, incoming
}
if strings.HasPrefix(incoming, current) && len(incoming) > len(current) {
return incoming, incoming[len(current):]
}
if incoming == current && utf8.RuneCountInString(current) > 1 {
return current, ""
}
return current + incoming, incoming
}
// NewClient 创建一个新的OpenAI客户端。
func NewClient(cfg *config.OpenAIConfig, httpClient *http.Client, logger *zap.Logger) *Client {
if httpClient == nil {
@@ -219,6 +246,7 @@ func (c *Client) ChatCompletionStream(ctx context.Context, payload interface{},
reader := bufio.NewReader(resp.Body)
var full strings.Builder
fullText := ""
// 典型 SSE 结构:
// data: {...}\n\n
@@ -263,9 +291,14 @@ func (c *Client) ChatCompletionStream(ctx context.Context, payload interface{},
continue
}
full.WriteString(delta)
var deltaOut string
fullText, deltaOut = normalizeStreamingDelta(fullText, delta)
if deltaOut == "" {
continue
}
full.WriteString(deltaOut)
if onDelta != nil {
if err := onDelta(delta); err != nil {
if err := onDelta(deltaOut); err != nil {
return full.String(), err
}
}
@@ -380,6 +413,7 @@ func (c *Client) ChatCompletionStreamWithToolCalls(
reader := bufio.NewReader(resp.Body)
var full strings.Builder
fullText := ""
finishReason := ""
for {
@@ -426,10 +460,14 @@ func (c *Client) ChatCompletionStreamWithToolCalls(
content = delta.Text
}
if content != "" {
full.WriteString(content)
if onContentDelta != nil {
if err := onContentDelta(content); err != nil {
return full.String(), nil, finishReason, err
var contentOut string
fullText, contentOut = normalizeStreamingDelta(fullText, content)
if contentOut != "" {
full.WriteString(contentOut)
if onContentDelta != nil {
if err := onContentDelta(contentOut); err != nil {
return full.String(), nil, finishReason, err
}
}
}
}
+28
View File
@@ -3196,6 +3196,12 @@ header {
border-color: rgba(220, 53, 69, 0.3);
}
.status-chip.status-cancelled {
background: rgba(108, 117, 125, 0.12);
color: var(--text-secondary, #6c757d);
border-color: rgba(108, 117, 125, 0.35);
}
.status-chip.status-pending,
.status-chip.status-unknown {
background: rgba(255, 193, 7, 0.12);
@@ -3203,6 +3209,18 @@ header {
border-color: rgba(255, 193, 7, 0.3);
}
.detail-abort-hint {
font-size: 0.875rem;
opacity: 0.88;
margin: 0 0 10px;
line-height: 1.45;
}
.detail-abort-section .btn-monitor-abort {
border-color: rgba(253, 126, 20, 0.55);
color: #fd7e14;
}
.detail-code-card {
background: var(--bg-secondary);
border: 1px dashed rgba(0, 0, 0, 0.06);
@@ -5517,6 +5535,16 @@ header {
color: var(--error-color);
}
.monitor-status-chip.cancelled {
background: rgba(108, 117, 125, 0.15);
color: var(--text-muted, #6c757d);
}
.monitor-execution-actions .btn-monitor-abort {
border-color: rgba(253, 126, 20, 0.55);
color: #fd7e14;
}
.monitor-execution-actions {
display: flex;
align-items: center;
+26
View File
@@ -394,6 +394,16 @@
"tasks": {
"title": "Task Management",
"stopTask": "Stop task",
"interruptModalTitle": "Interrupt current step",
"interruptReasonLabel": "Interrupt note",
"interruptModalHint": "Same as MCP monitor \"Stop tool\": ends only the in-flight tool call; the conversation and this run continue. Optional note is merged into the tool result (bilingual USER INTERRUPT NOTE, not raw CLI). Leave empty for a plain stop. If no tool is running yet (model still thinking), wait for a tool call or use \"Stop completely\".",
"interruptReasonPlaceholder": "e.g. Tool is too slow—skip and summarize…",
"interruptReasonRequired": "Please enter a short note so the model can continue accordingly.",
"interruptSubmitting": "Submitting...",
"interruptConfirmContinue": "Interrupt & continue",
"interruptHardStop": "Stop completely",
"interruptModalClose": "Close",
"userInterruptTimelineTitle": "User interrupt note (continuing)",
"collapseDetail": "Collapse details",
"newTask": "New task",
"autoRefresh": "Auto refresh",
@@ -1260,6 +1270,8 @@
"statusCompleted": "Completed",
"statusRunning": "Running",
"statusFailed": "Failed",
"statusCancelled": "Cancelled",
"terminateExecution": "Stop",
"loading": "Loading...",
"noStatsData": "No statistical data",
"noExecutions": "No execution records",
@@ -1727,8 +1739,22 @@
"statusRunning": "Running",
"statusCompleted": "Completed",
"statusFailed": "Failed",
"statusCancelled": "Cancelled",
"unknown": "Unknown",
"getDetailFailed": "Failed to get details",
"runningNoResponseYet": "No output yet; the tool may still be running. If it hangs, use \"Stop tool\" below to end this call only.",
"abortTitle": "Execution control",
"abortHint": "Stops only this tool call. The conversation / multi-step task continues (unlike stopping the whole task).",
"abortBtn": "Stop tool",
"abortConfirm": "Stop this tool call? The overall conversation or iterative task will not be cancelled.",
"abortSuccess": "Cancellation requested; status will update when the tool returns.",
"abortFailed": "Failed to stop tool",
"abortNoteModalTitle": "Stop tool with a note",
"abortNoteModalHint": "Optional: why you stopped or how the model should continue. The model sees any tool output first, then a labeled block (USER INTERRUPT NOTE — not raw tool output), then your text. Leave empty for a plain stop.",
"abortNoteLabel": "Note (optional)",
"abortNotePlaceholder": "e.g. Output is enough—skip waiting and continue…",
"abortNoteSubmit": "Stop tool",
"abortNoteClose": "Cancel",
"execSuccessNoContent": "Execution succeeded with no displayable content.",
"time": "Time",
"executionId": "Execution ID",
+26
View File
@@ -383,6 +383,16 @@
"tasks": {
"title": "任务管理",
"stopTask": "停止任务",
"interruptModalTitle": "中断当前步骤",
"interruptReasonLabel": "中断说明",
"interruptModalHint": "与 MCP 监控页「终止工具」一致:仅结束当前这一次工具调用,整条对话与本轮推理会继续;工具返回中可附带说明(中英 USER INTERRUPT NOTE 块,与命令行原文区分)。留空则等同仅终止工具。若当前没有工具在执行(模型尚在思考),请等待工具开始或改用「彻底停止」。",
"interruptReasonPlaceholder": "例如:工具耗时过长,请先跳过并总结当前结果…",
"interruptReasonRequired": "请填写中断说明,以便模型根据你的意图继续。",
"interruptSubmitting": "提交中...",
"interruptConfirmContinue": "中断并继续",
"interruptHardStop": "彻底停止",
"interruptModalClose": "关闭",
"userInterruptTimelineTitle": "用户中断说明(继续迭代)",
"collapseDetail": "收起详情",
"newTask": "新建任务",
"autoRefresh": "自动刷新",
@@ -1249,6 +1259,8 @@
"statusCompleted": "已完成",
"statusRunning": "执行中",
"statusFailed": "失败",
"statusCancelled": "已终止",
"terminateExecution": "终止",
"loading": "加载中...",
"noStatsData": "暂无统计数据",
"noExecutions": "暂无执行记录",
@@ -1716,8 +1728,22 @@
"statusRunning": "执行中",
"statusCompleted": "已完成",
"statusFailed": "失败",
"statusCancelled": "已终止",
"unknown": "未知",
"getDetailFailed": "获取详情失败",
"runningNoResponseYet": "尚无返回,工具可能仍在执行。若长时间无响应,可使用下方「终止工具」结束本次调用。",
"abortTitle": "运行控制",
"abortHint": "仅中断当前这一次工具调用;对话与多步迭代任务会继续,不会等同于「停止任务」。",
"abortBtn": "终止工具",
"abortConfirm": "确定终止此次工具调用?整条对话或迭代任务不会因此停止。",
"abortSuccess": "已发送终止请求,工具返回后状态将更新。",
"abortFailed": "终止失败",
"abortNoteModalTitle": "终止工具并补充说明",
"abortNoteModalHint": "可选:说明为何终止或希望模型如何继续。提交后模型会先看到工具已输出内容(若有),再看到带「用户终止说明」标题的独立区块(中英标注,与命令行原文区分),最后是您的文字。留空则与原先仅终止一致。",
"abortNoteLabel": "终止说明(可选)",
"abortNotePlaceholder": "例如:输出已够判断,请停止等待并继续下一步…",
"abortNoteSubmit": "提交终止",
"abortNoteClose": "取消",
"execSuccessNoContent": "执行成功,未返回可展示的文本内容。",
"time": "时间",
"executionId": "执行 ID",
+5 -4
View File
@@ -306,12 +306,13 @@ async function bootstrapApp() {
// 通用工具函数
function getStatusText(status) {
const s = (status && String(status).toLowerCase()) || '';
if (typeof window.t !== 'function') {
const fallback = { pending: '等待中', running: '执行中', completed: '已完成', failed: '失败' };
return fallback[status] || status;
const fallback = { pending: '等待中', running: '执行中', completed: '已完成', failed: '失败', cancelled: '已终止' };
return fallback[s] || status;
}
const keyMap = { pending: 'mcpDetailModal.statusPending', running: 'mcpDetailModal.statusRunning', completed: 'mcpDetailModal.statusCompleted', failed: 'mcpDetailModal.statusFailed' };
const key = keyMap[status];
const keyMap = { pending: 'mcpDetailModal.statusPending', running: 'mcpDetailModal.statusRunning', completed: 'mcpDetailModal.statusCompleted', failed: 'mcpDetailModal.statusFailed', cancelled: 'mcpDetailModal.statusCancelled' };
const key = keyMap[s];
return key ? window.t(key) : status;
}
+113 -1
View File
@@ -2446,7 +2446,24 @@ async function showMCPDetail(executionId) {
}
}
} else {
responseElement.textContent = typeof window.t === 'function' ? window.t('chat.noResponseData') : '暂无响应数据';
if (normalizedStatus === 'running') {
responseElement.textContent = typeof window.t === 'function' ? window.t('mcpDetailModal.runningNoResponseYet') : '尚无返回,工具可能仍在执行。若长时间无响应,可在下方终止本次调用。';
} else {
responseElement.textContent = typeof window.t === 'function' ? window.t('chat.noResponseData') : '暂无响应数据';
}
}
const abortSection = document.getElementById('detail-abort-section');
const abortBtn = document.getElementById('detail-abort-btn');
if (abortSection && abortBtn) {
if (normalizedStatus === 'running') {
abortSection.style.display = 'block';
abortBtn.dataset.execId = exec.id || '';
abortBtn.textContent = typeof window.t === 'function' ? window.t('mcpDetailModal.abortBtn') : '终止工具';
} else {
abortSection.style.display = 'none';
delete abortBtn.dataset.execId;
}
}
// 显示模态框
@@ -2464,6 +2481,101 @@ function closeMCPDetail() {
document.getElementById('mcp-detail-modal').style.display = 'none';
}
/** 从详情模态框触发:取消当前进行中的 MCP 工具调用 */
async function abortMCPToolExecutionFromDetail() {
const btn = document.getElementById('detail-abort-btn');
const id = btn && btn.dataset.execId;
if (!id) {
return;
}
await cancelMCPToolExecution(id, { refreshDetail: true });
}
/**
* 打开 MCP 工具终止弹窗说明会经服务端加上用户终止说明标题块后与工具输出合并给模型
* @param {string} executionId
* @param {{ refreshDetail?: boolean }} [options]
*/
function openMcpToolAbortModal(executionId, options = {}) {
window.__mcpToolAbortContext = { executionId: executionId, options: options || {} };
const ta = document.getElementById('mcp-tool-abort-note');
if (ta) {
ta.value = '';
}
const m = document.getElementById('mcp-tool-abort-modal');
if (m) {
m.style.display = 'block';
}
}
function closeMcpToolAbortModal() {
window.__mcpToolAbortContext = null;
const m = document.getElementById('mcp-tool-abort-modal');
if (m) {
m.style.display = 'none';
}
}
async function submitMcpToolAbortModal() {
const ctx = window.__mcpToolAbortContext;
if (!ctx || !ctx.executionId) {
closeMcpToolAbortModal();
return;
}
const note = (document.getElementById('mcp-tool-abort-note') && document.getElementById('mcp-tool-abort-note').value || '').trim();
const executionId = ctx.executionId;
const options = ctx.options || {};
closeMcpToolAbortModal();
await cancelMCPToolExecutionSubmit(executionId, note, options);
}
/**
* 提交终止请求body: { note }
* @param {string} executionId
* @param {string} userNote
* @param {{ refreshDetail?: boolean }} [options]
*/
async function cancelMCPToolExecutionSubmit(executionId, userNote, options = {}) {
if (!executionId) {
return;
}
try {
const res = await apiFetch(`/api/monitor/execution/${encodeURIComponent(executionId)}/cancel`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ note: userNote || '' }),
});
const body = await res.json().catch(() => ({}));
if (!res.ok) {
throw new Error(body.error || body.message || res.statusText);
}
const okMsg = typeof window.t === 'function' ? window.t('mcpDetailModal.abortSuccess') : '已发送终止请求';
alert(okMsg);
if (options.refreshDetail && typeof showMCPDetail === 'function') {
await showMCPDetail(executionId);
}
if (typeof refreshMonitorPanel === 'function') {
const page = (typeof monitorState !== 'undefined' && monitorState.pagination && monitorState.pagination.page) ? monitorState.pagination.page : 1;
await refreshMonitorPanel(page);
}
} catch (e) {
const failMsg = typeof window.t === 'function' ? window.t('mcpDetailModal.abortFailed') : '终止失败';
alert(failMsg + ': ' + (e && e.message ? e.message : String(e)));
}
}
/**
* 取消单次 MCP 工具执行监控页终止弹出说明框后提交仅取消该次 tools/call不停止整条对话/迭代任务
* @param {string} executionId
* @param {{ refreshDetail?: boolean }} [options]
*/
async function cancelMCPToolExecution(executionId, options = {}) {
if (!executionId) {
return;
}
openMcpToolAbortModal(executionId, options);
}
// 复制详情面板中的内容
function copyDetailBlock(elementId, triggerBtn = null) {
const target = document.getElementById(elementId);
+169 -39
View File
@@ -1,4 +1,6 @@
const progressTaskState = new Map();
/** @type {{ progressId: string, conversationId: string } | null} */
let userInterruptModalPending = null;
let activeTaskInterval = null;
const ACTIVE_TASK_REFRESH_INTERVAL = 10000; // 10秒检查一次
const TASK_FINAL_STATUSES = new Set(['failed', 'timeout', 'cancelled', 'completed']);
@@ -354,6 +356,23 @@ function isChatMessagesPinnedToBottom() {
return scrollHeight - clientHeight - scrollTop <= CHAT_SCROLL_PIN_THRESHOLD_PX;
}
/** 顶栏「停止任务」与进度条按钮对齐时,用会话 ID 反查当前页的 progress 块 ID(无则弹窗内仍可按会话取消) */
function findProgressIdByConversationId(conversationId) {
if (!conversationId) {
return null;
}
let fallback = null;
for (const [pid, st] of progressTaskState) {
if (st && st.conversationId === conversationId) {
fallback = pid;
if (document.getElementById(pid)) {
return pid;
}
}
}
return fallback;
}
function registerProgressTask(progressId, conversationId = null) {
const state = progressTaskState.get(progressId) || {};
state.conversationId = conversationId !== undefined && conversationId !== null
@@ -410,6 +429,140 @@ async function requestCancel(conversationId) {
return result;
}
/** 与 MCP 监控一致:仅终止当前进行中的工具调用,工具返回后本轮推理继续(可选 reason 合并进工具结果) */
async function requestCancelWithContinue(conversationId, reason) {
const response = await apiFetch('/api/agent-loop/cancel', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
conversationId,
reason: reason || '',
continueAfter: true,
}),
});
const result = await response.json().catch(() => ({}));
if (!response.ok) {
throw new Error(result.error || (typeof window.t === 'function' ? window.t('tasks.cancelFailed') : '取消失败'));
}
return result;
}
function openUserInterruptModal(progressId, conversationId) {
userInterruptModalPending = {
progressId: progressId != null && progressId !== '' ? progressId : null,
conversationId,
};
const ta = document.getElementById('user-interrupt-reason');
if (ta) {
ta.value = '';
}
const m = document.getElementById('user-interrupt-modal');
if (m) {
m.style.display = 'block';
}
}
function closeUserInterruptModal() {
userInterruptModalPending = null;
const m = document.getElementById('user-interrupt-modal');
if (m) {
m.style.display = 'none';
}
}
async function submitUserInterruptContinue() {
if (!userInterruptModalPending) {
return;
}
const reason = (document.getElementById('user-interrupt-reason') && document.getElementById('user-interrupt-reason').value || '').trim();
const { progressId, conversationId } = userInterruptModalPending;
closeUserInterruptModal();
const stopBtn = progressId ? document.getElementById(`${progressId}-stop-btn`) : null;
try {
if (stopBtn) {
stopBtn.disabled = true;
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.interruptSubmitting') : '提交中...';
}
await requestCancelWithContinue(conversationId, reason);
loadActiveTasks();
} catch (error) {
console.error('中断并继续失败:', error);
alert((typeof window.t === 'function' ? window.t('tasks.cancelTaskFailed') : '操作失败') + ': ' + error.message);
} finally {
if (stopBtn) {
stopBtn.disabled = false;
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.stopTask') : '停止任务';
}
}
}
async function submitUserInterruptHardCancel() {
if (!userInterruptModalPending) {
return;
}
const { progressId, conversationId } = userInterruptModalPending;
closeUserInterruptModal();
if (progressId) {
await performHardCancelProgressTask(progressId);
return;
}
if (!conversationId) {
return;
}
try {
await requestCancel(conversationId);
loadActiveTasks();
} catch (error) {
console.error('取消任务失败:', error);
alert((typeof window.t === 'function' ? window.t('tasks.cancelTaskFailed') : '取消任务失败') + ': ' + error.message);
}
}
/** 彻底停止任务(原「停止任务」行为) */
async function performHardCancelProgressTask(progressId) {
const state = progressTaskState.get(progressId);
const stopBtn = document.getElementById(`${progressId}-stop-btn`);
if (!state || !state.conversationId) {
if (stopBtn) {
stopBtn.disabled = true;
setTimeout(() => {
stopBtn.disabled = false;
}, 1500);
}
alert(typeof window.t === 'function' ? window.t('tasks.taskInfoNotSynced') : '任务信息尚未同步,请稍后再试。');
return;
}
if (state.cancelling) {
return;
}
markProgressCancelling(progressId);
if (stopBtn) {
stopBtn.disabled = true;
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.cancelling') : '取消中...';
}
try {
await requestCancel(state.conversationId);
loadActiveTasks();
} catch (error) {
console.error('取消任务失败:', error);
alert((typeof window.t === 'function' ? window.t('tasks.cancelTaskFailed') : '取消任务失败') + ': ' + error.message);
if (stopBtn) {
stopBtn.disabled = false;
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.stopTask') : '停止任务';
}
const currentState = progressTaskState.get(progressId);
if (currentState) {
currentState.cancelling = false;
}
}
}
function addProgressMessage() {
const messagesDiv = document.getElementById('chat-messages');
const messageDiv = document.createElement('div');
@@ -737,7 +890,7 @@ function toggleProcessDetails(progressId, assistantMessageId) {
}
}
// 停止当前进度对应的任务
// 停止当前进度:弹出「中断并说明 / 彻底停止」
async function cancelProgressTask(progressId) {
const state = progressTaskState.get(progressId);
const stopBtn = document.getElementById(`${progressId}-stop-btn`);
@@ -757,27 +910,7 @@ async function cancelProgressTask(progressId) {
return;
}
markProgressCancelling(progressId);
if (stopBtn) {
stopBtn.disabled = true;
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.cancelling') : '取消中...';
}
try {
await requestCancel(state.conversationId);
loadActiveTasks();
} catch (error) {
console.error('取消任务失败:', error);
alert((typeof window.t === 'function' ? window.t('tasks.cancelTaskFailed') : '取消任务失败') + ': ' + error.message);
if (stopBtn) {
stopBtn.disabled = false;
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.stopTask') : '停止任务';
}
const currentState = progressTaskState.get(progressId);
if (currentState) {
currentState.cancelling = false;
}
}
openUserInterruptModal(progressId, state.conversationId);
}
// 将进度消息转换为可折叠的详情组件
@@ -2417,7 +2550,7 @@ function renderActiveTasks(tasks) {
if (cancelBtn) {
cancelBtn.onclick = (evt) => {
evt.stopPropagation();
cancelActiveTask(task.conversationId, cancelBtn);
cancelActiveTask(task.conversationId);
};
if (task.status === 'cancelling') {
cancelBtn.disabled = true;
@@ -2430,21 +2563,12 @@ function renderActiveTasks(tasks) {
});
}
async function cancelActiveTask(conversationId, button) {
if (!conversationId) return;
const originalText = button.textContent;
button.disabled = true;
button.textContent = typeof window.t === 'function' ? window.t('tasks.cancelling') : '取消中...';
try {
await requestCancel(conversationId);
loadActiveTasks();
} catch (error) {
console.error('取消任务失败:', error);
alert((typeof window.t === 'function' ? window.t('tasks.cancelTaskFailed') : '取消任务失败') + ': ' + error.message);
button.disabled = false;
button.textContent = originalText;
function cancelActiveTask(conversationId) {
if (!conversationId) {
return;
}
const progressId = findProgressIdByConversationId(conversationId);
openUserInterruptModal(progressId, conversationId);
}
let monitorPanelFetchSeq = 0;
@@ -2777,7 +2901,8 @@ function renderMonitorExecutions(executions = [], statusFilter = 'all') {
const viewDetailLabel = typeof window.t === 'function' ? window.t('mcpMonitor.viewDetail') : '查看详情';
const deleteLabel = typeof window.t === 'function' ? window.t('mcpMonitor.delete') : '删除';
const deleteExecTitle = typeof window.t === 'function' ? window.t('mcpMonitor.deleteExecTitle') : '删除此执行记录';
const statusKeyMap = { pending: 'statusPending', running: 'statusRunning', completed: 'statusCompleted', failed: 'statusFailed' };
const terminateLabel = typeof window.t === 'function' ? window.t('mcpMonitor.terminateExecution') : '终止';
const statusKeyMap = { pending: 'statusPending', running: 'statusRunning', completed: 'statusCompleted', failed: 'statusFailed', cancelled: 'statusCancelled' };
const locale = (typeof window.__locale === 'string' && window.__locale.startsWith('zh')) ? 'zh-CN' : undefined;
const rows = executions
.map(exec => {
@@ -2788,7 +2913,11 @@ function renderMonitorExecutions(executions = [], statusFilter = 'all') {
const startTime = exec.startTime ? (new Date(exec.startTime).toLocaleString ? new Date(exec.startTime).toLocaleString(locale || 'en-US') : String(exec.startTime)) : unknownLabel;
const duration = formatExecutionDuration(exec.startTime, exec.endTime);
const toolName = escapeHtml(exec.toolName || unknownToolLabel);
const executionId = escapeHtml(exec.id || '');
const rawExecId = exec.id || '';
const executionId = escapeHtml(rawExecId);
const terminateBtn = status === 'running'
? `<button type="button" class="btn-secondary btn-monitor-abort" onclick="cancelMCPToolExecution('${rawExecId.replace(/\\/g, '\\\\').replace(/'/g, "\\'")}')">${escapeHtml(terminateLabel)}</button>`
: '';
return `
<tr>
<td>
@@ -2801,6 +2930,7 @@ function renderMonitorExecutions(executions = [], statusFilter = 'all') {
<td>
<div class="monitor-execution-actions">
<button class="btn-secondary" onclick="showMCPDetail('${executionId}')">${escapeHtml(viewDetailLabel)}</button>
${terminateBtn}
<button class="btn-secondary btn-delete" onclick="deleteExecution('${executionId}')" title="${escapeHtml(deleteExecTitle)}">${escapeHtml(deleteLabel)}</button>
</div>
</td>
+51
View File
@@ -1053,6 +1053,7 @@
<option value="completed" data-i18n="mcpMonitor.statusCompleted">已完成</option>
<option value="running" data-i18n="mcpMonitor.statusRunning">执行中</option>
<option value="failed" data-i18n="mcpMonitor.statusFailed">失败</option>
<option value="cancelled" data-i18n="mcpMonitor.statusCancelled">已终止</option>
</select>
</label>
</div>
@@ -2449,6 +2450,13 @@
</div>
</div>
</div>
<div class="detail-section detail-abort-section" id="detail-abort-section" style="display: none;">
<div class="detail-section-header">
<h3 data-i18n="mcpDetailModal.abortTitle">运行控制</h3>
</div>
<p class="detail-abort-hint" data-i18n="mcpDetailModal.abortHint">仅中断当前工具调用;对话与多步任务会继续。</p>
<button type="button" class="btn-secondary btn-monitor-abort" id="detail-abort-btn" onclick="abortMCPToolExecutionFromDetail()">终止工具</button>
</div>
<div class="detail-section">
<div class="detail-section-header">
<h3 data-i18n="mcpDetailModal.requestParams">请求参数</h3>
@@ -2489,6 +2497,49 @@
</div>
</div>
<!-- 用户中断并说明(继续迭代) -->
<div id="user-interrupt-modal" class="modal">
<div class="modal-content" style="max-width: 520px;">
<div class="modal-header">
<h2 data-i18n="tasks.interruptModalTitle">中断当前步骤</h2>
<span class="modal-close" onclick="closeUserInterruptModal()">&times;</span>
</div>
<div class="modal-body">
<p class="detail-abort-hint" data-i18n="tasks.interruptModalHint">填写说明后将写入对话并由智能体继续迭代。</p>
<div class="form-group">
<label for="user-interrupt-reason"><span data-i18n="tasks.interruptReasonLabel">中断说明</span></label>
<textarea id="user-interrupt-reason" class="form-control" rows="4" data-i18n="tasks.interruptReasonPlaceholder" data-i18n-attr="placeholder" placeholder=""></textarea>
</div>
<div class="form-actions" style="display: flex; flex-wrap: wrap; gap: 8px; justify-content: flex-end;">
<button type="button" class="btn-secondary" onclick="closeUserInterruptModal()" data-i18n="tasks.interruptModalClose">关闭</button>
<button type="button" class="btn-secondary btn-delete" onclick="submitUserInterruptHardCancel()" data-i18n="tasks.interruptHardStop">彻底停止</button>
<button type="button" class="btn-primary" onclick="submitUserInterruptContinue()" data-i18n="tasks.interruptConfirmContinue">中断并继续</button>
</div>
</div>
</div>
</div>
<!-- MCP 工具终止:可填写给模型的说明 -->
<div id="mcp-tool-abort-modal" class="modal">
<div class="modal-content" style="max-width: 520px;">
<div class="modal-header">
<h2 data-i18n="mcpDetailModal.abortNoteModalTitle">终止工具并补充说明</h2>
<span class="modal-close" onclick="closeMcpToolAbortModal()">&times;</span>
</div>
<div class="modal-body">
<p class="detail-abort-hint" data-i18n="mcpDetailModal.abortNoteModalHint">可选说明。</p>
<div class="form-group">
<label for="mcp-tool-abort-note"><span data-i18n="mcpDetailModal.abortNoteLabel">终止说明(可选)</span></label>
<textarea id="mcp-tool-abort-note" class="form-control" rows="4" data-i18n="mcpDetailModal.abortNotePlaceholder" data-i18n-attr="placeholder" placeholder=""></textarea>
</div>
<div class="form-actions" style="display: flex; flex-wrap: wrap; gap: 8px; justify-content: flex-end;">
<button type="button" class="btn-secondary" onclick="closeMcpToolAbortModal()" data-i18n="mcpDetailModal.abortNoteClose">取消</button>
<button type="button" class="btn-primary" onclick="submitMcpToolAbortModal()" data-i18n="mcpDetailModal.abortNoteSubmit">提交终止</button>
</div>
</div>
</div>
</div>
<!-- 外部MCP配置模态框 -->
<div id="external-mcp-modal" class="modal">
<div class="modal-content" style="max-width: 900px;">