mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-15 21:08:01 +02:00
436 lines
12 KiB
Go
436 lines
12 KiB
Go
// Package einoobserve attaches CloudWeGo Eino [callbacks.Handler] to ADK Runner contexts for
|
|
// structured logging and optional SSE trace events (eino_trace_*).
|
|
package einoobserve
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"cyberstrike-ai/internal/config"
|
|
|
|
"github.com/cloudwego/eino/adk"
|
|
"github.com/cloudwego/eino/callbacks"
|
|
"github.com/cloudwego/eino/components"
|
|
"github.com/cloudwego/eino/components/model"
|
|
"github.com/cloudwego/eino/components/tool"
|
|
"github.com/cloudwego/eino/schema"
|
|
"github.com/google/uuid"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type ctxSpanKey struct{}
|
|
|
|
type ctxOtelSpanKey struct{}
|
|
|
|
// Params for attaching per-run callback instrumentation.
|
|
type Params struct {
|
|
Logger *zap.Logger
|
|
Progress func(eventType, message string, data interface{})
|
|
ConversationID string
|
|
OrchMode string
|
|
OrchestratorName string
|
|
}
|
|
|
|
// AttachAgentRunCallbacks returns ctx wrapped with callbacks.InitCallbacks when enabled.
|
|
// Safe to call with nil cfg or disabled cfg (returns ctx unchanged).
|
|
func AttachAgentRunCallbacks(ctx context.Context, cfg *config.MultiAgentEinoCallbacksConfig, p Params) context.Context {
|
|
if ctx == nil {
|
|
return ctx
|
|
}
|
|
if cfg == nil || !cfg.Enabled {
|
|
return ctx
|
|
}
|
|
mode := cfg.EinoCallbacksModeEffective()
|
|
if mode == "off" {
|
|
return ctx
|
|
}
|
|
runID := uuid.New().String()
|
|
if p.Progress != nil && cfg.ShouldEmitEinoTraceSSE(mode) {
|
|
p.Progress("eino_trace_run", "Eino callbacks session", map[string]interface{}{
|
|
"runId": runID,
|
|
"conversationId": strings.TrimSpace(p.ConversationID),
|
|
"orchestration": strings.TrimSpace(p.OrchMode),
|
|
"orchestratorName": strings.TrimSpace(p.OrchestratorName),
|
|
"observeMode": mode,
|
|
"source": "eino_callbacks",
|
|
})
|
|
}
|
|
h := &runHandler{
|
|
cfg: *cfg,
|
|
mode: mode,
|
|
params: p,
|
|
runID: runID,
|
|
}
|
|
b := callbacks.NewHandlerBuilder().
|
|
OnStartFn(h.onStart).
|
|
OnEndFn(h.onEnd).
|
|
OnErrorFn(h.onError)
|
|
if mode == "full" {
|
|
b = b.OnStartWithStreamInputFn(h.onStartStreamIn).OnEndWithStreamOutputFn(h.onEndStreamOut)
|
|
}
|
|
ri := &callbacks.RunInfo{
|
|
Name: "CyberStrikeADKRun",
|
|
Type: strings.TrimSpace(p.OrchMode),
|
|
Component: components.Component("AgentSession"),
|
|
}
|
|
return callbacks.InitCallbacks(ctx, ri, b.Build())
|
|
}
|
|
|
|
type runHandler struct {
|
|
cfg config.MultiAgentEinoCallbacksConfig
|
|
mode string
|
|
params Params
|
|
runID string
|
|
|
|
mu sync.Mutex
|
|
spanStack []string
|
|
seq atomic.Uint64
|
|
}
|
|
|
|
func (h *runHandler) genSpanID() string {
|
|
return fmt.Sprintf("%s-%d", h.runID, h.seq.Add(1))
|
|
}
|
|
|
|
func (h *runHandler) popSpan() (id string) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if len(h.spanStack) == 0 {
|
|
return ""
|
|
}
|
|
id = h.spanStack[len(h.spanStack)-1]
|
|
h.spanStack = h.spanStack[:len(h.spanStack)-1]
|
|
return id
|
|
}
|
|
|
|
// popMatching removes the given id from the stack top if it matches; otherwise pops until empty or match (rare ordering mismatch).
|
|
func (h *runHandler) popMatching(want string) string {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
if want == "" {
|
|
if len(h.spanStack) == 0 {
|
|
return ""
|
|
}
|
|
id := h.spanStack[len(h.spanStack)-1]
|
|
h.spanStack = h.spanStack[:len(h.spanStack)-1]
|
|
return id
|
|
}
|
|
for len(h.spanStack) > 0 {
|
|
top := h.spanStack[len(h.spanStack)-1]
|
|
h.spanStack = h.spanStack[:len(h.spanStack)-1]
|
|
if top == want {
|
|
return top
|
|
}
|
|
}
|
|
return want
|
|
}
|
|
|
|
func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
|
|
var parentID string
|
|
h.mu.Lock()
|
|
if len(h.spanStack) > 0 {
|
|
parentID = h.spanStack[len(h.spanStack)-1]
|
|
}
|
|
spanID := h.genSpanID()
|
|
h.spanStack = append(h.spanStack, spanID)
|
|
h.mu.Unlock()
|
|
|
|
inSum := summarizeCallbackInput(input, h.cfg.EinoCallbacksMaxInputSummaryRunes())
|
|
if h.cfg.OtelTracingActive() {
|
|
tracer := otel.Tracer("cyberstrike/eino")
|
|
spanName := callbackSpanName(info)
|
|
var sp trace.Span
|
|
ctx, sp = tracer.Start(ctx, spanName,
|
|
trace.WithSpanKind(trace.SpanKindInternal),
|
|
trace.WithAttributes(
|
|
attribute.String("eino.component", string(info.Component)),
|
|
attribute.String("eino.name", info.Name),
|
|
attribute.String("eino.type", info.Type),
|
|
attribute.String("cyberstrike.run_id", h.runID),
|
|
attribute.String("cyberstrike.conversation_id", strings.TrimSpace(h.params.ConversationID)),
|
|
attribute.String("cyberstrike.orchestration", strings.TrimSpace(h.params.OrchMode)),
|
|
),
|
|
)
|
|
if inSum != "" {
|
|
sp.SetAttributes(attribute.String("eino.input.summary", truncateForAttr(inSum, 256)))
|
|
}
|
|
ctx = context.WithValue(ctx, ctxOtelSpanKey{}, sp)
|
|
}
|
|
if h.params.Logger != nil {
|
|
fields := []zap.Field{
|
|
zap.String("runId", h.runID),
|
|
zap.String("spanId", spanID),
|
|
zap.String("parentSpanId", parentID),
|
|
zap.String("component", string(info.Component)),
|
|
zap.String("name", info.Name),
|
|
zap.String("type", info.Type),
|
|
zap.String("phase", "start"),
|
|
}
|
|
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
|
|
if sc := sp.SpanContext(); sc.IsValid() {
|
|
fields = append(fields,
|
|
zap.String("trace_id", sc.TraceID().String()),
|
|
zap.String("otel_span_id", sc.SpanID().String()),
|
|
)
|
|
}
|
|
}
|
|
if h.cfg.ZapVerbose {
|
|
h.params.Logger.Debug("eino_callback", append(fields, zap.String("inputSummary", inSum))...)
|
|
} else {
|
|
h.params.Logger.Info("eino_callback", fields...)
|
|
}
|
|
}
|
|
if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) {
|
|
h.params.Progress("eino_trace_start", "", map[string]interface{}{
|
|
"runId": h.runID,
|
|
"spanId": spanID,
|
|
"parentSpanId": parentID,
|
|
"conversationId": strings.TrimSpace(h.params.ConversationID),
|
|
"orchestration": strings.TrimSpace(h.params.OrchMode),
|
|
"component": string(info.Component),
|
|
"name": info.Name,
|
|
"type": info.Type,
|
|
"ts": time.Now().UTC().Format(time.RFC3339Nano),
|
|
"inputSummary": inSum,
|
|
"source": "eino_callbacks",
|
|
})
|
|
}
|
|
ctx = context.WithValue(ctx, ctxSpanKey{}, spanID)
|
|
return ctx
|
|
}
|
|
|
|
func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
|
|
spanID, _ := ctx.Value(ctxSpanKey{}).(string)
|
|
if spanID == "" {
|
|
spanID = h.popSpan()
|
|
} else {
|
|
spanID = h.popMatching(spanID)
|
|
}
|
|
outSum := summarizeCallbackOutput(output, h.cfg.EinoCallbacksMaxOutputSummaryRunes())
|
|
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
|
|
if outSum != "" {
|
|
sp.SetAttributes(attribute.String("eino.output.summary", truncateForAttr(outSum, 256)))
|
|
}
|
|
sp.SetStatus(codes.Ok, "")
|
|
sp.End()
|
|
}
|
|
if h.params.Logger != nil {
|
|
fields := []zap.Field{
|
|
zap.String("runId", h.runID),
|
|
zap.String("spanId", spanID),
|
|
zap.String("component", string(info.Component)),
|
|
zap.String("name", info.Name),
|
|
zap.String("type", info.Type),
|
|
zap.String("phase", "end"),
|
|
}
|
|
if h.cfg.ZapVerbose {
|
|
h.params.Logger.Debug("eino_callback", append(fields, zap.String("outputSummary", outSum))...)
|
|
} else {
|
|
h.params.Logger.Info("eino_callback", fields...)
|
|
}
|
|
}
|
|
if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) {
|
|
h.params.Progress("eino_trace_end", "", map[string]interface{}{
|
|
"runId": h.runID,
|
|
"spanId": spanID,
|
|
"conversationId": strings.TrimSpace(h.params.ConversationID),
|
|
"orchestration": strings.TrimSpace(h.params.OrchMode),
|
|
"component": string(info.Component),
|
|
"name": info.Name,
|
|
"type": info.Type,
|
|
"ts": time.Now().UTC().Format(time.RFC3339Nano),
|
|
"outputSummary": outSum,
|
|
"source": "eino_callbacks",
|
|
})
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
|
|
spanID, _ := ctx.Value(ctxSpanKey{}).(string)
|
|
if spanID == "" {
|
|
spanID = h.popSpan()
|
|
} else {
|
|
spanID = h.popMatching(spanID)
|
|
}
|
|
msg := ""
|
|
if err != nil {
|
|
msg = truncateRunes(err.Error(), h.cfg.EinoCallbacksMaxOutputSummaryRunes())
|
|
}
|
|
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
|
|
if err != nil {
|
|
sp.RecordError(err)
|
|
}
|
|
sp.SetStatus(codes.Error, msg)
|
|
sp.End()
|
|
}
|
|
if h.params.Logger != nil {
|
|
h.params.Logger.Warn("eino_callback_error",
|
|
zap.String("runId", h.runID),
|
|
zap.String("spanId", spanID),
|
|
zap.String("component", string(info.Component)),
|
|
zap.String("name", info.Name),
|
|
zap.String("type", info.Type),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
if h.params.Progress != nil && h.cfg.ShouldEmitEinoTraceSSE(h.mode) {
|
|
h.params.Progress("eino_trace_error", msg, map[string]interface{}{
|
|
"runId": h.runID,
|
|
"spanId": spanID,
|
|
"conversationId": strings.TrimSpace(h.params.ConversationID),
|
|
"orchestration": strings.TrimSpace(h.params.OrchMode),
|
|
"component": string(info.Component),
|
|
"name": info.Name,
|
|
"type": info.Type,
|
|
"ts": time.Now().UTC().Format(time.RFC3339Nano),
|
|
"error": msg,
|
|
"source": "eino_callbacks",
|
|
})
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
func (h *runHandler) onStartStreamIn(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
|
|
if input != nil {
|
|
input.Close()
|
|
}
|
|
if h.params.Logger != nil {
|
|
h.params.Logger.Debug("eino_callback_stream_in",
|
|
zap.String("runId", h.runID),
|
|
zap.String("component", string(info.Component)),
|
|
zap.String("name", info.Name),
|
|
)
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
func (h *runHandler) onEndStreamOut(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
|
|
if output != nil {
|
|
output.Close()
|
|
}
|
|
if h.params.Logger != nil {
|
|
h.params.Logger.Debug("eino_callback_stream_out",
|
|
zap.String("runId", h.runID),
|
|
zap.String("component", string(info.Component)),
|
|
zap.String("name", info.Name),
|
|
)
|
|
}
|
|
return ctx
|
|
}
|
|
|
|
func callbackSpanName(info *callbacks.RunInfo) string {
|
|
if info == nil {
|
|
return "eino.callback"
|
|
}
|
|
comp := strings.TrimSpace(string(info.Component))
|
|
name := strings.TrimSpace(info.Name)
|
|
typ := strings.TrimSpace(info.Type)
|
|
if name != "" && comp != "" {
|
|
return comp + "/" + name
|
|
}
|
|
if typ != "" && comp != "" {
|
|
return comp + "[" + typ + "]"
|
|
}
|
|
if comp != "" {
|
|
return comp
|
|
}
|
|
return "eino.callback"
|
|
}
|
|
|
|
func truncateForAttr(s string, maxRunes int) string {
|
|
return truncateRunes(s, maxRunes)
|
|
}
|
|
|
|
func summarizeCallbackInput(in callbacks.CallbackInput, maxRunes int) string {
|
|
if in == nil {
|
|
return ""
|
|
}
|
|
if ai := adk.ConvAgentCallbackInput(in); ai != nil {
|
|
parts := []string{"agent"}
|
|
if ai.Input != nil {
|
|
parts = append(parts, fmt.Sprintf("messages=%d", len(ai.Input.Messages)))
|
|
}
|
|
if ai.ResumeInfo != nil {
|
|
parts = append(parts, "resume=true")
|
|
}
|
|
return strings.Join(parts, " ")
|
|
}
|
|
if mi := model.ConvCallbackInput(in); mi != nil {
|
|
return fmt.Sprintf("chatModel messages=%d tools=%d", len(mi.Messages), len(mi.Tools))
|
|
}
|
|
if ti := tool.ConvCallbackInput(in); ti != nil {
|
|
raw := ti.ArgumentsInJSON
|
|
return "tool args=" + truncateRunes(raw, maxRunes)
|
|
}
|
|
b, err := json.Marshal(in)
|
|
if err != nil {
|
|
return fmt.Sprintf("%T", in)
|
|
}
|
|
return truncateRunes(string(b), maxRunes)
|
|
}
|
|
|
|
func summarizeCallbackOutput(out callbacks.CallbackOutput, maxRunes int) string {
|
|
if out == nil {
|
|
return ""
|
|
}
|
|
if ao := adk.ConvAgentCallbackOutput(out); ao != nil {
|
|
return "agent_events=stream"
|
|
}
|
|
if mo := model.ConvCallbackOutput(out); mo != nil && mo.Message != nil {
|
|
s := ""
|
|
if mo.Message.Content != "" {
|
|
s = mo.Message.Content
|
|
}
|
|
if mo.TokenUsage != nil {
|
|
return fmt.Sprintf("tokens total=%d completion=%d prompt=%d text=%s",
|
|
mo.TokenUsage.TotalTokens, mo.TokenUsage.CompletionTokens, mo.TokenUsage.PromptTokens,
|
|
truncateRunes(s, minInt(120, maxRunes)))
|
|
}
|
|
return "assistant len=" + itoa(len(s))
|
|
}
|
|
if to := tool.ConvCallbackOutput(out); to != nil {
|
|
if to.Response != "" {
|
|
return truncateRunes(to.Response, maxRunes)
|
|
}
|
|
if to.ToolOutput != nil {
|
|
return "tool_result multimodal"
|
|
}
|
|
}
|
|
b, err := json.Marshal(out)
|
|
if err != nil {
|
|
return fmt.Sprintf("%T", out)
|
|
}
|
|
return truncateRunes(string(b), maxRunes)
|
|
}
|
|
|
|
func minInt(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func itoa(n int) string {
|
|
return fmt.Sprintf("%d", n)
|
|
}
|
|
|
|
func truncateRunes(s string, maxRunes int) string {
|
|
if maxRunes <= 0 {
|
|
return ""
|
|
}
|
|
r := []rune(s)
|
|
if len(r) <= maxRunes {
|
|
return s
|
|
}
|
|
return string(r[:maxRunes]) + "…"
|
|
}
|