mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-06-20 04:50:10 +02:00
Add files via upload
This commit is contained in:
@@ -77,11 +77,12 @@ type BatchTaskQueue struct {
|
||||
|
||||
// BatchTaskManager 批量任务管理器
|
||||
type BatchTaskManager struct {
|
||||
db *database.DB
|
||||
logger *zap.Logger
|
||||
queues map[string]*BatchTaskQueue
|
||||
taskCancels map[string]context.CancelFunc // 存储每个队列当前任务的取消函数
|
||||
mu sync.RWMutex
|
||||
db *database.DB
|
||||
logger *zap.Logger
|
||||
queues map[string]*BatchTaskQueue
|
||||
taskCancels map[string]context.CancelFunc // 存储每个队列当前任务的取消函数
|
||||
singleRunTasks map[string]string // queueID -> taskID,单条执行完成后暂停队列
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewBatchTaskManager 创建批量任务管理器
|
||||
@@ -90,9 +91,10 @@ func NewBatchTaskManager(logger *zap.Logger) *BatchTaskManager {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
return &BatchTaskManager{
|
||||
logger: logger,
|
||||
queues: make(map[string]*BatchTaskQueue),
|
||||
taskCancels: make(map[string]context.CancelFunc),
|
||||
logger: logger,
|
||||
queues: make(map[string]*BatchTaskQueue),
|
||||
taskCancels: make(map[string]context.CancelFunc),
|
||||
singleRunTasks: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -864,6 +866,138 @@ func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask,
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// PrepareSingleTaskRun 准备单条执行:重置目标任务(若已有结果)并定位队列索引
|
||||
func (m *BatchTaskManager) PrepareSingleTaskRun(queueID, taskID string) error {
|
||||
var cancelFunc context.CancelFunc
|
||||
var siblingRunningIDs []string
|
||||
|
||||
m.mu.Lock()
|
||||
queue, exists := m.queues[queueID]
|
||||
if !exists {
|
||||
m.mu.Unlock()
|
||||
return fmt.Errorf("队列不存在")
|
||||
}
|
||||
|
||||
var task *BatchTask
|
||||
taskIndex := -1
|
||||
for i, t := range queue.Tasks {
|
||||
if t.ID == taskID {
|
||||
taskIndex = i
|
||||
task = t
|
||||
break
|
||||
}
|
||||
}
|
||||
if task == nil {
|
||||
m.mu.Unlock()
|
||||
return fmt.Errorf("任务不存在")
|
||||
}
|
||||
|
||||
if !queueAllowsSingleTaskRunLocked(queue, task) {
|
||||
m.mu.Unlock()
|
||||
return fmt.Errorf("队列正在执行或未就绪,无法单条执行")
|
||||
}
|
||||
|
||||
// 暂停态:中止在途子任务并收口仍标记 running 的其它子任务,以便单条执行非冲突项
|
||||
if queue.Status == BatchQueueStatusPaused {
|
||||
if c, ok := m.taskCancels[queueID]; ok {
|
||||
cancelFunc = c
|
||||
delete(m.taskCancels, queueID)
|
||||
}
|
||||
for _, t := range queue.Tasks {
|
||||
if t != nil && t.ID != taskID && t.Status == BatchTaskStatusRunning {
|
||||
siblingRunningIDs = append(siblingRunningIDs, t.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
needsReset := task.Status != BatchTaskStatusPending
|
||||
resumeQueue := queue.Status == BatchQueueStatusCompleted || queue.Status == BatchQueueStatusCancelled
|
||||
m.mu.Unlock()
|
||||
|
||||
if cancelFunc != nil {
|
||||
cancelFunc()
|
||||
}
|
||||
const staleRunMsg = "为单条执行其它任务,已中止"
|
||||
for _, sid := range siblingRunningIDs {
|
||||
m.UpdateTaskStatus(queueID, sid, BatchTaskStatusCancelled, "", staleRunMsg)
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
queue, exists = m.queues[queueID]
|
||||
if !exists {
|
||||
return fmt.Errorf("队列不存在")
|
||||
}
|
||||
|
||||
task = nil
|
||||
taskIndex = -1
|
||||
for i, t := range queue.Tasks {
|
||||
if t.ID == taskID {
|
||||
taskIndex = i
|
||||
task = t
|
||||
break
|
||||
}
|
||||
}
|
||||
if task == nil {
|
||||
return fmt.Errorf("任务不存在")
|
||||
}
|
||||
|
||||
if m.db != nil {
|
||||
if err := m.db.PrepareBatchSingleTaskRun(queueID, taskID, taskIndex, needsReset, resumeQueue); err != nil {
|
||||
return fmt.Errorf("准备单条执行失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if needsReset {
|
||||
task.Status = BatchTaskStatusPending
|
||||
task.ConversationID = ""
|
||||
task.StartedAt = nil
|
||||
task.CompletedAt = nil
|
||||
task.Error = ""
|
||||
task.Result = ""
|
||||
}
|
||||
queue.CurrentIndex = taskIndex
|
||||
queue.LastRunError = ""
|
||||
if resumeQueue {
|
||||
queue.Status = BatchQueueStatusPaused
|
||||
queue.CompletedAt = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetSingleRunTask 标记队列仅执行指定子任务,完成后自动暂停
|
||||
func (m *BatchTaskManager) SetSingleRunTask(queueID, taskID string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.singleRunTasks == nil {
|
||||
m.singleRunTasks = make(map[string]string)
|
||||
}
|
||||
m.singleRunTasks[queueID] = taskID
|
||||
}
|
||||
|
||||
// ClearSingleRunTask 清除单条执行标记
|
||||
func (m *BatchTaskManager) ClearSingleRunTask(queueID string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.singleRunTasks, queueID)
|
||||
}
|
||||
|
||||
// TakeSingleRunTaskIfMatch 若刚完成的子任务为单条执行目标,则清除标记并返回 true
|
||||
func (m *BatchTaskManager) TakeSingleRunTaskIfMatch(queueID, taskID string) bool {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.singleRunTasks == nil {
|
||||
return false
|
||||
}
|
||||
if m.singleRunTasks[queueID] != taskID {
|
||||
return false
|
||||
}
|
||||
delete(m.singleRunTasks, queueID)
|
||||
return true
|
||||
}
|
||||
|
||||
// DeleteTask 删除任务(队列空闲时可删;执行中任务不可删)
|
||||
func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error {
|
||||
m.mu.Lock()
|
||||
@@ -936,6 +1070,25 @@ func queueAllowsTaskListMutationLocked(queue *BatchTaskQueue) bool {
|
||||
}
|
||||
}
|
||||
|
||||
// queueAllowsSingleTaskRunLocked 是否允许对指定子任务发起单条执行(必须在持有 BatchTaskManager.mu 下调用)
|
||||
func queueAllowsSingleTaskRunLocked(queue *BatchTaskQueue, task *BatchTask) bool {
|
||||
if queue == nil || task == nil {
|
||||
return false
|
||||
}
|
||||
if task.Status == BatchTaskStatusRunning {
|
||||
return false
|
||||
}
|
||||
if queue.Status == BatchQueueStatusRunning {
|
||||
return false
|
||||
}
|
||||
switch queue.Status {
|
||||
case BatchQueueStatusPending, BatchQueueStatusPaused, BatchQueueStatusCompleted, BatchQueueStatusCancelled:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// GetNextTask 获取下一个待执行的任务
|
||||
func (m *BatchTaskManager) GetNextTask(queueID string) (*BatchTask, bool) {
|
||||
m.mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user