diff --git a/internal/app/app.go b/internal/app/app.go index 7b0bd78f..0f4f154a 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -657,6 +657,7 @@ func setupRoutes( protected.POST("/eino-agent/stream", agentHandler.EinoSingleAgentLoopStream) protected.GET("/hitl/pending", agentHandler.ListHITLPending) protected.POST("/hitl/decision", agentHandler.DecideHITLInterrupt) + protected.POST("/hitl/dismiss", agentHandler.DismissHITLInterrupt) protected.GET("/hitl/config/:conversationId", agentHandler.GetHITLConversationConfig) protected.PUT("/hitl/config", agentHandler.UpsertHITLConversationConfig) protected.POST("/hitl/tool-whitelist", agentHandler.MergeHITLGlobalToolWhitelist) diff --git a/internal/handler/hitl.go b/internal/handler/hitl.go index 4231f319..f14681e6 100644 --- a/internal/handler/hitl.go +++ b/internal/handler/hitl.go @@ -88,7 +88,20 @@ CREATE TABLE IF NOT EXISTS hitl_conversation_configs ( timeout_seconds INTEGER NOT NULL DEFAULT 300, updated_at DATETIME NOT NULL );`) - return err + if err != nil { + return err + } + + // On startup, cancel all orphaned pending interrupts from previous process. + // Their in-memory channels are gone, so they can never be resolved. + res, err := m.db.Exec(`UPDATE hitl_interrupts SET status='cancelled', decision='reject', + decision_comment='process restarted', decided_at=CURRENT_TIMESTAMP WHERE status='pending'`) + if err != nil { + m.logger.Warn("failed to cancel orphaned HITL interrupts", zap.Error(err)) + } else if n, _ := res.RowsAffected(); n > 0 { + m.logger.Info("cancelled orphaned HITL interrupts from previous process", zap.Int64("count", n)) + } + return nil } func normalizeHitlMode(mode string) string { @@ -587,6 +600,43 @@ func (h *AgentHandler) DecideHITLInterrupt(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"ok": true}) } +func (h *AgentHandler) DismissHITLInterrupt(c *gin.Context) { + var req struct { + InterruptID string `json:"interruptId" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + if h.hitlManager == nil { + c.JSON(500, gin.H{"error": "hitl manager unavailable"}) + return + } + res, err := h.db.Exec(`UPDATE hitl_interrupts SET status='cancelled', decision='reject', + decision_comment='dismissed by user', decided_at=CURRENT_TIMESTAMP + WHERE id=? AND status='pending'`, req.InterruptID) + if err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + n, _ := res.RowsAffected() + if n == 0 { + c.JSON(404, gin.H{"error": "interrupt not found or already resolved"}) + return + } + // Also drain from in-memory map if present + h.hitlManager.mu.Lock() + if p, ok := h.hitlManager.pending[req.InterruptID]; ok { + delete(h.hitlManager.pending, req.InterruptID) + select { + case p.decideCh <- hitlDecision{Decision: "reject", Comment: "dismissed by user"}: + default: + } + } + h.hitlManager.mu.Unlock() + c.JSON(http.StatusOK, gin.H{"ok": true}) +} + func (h *AgentHandler) interceptHITLForEinoTool(runCtx context.Context, cancelRun context.CancelCauseFunc, conversationID, assistantMessageID string, sendEventFunc func(eventType, message string, data interface{}), toolName, arguments string) (string, error) { payload := map[string]interface{}{ "toolName": toolName,