Add files via upload

This commit is contained in:
公明
2026-05-07 17:01:15 +08:00
committed by GitHub
parent e68d3a3d23
commit 6d180c814d
8 changed files with 450 additions and 92 deletions
+24 -4
View File
@@ -1717,6 +1717,8 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
var req struct {
ConversationID string `json:"conversationId" binding:"required"`
Reason string `json:"reason,omitempty"`
ContinueAfter bool `json:"continueAfter,omitempty"`
}
if err := c.ShouldBindJSON(&req); err != nil {
@@ -1724,7 +1726,23 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
return
}
ok, err := h.tasks.CancelTask(req.ConversationID, ErrTaskCancelled)
if req.ContinueAfter && strings.TrimSpace(req.Reason) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "continueAfter 为 true 时必须提供非空的 reason(中断说明)"})
return
}
var cause error = ErrTaskCancelled
msg := "已提交取消请求,任务将在当前步骤完成后停止。"
if req.ContinueAfter {
if !h.tasks.SetInterruptContinueReason(req.ConversationID, req.Reason) {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到正在执行的任务,无法提交中断说明"})
return
}
cause = ErrUserInterruptContinue
msg = "已提交中断说明,当前步骤结束后将写入对话并继续迭代。"
}
ok, err := h.tasks.CancelTask(req.ConversationID, cause)
if err != nil {
h.logger.Error("取消任务失败", zap.Error(err))
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
@@ -1737,9 +1755,11 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{
"status": "cancelling",
"conversationId": req.ConversationID,
"message": "已提交取消请求,任务将在当前步骤完成后停止。",
"status": "cancelling",
"conversationId": req.ConversationID,
"message": msg,
"continueAfter": req.ContinueAfter,
"interruptWithNote": req.ContinueAfter,
})
}
+108 -42
View File
@@ -43,8 +43,11 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
var sseWriteMu sync.Mutex
var ssePublishConversationID string
sendEvent := func(eventType, message string, data interface{}) {
if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) {
return
if eventType == "error" && baseCtx != nil {
cause := context.Cause(baseCtx)
if errors.Is(cause, ErrTaskCancelled) || errors.Is(cause, ErrUserInterruptContinue) {
return
}
}
ev := StreamEvent{Type: eventType, Message: message, Data: data}
b, errMarshal := json.Marshal(ev)
@@ -114,33 +117,10 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
}
var cancelWithCause context.CancelCauseFunc
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
defer timeoutCancel()
defer cancelWithCause(nil)
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。"
sendEvent("error", errorMsg, map[string]interface{}{
"conversationId": conversationID,
"errorType": "task_already_running",
})
} else {
errorMsg = "❌ 无法启动任务: " + err.Error()
sendEvent("error", errorMsg, nil)
}
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
return
}
firstRun := true
curFinalMessage := prep.FinalMessage
curHistory := prep.History
roleTools := prep.RoleTools
taskStatus := "completed"
defer h.tasks.FinishTask(conversationID, taskStatus)
@@ -161,22 +141,108 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
return
}
result, runErr := multiagent.RunEinoSingleChatModelAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
prep.FinalMessage,
prep.History,
prep.RoleTools,
progressCallback,
)
var result *multiagent.RunResult
var runErr error
for {
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
if firstRun {
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。"
sendEvent("error", errorMsg, map[string]interface{}{
"conversationId": conversationID,
"errorType": "task_already_running",
})
} else {
errorMsg = "❌ 无法启动任务: " + err.Error()
sendEvent("error", errorMsg, nil)
}
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
timeoutCancel()
return
}
firstRun = false
} else {
if err := h.tasks.ResetTaskCancelForContinue(conversationID, cancelWithCause); err != nil {
h.logger.Error("续跑任务时重置 cancel 失败", zap.Error(err))
taskStatus = "failed"
sendEvent("error", err.Error(), nil)
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
timeoutCancel()
return
}
}
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
result, runErr = multiagent.RunEinoSingleChatModelAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
curFinalMessage,
curHistory,
roleTools,
progressCallback,
)
timeoutCancel()
if runErr == nil {
break
}
if runErr != nil {
h.persistEinoAgentTraceForResume(conversationID, result)
cause := context.Cause(baseCtx)
if errors.Is(cause, ErrUserInterruptContinue) {
reason := h.tasks.TakeInterruptContinueReason(conversationID)
prepNext, perr := h.prepareSessionAfterUserInterrupt(conversationID, assistantMessageID, reason, roleTools)
if perr != nil {
h.logger.Error("准备中断后续跑失败", zap.Error(perr))
taskStatus = "failed"
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
errMsg := "中断后续跑失败: " + perr.Error()
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
}
sendEvent("error", errMsg, map[string]interface{}{
"conversationId": conversationID,
"messageId": assistantMessageID,
})
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
return
}
assistantMessageID = prepNext.AssistantMessageID
curFinalMessage = prepNext.FinalMessage
curHistory = prepNext.History
if prepNext.UserMessageID != "" {
sendEvent("message_saved", "", map[string]interface{}{
"conversationId": conversationID,
"userMessageId": prepNext.UserMessageID,
})
}
sendEvent("user_interrupt_continue", reason, map[string]interface{}{
"conversationId": conversationID,
"reason": reason,
"messageId": assistantMessageID,
})
sendEvent("progress", "已接收中断说明,继续迭代...", map[string]interface{}{
"conversationId": conversationID,
})
continue
}
if errors.Is(cause, ErrTaskCancelled) {
taskStatus = "cancelled"
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
+36 -2
View File
@@ -1,6 +1,9 @@
package handler
import (
"encoding/json"
"errors"
"io"
"net/http"
"strconv"
"strings"
@@ -245,6 +248,37 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "执行记录未找到"})
}
// CancelExecution 手动取消进行中的 MCP 工具调用(仅取消该次 tools/call 的上下文,不停止整条 Agent / 迭代任务)
// 请求体可选 JSON{ "note": "用户说明" },将与工具已返回输出合并交给模型(含「用户终止说明」标题块,与命令行原文区分)。
func (h *MonitorHandler) CancelExecution(c *gin.Context) {
id := c.Param("id")
if id == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "执行记录ID不能为空"})
return
}
note := ""
dec := json.NewDecoder(c.Request.Body)
var body struct {
Note string `json:"note"`
}
if err := dec.Decode(&body); err != nil && !errors.Is(err, io.EOF) {
c.JSON(http.StatusBadRequest, gin.H{"error": "请求体须为 JSON,例如 {\"note\":\"说明\"},可为空对象"})
return
}
note = strings.TrimSpace(body.Note)
if h.mcpServer.CancelToolExecutionWithNote(id, note) {
h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "internal"), zap.Bool("hasNote", note != ""))
c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id})
return
}
if h.externalMCPMgr != nil && h.externalMCPMgr.CancelToolExecutionWithNote(id, note) {
h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "external"), zap.Bool("hasNote", note != ""))
c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id})
return
}
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行,或该任务已结束"})
}
// BatchGetToolNames 批量获取工具执行的工具名称(消除前端 N+1 请求)
func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) {
var req struct {
@@ -317,7 +351,7 @@ func (h *MonitorHandler) DeleteExecution(c *gin.Context) {
totalCalls := 1
successCalls := 0
failedCalls := 0
if exec.Status == "failed" {
if exec.Status == "failed" || exec.Status == "cancelled" {
failedCalls = 1
} else if exec.Status == "completed" {
successCalls = 1
@@ -381,7 +415,7 @@ func (h *MonitorHandler) DeleteExecutions(c *gin.Context) {
stats := toolStats[exec.ToolName]
stats.totalCalls++
if exec.Status == "failed" {
if exec.Status == "failed" || exec.Status == "cancelled" {
stats.failedCalls++
} else if exec.Status == "completed" {
stats.successCalls++
+112 -44
View File
@@ -60,8 +60,11 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
sendEvent := func(eventType, message string, data interface{}) {
// 用户主动停止时,Eino 可能仍会并发上报 eventType=="error"。
// 为避免 UI 看到“取消错误 + cancelled 文案”两条回复,这里直接丢弃取消对应的 error。
if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) {
return
if eventType == "error" && baseCtx != nil {
cause := context.Cause(baseCtx)
if errors.Is(cause, ErrTaskCancelled) || errors.Is(cause, ErrUserInterruptContinue) {
return
}
}
ev := StreamEvent{Type: eventType, Message: message, Data: data}
b, errMarshal := json.Marshal(ev)
@@ -130,33 +133,12 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
})
}
baseCtx, cancelWithCause := context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
defer timeoutCancel()
defer cancelWithCause(nil)
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。"
sendEvent("error", errorMsg, map[string]interface{}{
"conversationId": conversationID,
"errorType": "task_already_running",
})
} else {
errorMsg = "❌ 无法启动任务: " + err.Error()
sendEvent("error", errorMsg, nil)
}
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
return
}
var cancelWithCause context.CancelCauseFunc
firstRun := true
curFinalMessage := prep.FinalMessage
curHistory := prep.History
roleTools := prep.RoleTools
orch := strings.TrimSpace(req.Orchestration)
taskStatus := "completed"
defer h.tasks.FinishTask(conversationID, taskStatus)
@@ -169,24 +151,110 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
go sseKeepalive(c, stopKeepalive, &sseWriteMu)
defer close(stopKeepalive)
result, runErr := multiagent.RunDeepAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
prep.FinalMessage,
prep.History,
prep.RoleTools,
progressCallback,
h.agentsMarkdownDir,
strings.TrimSpace(req.Orchestration),
)
var result *multiagent.RunResult
var runErr error
for {
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute)
if firstRun {
if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil {
var errorMsg string
if errors.Is(err, ErrTaskAlreadyRunning) {
errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。"
sendEvent("error", errorMsg, map[string]interface{}{
"conversationId": conversationID,
"errorType": "task_already_running",
})
} else {
errorMsg = "❌ 无法启动任务: " + err.Error()
sendEvent("error", errorMsg, nil)
}
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
}
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
timeoutCancel()
return
}
firstRun = false
} else {
if err := h.tasks.ResetTaskCancelForContinue(conversationID, cancelWithCause); err != nil {
h.logger.Error("续跑任务时重置 cancel 失败", zap.Error(err))
taskStatus = "failed"
sendEvent("error", err.Error(), nil)
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
timeoutCancel()
return
}
}
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments)
})
result, runErr = multiagent.RunDeepAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
curFinalMessage,
curHistory,
roleTools,
progressCallback,
h.agentsMarkdownDir,
orch,
)
timeoutCancel()
if runErr == nil {
break
}
if runErr != nil {
h.persistEinoAgentTraceForResume(conversationID, result)
cause := context.Cause(baseCtx)
if errors.Is(cause, ErrUserInterruptContinue) {
reason := h.tasks.TakeInterruptContinueReason(conversationID)
prepNext, perr := h.prepareSessionAfterUserInterrupt(conversationID, assistantMessageID, reason, roleTools)
if perr != nil {
h.logger.Error("准备中断后续跑失败", zap.Error(perr))
taskStatus = "failed"
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
errMsg := "中断后续跑失败: " + perr.Error()
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
}
sendEvent("error", errMsg, map[string]interface{}{
"conversationId": conversationID,
"messageId": assistantMessageID,
})
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
return
}
assistantMessageID = prepNext.AssistantMessageID
curFinalMessage = prepNext.FinalMessage
curHistory = prepNext.History
if prepNext.UserMessageID != "" {
sendEvent("message_saved", "", map[string]interface{}{
"conversationId": conversationID,
"userMessageId": prepNext.UserMessageID,
})
}
sendEvent("user_interrupt_continue", reason, map[string]interface{}{
"conversationId": conversationID,
"reason": reason,
"messageId": assistantMessageID,
})
sendEvent("progress", "已接收中断说明,继续迭代...", map[string]interface{}{
"conversationId": conversationID,
})
continue
}
if errors.Is(cause, ErrTaskCancelled) {
taskStatus = "cancelled"
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
+62
View File
@@ -3,6 +3,7 @@ package handler
import (
"fmt"
"strings"
"time"
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/database"
@@ -142,3 +143,64 @@ func (h *AgentHandler) prepareMultiAgentSession(req *ChatRequest) (*multiAgentPr
UserMessageID: userMessageID,
}, nil
}
// prepareSessionAfterUserInterrupt 用户「中断并说明」后:结束当前助手占位、写入用户说明、新建助手占位,并生成下一轮 Run 所需的 History + FinalMessage。
func (h *AgentHandler) prepareSessionAfterUserInterrupt(conversationID, prevAssistantMessageID, reason string, roleTools []string) (*multiAgentPrepared, error) {
if strings.TrimSpace(conversationID) == "" {
return nil, fmt.Errorf("conversationId 为空")
}
if _, err := h.db.GetConversation(conversationID); err != nil {
return nil, fmt.Errorf("对话不存在")
}
note := "(已根据用户说明中断当前步骤,正在继续迭代。)"
if prevAssistantMessageID != "" {
if _, err := h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", note, time.Now(), prevAssistantMessageID); err != nil {
return nil, fmt.Errorf("更新助手消息失败: %w", err)
}
r := strings.TrimSpace(reason)
detail := "用户中断并说明"
if r != "" {
detail += "" + r
}
_ = h.db.AddProcessDetail(prevAssistantMessageID, conversationID, "user_interrupt", detail, map[string]interface{}{
"reason": r,
})
}
userContent := fmt.Sprintf("【用户中断说明】%s\n\n请根据以上说明调整并继续任务。", strings.TrimSpace(reason))
if strings.TrimSpace(reason) == "" {
userContent = "【用户中断说明】(未填写具体原因)\n\n请根据情况调整并继续任务。"
}
userMsgRow, err := h.db.AddMessage(conversationID, "user", userContent, nil)
if err != nil {
return nil, fmt.Errorf("保存用户消息失败: %w", err)
}
assistantMsg, err := h.db.AddMessage(conversationID, "assistant", "处理中...", nil)
if err != nil || assistantMsg == nil {
return nil, fmt.Errorf("创建助手占位失败: %w", err)
}
msgs, err := h.db.GetMessages(conversationID)
if err != nil || len(msgs) < 2 {
return nil, fmt.Errorf("读取消息历史失败或消息不足")
}
histMsgs := msgs[:len(msgs)-2]
agentHistory := make([]agent.ChatMessage, 0, len(histMsgs))
for _, msg := range histMsgs {
agentHistory = append(agentHistory, agent.ChatMessage{
Role: msg.Role,
Content: msg.Content,
})
}
userMessageID := ""
if userMsgRow != nil {
userMessageID = userMsgRow.ID
}
return &multiAgentPrepared{
ConversationID: conversationID,
CreatedNew: false,
History: agentHistory,
FinalMessage: userContent,
RoleTools: roleTools,
AssistantMessageID: assistantMsg.ID,
UserMessageID: userMessageID,
}, nil
}
+57
View File
@@ -461,6 +461,14 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"type": "string",
"description": "对话ID",
},
"reason": map[string]interface{}{
"type": "string",
"description": "中断说明;与 continueAfter 同时为真时必填,将写入对话并由同一会话流式迭代继续",
},
"continueAfter": map[string]interface{}{
"type": "boolean",
"description": "为 true 时取消当前运行步骤并注入 reason 后继续迭代(非彻底停止)",
},
},
},
"AgentTask": map[string]interface{}{
@@ -3318,6 +3326,55 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
},
},
},
"/api/monitor/execution/{id}/cancel": map[string]interface{}{
"post": map[string]interface{}{
"tags": []string{"监控"},
"summary": "取消进行中的工具执行",
"description": "对当前进程内正在执行的 MCP 工具调用发送 context 取消信号;上层对话/多步任务可继续。若执行已结束或未在本进程内运行则返回 404。",
"operationId": "cancelExecution",
"parameters": []map[string]interface{}{
{
"name": "id",
"in": "path",
"required": true,
"description": "执行ID",
"schema": map[string]interface{}{
"type": "string",
},
},
},
"requestBody": map[string]interface{}{
"required": false,
"content": map[string]interface{}{
"application/json": map[string]interface{}{
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"note": map[string]interface{}{
"type": "string",
"description": "可选。非空时与工具已返回输出合并交给大模型,并带有「用户终止说明」标题块以便与命令行原文区分",
},
},
},
},
},
},
"responses": map[string]interface{}{
"200": map[string]interface{}{
"description": "已发送终止信号",
},
"400": map[string]interface{}{
"description": "请求体不是合法 JSON",
},
"404": map[string]interface{}{
"description": "未找到进行中的工具执行",
},
"401": map[string]interface{}{
"description": "未授权",
},
},
},
},
"/api/monitor/executions": map[string]interface{}{
"delete": map[string]interface{}{
"tags": []string{"监控"},
+50
View File
@@ -3,6 +3,7 @@ package handler
import (
"context"
"errors"
"strings"
"sync"
"time"
)
@@ -10,6 +11,9 @@ import (
// ErrTaskCancelled 用户取消任务的错误
var ErrTaskCancelled = errors.New("agent task cancelled by user")
// ErrUserInterruptContinue 用户在进度条上「中断并说明」:取消当前运行步骤,将说明写入对话并继续迭代(与 ErrTaskCancelled 区分)
var ErrUserInterruptContinue = errors.New("user interrupt with continue")
// ErrTaskAlreadyRunning 会话已有任务正在执行
var ErrTaskAlreadyRunning = errors.New("agent task already running for conversation")
@@ -21,6 +25,9 @@ type AgentTask struct {
Status string `json:"status"`
CancellingAt time.Time `json:"-"` // 进入 cancelling 状态的时间,用于清理长时间卡住的任务
// InterruptContinueReason 由 /api/agent-loop/cancel 在 continueAfter 时写入,Run 返回后由 handler 取出并清空
InterruptContinueReason string `json:"-"`
cancel func(error)
}
@@ -140,6 +147,49 @@ func (m *AgentTaskManager) StartTask(conversationID, message string, cancel cont
return task, nil
}
// SetInterruptContinueReason 在发起 ErrUserInterruptContinue 取消前写入用户说明(须任务仍存在)。
func (m *AgentTaskManager) SetInterruptContinueReason(conversationID, reason string) bool {
m.mu.Lock()
defer m.mu.Unlock()
task, ok := m.tasks[conversationID]
if !ok {
return false
}
task.InterruptContinueReason = strings.TrimSpace(reason)
return true
}
// TakeInterruptContinueReason 取出并清空用户中断说明。
func (m *AgentTaskManager) TakeInterruptContinueReason(conversationID string) string {
m.mu.Lock()
defer m.mu.Unlock()
task, ok := m.tasks[conversationID]
if !ok {
return ""
}
r := task.InterruptContinueReason
task.InterruptContinueReason = ""
return r
}
// ResetTaskCancelForContinue 在一次「中断并继续」后恢复任务为 running 并绑定新的 cancel(同一会话同一条 HTTP 流内续跑)。
func (m *AgentTaskManager) ResetTaskCancelForContinue(conversationID string, cancel context.CancelCauseFunc) error {
m.mu.Lock()
defer m.mu.Unlock()
task, ok := m.tasks[conversationID]
if !ok {
return errors.New("no active task")
}
task.cancel = func(err error) {
if cancel != nil {
cancel(err)
}
}
task.Status = "running"
task.CancellingAt = time.Time{}
return nil
}
// CancelTask 取消指定会话的任务。若任务已在取消中,仍返回 (true, nil) 以便接口幂等、前端不报错。
func (m *AgentTaskManager) CancelTask(conversationID string, cause error) (bool, error) {
m.mu.Lock()