mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-06-03 12:58:08 +02:00
Add files via upload
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -109,11 +110,11 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
}
|
||||
}()
|
||||
|
||||
var lastRunMsgs []adk.Message
|
||||
var lastAssistant string
|
||||
var lastPlanExecuteExecutor string
|
||||
msgs := append([]adk.Message(nil), baseMsgs...)
|
||||
runAccumulatedMsgs := append([]adk.Message(nil), msgs...)
|
||||
baseAccumulatedCount := len(runAccumulatedMsgs)
|
||||
|
||||
emptyHint := strings.TrimSpace(args.EmptyResponseMessage)
|
||||
if emptyHint == "" {
|
||||
@@ -132,125 +133,150 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
pendingByID := make(map[string]toolCallPendingInfo)
|
||||
pendingQueueByAgent := make(map[string][]string)
|
||||
markPending := func(tc toolCallPendingInfo) {
|
||||
if tc.ToolCallID == "" {
|
||||
return
|
||||
}
|
||||
pendingByID[tc.ToolCallID] = tc
|
||||
pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID)
|
||||
if tc.ToolCallID == "" {
|
||||
return
|
||||
}
|
||||
popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) {
|
||||
q := pendingQueueByAgent[agentName]
|
||||
for len(q) > 0 {
|
||||
id := q[0]
|
||||
q = q[1:]
|
||||
pendingQueueByAgent[agentName] = q
|
||||
if tc, ok := pendingByID[id]; ok {
|
||||
delete(pendingByID, id)
|
||||
return tc, true
|
||||
}
|
||||
pendingByID[tc.ToolCallID] = tc
|
||||
pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID)
|
||||
}
|
||||
popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) {
|
||||
q := pendingQueueByAgent[agentName]
|
||||
for len(q) > 0 {
|
||||
id := q[0]
|
||||
q = q[1:]
|
||||
pendingQueueByAgent[agentName] = q
|
||||
if tc, ok := pendingByID[id]; ok {
|
||||
delete(pendingByID, id)
|
||||
return tc, true
|
||||
}
|
||||
return toolCallPendingInfo{}, false
|
||||
}
|
||||
removePendingByID := func(toolCallID string) {
|
||||
if toolCallID == "" {
|
||||
return
|
||||
}
|
||||
delete(pendingByID, toolCallID)
|
||||
return toolCallPendingInfo{}, false
|
||||
}
|
||||
removePendingByID := func(toolCallID string) {
|
||||
if toolCallID == "" {
|
||||
return
|
||||
}
|
||||
flushAllPendingAsFailed := func(err error) {
|
||||
if progress == nil {
|
||||
pendingByID = make(map[string]toolCallPendingInfo)
|
||||
pendingQueueByAgent = make(map[string][]string)
|
||||
return
|
||||
}
|
||||
msg := ""
|
||||
if err != nil {
|
||||
msg = err.Error()
|
||||
}
|
||||
for _, tc := range pendingByID {
|
||||
toolName := tc.ToolName
|
||||
if strings.TrimSpace(toolName) == "" {
|
||||
toolName = "unknown"
|
||||
}
|
||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{
|
||||
"toolName": toolName,
|
||||
"success": false,
|
||||
"isError": true,
|
||||
"result": msg,
|
||||
"resultPreview": msg,
|
||||
"toolCallId": tc.ToolCallID,
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": tc.EinoAgent,
|
||||
"einoRole": tc.EinoRole,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
delete(pendingByID, toolCallID)
|
||||
}
|
||||
flushAllPendingAsFailed := func(err error) {
|
||||
if progress == nil {
|
||||
pendingByID = make(map[string]toolCallPendingInfo)
|
||||
pendingQueueByAgent = make(map[string][]string)
|
||||
return
|
||||
}
|
||||
msg := ""
|
||||
if err != nil {
|
||||
msg = err.Error()
|
||||
}
|
||||
for _, tc := range pendingByID {
|
||||
toolName := tc.ToolName
|
||||
if strings.TrimSpace(toolName) == "" {
|
||||
toolName = "unknown"
|
||||
}
|
||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{
|
||||
"toolName": toolName,
|
||||
"success": false,
|
||||
"isError": true,
|
||||
"result": msg,
|
||||
"resultPreview": msg,
|
||||
"toolCallId": tc.ToolCallID,
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": tc.EinoAgent,
|
||||
"einoRole": tc.EinoRole,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
pendingByID = make(map[string]toolCallPendingInfo)
|
||||
pendingQueueByAgent = make(map[string][]string)
|
||||
}
|
||||
|
||||
runnerCfg := adk.RunnerConfig{
|
||||
Agent: da,
|
||||
EnableStreaming: true,
|
||||
}
|
||||
if cp := strings.TrimSpace(args.CheckpointDir); cp != "" {
|
||||
cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID))
|
||||
st, stErr := newFileCheckPointStore(cpDir)
|
||||
if stErr != nil {
|
||||
if logger != nil {
|
||||
logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr))
|
||||
}
|
||||
} else {
|
||||
runnerCfg.CheckPointStore = st
|
||||
if logger != nil {
|
||||
logger.Info("eino runner: checkpoint store enabled", zap.String("dir", cpDir))
|
||||
}
|
||||
runnerCfg := adk.RunnerConfig{
|
||||
Agent: da,
|
||||
EnableStreaming: true,
|
||||
}
|
||||
var cpStore *fileCheckPointStore
|
||||
var checkPointID string
|
||||
if cp := strings.TrimSpace(args.CheckpointDir); cp != "" {
|
||||
cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID))
|
||||
st, stErr := newFileCheckPointStore(cpDir)
|
||||
if stErr != nil {
|
||||
if logger != nil {
|
||||
logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr))
|
||||
}
|
||||
} else {
|
||||
cpStore = st
|
||||
checkPointID = buildEinoCheckpointID(orchMode)
|
||||
runnerCfg.CheckPointStore = st
|
||||
if logger != nil {
|
||||
logger.Info("eino runner: checkpoint store enabled",
|
||||
zap.String("dir", cpDir),
|
||||
zap.String("checkPointID", checkPointID))
|
||||
}
|
||||
}
|
||||
}
|
||||
runner := adk.NewRunner(ctx, runnerCfg)
|
||||
iter := runner.Run(ctx, msgs)
|
||||
handleRunErr := func(runErr error) error {
|
||||
if runErr == nil {
|
||||
return nil
|
||||
var iter *adk.AsyncIterator[*adk.AgentEvent]
|
||||
if cpStore != nil && checkPointID != "" {
|
||||
if _, existed, getErr := cpStore.Get(ctx, checkPointID); getErr != nil {
|
||||
if logger != nil {
|
||||
logger.Warn("eino checkpoint preflight get failed", zap.String("checkPointID", checkPointID), zap.Error(getErr))
|
||||
}
|
||||
if errors.Is(runErr, context.DeadlineExceeded) {
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"errorKind": "timeout",
|
||||
})
|
||||
} else if existed {
|
||||
if progress != nil {
|
||||
progress("progress", "检测到断点,正在从中断节点恢复执行...", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"orchestration": orchMode,
|
||||
"checkPointID": checkPointID,
|
||||
})
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Info("eino runner: resume from checkpoint", zap.String("checkPointID", checkPointID))
|
||||
}
|
||||
resumeIter, resumeErr := runner.Resume(ctx, checkPointID)
|
||||
if resumeErr == nil {
|
||||
iter = resumeIter
|
||||
} else {
|
||||
if logger != nil {
|
||||
logger.Warn("eino runner: resume failed, fallback to fresh run",
|
||||
zap.String("checkPointID", checkPointID),
|
||||
zap.Error(resumeErr))
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
// context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。
|
||||
if errors.Is(runErr, context.Canceled) {
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
if isEinoIterationLimitError(runErr) {
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("iteration_limit_reached", runErr.Error(), map[string]interface{}{
|
||||
progress("progress", "断点恢复失败,已回退为全新执行。", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"errorKind": "iteration_limit",
|
||||
"checkPointID": checkPointID,
|
||||
})
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
}
|
||||
}
|
||||
if iter == nil {
|
||||
if checkPointID != "" {
|
||||
iter = runner.Run(ctx, msgs, adk.WithCheckPointID(checkPointID))
|
||||
} else {
|
||||
iter = runner.Run(ctx, msgs)
|
||||
}
|
||||
}
|
||||
handleRunErr := func(runErr error) error {
|
||||
if runErr == nil {
|
||||
return nil
|
||||
}
|
||||
if errors.Is(runErr, context.DeadlineExceeded) {
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"errorKind": "timeout",
|
||||
})
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
// context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。
|
||||
if errors.Is(runErr, context.Canceled) {
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
@@ -260,249 +286,190 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
|
||||
for {
|
||||
// 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
flushAllPendingAsFailed(ctx.Err())
|
||||
if progress != nil {
|
||||
progress("error", "Request cancelled / 请求已取消", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
ev, ok := iter.Next()
|
||||
if !ok {
|
||||
if len(pendingByID) > 0 {
|
||||
orphanCount := len(pendingByID)
|
||||
flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion"))
|
||||
if progress != nil {
|
||||
progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"orchestration": orchMode,
|
||||
"pendingCount": orphanCount,
|
||||
})
|
||||
}
|
||||
}
|
||||
lastRunMsgs = runAccumulatedMsgs
|
||||
break
|
||||
}
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
if ev.Err != nil {
|
||||
if retErr := handleRunErr(ev.Err); retErr != nil {
|
||||
return nil, retErr
|
||||
}
|
||||
}
|
||||
if ev.AgentName != "" && progress != nil {
|
||||
iterEinoAgent := orchestratorName
|
||||
if orchMode == "plan_execute" {
|
||||
if a := strings.TrimSpace(ev.AgentName); a != "" {
|
||||
iterEinoAgent = a
|
||||
}
|
||||
}
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if einoMainRound == 0 {
|
||||
einoMainRound = 1
|
||||
progress("iteration", "", map[string]interface{}{
|
||||
"iteration": 1,
|
||||
"einoScope": "main",
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": iterEinoAgent,
|
||||
"orchestration": orchMode,
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
} else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) {
|
||||
einoMainRound++
|
||||
progress("iteration", "", map[string]interface{}{
|
||||
"iteration": einoMainRound,
|
||||
"einoScope": "main",
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": iterEinoAgent,
|
||||
"orchestration": orchMode,
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
}
|
||||
einoLastAgent = ev.AgentName
|
||||
progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{
|
||||
if isEinoIterationLimitError(runErr) {
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("iteration_limit_reached", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"source": "eino",
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"errorKind": "iteration_limit",
|
||||
})
|
||||
}
|
||||
if ev.Output == nil || ev.Output.MessageOutput == nil {
|
||||
continue
|
||||
}
|
||||
mv := ev.Output.MessageOutput
|
||||
return runErr
|
||||
}
|
||||
flushAllPendingAsFailed(runErr)
|
||||
if progress != nil {
|
||||
progress("error", runErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
return runErr
|
||||
}
|
||||
|
||||
if mv.IsStreaming && mv.MessageStream != nil {
|
||||
streamHeaderSent := false
|
||||
var reasoningStreamID string
|
||||
var toolStreamFragments []schema.ToolCall
|
||||
var subAssistantBuf strings.Builder
|
||||
var subReplyStreamID string
|
||||
var mainAssistantBuf strings.Builder
|
||||
var streamRecvErr error
|
||||
for {
|
||||
chunk, rerr := mv.MessageStream.Recv()
|
||||
if rerr != nil {
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Warn("eino stream recv error, flushing incomplete stream",
|
||||
zap.Error(rerr),
|
||||
zap.String("agent", ev.AgentName),
|
||||
zap.Int("toolFragments", len(toolStreamFragments)))
|
||||
}
|
||||
streamRecvErr = rerr
|
||||
break
|
||||
}
|
||||
if chunk == nil {
|
||||
continue
|
||||
}
|
||||
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||||
if reasoningStreamID == "" {
|
||||
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
|
||||
progress("thinking_stream_start", " ", map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
})
|
||||
}
|
||||
if chunk.Content != "" {
|
||||
if progress != nil && streamsMainAssistant(ev.AgentName) {
|
||||
if !streamHeaderSent {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
streamHeaderSent = true
|
||||
}
|
||||
progress("response_delta", chunk.Content, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
mainAssistantBuf.WriteString(chunk.Content)
|
||||
} else if !streamsMainAssistant(ev.AgentName) {
|
||||
if progress != nil {
|
||||
if subReplyStreamID == "" {
|
||||
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
|
||||
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"conversationId": conversationID,
|
||||
})
|
||||
}
|
||||
subAssistantBuf.WriteString(chunk.Content)
|
||||
}
|
||||
}
|
||||
if len(chunk.ToolCalls) > 0 {
|
||||
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
|
||||
}
|
||||
}
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" {
|
||||
lastAssistant = s
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
if subAssistantBuf.Len() > 0 && progress != nil {
|
||||
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
|
||||
if subReplyStreamID != "" {
|
||||
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
} else {
|
||||
progress("eino_agent_reply", s, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
var lastToolChunk *schema.Message
|
||||
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
|
||||
lastToolChunk = &schema.Message{ToolCalls: merged}
|
||||
}
|
||||
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
|
||||
if streamRecvErr != nil {
|
||||
if progress != nil {
|
||||
progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
})
|
||||
}
|
||||
if retErr := handleRunErr(streamRecvErr); retErr != nil {
|
||||
return nil, retErr
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
takePartial := func(runErr error) (*RunResult, error) {
|
||||
if len(runAccumulatedMsgs) <= baseAccumulatedCount {
|
||||
return nil, runErr
|
||||
}
|
||||
ids := snapshotMCPIDs()
|
||||
return buildEinoRunResultFromAccumulated(
|
||||
orchMode, runAccumulatedMsgs, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, true,
|
||||
), runErr
|
||||
}
|
||||
|
||||
msg, gerr := mv.GetMessage()
|
||||
if gerr != nil || msg == nil {
|
||||
continue
|
||||
for {
|
||||
// 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
flushAllPendingAsFailed(ctx.Err())
|
||||
if progress != nil {
|
||||
progress("error", "Request cancelled / 请求已取消", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, msg)
|
||||
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
|
||||
return takePartial(ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
if mv.Role == schema.Assistant {
|
||||
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
|
||||
progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{
|
||||
ev, ok := iter.Next()
|
||||
if !ok {
|
||||
// iter 结束并不总是“正常完成”:
|
||||
// 当取消/超时发生在 iter.Next() 阻塞期间时,可能直接返回 !ok。
|
||||
// 此时必须保留 checkpoint,避免后续恢复时被误判为“无断点”而全量重跑。
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
flushAllPendingAsFailed(ctxErr)
|
||||
if progress != nil {
|
||||
progress("error", ctxErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
body := strings.TrimSpace(msg.Content)
|
||||
if body != "" {
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if progress != nil {
|
||||
return takePartial(ctxErr)
|
||||
}
|
||||
if len(pendingByID) > 0 {
|
||||
orphanCount := len(pendingByID)
|
||||
flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion"))
|
||||
if progress != nil {
|
||||
progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"orchestration": orchMode,
|
||||
"pendingCount": orphanCount,
|
||||
})
|
||||
}
|
||||
}
|
||||
if cpStore != nil && checkPointID != "" {
|
||||
if p, pErr := cpStore.path(checkPointID); pErr == nil {
|
||||
if rmErr := os.Remove(p); rmErr != nil && !os.IsNotExist(rmErr) && logger != nil {
|
||||
logger.Warn("eino checkpoint cleanup failed", zap.String("path", p), zap.Error(rmErr))
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
if ev == nil {
|
||||
continue
|
||||
}
|
||||
if ev.Err != nil {
|
||||
if retErr := handleRunErr(ev.Err); retErr != nil {
|
||||
return takePartial(retErr)
|
||||
}
|
||||
}
|
||||
if ev.AgentName != "" && progress != nil {
|
||||
iterEinoAgent := orchestratorName
|
||||
if orchMode == "plan_execute" {
|
||||
if a := strings.TrimSpace(ev.AgentName); a != "" {
|
||||
iterEinoAgent = a
|
||||
}
|
||||
}
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if einoMainRound == 0 {
|
||||
einoMainRound = 1
|
||||
progress("iteration", "", map[string]interface{}{
|
||||
"iteration": 1,
|
||||
"einoScope": "main",
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": iterEinoAgent,
|
||||
"orchestration": orchMode,
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
} else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) {
|
||||
einoMainRound++
|
||||
progress("iteration", "", map[string]interface{}{
|
||||
"iteration": einoMainRound,
|
||||
"einoScope": "main",
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": iterEinoAgent,
|
||||
"orchestration": orchMode,
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
}
|
||||
einoLastAgent = ev.AgentName
|
||||
progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
if ev.Output == nil || ev.Output.MessageOutput == nil {
|
||||
continue
|
||||
}
|
||||
mv := ev.Output.MessageOutput
|
||||
|
||||
if mv.IsStreaming && mv.MessageStream != nil {
|
||||
streamHeaderSent := false
|
||||
var reasoningStreamID string
|
||||
var toolStreamFragments []schema.ToolCall
|
||||
var subAssistantBuf strings.Builder
|
||||
var subReplyStreamID string
|
||||
var mainAssistantBuf strings.Builder
|
||||
var streamRecvErr error
|
||||
for {
|
||||
chunk, rerr := mv.MessageStream.Recv()
|
||||
if rerr != nil {
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Warn("eino stream recv error, flushing incomplete stream",
|
||||
zap.Error(rerr),
|
||||
zap.String("agent", ev.AgentName),
|
||||
zap.Int("toolFragments", len(toolStreamFragments)))
|
||||
}
|
||||
streamRecvErr = rerr
|
||||
break
|
||||
}
|
||||
if chunk == nil {
|
||||
continue
|
||||
}
|
||||
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||||
if reasoningStreamID == "" {
|
||||
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
|
||||
progress("thinking_stream_start", " ", map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
})
|
||||
}
|
||||
if chunk.Content != "" {
|
||||
if progress != nil && streamsMainAssistant(ev.AgentName) {
|
||||
if !streamHeaderSent {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
@@ -511,20 +478,61 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("response_delta", body, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
streamHeaderSent = true
|
||||
}
|
||||
progress("response_delta", chunk.Content, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
mainAssistantBuf.WriteString(chunk.Content)
|
||||
} else if !streamsMainAssistant(ev.AgentName) {
|
||||
if progress != nil {
|
||||
if subReplyStreamID == "" {
|
||||
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
|
||||
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"conversationId": conversationID,
|
||||
})
|
||||
}
|
||||
lastAssistant = body
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||
}
|
||||
} else if progress != nil {
|
||||
progress("eino_agent_reply", body, map[string]interface{}{
|
||||
subAssistantBuf.WriteString(chunk.Content)
|
||||
}
|
||||
}
|
||||
if len(chunk.ToolCalls) > 0 {
|
||||
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
|
||||
}
|
||||
}
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" {
|
||||
lastAssistant = s
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
if subAssistantBuf.Len() > 0 && progress != nil {
|
||||
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
|
||||
if subReplyStreamID != "" {
|
||||
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
} else {
|
||||
progress("eino_agent_reply", s, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
@@ -533,65 +541,157 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if mv.Role == schema.Tool && progress != nil {
|
||||
toolName := msg.ToolName
|
||||
if toolName == "" {
|
||||
toolName = mv.ToolName
|
||||
var lastToolChunk *schema.Message
|
||||
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
|
||||
lastToolChunk = &schema.Message{ToolCalls: merged}
|
||||
}
|
||||
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
|
||||
if streamRecvErr != nil {
|
||||
if progress != nil {
|
||||
progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
})
|
||||
}
|
||||
|
||||
content := msg.Content
|
||||
isErr := false
|
||||
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
|
||||
isErr = true
|
||||
content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
|
||||
if retErr := handleRunErr(streamRecvErr); retErr != nil {
|
||||
return takePartial(retErr)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
preview := content
|
||||
if len(preview) > 200 {
|
||||
preview = preview[:200] + "..."
|
||||
}
|
||||
data := map[string]interface{}{
|
||||
"toolName": toolName,
|
||||
"success": !isErr,
|
||||
"isError": isErr,
|
||||
"result": content,
|
||||
"resultPreview": preview,
|
||||
msg, gerr := mv.GetMessage()
|
||||
if gerr != nil || msg == nil {
|
||||
continue
|
||||
}
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, msg)
|
||||
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
|
||||
|
||||
if mv.Role == schema.Assistant {
|
||||
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
|
||||
progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"source": "eino",
|
||||
}
|
||||
toolCallID := strings.TrimSpace(msg.ToolCallID)
|
||||
if toolCallID == "" {
|
||||
if inferred, ok := popNextPendingForAgent(ev.AgentName); ok {
|
||||
toolCallID = inferred.ToolCallID
|
||||
} else if inferred, ok := popNextPendingForAgent(orchestratorName); ok {
|
||||
toolCallID = inferred.ToolCallID
|
||||
} else if inferred, ok := popNextPendingForAgent(""); ok {
|
||||
toolCallID = inferred.ToolCallID
|
||||
} else {
|
||||
for id := range pendingByID {
|
||||
toolCallID = id
|
||||
delete(pendingByID, id)
|
||||
break
|
||||
}
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
body := strings.TrimSpace(msg.Content)
|
||||
if body != "" {
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if progress != nil {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("response_delta", body, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
removePendingByID(toolCallID)
|
||||
lastAssistant = body
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||
}
|
||||
} else if progress != nil {
|
||||
progress("eino_agent_reply", body, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
if toolCallID != "" {
|
||||
data["toolCallId"] = toolCallID
|
||||
}
|
||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
|
||||
}
|
||||
}
|
||||
|
||||
if mv.Role == schema.Tool && progress != nil {
|
||||
toolName := msg.ToolName
|
||||
if toolName == "" {
|
||||
toolName = mv.ToolName
|
||||
}
|
||||
|
||||
content := msg.Content
|
||||
isErr := false
|
||||
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
|
||||
isErr = true
|
||||
content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
|
||||
}
|
||||
|
||||
preview := content
|
||||
if len(preview) > 200 {
|
||||
preview = preview[:200] + "..."
|
||||
}
|
||||
data := map[string]interface{}{
|
||||
"toolName": toolName,
|
||||
"success": !isErr,
|
||||
"isError": isErr,
|
||||
"result": content,
|
||||
"resultPreview": preview,
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"source": "eino",
|
||||
}
|
||||
toolCallID := strings.TrimSpace(msg.ToolCallID)
|
||||
if toolCallID == "" {
|
||||
if inferred, ok := popNextPendingForAgent(ev.AgentName); ok {
|
||||
toolCallID = inferred.ToolCallID
|
||||
} else if inferred, ok := popNextPendingForAgent(orchestratorName); ok {
|
||||
toolCallID = inferred.ToolCallID
|
||||
} else if inferred, ok := popNextPendingForAgent(""); ok {
|
||||
toolCallID = inferred.ToolCallID
|
||||
} else {
|
||||
for id := range pendingByID {
|
||||
toolCallID = id
|
||||
delete(pendingByID, id)
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
removePendingByID(toolCallID)
|
||||
}
|
||||
if toolCallID != "" {
|
||||
data["toolCallId"] = toolCallID
|
||||
}
|
||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
|
||||
}
|
||||
}
|
||||
|
||||
mcpIDsMu.Lock()
|
||||
ids := append([]string(nil), *mcpIDs...)
|
||||
mcpIDsMu.Unlock()
|
||||
|
||||
histJSON, _ := json.Marshal(lastRunMsgs)
|
||||
out := buildEinoRunResultFromAccumulated(
|
||||
orchMode, runAccumulatedMsgs, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false,
|
||||
)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func einoPartialRunLastOutputHint() string {
|
||||
return "[执行未正常结束(用户停止、超时或异常)。续跑时请基于上文已产生的工具与结果继续,勿重复已完成步骤。]\n" +
|
||||
"[Run ended abnormally; continue from the trace above without repeating completed steps.]"
|
||||
}
|
||||
|
||||
func buildEinoRunResultFromAccumulated(
|
||||
orchMode string,
|
||||
runAccumulatedMsgs []adk.Message,
|
||||
lastAssistant string,
|
||||
lastPlanExecuteExecutor string,
|
||||
emptyHint string,
|
||||
mcpIDs []string,
|
||||
partial bool,
|
||||
) *RunResult {
|
||||
histJSON, _ := json.Marshal(runAccumulatedMsgs)
|
||||
cleaned := strings.TrimSpace(lastAssistant)
|
||||
if orchMode == "plan_execute" {
|
||||
if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" {
|
||||
@@ -607,15 +707,29 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
if rs := []rune(cleaned); len(rs) > maxResponseRunes {
|
||||
cleaned = string(rs[:maxResponseRunes]) + "\n\n... (response truncated / 响应已截断)"
|
||||
}
|
||||
lastOut := cleaned
|
||||
resp := cleaned
|
||||
if partial && cleaned == "" {
|
||||
lastOut = einoPartialRunLastOutputHint()
|
||||
resp = emptyHint
|
||||
}
|
||||
out := &RunResult{
|
||||
Response: cleaned,
|
||||
MCPExecutionIDs: ids,
|
||||
LastReActInput: string(histJSON),
|
||||
LastReActOutput: cleaned,
|
||||
Response: resp,
|
||||
MCPExecutionIDs: mcpIDs,
|
||||
LastAgentTraceInput: string(histJSON),
|
||||
LastAgentTraceOutput: lastOut,
|
||||
}
|
||||
if out.Response == "" {
|
||||
if !partial && out.Response == "" {
|
||||
out.Response = emptyHint
|
||||
out.LastReActOutput = out.Response
|
||||
out.LastAgentTraceOutput = out.Response
|
||||
}
|
||||
return out, nil
|
||||
return out
|
||||
}
|
||||
|
||||
func buildEinoCheckpointID(orchMode string) string {
|
||||
mode := sanitizeEinoPathSegment(strings.TrimSpace(orchMode))
|
||||
if mode == "" {
|
||||
mode = "default"
|
||||
}
|
||||
return "runner-" + mode
|
||||
}
|
||||
|
||||
@@ -59,4 +59,3 @@ func (m *noNestedTaskMiddleware) WrapInvokableToolCall(
|
||||
return endpoint(ctx2, argumentsInJSON, opts...)
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
|
||||
const (
|
||||
planExecuteMaxStepResultRunes = 12000
|
||||
planExecuteKeepLastSteps = 16
|
||||
planExecuteKeepLastSteps = 16
|
||||
)
|
||||
|
||||
func truncateRunesWithSuffix(s string, maxRunes int, suffix string) string {
|
||||
|
||||
@@ -30,10 +30,10 @@ import (
|
||||
|
||||
// RunResult 与单 Agent 循环结果字段对齐,便于复用存储与 SSE 收尾逻辑。
|
||||
type RunResult struct {
|
||||
Response string
|
||||
MCPExecutionIDs []string
|
||||
LastReActInput string
|
||||
LastReActOutput string
|
||||
Response string
|
||||
MCPExecutionIDs []string
|
||||
LastAgentTraceInput string // 已序列化的消息带(JSON):原生循环或 Eino 均写入,供续跑/攻击链等恢复上下文
|
||||
LastAgentTraceOutput string // 本轮助手侧对外展示文本(摘要或最终回复)
|
||||
}
|
||||
|
||||
// toolCallPendingInfo tracks a tool_call emitted to the UI so we can later
|
||||
|
||||
Reference in New Issue
Block a user