mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-06-25 23:40:09 +02:00
Add files via upload
This commit is contained in:
@@ -288,6 +288,93 @@ func (db *DB) GetToolExecution(id string) (*mcp.ToolExecution, error) {
|
||||
return &exec, nil
|
||||
}
|
||||
|
||||
// CancelOrphanedRunningToolExecutions 将仍为 running 的记录批量标记为 cancelled(如进程重启后无对应执行协程)。
|
||||
func (db *DB) CancelOrphanedRunningToolExecutions(endTime time.Time, errMsg string) (int64, error) {
|
||||
errMsg = strings.TrimSpace(errMsg)
|
||||
if errMsg == "" {
|
||||
errMsg = "执行已中断(服务重启或会话结束)"
|
||||
}
|
||||
query := `
|
||||
UPDATE tool_executions
|
||||
SET status = 'cancelled',
|
||||
error = ?,
|
||||
end_time = ?,
|
||||
duration_ms = MAX(0, CAST((julianday(?) - julianday(start_time)) * 86400000 AS INTEGER))
|
||||
WHERE status = 'running'
|
||||
`
|
||||
res, err := db.Exec(query, errMsg, endTime, endTime)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
// FinalizeStaleRunningToolExecutions 将「非活跃且超过 minAge」的 running 记录标记为 cancelled。
|
||||
// activeIDs 为当前进程内仍登记 cancel 的 executionId;不在集合内且已超时的视为孤儿记录。
|
||||
func (db *DB) FinalizeStaleRunningToolExecutions(endTime time.Time, minAge time.Duration, activeIDs map[string]struct{}, errMsg string) (int64, error) {
|
||||
errMsg = strings.TrimSpace(errMsg)
|
||||
if errMsg == "" {
|
||||
errMsg = "执行已中断(会话已结束)"
|
||||
}
|
||||
if minAge < 0 {
|
||||
minAge = 0
|
||||
}
|
||||
cutoff := endTime.Add(-minAge)
|
||||
rows, err := db.Query(`
|
||||
SELECT id, start_time FROM tool_executions
|
||||
WHERE status = 'running' AND start_time <= ?
|
||||
`, cutoff)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type staleRow struct {
|
||||
id string
|
||||
startTime time.Time
|
||||
}
|
||||
var stale []staleRow
|
||||
for rows.Next() {
|
||||
var row staleRow
|
||||
if err := rows.Scan(&row.id, &row.startTime); err != nil {
|
||||
db.logger.Warn("读取 stale running 执行记录失败", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if activeIDs != nil {
|
||||
if _, active := activeIDs[row.id]; active {
|
||||
continue
|
||||
}
|
||||
}
|
||||
stale = append(stale, row)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(stale) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var affected int64
|
||||
for _, row := range stale {
|
||||
durationMs := endTime.Sub(row.startTime).Milliseconds()
|
||||
if durationMs < 0 {
|
||||
durationMs = 0
|
||||
}
|
||||
res, err := db.Exec(`
|
||||
UPDATE tool_executions
|
||||
SET status = 'cancelled', error = ?, end_time = ?, duration_ms = ?
|
||||
WHERE id = ? AND status = 'running'
|
||||
`, errMsg, endTime, durationMs, row.id)
|
||||
if err != nil {
|
||||
db.logger.Warn("更新 stale running 执行记录失败", zap.Error(err), zap.String("executionId", row.id))
|
||||
continue
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
affected += n
|
||||
}
|
||||
return affected, nil
|
||||
}
|
||||
|
||||
// DeleteToolExecution 删除工具执行记录
|
||||
func (db *DB) DeleteToolExecution(id string) error {
|
||||
query := `DELETE FROM tool_executions WHERE id = ?`
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"cyberstrike-ai/internal/mcp"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestCancelOrphanedRunningToolExecutions(t *testing.T) {
|
||||
dbPath := filepath.Join(t.TempDir(), "monitor.db")
|
||||
db, err := NewDB(dbPath, zap.NewNop())
|
||||
if err != nil {
|
||||
t.Fatalf("NewDB: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
start := time.Now().Add(-2 * time.Hour)
|
||||
exec := &mcp.ToolExecution{
|
||||
ID: "orphan-hydra",
|
||||
ToolName: "hydra",
|
||||
Arguments: map[string]interface{}{"target": "127.0.0.1"},
|
||||
Status: "running",
|
||||
StartTime: start,
|
||||
}
|
||||
if err := db.SaveToolExecution(exec); err != nil {
|
||||
t.Fatalf("SaveToolExecution: %v", err)
|
||||
}
|
||||
|
||||
end := time.Now()
|
||||
n, err := db.CancelOrphanedRunningToolExecutions(end, "执行已中断(服务重启)")
|
||||
if err != nil {
|
||||
t.Fatalf("CancelOrphanedRunningToolExecutions: %v", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("expected 1 row updated, got %d", n)
|
||||
}
|
||||
|
||||
got, err := db.GetToolExecution("orphan-hydra")
|
||||
if err != nil {
|
||||
t.Fatalf("GetToolExecution: %v", err)
|
||||
}
|
||||
if got.Status != "cancelled" {
|
||||
t.Fatalf("expected cancelled, got %s", got.Status)
|
||||
}
|
||||
if got.EndTime == nil {
|
||||
t.Fatal("expected end_time to be set")
|
||||
}
|
||||
if got.Duration <= 0 {
|
||||
t.Fatalf("expected positive duration, got %v", got.Duration)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFinalizeStaleRunningToolExecutions_skipsActive(t *testing.T) {
|
||||
dbPath := filepath.Join(t.TempDir(), "monitor.db")
|
||||
db, err := NewDB(dbPath, zap.NewNop())
|
||||
if err != nil {
|
||||
t.Fatalf("NewDB: %v", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
now := time.Now()
|
||||
oldStart := now.Add(-5 * time.Minute)
|
||||
if err := db.SaveToolExecution(&mcp.ToolExecution{
|
||||
ID: "stale", ToolName: "hydra", Status: "running", StartTime: oldStart,
|
||||
}); err != nil {
|
||||
t.Fatalf("SaveToolExecution stale: %v", err)
|
||||
}
|
||||
if err := db.SaveToolExecution(&mcp.ToolExecution{
|
||||
ID: "active", ToolName: "hydra", Status: "running", StartTime: oldStart,
|
||||
}); err != nil {
|
||||
t.Fatalf("SaveToolExecution active: %v", err)
|
||||
}
|
||||
|
||||
active := map[string]struct{}{"active": {}}
|
||||
n, err := db.FinalizeStaleRunningToolExecutions(now, time.Minute, active, "执行已中断(会话已结束)")
|
||||
if err != nil {
|
||||
t.Fatalf("FinalizeStaleRunningToolExecutions: %v", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("expected 1 stale row updated, got %d", n)
|
||||
}
|
||||
|
||||
stale, err := db.GetToolExecution("stale")
|
||||
if err != nil {
|
||||
t.Fatalf("GetToolExecution stale: %v", err)
|
||||
}
|
||||
if stale.Status != "cancelled" {
|
||||
t.Fatalf("stale expected cancelled, got %s", stale.Status)
|
||||
}
|
||||
|
||||
activeExec, err := db.GetToolExecution("active")
|
||||
if err != nil {
|
||||
t.Fatalf("GetToolExecution active: %v", err)
|
||||
}
|
||||
if activeExec.Status != "running" {
|
||||
t.Fatalf("active expected running, got %s", activeExec.Status)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user