mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-15 04:51:01 +02:00
Add files via upload
This commit is contained in:
@@ -193,6 +193,8 @@ type ChatMessage struct {
|
||||
Content string `json:"content,omitempty"`
|
||||
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
|
||||
ToolCallID string `json:"tool_call_id,omitempty"`
|
||||
// ToolName 仅 tool 角色:从 Eino/轨迹 JSON 的 name 或 tool_name 恢复,供续跑构造 ToolMessage。
|
||||
ToolName string `json:"tool_name,omitempty"`
|
||||
}
|
||||
|
||||
// MarshalJSON 自定义JSON序列化,将tool_calls中的arguments转换为JSON字符串
|
||||
@@ -211,6 +213,9 @@ func (cm ChatMessage) MarshalJSON() ([]byte, error) {
|
||||
if cm.ToolCallID != "" {
|
||||
aux["tool_call_id"] = cm.ToolCallID
|
||||
}
|
||||
if cm.ToolName != "" {
|
||||
aux["tool_name"] = cm.ToolName
|
||||
}
|
||||
|
||||
// 转换tool_calls,将arguments转换为JSON字符串
|
||||
if len(cm.ToolCalls) > 0 {
|
||||
@@ -438,6 +443,7 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his
|
||||
Content: msg.Content,
|
||||
ToolCalls: msg.ToolCalls,
|
||||
ToolCallID: msg.ToolCallID,
|
||||
ToolName: msg.ToolName,
|
||||
})
|
||||
addedCount++
|
||||
contentPreview := msg.Content
|
||||
|
||||
@@ -40,6 +40,13 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||
return current + incoming, incoming
|
||||
}
|
||||
|
||||
func isInterruptContinue(ctx context.Context) bool {
|
||||
if ctx == nil {
|
||||
return false
|
||||
}
|
||||
return errors.Is(context.Cause(ctx), ErrInterruptContinue)
|
||||
}
|
||||
|
||||
func isEinoIterationLimitError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
@@ -409,10 +416,18 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
case <-ctx.Done():
|
||||
flushAllPendingAsFailed(ctx.Err())
|
||||
if progress != nil {
|
||||
progress("error", "Request cancelled / 请求已取消", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
if isInterruptContinue(ctx) {
|
||||
progress("progress", "已暂停当前输出,正在合并用户补充并继续…", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"kind": "interrupt_continue",
|
||||
})
|
||||
} else {
|
||||
progress("error", "Request cancelled / 请求已取消", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
}
|
||||
return takePartial(ctx.Err())
|
||||
default:
|
||||
@@ -426,10 +441,18 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
flushAllPendingAsFailed(ctxErr)
|
||||
if progress != nil {
|
||||
progress("error", ctxErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
if isInterruptContinue(ctx) {
|
||||
progress("progress", "已暂停当前输出,正在合并用户补充并继续…", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
"kind": "interrupt_continue",
|
||||
})
|
||||
} else {
|
||||
progress("error", ctxErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
}
|
||||
return takePartial(ctxErr)
|
||||
}
|
||||
@@ -517,103 +540,128 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重
|
||||
var reasoningBuf string
|
||||
var streamRecvErr error
|
||||
type streamMsg struct {
|
||||
chunk *schema.Message
|
||||
err error
|
||||
}
|
||||
recvCh := make(chan streamMsg, 8)
|
||||
go func() {
|
||||
defer close(recvCh)
|
||||
for {
|
||||
ch, rerr := mv.MessageStream.Recv()
|
||||
recvCh <- streamMsg{chunk: ch, err: rerr}
|
||||
if rerr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
streamRecvLoop:
|
||||
for {
|
||||
chunk, rerr := mv.MessageStream.Recv()
|
||||
if rerr != nil {
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
streamRecvErr = ctx.Err()
|
||||
break streamRecvLoop
|
||||
case sm, ok := <-recvCh:
|
||||
if !ok {
|
||||
break streamRecvLoop
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Warn("eino stream recv error, flushing incomplete stream",
|
||||
zap.Error(rerr),
|
||||
zap.String("agent", ev.AgentName),
|
||||
zap.Int("toolFragments", len(toolStreamFragments)))
|
||||
chunk, rerr := sm.chunk, sm.err
|
||||
if rerr != nil {
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break streamRecvLoop
|
||||
}
|
||||
if logger != nil {
|
||||
logger.Warn("eino stream recv error, flushing incomplete stream",
|
||||
zap.Error(rerr),
|
||||
zap.String("agent", ev.AgentName),
|
||||
zap.Int("toolFragments", len(toolStreamFragments)))
|
||||
}
|
||||
streamRecvErr = rerr
|
||||
break streamRecvLoop
|
||||
}
|
||||
streamRecvErr = rerr
|
||||
break
|
||||
}
|
||||
if chunk == nil {
|
||||
continue
|
||||
}
|
||||
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||||
var reasoningDelta string
|
||||
reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent)
|
||||
if reasoningDelta != "" {
|
||||
if reasoningStreamID == "" {
|
||||
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
|
||||
progress("thinking_stream_start", " ", map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"orchestration": orchMode,
|
||||
if chunk == nil {
|
||||
continue
|
||||
}
|
||||
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||||
var reasoningDelta string
|
||||
reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent)
|
||||
if reasoningDelta != "" {
|
||||
if reasoningStreamID == "" {
|
||||
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
|
||||
progress("thinking_stream_start", " ", map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
"source": "eino",
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": einoRoleTag(ev.AgentName),
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
})
|
||||
}
|
||||
progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{
|
||||
"streamId": reasoningStreamID,
|
||||
})
|
||||
}
|
||||
}
|
||||
if chunk.Content != "" {
|
||||
if progress != nil && streamsMainAssistant(ev.AgentName) {
|
||||
var contentDelta string
|
||||
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
|
||||
if contentDelta != "" {
|
||||
if mainAssistDupTarget == "" {
|
||||
executeStdoutDupMu.Lock()
|
||||
if pendingExecuteStdoutDup != "" {
|
||||
mainAssistDupTarget = pendingExecuteStdoutDup
|
||||
if chunk.Content != "" {
|
||||
if progress != nil && streamsMainAssistant(ev.AgentName) {
|
||||
var contentDelta string
|
||||
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
|
||||
if contentDelta != "" {
|
||||
if mainAssistDupTarget == "" {
|
||||
executeStdoutDupMu.Lock()
|
||||
if pendingExecuteStdoutDup != "" {
|
||||
mainAssistDupTarget = pendingExecuteStdoutDup
|
||||
}
|
||||
executeStdoutDupMu.Unlock()
|
||||
}
|
||||
executeStdoutDupMu.Unlock()
|
||||
}
|
||||
if mainAssistDupTarget != "" {
|
||||
// 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流
|
||||
} else {
|
||||
if !streamHeaderSent {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
if mainAssistDupTarget != "" {
|
||||
// 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流
|
||||
} else {
|
||||
if !streamHeaderSent {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
streamHeaderSent = true
|
||||
}
|
||||
progress("response_delta", contentDelta, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
streamHeaderSent = true
|
||||
}
|
||||
progress("response_delta", contentDelta, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else if !streamsMainAssistant(ev.AgentName) {
|
||||
var subDelta string
|
||||
subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content)
|
||||
if subDelta != "" {
|
||||
if progress != nil {
|
||||
if subReplyStreamID == "" {
|
||||
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
|
||||
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
|
||||
} else if !streamsMainAssistant(ev.AgentName) {
|
||||
var subDelta string
|
||||
subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content)
|
||||
if subDelta != "" {
|
||||
if progress != nil {
|
||||
if subReplyStreamID == "" {
|
||||
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
|
||||
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"einoAgent": ev.AgentName,
|
||||
"einoRole": "sub",
|
||||
"conversationId": conversationID,
|
||||
"source": "eino",
|
||||
})
|
||||
}
|
||||
progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{
|
||||
"streamId": subReplyStreamID,
|
||||
"conversationId": conversationID,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(chunk.ToolCalls) > 0 {
|
||||
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
|
||||
if len(chunk.ToolCalls) > 0 {
|
||||
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
|
||||
}
|
||||
}
|
||||
}
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
@@ -683,10 +731,17 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
}
|
||||
var lastToolChunk *schema.Message
|
||||
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
|
||||
lastToolChunk = &schema.Message{ToolCalls: merged}
|
||||
lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged})
|
||||
}
|
||||
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
|
||||
// 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。
|
||||
if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 {
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls))
|
||||
}
|
||||
if streamRecvErr != nil {
|
||||
if isInterruptContinue(ctx) {
|
||||
return takePartial(streamRecvErr)
|
||||
}
|
||||
if progress != nil {
|
||||
progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package multiagent
|
||||
|
||||
import "errors"
|
||||
|
||||
// ErrInterruptContinue 作为 context.CancelCause 使用:用户选择「中断并继续」且当前无进行中的 MCP 工具时,
|
||||
// 取消当前推理/流式输出,并在同一会话任务内携带用户补充说明自动续跑下一轮(类似 Hermes 式人机回合)。
|
||||
var ErrInterruptContinue = errors.New("agent interrupt: continue with user-supplied context")
|
||||
@@ -574,78 +574,73 @@ func RunDeepAgent(
|
||||
}, baseMsgs)
|
||||
}
|
||||
|
||||
func chatToolCallsToSchema(tcs []agent.ToolCall) []schema.ToolCall {
|
||||
if len(tcs) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]schema.ToolCall, 0, len(tcs))
|
||||
for _, tc := range tcs {
|
||||
if strings.TrimSpace(tc.ID) == "" {
|
||||
continue
|
||||
}
|
||||
argsStr := ""
|
||||
if tc.Function.Arguments != nil {
|
||||
b, err := json.Marshal(tc.Function.Arguments)
|
||||
if err == nil {
|
||||
argsStr = string(b)
|
||||
}
|
||||
}
|
||||
typ := tc.Type
|
||||
if typ == "" {
|
||||
typ = "function"
|
||||
}
|
||||
out = append(out, schema.ToolCall{
|
||||
ID: tc.ID,
|
||||
Type: typ,
|
||||
Function: schema.FunctionCall{
|
||||
Name: tc.Function.Name,
|
||||
Arguments: argsStr,
|
||||
},
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// historyToMessages 将轨迹恢复的 ChatMessage 转为 Eino ADK 消息:**不裁剪条数、不按 token 预算截断**,
|
||||
// 并保留 user / assistant(含仅 tool_calls)/ tool,与库中 last_react 轨迹一致。
|
||||
func historyToMessages(history []agent.ChatMessage, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message {
|
||||
_ = appCfg
|
||||
_ = mwCfg
|
||||
if len(history) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Keep a bounded tail first; then enforce a token budget.
|
||||
const maxHistoryMessages = 200
|
||||
start := 0
|
||||
if len(history) > maxHistoryMessages {
|
||||
start = len(history) - maxHistoryMessages
|
||||
}
|
||||
raw := make([]adk.Message, 0, len(history[start:]))
|
||||
for _, h := range history[start:] {
|
||||
switch h.Role {
|
||||
raw := make([]adk.Message, 0, len(history))
|
||||
for _, h := range history {
|
||||
role := strings.ToLower(strings.TrimSpace(h.Role))
|
||||
switch role {
|
||||
case "user":
|
||||
if strings.TrimSpace(h.Content) != "" {
|
||||
raw = append(raw, schema.UserMessage(h.Content))
|
||||
}
|
||||
case "assistant":
|
||||
if strings.TrimSpace(h.Content) == "" && len(h.ToolCalls) > 0 {
|
||||
toolSchema := chatToolCallsToSchema(h.ToolCalls)
|
||||
if len(toolSchema) > 0 || strings.TrimSpace(h.Content) != "" {
|
||||
raw = append(raw, schema.AssistantMessage(h.Content, toolSchema))
|
||||
}
|
||||
case "tool":
|
||||
if strings.TrimSpace(h.ToolCallID) == "" && strings.TrimSpace(h.Content) == "" {
|
||||
continue
|
||||
}
|
||||
if strings.TrimSpace(h.Content) != "" {
|
||||
raw = append(raw, schema.AssistantMessage(h.Content, nil))
|
||||
var opts []schema.ToolMessageOption
|
||||
if tn := strings.TrimSpace(h.ToolName); tn != "" {
|
||||
opts = append(opts, schema.WithToolName(tn))
|
||||
}
|
||||
raw = append(raw, schema.ToolMessage(h.Content, h.ToolCallID, opts...))
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
if len(raw) == 0 {
|
||||
return raw
|
||||
}
|
||||
maxTotal := 120000
|
||||
modelName := "gpt-4o"
|
||||
if appCfg != nil {
|
||||
if appCfg.OpenAI.MaxTotalTokens > 0 {
|
||||
maxTotal = appCfg.OpenAI.MaxTotalTokens
|
||||
}
|
||||
if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" {
|
||||
modelName = m
|
||||
}
|
||||
}
|
||||
ratio := 0.35
|
||||
if mwCfg != nil {
|
||||
ratio = mwCfg.HistoryInputBudgetRatioEffective()
|
||||
}
|
||||
budget := int(float64(maxTotal) * ratio)
|
||||
if budget < 4096 {
|
||||
budget = 4096
|
||||
}
|
||||
tc := agent.NewTikTokenCounter()
|
||||
outRev := make([]adk.Message, 0, len(raw))
|
||||
used := 0
|
||||
for i := len(raw) - 1; i >= 0; i-- {
|
||||
msg := raw[i]
|
||||
n, err := tc.Count(modelName, string(msg.Role)+"\n"+msg.Content)
|
||||
if err != nil {
|
||||
n = (len(msg.Content) + 3) / 4
|
||||
}
|
||||
if n <= 0 {
|
||||
n = 1
|
||||
}
|
||||
if used+n > budget {
|
||||
break
|
||||
}
|
||||
used += n
|
||||
outRev = append(outRev, msg)
|
||||
}
|
||||
out := make([]adk.Message, 0, len(outRev))
|
||||
for i := len(outRev) - 1; i >= 0; i-- {
|
||||
out = append(out, outRev[i])
|
||||
}
|
||||
return out
|
||||
return raw
|
||||
}
|
||||
|
||||
// mergeStreamingToolCallFragments 将流式多帧的 ToolCall 按 index 合并 arguments(与 schema.concatToolCalls 行为一致)。
|
||||
|
||||
Reference in New Issue
Block a user