diff --git a/internal/einoobserve/attach.go b/internal/einoobserve/attach.go new file mode 100644 index 00000000..dfc9809f --- /dev/null +++ b/internal/einoobserve/attach.go @@ -0,0 +1,435 @@ +// Package einoobserve attaches CloudWeGo Eino [callbacks.Handler] to ADK Runner contexts for +// structured logging and optional SSE trace events (eino_trace_*). +package einoobserve + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "cyberstrike-ai/internal/config" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/components" + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/schema" + "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +type ctxSpanKey struct{} + +type ctxOtelSpanKey struct{} + +// Params for attaching per-run callback instrumentation. +type Params struct { + Logger *zap.Logger + Progress func(eventType, message string, data interface{}) + ConversationID string + OrchMode string + OrchestratorName string +} + +// AttachAgentRunCallbacks returns ctx wrapped with callbacks.InitCallbacks when enabled. +// Safe to call with nil cfg or disabled cfg (returns ctx unchanged). +func AttachAgentRunCallbacks(ctx context.Context, cfg *config.MultiAgentEinoCallbacksConfig, p Params) context.Context { + if ctx == nil { + return ctx + } + if cfg == nil || !cfg.Enabled { + return ctx + } + mode := cfg.EinoCallbacksModeEffective() + if mode == "off" { + return ctx + } + runID := uuid.New().String() + if p.Progress != nil && cfg.ShouldEmitEinoTraceSSE(mode) { + p.Progress("eino_trace_run", "Eino callbacks session", map[string]interface{}{ + "runId": runID, + "conversationId": strings.TrimSpace(p.ConversationID), + "orchestration": strings.TrimSpace(p.OrchMode), + "orchestratorName": strings.TrimSpace(p.OrchestratorName), + "observeMode": mode, + "source": "eino_callbacks", + }) + } + h := &runHandler{ + cfg: *cfg, + mode: mode, + params: p, + runID: runID, + } + b := callbacks.NewHandlerBuilder(). + OnStartFn(h.onStart). + OnEndFn(h.onEnd). + OnErrorFn(h.onError) + if mode == "full" { + b = b.OnStartWithStreamInputFn(h.onStartStreamIn).OnEndWithStreamOutputFn(h.onEndStreamOut) + } + ri := &callbacks.RunInfo{ + Name: "CyberStrikeADKRun", + Type: strings.TrimSpace(p.OrchMode), + Component: components.Component("AgentSession"), + } + return callbacks.InitCallbacks(ctx, ri, b.Build()) +} + +type runHandler struct { + cfg config.MultiAgentEinoCallbacksConfig + mode string + params Params + runID string + + mu sync.Mutex + spanStack []string + seq atomic.Uint64 +} + +func (h *runHandler) genSpanID() string { + return fmt.Sprintf("%s-%d", h.runID, h.seq.Add(1)) +} + +func (h *runHandler) popSpan() (id string) { + h.mu.Lock() + defer h.mu.Unlock() + if len(h.spanStack) == 0 { + return "" + } + id = h.spanStack[len(h.spanStack)-1] + h.spanStack = h.spanStack[:len(h.spanStack)-1] + return id +} + +// popMatching removes the given id from the stack top if it matches; otherwise pops until empty or match (rare ordering mismatch). +func (h *runHandler) popMatching(want string) string { + h.mu.Lock() + defer h.mu.Unlock() + if want == "" { + if len(h.spanStack) == 0 { + return "" + } + id := h.spanStack[len(h.spanStack)-1] + h.spanStack = h.spanStack[:len(h.spanStack)-1] + return id + } + for len(h.spanStack) > 0 { + top := h.spanStack[len(h.spanStack)-1] + h.spanStack = h.spanStack[:len(h.spanStack)-1] + if top == want { + return top + } + } + return want +} + +func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + var parentID string + h.mu.Lock() + if len(h.spanStack) > 0 { + parentID = h.spanStack[len(h.spanStack)-1] + } + spanID := h.genSpanID() + h.spanStack = append(h.spanStack, spanID) + h.mu.Unlock() + + inSum := summarizeCallbackInput(input, h.cfg.EinoCallbacksMaxInputSummaryRunes()) + if h.cfg.OtelTracingActive() { + tracer := otel.Tracer("cyberstrike/eino") + spanName := callbackSpanName(info) + var sp trace.Span + ctx, sp = tracer.Start(ctx, spanName, + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes( + attribute.String("eino.component", string(info.Component)), + attribute.String("eino.name", info.Name), + attribute.String("eino.type", info.Type), + attribute.String("cyberstrike.run_id", h.runID), + attribute.String("cyberstrike.conversation_id", strings.TrimSpace(h.params.ConversationID)), + attribute.String("cyberstrike.orchestration", strings.TrimSpace(h.params.OrchMode)), + ), + ) + if inSum != "" { + sp.SetAttributes(attribute.String("eino.input.summary", truncateForAttr(inSum, 256))) + } + ctx = context.WithValue(ctx, ctxOtelSpanKey{}, sp) + } + if h.params.Logger != nil { + fields := []zap.Field{ + zap.String("runId", h.runID), + zap.String("spanId", spanID), + zap.String("parentSpanId", parentID), + zap.String("component", string(info.Component)), + zap.String("name", info.Name), + zap.String("type", info.Type), + zap.String("phase", "start"), + } + if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil { + if sc := sp.SpanContext(); sc.IsValid() { + fields = append(fields, + zap.String("trace_id", sc.TraceID().String()), + zap.String("otel_span_id", sc.SpanID().String()), + ) + } + } + if h.cfg.ZapVerbose { + h.params.Logger.Debug("eino_callback", append(fields, zap.String("inputSummary", inSum))...) + } else { + h.params.Logger.Info("eino_callback", fields...) + } + } + if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) { + h.params.Progress("eino_trace_start", "", map[string]interface{}{ + "runId": h.runID, + "spanId": spanID, + "parentSpanId": parentID, + "conversationId": strings.TrimSpace(h.params.ConversationID), + "orchestration": strings.TrimSpace(h.params.OrchMode), + "component": string(info.Component), + "name": info.Name, + "type": info.Type, + "ts": time.Now().UTC().Format(time.RFC3339Nano), + "inputSummary": inSum, + "source": "eino_callbacks", + }) + } + ctx = context.WithValue(ctx, ctxSpanKey{}, spanID) + return ctx +} + +func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + spanID, _ := ctx.Value(ctxSpanKey{}).(string) + if spanID == "" { + spanID = h.popSpan() + } else { + spanID = h.popMatching(spanID) + } + outSum := summarizeCallbackOutput(output, h.cfg.EinoCallbacksMaxOutputSummaryRunes()) + if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil { + if outSum != "" { + sp.SetAttributes(attribute.String("eino.output.summary", truncateForAttr(outSum, 256))) + } + sp.SetStatus(codes.Ok, "") + sp.End() + } + if h.params.Logger != nil { + fields := []zap.Field{ + zap.String("runId", h.runID), + zap.String("spanId", spanID), + zap.String("component", string(info.Component)), + zap.String("name", info.Name), + zap.String("type", info.Type), + zap.String("phase", "end"), + } + if h.cfg.ZapVerbose { + h.params.Logger.Debug("eino_callback", append(fields, zap.String("outputSummary", outSum))...) + } else { + h.params.Logger.Info("eino_callback", fields...) + } + } + if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) { + h.params.Progress("eino_trace_end", "", map[string]interface{}{ + "runId": h.runID, + "spanId": spanID, + "conversationId": strings.TrimSpace(h.params.ConversationID), + "orchestration": strings.TrimSpace(h.params.OrchMode), + "component": string(info.Component), + "name": info.Name, + "type": info.Type, + "ts": time.Now().UTC().Format(time.RFC3339Nano), + "outputSummary": outSum, + "source": "eino_callbacks", + }) + } + return ctx +} + +func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + spanID, _ := ctx.Value(ctxSpanKey{}).(string) + if spanID == "" { + spanID = h.popSpan() + } else { + spanID = h.popMatching(spanID) + } + msg := "" + if err != nil { + msg = truncateRunes(err.Error(), h.cfg.EinoCallbacksMaxOutputSummaryRunes()) + } + if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil { + if err != nil { + sp.RecordError(err) + } + sp.SetStatus(codes.Error, msg) + sp.End() + } + if h.params.Logger != nil { + h.params.Logger.Warn("eino_callback_error", + zap.String("runId", h.runID), + zap.String("spanId", spanID), + zap.String("component", string(info.Component)), + zap.String("name", info.Name), + zap.String("type", info.Type), + zap.Error(err), + ) + } + if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) { + h.params.Progress("eino_trace_error", msg, map[string]interface{}{ + "runId": h.runID, + "spanId": spanID, + "conversationId": strings.TrimSpace(h.params.ConversationID), + "orchestration": strings.TrimSpace(h.params.OrchMode), + "component": string(info.Component), + "name": info.Name, + "type": info.Type, + "ts": time.Now().UTC().Format(time.RFC3339Nano), + "error": msg, + "source": "eino_callbacks", + }) + } + return ctx +} + +func (h *runHandler) onStartStreamIn(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context { + if input != nil { + input.Close() + } + if h.params.Logger != nil { + h.params.Logger.Debug("eino_callback_stream_in", + zap.String("runId", h.runID), + zap.String("component", string(info.Component)), + zap.String("name", info.Name), + ) + } + return ctx +} + +func (h *runHandler) onEndStreamOut(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + if output != nil { + output.Close() + } + if h.params.Logger != nil { + h.params.Logger.Debug("eino_callback_stream_out", + zap.String("runId", h.runID), + zap.String("component", string(info.Component)), + zap.String("name", info.Name), + ) + } + return ctx +} + +func callbackSpanName(info *callbacks.RunInfo) string { + if info == nil { + return "eino.callback" + } + comp := strings.TrimSpace(string(info.Component)) + name := strings.TrimSpace(info.Name) + typ := strings.TrimSpace(info.Type) + if name != "" && comp != "" { + return comp + "/" + name + } + if typ != "" && comp != "" { + return comp + "[" + typ + "]" + } + if comp != "" { + return comp + } + return "eino.callback" +} + +func truncateForAttr(s string, maxRunes int) string { + return truncateRunes(s, maxRunes) +} + +func summarizeCallbackInput(in callbacks.CallbackInput, maxRunes int) string { + if in == nil { + return "" + } + if ai := adk.ConvAgentCallbackInput(in); ai != nil { + parts := []string{"agent"} + if ai.Input != nil { + parts = append(parts, fmt.Sprintf("messages=%d", len(ai.Input.Messages))) + } + if ai.ResumeInfo != nil { + parts = append(parts, "resume=true") + } + return strings.Join(parts, " ") + } + if mi := model.ConvCallbackInput(in); mi != nil { + return fmt.Sprintf("chatModel messages=%d tools=%d", len(mi.Messages), len(mi.Tools)) + } + if ti := tool.ConvCallbackInput(in); ti != nil { + raw := ti.ArgumentsInJSON + return "tool args=" + truncateRunes(raw, maxRunes) + } + b, err := json.Marshal(in) + if err != nil { + return fmt.Sprintf("%T", in) + } + return truncateRunes(string(b), maxRunes) +} + +func summarizeCallbackOutput(out callbacks.CallbackOutput, maxRunes int) string { + if out == nil { + return "" + } + if ao := adk.ConvAgentCallbackOutput(out); ao != nil { + return "agent_events=stream" + } + if mo := model.ConvCallbackOutput(out); mo != nil && mo.Message != nil { + s := "" + if mo.Message.Content != "" { + s = mo.Message.Content + } + if mo.TokenUsage != nil { + return fmt.Sprintf("tokens total=%d completion=%d prompt=%d text=%s", + mo.TokenUsage.TotalTokens, mo.TokenUsage.CompletionTokens, mo.TokenUsage.PromptTokens, + truncateRunes(s, minInt(120, maxRunes))) + } + return "assistant len=" + itoa(len(s)) + } + if to := tool.ConvCallbackOutput(out); to != nil { + if to.Response != "" { + return truncateRunes(to.Response, maxRunes) + } + if to.ToolOutput != nil { + return "tool_result multimodal" + } + } + b, err := json.Marshal(out) + if err != nil { + return fmt.Sprintf("%T", out) + } + return truncateRunes(string(b), maxRunes) +} + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} + +func itoa(n int) string { + return fmt.Sprintf("%d", n) +} + +func truncateRunes(s string, maxRunes int) string { + if maxRunes <= 0 { + return "" + } + r := []rune(s) + if len(r) <= maxRunes { + return s + } + return string(r[:maxRunes]) + "…" +} diff --git a/internal/einoobserve/attach_test.go b/internal/einoobserve/attach_test.go new file mode 100644 index 00000000..f4e2d80b --- /dev/null +++ b/internal/einoobserve/attach_test.go @@ -0,0 +1,26 @@ +package einoobserve + +import ( + "context" + "testing" + + "cyberstrike-ai/internal/config" +) + +func TestAttachAgentRunCallbacks_Disabled(t *testing.T) { + ctx := context.Background() + cfg := &config.MultiAgentEinoCallbacksConfig{Enabled: false} + out := AttachAgentRunCallbacks(ctx, cfg, Params{}) + if out != ctx { + t.Fatalf("expected same ctx when disabled") + } +} + +func TestTruncateRunes(t *testing.T) { + if got := truncateRunes("abc", 10); got != "abc" { + t.Fatalf("got %q", got) + } + if got := truncateRunes("abcdefghij", 4); got != "abcd…" { + t.Fatalf("got %q", got) + } +} diff --git a/internal/einoobserve/otel.go b/internal/einoobserve/otel.go new file mode 100644 index 00000000..05800abd --- /dev/null +++ b/internal/einoobserve/otel.go @@ -0,0 +1,111 @@ +package einoobserve + +import ( + "context" + "fmt" + "strings" + "sync" + + "cyberstrike-ai/internal/config" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.uber.org/zap" +) + +var ( + otelMu sync.Mutex + otelShutdown func(context.Context) error + otelInitialized bool +) + +// InitOtelFromConfig installs the global OpenTelemetry TracerProvider when +// eino_callbacks.otel is enabled and exporter is not none. Safe to call multiple times. +func InitOtelFromConfig(cfg *config.MultiAgentEinoCallbacksConfig, log *zap.Logger) (shutdown func(context.Context) error, err error) { + shutdown = func(context.Context) error { return nil } + if cfg == nil || !cfg.OtelTracingActive() { + return shutdown, nil + } + + otelMu.Lock() + defer otelMu.Unlock() + if otelInitialized { + if otelShutdown != nil { + return otelShutdown, nil + } + return shutdown, nil + } + + oc := cfg.Otel + expKind := oc.OtelExporterEffective() + ctx := context.Background() + + var exporter sdktrace.SpanExporter + switch expKind { + case "stdout": + exporter, err = stdouttrace.New() + if err != nil { + return shutdown, fmt.Errorf("eino otel stdout exporter: %w", err) + } + case "otlphttp": + ep := strings.TrimSpace(oc.OTLPEndpoint) + if ep == "" { + ep = "localhost:4318" + } + exporter, err = otlptracehttp.New(ctx, + otlptracehttp.WithEndpoint(ep), + otlptracehttp.WithURLPath("/v1/traces"), + ) + if err != nil { + return shutdown, fmt.Errorf("eino otel otlphttp exporter: %w", err) + } + default: + return shutdown, nil + } + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(oc.ServiceNameEffective()), + ), + ) + if err != nil { + return shutdown, fmt.Errorf("eino otel resource: %w", err) + } + + sampler := sdktrace.ParentBased(sdktrace.TraceIDRatioBased(oc.SampleRatioEffective())) + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sampler), + ) + otel.SetTracerProvider(tp) + + otelShutdown = tp.Shutdown + otelInitialized = true + if log != nil { + log.Info("eino otel: tracer provider initialized", + zap.String("exporter", expKind), + zap.String("service", oc.ServiceNameEffective()), + zap.Float64("sample_ratio", oc.SampleRatioEffective()), + ) + } + return otelShutdown, nil +} + +// ShutdownOtel flushes and shuts down the global TracerProvider if it was installed. +func ShutdownOtel(ctx context.Context) error { + otelMu.Lock() + fn := otelShutdown + otelShutdown = nil + inited := otelInitialized + otelInitialized = false + otelMu.Unlock() + if !inited || fn == nil { + return nil + } + return fn(ctx) +} diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 07db48e7..4ec0e9c8 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -14,6 +14,8 @@ import ( "unicode/utf8" "cyberstrike-ai/internal/agent" + "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/einoobserve" "cyberstrike-ai/internal/einomcp" "cyberstrike-ai/internal/openai" @@ -95,6 +97,9 @@ type einoADKRunLoopArgs struct { // ModelFacingTrace 可选:由各 ChatModelAgent Handlers 链末尾中间件写入「即将送入模型」的消息快照; // 非空时优先用于 LastAgentTraceInput 序列化,使续跑与 summarization/reduction 后的上下文一致。 ModelFacingTrace *modelFacingTraceHolder + + // EinoCallbacks 可选:为 ADK Runner 注入 eino [callbacks] 全链路观测(见 internal/einoobserve)。 + EinoCallbacks *config.MultiAgentEinoCallbacksConfig } func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs []adk.Message) (*RunResult, error) { @@ -289,6 +294,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs }) } + if args.EinoCallbacks != nil { + ctx = einoobserve.AttachAgentRunCallbacks(ctx, args.EinoCallbacks, einoobserve.Params{ + Logger: logger, + Progress: progress, + ConversationID: conversationID, + OrchMode: orchMode, + OrchestratorName: orchestratorName, + }) + } + runnerCfg := adk.RunnerConfig{ Agent: da, EnableStreaming: true, diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 5f7cf4bd..2c32a1a8 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -247,6 +247,7 @@ func RunEinoSingleChatModelAgent( ToolInvokeNotify: toolInvokeNotify, DA: chatAgent, ModelFacingTrace: modelFacingTrace, + EinoCallbacks: &ma.EinoCallbacks, EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " + "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs) diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index df37c7d5..313d58bb 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -585,6 +585,7 @@ func RunDeepAgent( ToolInvokeNotify: toolInvokeNotify, DA: da, ModelFacingTrace: modelFacingTrace, + EinoCallbacks: &ma.EinoCallbacks, EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " + "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs)