mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-06-24 14:59:59 +02:00
Add files via upload
This commit is contained in:
+64
-29
@@ -187,6 +187,14 @@ func (h *AgentHandler) SetAudit(s *audit.Service) {
|
||||
h.audit = s
|
||||
}
|
||||
|
||||
// TaskManager 返回 Agent 任务管理器(供 MCP 监控页终止 Eino execute 等)。
|
||||
func (h *AgentHandler) TaskManager() *AgentTaskManager {
|
||||
if h == nil {
|
||||
return nil
|
||||
}
|
||||
return h.tasks
|
||||
}
|
||||
|
||||
// CancelRunningTaskForConversation stops any in-flight agent work for the conversation (idempotent).
|
||||
func (h *AgentHandler) CancelRunningTaskForConversation(conversationID string) {
|
||||
if h == nil || conversationID == "" || h.tasks == nil {
|
||||
@@ -1291,6 +1299,55 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun
|
||||
}
|
||||
}
|
||||
|
||||
// cancelToolContinueAfter 仅终止当前工具调用,不停止整条 Agent 任务(对话「中断并继续」与 MCP 监控终止共用)。
|
||||
func (h *AgentHandler) cancelToolContinueAfter(conversationID, preferredExecID, note string) (bool, gin.H) {
|
||||
conversationID = strings.TrimSpace(conversationID)
|
||||
if conversationID == "" || h.tasks.GetTask(conversationID) == nil {
|
||||
return false, nil
|
||||
}
|
||||
note = strings.TrimSpace(note)
|
||||
execID := strings.TrimSpace(preferredExecID)
|
||||
if execID == "" {
|
||||
execID = h.tasks.ActiveMCPExecutionID(conversationID)
|
||||
}
|
||||
if execID != "" {
|
||||
if h.agent.CancelMCPToolExecutionWithNote(execID, note) {
|
||||
return true, gin.H{
|
||||
"status": "tool_abort_requested",
|
||||
"conversationId": conversationID,
|
||||
"executionId": execID,
|
||||
"message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。",
|
||||
"continueAfter": true,
|
||||
"interruptWithNote": note != "",
|
||||
"continueWithoutTool": false,
|
||||
}
|
||||
}
|
||||
if h.tasks.AbortActiveEinoExecute(conversationID, note) {
|
||||
return true, gin.H{
|
||||
"status": "tool_abort_requested",
|
||||
"conversationId": conversationID,
|
||||
"executionId": execID,
|
||||
"message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。",
|
||||
"continueAfter": true,
|
||||
"interruptWithNote": note != "",
|
||||
"continueWithoutTool": false,
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
if h.tasks.AbortActiveEinoExecute(conversationID, note) {
|
||||
return true, gin.H{
|
||||
"status": "tool_abort_requested",
|
||||
"conversationId": conversationID,
|
||||
"message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。",
|
||||
"continueAfter": true,
|
||||
"interruptWithNote": note != "",
|
||||
"continueWithoutTool": false,
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// CancelAgentLoop 取消正在执行的任务
|
||||
func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
|
||||
var req struct {
|
||||
@@ -1309,42 +1366,20 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "未找到正在执行的任务"})
|
||||
return
|
||||
}
|
||||
execID := h.tasks.ActiveMCPExecutionID(req.ConversationID)
|
||||
note := strings.TrimSpace(req.Reason)
|
||||
if execID != "" {
|
||||
if !h.agent.CancelMCPToolExecutionWithNote(execID, note) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"})
|
||||
return
|
||||
}
|
||||
h.logger.Info("对话页仅终止当前 MCP 工具",
|
||||
activeExec := strings.TrimSpace(h.tasks.ActiveMCPExecutionID(req.ConversationID))
|
||||
if ok, payload := h.cancelToolContinueAfter(req.ConversationID, "", note); ok {
|
||||
execID, _ := payload["executionId"].(string)
|
||||
h.logger.Info("对话页仅终止当前工具",
|
||||
zap.String("conversationId", req.ConversationID),
|
||||
zap.String("executionId", execID),
|
||||
zap.Bool("hasNote", note != ""),
|
||||
)
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "tool_abort_requested",
|
||||
"conversationId": req.ConversationID,
|
||||
"executionId": execID,
|
||||
"message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。",
|
||||
"continueAfter": true,
|
||||
"interruptWithNote": note != "",
|
||||
"continueWithoutTool": false,
|
||||
})
|
||||
c.JSON(http.StatusOK, payload)
|
||||
return
|
||||
}
|
||||
if h.tasks.AbortActiveEinoExecute(req.ConversationID, note) {
|
||||
h.logger.Info("对话页仅终止当前 Eino execute",
|
||||
zap.String("conversationId", req.ConversationID),
|
||||
zap.Bool("hasNote", note != ""),
|
||||
)
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "tool_abort_requested",
|
||||
"conversationId": req.ConversationID,
|
||||
"message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。",
|
||||
"continueAfter": true,
|
||||
"interruptWithNote": note != "",
|
||||
"continueWithoutTool": false,
|
||||
})
|
||||
if activeExec != "" {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"})
|
||||
return
|
||||
}
|
||||
// 无进行中的 MCP 工具(模型纯推理/流式输出阶段):取消当前上下文并由 Eino 流式处理器合并用户补充后自动续跑。
|
||||
|
||||
@@ -23,6 +23,8 @@ import (
|
||||
type MonitorHandler struct {
|
||||
mcpServer *mcp.Server
|
||||
externalMCPMgr *mcp.ExternalMCPManager
|
||||
taskManager *AgentTaskManager
|
||||
agentHandler *AgentHandler
|
||||
executor *security.Executor
|
||||
db *database.DB
|
||||
logger *zap.Logger
|
||||
@@ -56,6 +58,16 @@ func (h *MonitorHandler) SetExternalMCPManager(mgr *mcp.ExternalMCPManager) {
|
||||
h.externalMCPMgr = mgr
|
||||
}
|
||||
|
||||
// SetTaskManager 设置 Agent 任务管理器(用于 Eino execute 等按 executionId 终止)。
|
||||
func (h *MonitorHandler) SetTaskManager(mgr *AgentTaskManager) {
|
||||
h.taskManager = mgr
|
||||
}
|
||||
|
||||
// SetAgentHandler 设置 Agent 处理器(MCP 监控终止与对话页「中断并继续」共用逻辑)。
|
||||
func (h *MonitorHandler) SetAgentHandler(ah *AgentHandler) {
|
||||
h.agentHandler = ah
|
||||
}
|
||||
|
||||
// MonitorResponse 监控响应
|
||||
type MonitorResponse struct {
|
||||
Executions []*mcp.ToolExecution `json:"executions"`
|
||||
@@ -90,6 +102,7 @@ func (h *MonitorHandler) Monitor(c *gin.Context) {
|
||||
toolName := normalizeToolNameFilter(c.Query("tool"))
|
||||
|
||||
executions, total := h.loadExecutionsWithPagination(page, pageSize, status, toolName)
|
||||
h.enrichExecutionsConversationID(executions)
|
||||
stats := h.loadStats()
|
||||
|
||||
totalPages := (total + pageSize - 1) / pageSize
|
||||
@@ -247,6 +260,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
|
||||
// 先从内部MCP服务器查找
|
||||
exec, exists := h.mcpServer.GetExecution(id)
|
||||
if exists {
|
||||
h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec})
|
||||
c.JSON(http.StatusOK, exec)
|
||||
return
|
||||
}
|
||||
@@ -255,6 +269,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
|
||||
if h.externalMCPMgr != nil {
|
||||
exec, exists = h.externalMCPMgr.GetExecution(id)
|
||||
if exists {
|
||||
h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec})
|
||||
c.JSON(http.StatusOK, exec)
|
||||
return
|
||||
}
|
||||
@@ -264,6 +279,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
|
||||
if h.db != nil {
|
||||
exec, err := h.db.GetToolExecution(id)
|
||||
if err == nil && exec != nil {
|
||||
h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec})
|
||||
c.JSON(http.StatusOK, exec)
|
||||
return
|
||||
}
|
||||
@@ -290,6 +306,19 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
note = strings.TrimSpace(body.Note)
|
||||
|
||||
convID := h.conversationIDForRunningExecution(id)
|
||||
if convID != "" && h.agentHandler != nil {
|
||||
if ok, payload := h.agentHandler.cancelToolContinueAfter(convID, id, note); ok {
|
||||
h.logger.Info("MCP 监控页终止工具(与对话中断并继续一致)",
|
||||
zap.String("executionId", id),
|
||||
zap.String("conversationId", convID),
|
||||
zap.Bool("hasNote", note != ""),
|
||||
)
|
||||
c.JSON(http.StatusOK, payload)
|
||||
return
|
||||
}
|
||||
}
|
||||
if h.mcpServer.CancelToolExecutionWithNote(id, note) {
|
||||
h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "internal"), zap.Bool("hasNote", note != ""))
|
||||
c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id})
|
||||
@@ -303,6 +332,52 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行,或该任务已结束"})
|
||||
}
|
||||
|
||||
func (h *MonitorHandler) enrichExecutionsConversationID(executions []*mcp.ToolExecution) {
|
||||
for _, exec := range executions {
|
||||
if exec == nil {
|
||||
continue
|
||||
}
|
||||
exec.ConversationID = h.conversationIDForRunningExecution(exec.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MonitorHandler) conversationIDForRunningExecution(executionID string) string {
|
||||
executionID = strings.TrimSpace(executionID)
|
||||
if executionID == "" || h.taskManager == nil {
|
||||
return ""
|
||||
}
|
||||
if conv := h.taskManager.ConversationIDForActiveMCPExecution(executionID); conv != "" {
|
||||
return conv
|
||||
}
|
||||
exec := h.lookupExecution(executionID)
|
||||
if exec == nil || exec.Status != "running" {
|
||||
return ""
|
||||
}
|
||||
if strings.TrimSpace(exec.ToolName) == "execute" {
|
||||
if onlyConv, ok := h.taskManager.ConversationIDForActiveEinoExecute(); ok {
|
||||
return onlyConv
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (h *MonitorHandler) lookupExecution(id string) *mcp.ToolExecution {
|
||||
if exec, ok := h.mcpServer.GetExecution(id); ok {
|
||||
return exec
|
||||
}
|
||||
if h.externalMCPMgr != nil {
|
||||
if exec, ok := h.externalMCPMgr.GetExecution(id); ok {
|
||||
return exec
|
||||
}
|
||||
}
|
||||
if h.db != nil {
|
||||
if exec, err := h.db.GetToolExecution(id); err == nil && exec != nil {
|
||||
return exec
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BatchGetToolNames 批量获取工具执行的工具名称(消除前端 N+1 请求)
|
||||
func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) {
|
||||
var req struct {
|
||||
|
||||
@@ -103,6 +103,40 @@ func (m *AgentTaskManager) UnregisterActiveEinoExecute(conversationID string) {
|
||||
}
|
||||
}
|
||||
|
||||
// ConversationIDForActiveMCPExecution 根据当前登记的工具 executionId 反查会话 ID(供 MCP 监控页按 executionId 终止)。
|
||||
func (m *AgentTaskManager) ConversationIDForActiveMCPExecution(executionID string) string {
|
||||
executionID = strings.TrimSpace(executionID)
|
||||
if executionID == "" {
|
||||
return ""
|
||||
}
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
for convID, t := range m.tasks {
|
||||
if t != nil && t.ActiveMCPExecutionID == executionID {
|
||||
return convID
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ConversationIDForActiveEinoExecute 返回当前唯一进行 Eino execute 的会话 ID;多会话并行时返回空。
|
||||
func (m *AgentTaskManager) ConversationIDForActiveEinoExecute() (string, bool) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
var found string
|
||||
count := 0
|
||||
for convID, t := range m.tasks {
|
||||
if t != nil && t.activeEinoExecuteCancel != nil {
|
||||
found = convID
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count == 1 {
|
||||
return found, true
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// AbortActiveEinoExecute 终止当前 Eino execute 并暂存用户说明(与 MCP 工具终止一致)。
|
||||
func (m *AgentTaskManager) AbortActiveEinoExecute(conversationID, note string) bool {
|
||||
conversationID = strings.TrimSpace(conversationID)
|
||||
|
||||
@@ -38,3 +38,19 @@ func TestAbortActiveEinoExecute(t *testing.T) {
|
||||
t.Fatal("second abort should fail when no active execute")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConversationIDForActiveMCPExecution(t *testing.T) {
|
||||
m := NewAgentTaskManager()
|
||||
conv := "conv-mcp-exec"
|
||||
_, err := m.StartTask(conv, "test", func(error) {})
|
||||
if err != nil {
|
||||
t.Fatalf("StartTask: %v", err)
|
||||
}
|
||||
m.RegisterRunningTool(conv, "exec-123")
|
||||
if got := m.ConversationIDForActiveMCPExecution("exec-123"); got != conv {
|
||||
t.Fatalf("got %q, want %q", got, conv)
|
||||
}
|
||||
if got := m.ConversationIDForActiveMCPExecution("missing"); got != "" {
|
||||
t.Fatalf("missing should be empty, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user