package handler import ( "errors" "testing" "go.uber.org/zap" ) func TestNormalizeBatchQueueConcurrency(t *testing.T) { if got := normalizeBatchQueueConcurrency(0); got != DefaultBatchQueueConcurrency { t.Fatalf("expected default %d, got %d", DefaultBatchQueueConcurrency, got) } if got := normalizeBatchQueueConcurrency(99); got != MaxBatchQueueConcurrency { t.Fatalf("expected max %d, got %d", MaxBatchQueueConcurrency, got) } } func TestClaimNextPendingTaskParallel(t *testing.T) { m := NewBatchTaskManager(zap.NewNop()) queue, err := m.CreateBatchQueue("test", "", "eino_single", "manual", "", "", nil, 3, []string{"a", "b", "c"}) if err != nil { t.Fatalf("CreateBatchQueue: %v", err) } m.UpdateQueueStatus(queue.ID, BatchQueueStatusRunning) t1, ok1 := m.ClaimNextPendingTask(queue.ID) t2, ok2 := m.ClaimNextPendingTask(queue.ID) if !ok1 || !ok2 || t1.ID == t2.ID { t.Fatalf("expected two distinct claims, got ok1=%v ok2=%v t1=%v t2=%v", ok1, ok2, t1, t2) } if t1.Status != BatchTaskStatusRunning || t2.Status != BatchTaskStatusRunning { t.Fatalf("claimed tasks should be running") } t3, ok3 := m.ClaimNextPendingTask(queue.ID) if !ok3 { t.Fatal("expected third claim") } _, ok4 := m.ClaimNextPendingTask(queue.ID) if ok4 { t.Fatal("expected no fourth pending task") } _ = t3 } func TestBatchQueueExecutionShouldStop(t *testing.T) { t.Parallel() if !batchQueueExecutionShouldStop(nil, false) { t.Fatal("expected stop when queue missing") } if !batchQueueExecutionShouldStop(nil, true) { t.Fatal("expected stop when queue is nil but exists=true") } q := &BatchTaskQueue{Status: BatchQueueStatusRunning} if batchQueueExecutionShouldStop(q, true) { t.Fatal("expected continue when running") } q.Status = BatchQueueStatusCancelled if !batchQueueExecutionShouldStop(q, true) { t.Fatal("expected stop when cancelled") } } func TestDeleteQueueBlockedWhileExecutorActive(t *testing.T) { t.Parallel() m := NewBatchTaskManager(zap.NewNop()) queue, err := m.CreateBatchQueue("test", "", "eino_single", "manual", "", "", nil, 1, []string{"hello"}) if err != nil { t.Fatalf("CreateBatchQueue: %v", err) } if !m.TryMarkQueueExecutor(queue.ID) { t.Fatal("expected to mark executor") } m.UpdateQueueStatus(queue.ID, BatchQueueStatusCancelled) err = m.DeleteQueue(queue.ID) if !errors.Is(err, ErrBatchQueueExecutorActive) { t.Fatalf("expected ErrBatchQueueExecutorActive, got %v", err) } if _, ok := m.GetBatchQueue(queue.ID); !ok { t.Fatal("queue should still exist while executor active") } m.UnmarkQueueExecutor(queue.ID) if err := m.DeleteQueue(queue.ID); err != nil { t.Fatalf("expected delete after executor unmarked, got %v", err) } if _, ok := m.GetBatchQueue(queue.ID); ok { t.Fatal("queue should be deleted") } } func TestDeleteQueueBlockedWhileRunning(t *testing.T) { t.Parallel() m := NewBatchTaskManager(zap.NewNop()) queue, err := m.CreateBatchQueue("test", "", "eino_single", "manual", "", "", nil, 1, []string{"hello"}) if err != nil { t.Fatalf("CreateBatchQueue: %v", err) } m.UpdateQueueStatus(queue.ID, BatchQueueStatusRunning) err = m.DeleteQueue(queue.ID) if !errors.Is(err, ErrBatchQueueStillRunning) { t.Fatalf("expected ErrBatchQueueStillRunning, got %v", err) } } func TestTryMarkQueueExecutorDedupes(t *testing.T) { t.Parallel() m := NewBatchTaskManager(zap.NewNop()) if !m.TryMarkQueueExecutor("q-1") { t.Fatal("first mark should succeed") } if m.TryMarkQueueExecutor("q-1") { t.Fatal("second mark should fail") } m.UnmarkQueueExecutor("q-1") if !m.TryMarkQueueExecutor("q-1") { t.Fatal("mark after unmark should succeed") } }