Files
CyberStrikeAI/internal/handler/task_manager.go
2026-03-10 00:23:19 +08:00

277 lines
7.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handler
import (
"context"
"errors"
"sync"
"time"
)
// ErrTaskCancelled 用户取消任务的错误
var ErrTaskCancelled = errors.New("agent task cancelled by user")
// ErrTaskAlreadyRunning 会话已有任务正在执行
var ErrTaskAlreadyRunning = errors.New("agent task already running for conversation")
// AgentTask 描述正在运行的Agent任务
type AgentTask struct {
ConversationID string `json:"conversationId"`
Message string `json:"message,omitempty"`
StartedAt time.Time `json:"startedAt"`
Status string `json:"status"`
CancellingAt time.Time `json:"-"` // 进入 cancelling 状态的时间,用于清理长时间卡住的任务
cancel func(error)
}
// CompletedTask 已完成的任务(用于历史记录)
type CompletedTask struct {
ConversationID string `json:"conversationId"`
Message string `json:"message,omitempty"`
StartedAt time.Time `json:"startedAt"`
CompletedAt time.Time `json:"completedAt"`
Status string `json:"status"`
}
// AgentTaskManager 管理正在运行的Agent任务
type AgentTaskManager struct {
mu sync.RWMutex
tasks map[string]*AgentTask
completedTasks []*CompletedTask // 最近完成的任务历史
maxHistorySize int // 最大历史记录数
historyRetention time.Duration // 历史记录保留时间
}
const (
// cancellingStuckThreshold 处于「取消中」超过此时长则强制从运行列表移除。正常取消会在当前步骤内返回,
// 超过则视为卡住,尽快释放会话。常见做法多为 3060s 内释放。
cancellingStuckThreshold = 45 * time.Second
// cancellingStuckThresholdLegacy 未记录 CancellingAt 时用 StartedAt 判断的兜底时长
cancellingStuckThresholdLegacy = 2 * time.Minute
cleanupInterval = 15 * time.Second // 与上面阈值配合,最长约 60s 内移除
)
// NewAgentTaskManager 创建任务管理器
func NewAgentTaskManager() *AgentTaskManager {
m := &AgentTaskManager{
tasks: make(map[string]*AgentTask),
completedTasks: make([]*CompletedTask, 0),
maxHistorySize: 50, // 最多保留50条历史记录
historyRetention: 24 * time.Hour, // 保留24小时
}
go m.runStuckCancellingCleanup()
return m
}
// runStuckCancellingCleanup 定期将长时间处于「取消中」的任务强制结束,避免卡住无法发新消息
func (m *AgentTaskManager) runStuckCancellingCleanup() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for range ticker.C {
m.cleanupStuckCancelling()
}
}
func (m *AgentTaskManager) cleanupStuckCancelling() {
m.mu.Lock()
var toFinish []string
now := time.Now()
for id, task := range m.tasks {
if task.Status != "cancelling" {
continue
}
var elapsed time.Duration
if !task.CancellingAt.IsZero() {
elapsed = now.Sub(task.CancellingAt)
if elapsed < cancellingStuckThreshold {
continue
}
} else {
elapsed = now.Sub(task.StartedAt)
if elapsed < cancellingStuckThresholdLegacy {
continue
}
}
toFinish = append(toFinish, id)
}
m.mu.Unlock()
for _, id := range toFinish {
m.FinishTask(id, "cancelled")
}
}
// StartTask 注册并开始一个新的任务
func (m *AgentTaskManager) StartTask(conversationID, message string, cancel context.CancelCauseFunc) (*AgentTask, error) {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.tasks[conversationID]; exists {
return nil, ErrTaskAlreadyRunning
}
task := &AgentTask{
ConversationID: conversationID,
Message: message,
StartedAt: time.Now(),
Status: "running",
cancel: func(err error) {
if cancel != nil {
cancel(err)
}
},
}
m.tasks[conversationID] = task
return task, nil
}
// CancelTask 取消指定会话的任务。若任务已在取消中,仍返回 (true, nil) 以便接口幂等、前端不报错。
func (m *AgentTaskManager) CancelTask(conversationID string, cause error) (bool, error) {
m.mu.Lock()
task, exists := m.tasks[conversationID]
if !exists {
m.mu.Unlock()
return false, nil
}
// 如果已经处于取消流程,视为成功(幂等),避免前端重复点击报「未找到任务」
if task.Status == "cancelling" {
m.mu.Unlock()
return true, nil
}
task.Status = "cancelling"
task.CancellingAt = time.Now()
cancel := task.cancel
m.mu.Unlock()
if cause == nil {
cause = ErrTaskCancelled
}
if cancel != nil {
cancel(cause)
}
return true, nil
}
// UpdateTaskStatus 更新任务状态但不删除任务(用于在发送事件前更新状态)
func (m *AgentTaskManager) UpdateTaskStatus(conversationID string, status string) {
m.mu.Lock()
defer m.mu.Unlock()
task, exists := m.tasks[conversationID]
if !exists {
return
}
if status != "" {
task.Status = status
}
}
// FinishTask 完成任务并从管理器中移除
func (m *AgentTaskManager) FinishTask(conversationID string, finalStatus string) {
m.mu.Lock()
defer m.mu.Unlock()
task, exists := m.tasks[conversationID]
if !exists {
return
}
if finalStatus != "" {
task.Status = finalStatus
}
// 保存到历史记录
completedTask := &CompletedTask{
ConversationID: task.ConversationID,
Message: task.Message,
StartedAt: task.StartedAt,
CompletedAt: time.Now(),
Status: finalStatus,
}
// 添加到历史记录
m.completedTasks = append(m.completedTasks, completedTask)
// 清理过期和过多的历史记录
m.cleanupHistory()
// 从运行任务中移除
delete(m.tasks, conversationID)
}
// cleanupHistory 清理过期的历史记录
func (m *AgentTaskManager) cleanupHistory() {
now := time.Now()
cutoffTime := now.Add(-m.historyRetention)
// 过滤掉过期的记录
validTasks := make([]*CompletedTask, 0, len(m.completedTasks))
for _, task := range m.completedTasks {
if task.CompletedAt.After(cutoffTime) {
validTasks = append(validTasks, task)
}
}
// 如果仍然超过最大数量,只保留最新的
if len(validTasks) > m.maxHistorySize {
// 按完成时间排序,保留最新的
// 由于是追加的最新的在最后所以直接取最后N个
start := len(validTasks) - m.maxHistorySize
validTasks = validTasks[start:]
}
m.completedTasks = validTasks
}
// GetActiveTasks 返回所有正在运行的任务
func (m *AgentTaskManager) GetActiveTasks() []*AgentTask {
m.mu.RLock()
defer m.mu.RUnlock()
result := make([]*AgentTask, 0, len(m.tasks))
for _, task := range m.tasks {
result = append(result, &AgentTask{
ConversationID: task.ConversationID,
Message: task.Message,
StartedAt: task.StartedAt,
Status: task.Status,
})
}
return result
}
// GetCompletedTasks 返回最近完成的任务历史
func (m *AgentTaskManager) GetCompletedTasks() []*CompletedTask {
m.mu.RLock()
defer m.mu.RUnlock()
// 清理过期记录(只读锁,不影响其他操作)
// 注意这里不能直接调用cleanupHistory因为需要写锁
// 所以返回时过滤过期记录
now := time.Now()
cutoffTime := now.Add(-m.historyRetention)
result := make([]*CompletedTask, 0, len(m.completedTasks))
for _, task := range m.completedTasks {
if task.CompletedAt.After(cutoffTime) {
result = append(result, task)
}
}
// 按完成时间倒序排序(最新的在前)
// 由于是追加的,最新的在最后,需要反转
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
// 限制返回数量
if len(result) > m.maxHistorySize {
result = result[:m.maxHistorySize]
}
return result
}