Add files via upload

This commit is contained in:
公明
2026-05-12 17:17:36 +08:00
committed by GitHub
parent 62710f6619
commit 176c17d630
6 changed files with 589 additions and 0 deletions
+435
View File
@@ -0,0 +1,435 @@
// Package einoobserve attaches CloudWeGo Eino [callbacks.Handler] to ADK Runner contexts for
// structured logging and optional SSE trace events (eino_trace_*).
package einoobserve
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"cyberstrike-ai/internal/config"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type ctxSpanKey struct{}
type ctxOtelSpanKey struct{}
// Params for attaching per-run callback instrumentation.
type Params struct {
Logger *zap.Logger
Progress func(eventType, message string, data interface{})
ConversationID string
OrchMode string
OrchestratorName string
}
// AttachAgentRunCallbacks returns ctx wrapped with callbacks.InitCallbacks when enabled.
// Safe to call with nil cfg or disabled cfg (returns ctx unchanged).
func AttachAgentRunCallbacks(ctx context.Context, cfg *config.MultiAgentEinoCallbacksConfig, p Params) context.Context {
if ctx == nil {
return ctx
}
if cfg == nil || !cfg.Enabled {
return ctx
}
mode := cfg.EinoCallbacksModeEffective()
if mode == "off" {
return ctx
}
runID := uuid.New().String()
if p.Progress != nil && cfg.ShouldEmitEinoTraceSSE(mode) {
p.Progress("eino_trace_run", "Eino callbacks session", map[string]interface{}{
"runId": runID,
"conversationId": strings.TrimSpace(p.ConversationID),
"orchestration": strings.TrimSpace(p.OrchMode),
"orchestratorName": strings.TrimSpace(p.OrchestratorName),
"observeMode": mode,
"source": "eino_callbacks",
})
}
h := &runHandler{
cfg: *cfg,
mode: mode,
params: p,
runID: runID,
}
b := callbacks.NewHandlerBuilder().
OnStartFn(h.onStart).
OnEndFn(h.onEnd).
OnErrorFn(h.onError)
if mode == "full" {
b = b.OnStartWithStreamInputFn(h.onStartStreamIn).OnEndWithStreamOutputFn(h.onEndStreamOut)
}
ri := &callbacks.RunInfo{
Name: "CyberStrikeADKRun",
Type: strings.TrimSpace(p.OrchMode),
Component: components.Component("AgentSession"),
}
return callbacks.InitCallbacks(ctx, ri, b.Build())
}
type runHandler struct {
cfg config.MultiAgentEinoCallbacksConfig
mode string
params Params
runID string
mu sync.Mutex
spanStack []string
seq atomic.Uint64
}
func (h *runHandler) genSpanID() string {
return fmt.Sprintf("%s-%d", h.runID, h.seq.Add(1))
}
func (h *runHandler) popSpan() (id string) {
h.mu.Lock()
defer h.mu.Unlock()
if len(h.spanStack) == 0 {
return ""
}
id = h.spanStack[len(h.spanStack)-1]
h.spanStack = h.spanStack[:len(h.spanStack)-1]
return id
}
// popMatching removes the given id from the stack top if it matches; otherwise pops until empty or match (rare ordering mismatch).
func (h *runHandler) popMatching(want string) string {
h.mu.Lock()
defer h.mu.Unlock()
if want == "" {
if len(h.spanStack) == 0 {
return ""
}
id := h.spanStack[len(h.spanStack)-1]
h.spanStack = h.spanStack[:len(h.spanStack)-1]
return id
}
for len(h.spanStack) > 0 {
top := h.spanStack[len(h.spanStack)-1]
h.spanStack = h.spanStack[:len(h.spanStack)-1]
if top == want {
return top
}
}
return want
}
func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
var parentID string
h.mu.Lock()
if len(h.spanStack) > 0 {
parentID = h.spanStack[len(h.spanStack)-1]
}
spanID := h.genSpanID()
h.spanStack = append(h.spanStack, spanID)
h.mu.Unlock()
inSum := summarizeCallbackInput(input, h.cfg.EinoCallbacksMaxInputSummaryRunes())
if h.cfg.OtelTracingActive() {
tracer := otel.Tracer("cyberstrike/eino")
spanName := callbackSpanName(info)
var sp trace.Span
ctx, sp = tracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
attribute.String("eino.component", string(info.Component)),
attribute.String("eino.name", info.Name),
attribute.String("eino.type", info.Type),
attribute.String("cyberstrike.run_id", h.runID),
attribute.String("cyberstrike.conversation_id", strings.TrimSpace(h.params.ConversationID)),
attribute.String("cyberstrike.orchestration", strings.TrimSpace(h.params.OrchMode)),
),
)
if inSum != "" {
sp.SetAttributes(attribute.String("eino.input.summary", truncateForAttr(inSum, 256)))
}
ctx = context.WithValue(ctx, ctxOtelSpanKey{}, sp)
}
if h.params.Logger != nil {
fields := []zap.Field{
zap.String("runId", h.runID),
zap.String("spanId", spanID),
zap.String("parentSpanId", parentID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("type", info.Type),
zap.String("phase", "start"),
}
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
if sc := sp.SpanContext(); sc.IsValid() {
fields = append(fields,
zap.String("trace_id", sc.TraceID().String()),
zap.String("otel_span_id", sc.SpanID().String()),
)
}
}
if h.cfg.ZapVerbose {
h.params.Logger.Debug("eino_callback", append(fields, zap.String("inputSummary", inSum))...)
} else {
h.params.Logger.Info("eino_callback", fields...)
}
}
if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) {
h.params.Progress("eino_trace_start", "", map[string]interface{}{
"runId": h.runID,
"spanId": spanID,
"parentSpanId": parentID,
"conversationId": strings.TrimSpace(h.params.ConversationID),
"orchestration": strings.TrimSpace(h.params.OrchMode),
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"inputSummary": inSum,
"source": "eino_callbacks",
})
}
ctx = context.WithValue(ctx, ctxSpanKey{}, spanID)
return ctx
}
func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
spanID, _ := ctx.Value(ctxSpanKey{}).(string)
if spanID == "" {
spanID = h.popSpan()
} else {
spanID = h.popMatching(spanID)
}
outSum := summarizeCallbackOutput(output, h.cfg.EinoCallbacksMaxOutputSummaryRunes())
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
if outSum != "" {
sp.SetAttributes(attribute.String("eino.output.summary", truncateForAttr(outSum, 256)))
}
sp.SetStatus(codes.Ok, "")
sp.End()
}
if h.params.Logger != nil {
fields := []zap.Field{
zap.String("runId", h.runID),
zap.String("spanId", spanID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("type", info.Type),
zap.String("phase", "end"),
}
if h.cfg.ZapVerbose {
h.params.Logger.Debug("eino_callback", append(fields, zap.String("outputSummary", outSum))...)
} else {
h.params.Logger.Info("eino_callback", fields...)
}
}
if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) {
h.params.Progress("eino_trace_end", "", map[string]interface{}{
"runId": h.runID,
"spanId": spanID,
"conversationId": strings.TrimSpace(h.params.ConversationID),
"orchestration": strings.TrimSpace(h.params.OrchMode),
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"outputSummary": outSum,
"source": "eino_callbacks",
})
}
return ctx
}
func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
spanID, _ := ctx.Value(ctxSpanKey{}).(string)
if spanID == "" {
spanID = h.popSpan()
} else {
spanID = h.popMatching(spanID)
}
msg := ""
if err != nil {
msg = truncateRunes(err.Error(), h.cfg.EinoCallbacksMaxOutputSummaryRunes())
}
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
if err != nil {
sp.RecordError(err)
}
sp.SetStatus(codes.Error, msg)
sp.End()
}
if h.params.Logger != nil {
h.params.Logger.Warn("eino_callback_error",
zap.String("runId", h.runID),
zap.String("spanId", spanID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("type", info.Type),
zap.Error(err),
)
}
if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) {
h.params.Progress("eino_trace_error", msg, map[string]interface{}{
"runId": h.runID,
"spanId": spanID,
"conversationId": strings.TrimSpace(h.params.ConversationID),
"orchestration": strings.TrimSpace(h.params.OrchMode),
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"error": msg,
"source": "eino_callbacks",
})
}
return ctx
}
func (h *runHandler) onStartStreamIn(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
if input != nil {
input.Close()
}
if h.params.Logger != nil {
h.params.Logger.Debug("eino_callback_stream_in",
zap.String("runId", h.runID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
)
}
return ctx
}
func (h *runHandler) onEndStreamOut(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
if output != nil {
output.Close()
}
if h.params.Logger != nil {
h.params.Logger.Debug("eino_callback_stream_out",
zap.String("runId", h.runID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
)
}
return ctx
}
func callbackSpanName(info *callbacks.RunInfo) string {
if info == nil {
return "eino.callback"
}
comp := strings.TrimSpace(string(info.Component))
name := strings.TrimSpace(info.Name)
typ := strings.TrimSpace(info.Type)
if name != "" && comp != "" {
return comp + "/" + name
}
if typ != "" && comp != "" {
return comp + "[" + typ + "]"
}
if comp != "" {
return comp
}
return "eino.callback"
}
func truncateForAttr(s string, maxRunes int) string {
return truncateRunes(s, maxRunes)
}
func summarizeCallbackInput(in callbacks.CallbackInput, maxRunes int) string {
if in == nil {
return ""
}
if ai := adk.ConvAgentCallbackInput(in); ai != nil {
parts := []string{"agent"}
if ai.Input != nil {
parts = append(parts, fmt.Sprintf("messages=%d", len(ai.Input.Messages)))
}
if ai.ResumeInfo != nil {
parts = append(parts, "resume=true")
}
return strings.Join(parts, " ")
}
if mi := model.ConvCallbackInput(in); mi != nil {
return fmt.Sprintf("chatModel messages=%d tools=%d", len(mi.Messages), len(mi.Tools))
}
if ti := tool.ConvCallbackInput(in); ti != nil {
raw := ti.ArgumentsInJSON
return "tool args=" + truncateRunes(raw, maxRunes)
}
b, err := json.Marshal(in)
if err != nil {
return fmt.Sprintf("%T", in)
}
return truncateRunes(string(b), maxRunes)
}
func summarizeCallbackOutput(out callbacks.CallbackOutput, maxRunes int) string {
if out == nil {
return ""
}
if ao := adk.ConvAgentCallbackOutput(out); ao != nil {
return "agent_events=stream"
}
if mo := model.ConvCallbackOutput(out); mo != nil && mo.Message != nil {
s := ""
if mo.Message.Content != "" {
s = mo.Message.Content
}
if mo.TokenUsage != nil {
return fmt.Sprintf("tokens total=%d completion=%d prompt=%d text=%s",
mo.TokenUsage.TotalTokens, mo.TokenUsage.CompletionTokens, mo.TokenUsage.PromptTokens,
truncateRunes(s, minInt(120, maxRunes)))
}
return "assistant len=" + itoa(len(s))
}
if to := tool.ConvCallbackOutput(out); to != nil {
if to.Response != "" {
return truncateRunes(to.Response, maxRunes)
}
if to.ToolOutput != nil {
return "tool_result multimodal"
}
}
b, err := json.Marshal(out)
if err != nil {
return fmt.Sprintf("%T", out)
}
return truncateRunes(string(b), maxRunes)
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func itoa(n int) string {
return fmt.Sprintf("%d", n)
}
func truncateRunes(s string, maxRunes int) string {
if maxRunes <= 0 {
return ""
}
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return string(r[:maxRunes]) + "…"
}
+26
View File
@@ -0,0 +1,26 @@
package einoobserve
import (
"context"
"testing"
"cyberstrike-ai/internal/config"
)
func TestAttachAgentRunCallbacks_Disabled(t *testing.T) {
ctx := context.Background()
cfg := &config.MultiAgentEinoCallbacksConfig{Enabled: false}
out := AttachAgentRunCallbacks(ctx, cfg, Params{})
if out != ctx {
t.Fatalf("expected same ctx when disabled")
}
}
func TestTruncateRunes(t *testing.T) {
if got := truncateRunes("abc", 10); got != "abc" {
t.Fatalf("got %q", got)
}
if got := truncateRunes("abcdefghij", 4); got != "abcd…" {
t.Fatalf("got %q", got)
}
}
+111
View File
@@ -0,0 +1,111 @@
package einoobserve
import (
"context"
"fmt"
"strings"
"sync"
"cyberstrike-ai/internal/config"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.uber.org/zap"
)
var (
otelMu sync.Mutex
otelShutdown func(context.Context) error
otelInitialized bool
)
// InitOtelFromConfig installs the global OpenTelemetry TracerProvider when
// eino_callbacks.otel is enabled and exporter is not none. Safe to call multiple times.
func InitOtelFromConfig(cfg *config.MultiAgentEinoCallbacksConfig, log *zap.Logger) (shutdown func(context.Context) error, err error) {
shutdown = func(context.Context) error { return nil }
if cfg == nil || !cfg.OtelTracingActive() {
return shutdown, nil
}
otelMu.Lock()
defer otelMu.Unlock()
if otelInitialized {
if otelShutdown != nil {
return otelShutdown, nil
}
return shutdown, nil
}
oc := cfg.Otel
expKind := oc.OtelExporterEffective()
ctx := context.Background()
var exporter sdktrace.SpanExporter
switch expKind {
case "stdout":
exporter, err = stdouttrace.New()
if err != nil {
return shutdown, fmt.Errorf("eino otel stdout exporter: %w", err)
}
case "otlphttp":
ep := strings.TrimSpace(oc.OTLPEndpoint)
if ep == "" {
ep = "localhost:4318"
}
exporter, err = otlptracehttp.New(ctx,
otlptracehttp.WithEndpoint(ep),
otlptracehttp.WithURLPath("/v1/traces"),
)
if err != nil {
return shutdown, fmt.Errorf("eino otel otlphttp exporter: %w", err)
}
default:
return shutdown, nil
}
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName(oc.ServiceNameEffective()),
),
)
if err != nil {
return shutdown, fmt.Errorf("eino otel resource: %w", err)
}
sampler := sdktrace.ParentBased(sdktrace.TraceIDRatioBased(oc.SampleRatioEffective()))
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sampler),
)
otel.SetTracerProvider(tp)
otelShutdown = tp.Shutdown
otelInitialized = true
if log != nil {
log.Info("eino otel: tracer provider initialized",
zap.String("exporter", expKind),
zap.String("service", oc.ServiceNameEffective()),
zap.Float64("sample_ratio", oc.SampleRatioEffective()),
)
}
return otelShutdown, nil
}
// ShutdownOtel flushes and shuts down the global TracerProvider if it was installed.
func ShutdownOtel(ctx context.Context) error {
otelMu.Lock()
fn := otelShutdown
otelShutdown = nil
inited := otelInitialized
otelInitialized = false
otelMu.Unlock()
if !inited || fn == nil {
return nil
}
return fn(ctx)
}
+15
View File
@@ -14,6 +14,8 @@ import (
"unicode/utf8"
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/einoobserve"
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/openai"
@@ -95,6 +97,9 @@ type einoADKRunLoopArgs struct {
// ModelFacingTrace 可选:由各 ChatModelAgent Handlers 链末尾中间件写入「即将送入模型」的消息快照;
// 非空时优先用于 LastAgentTraceInput 序列化,使续跑与 summarization/reduction 后的上下文一致。
ModelFacingTrace *modelFacingTraceHolder
// EinoCallbacks 可选:为 ADK Runner 注入 eino [callbacks] 全链路观测(见 internal/einoobserve)。
EinoCallbacks *config.MultiAgentEinoCallbacksConfig
}
func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs []adk.Message) (*RunResult, error) {
@@ -289,6 +294,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
})
}
if args.EinoCallbacks != nil {
ctx = einoobserve.AttachAgentRunCallbacks(ctx, args.EinoCallbacks, einoobserve.Params{
Logger: logger,
Progress: progress,
ConversationID: conversationID,
OrchMode: orchMode,
OrchestratorName: orchestratorName,
})
}
runnerCfg := adk.RunnerConfig{
Agent: da,
EnableStreaming: true,
@@ -247,6 +247,7 @@ func RunEinoSingleChatModelAgent(
ToolInvokeNotify: toolInvokeNotify,
DA: chatAgent,
ModelFacingTrace: modelFacingTrace,
EinoCallbacks: &ma.EinoCallbacks,
EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " +
"Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
}, baseMsgs)
+1
View File
@@ -585,6 +585,7 @@ func RunDeepAgent(
ToolInvokeNotify: toolInvokeNotify,
DA: da,
ModelFacingTrace: modelFacingTrace,
EinoCallbacks: &ma.EinoCallbacks,
EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " +
"(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
}, baseMsgs)