diff --git a/internal/einoobserve/attach.go b/internal/einoobserve/attach.go index dfc9809f..62c5e4bd 100644 --- a/internal/einoobserve/attach.go +++ b/internal/einoobserve/attach.go @@ -96,6 +96,17 @@ type runHandler struct { seq atomic.Uint64 } +func safeRunInfo(info *callbacks.RunInfo) callbacks.RunInfo { + if info == nil { + return callbacks.RunInfo{ + Name: "unknown", + Type: "unknown", + Component: components.Component("unknown"), + } + } + return *info +} + func (h *runHandler) genSpanID() string { return fmt.Sprintf("%s-%d", h.runID, h.seq.Add(1)) } @@ -134,6 +145,7 @@ func (h *runHandler) popMatching(want string) string { } func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { + ri := safeRunInfo(info) var parentID string h.mu.Lock() if len(h.spanStack) > 0 { @@ -151,9 +163,9 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input ctx, sp = tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindInternal), trace.WithAttributes( - attribute.String("eino.component", string(info.Component)), - attribute.String("eino.name", info.Name), - attribute.String("eino.type", info.Type), + attribute.String("eino.component", string(ri.Component)), + attribute.String("eino.name", ri.Name), + attribute.String("eino.type", ri.Type), attribute.String("cyberstrike.run_id", h.runID), attribute.String("cyberstrike.conversation_id", strings.TrimSpace(h.params.ConversationID)), attribute.String("cyberstrike.orchestration", strings.TrimSpace(h.params.OrchMode)), @@ -169,9 +181,9 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input zap.String("runId", h.runID), zap.String("spanId", spanID), zap.String("parentSpanId", parentID), - zap.String("component", string(info.Component)), - zap.String("name", info.Name), - zap.String("type", info.Type), + zap.String("component", string(ri.Component)), + zap.String("name", ri.Name), + zap.String("type", ri.Type), zap.String("phase", "start"), } if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil { @@ -195,9 +207,9 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input "parentSpanId": parentID, "conversationId": strings.TrimSpace(h.params.ConversationID), "orchestration": strings.TrimSpace(h.params.OrchMode), - "component": string(info.Component), - "name": info.Name, - "type": info.Type, + "component": string(ri.Component), + "name": ri.Name, + "type": ri.Type, "ts": time.Now().UTC().Format(time.RFC3339Nano), "inputSummary": inSum, "source": "eino_callbacks", @@ -208,6 +220,7 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input } func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { + ri := safeRunInfo(info) spanID, _ := ctx.Value(ctxSpanKey{}).(string) if spanID == "" { spanID = h.popSpan() @@ -226,9 +239,9 @@ func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output fields := []zap.Field{ zap.String("runId", h.runID), zap.String("spanId", spanID), - zap.String("component", string(info.Component)), - zap.String("name", info.Name), - zap.String("type", info.Type), + zap.String("component", string(ri.Component)), + zap.String("name", ri.Name), + zap.String("type", ri.Type), zap.String("phase", "end"), } if h.cfg.ZapVerbose { @@ -243,9 +256,9 @@ func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output "spanId": spanID, "conversationId": strings.TrimSpace(h.params.ConversationID), "orchestration": strings.TrimSpace(h.params.OrchMode), - "component": string(info.Component), - "name": info.Name, - "type": info.Type, + "component": string(ri.Component), + "name": ri.Name, + "type": ri.Type, "ts": time.Now().UTC().Format(time.RFC3339Nano), "outputSummary": outSum, "source": "eino_callbacks", @@ -255,6 +268,7 @@ func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output } func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { + ri := safeRunInfo(info) spanID, _ := ctx.Value(ctxSpanKey{}).(string) if spanID == "" { spanID = h.popSpan() @@ -276,9 +290,9 @@ func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err e h.params.Logger.Warn("eino_callback_error", zap.String("runId", h.runID), zap.String("spanId", spanID), - zap.String("component", string(info.Component)), - zap.String("name", info.Name), - zap.String("type", info.Type), + zap.String("component", string(ri.Component)), + zap.String("name", ri.Name), + zap.String("type", ri.Type), zap.Error(err), ) } @@ -288,9 +302,9 @@ func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err e "spanId": spanID, "conversationId": strings.TrimSpace(h.params.ConversationID), "orchestration": strings.TrimSpace(h.params.OrchMode), - "component": string(info.Component), - "name": info.Name, - "type": info.Type, + "component": string(ri.Component), + "name": ri.Name, + "type": ri.Type, "ts": time.Now().UTC().Format(time.RFC3339Nano), "error": msg, "source": "eino_callbacks", @@ -300,28 +314,30 @@ func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err e } func (h *runHandler) onStartStreamIn(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context { + ri := safeRunInfo(info) if input != nil { input.Close() } if h.params.Logger != nil { h.params.Logger.Debug("eino_callback_stream_in", zap.String("runId", h.runID), - zap.String("component", string(info.Component)), - zap.String("name", info.Name), + zap.String("component", string(ri.Component)), + zap.String("name", ri.Name), ) } return ctx } func (h *runHandler) onEndStreamOut(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { + ri := safeRunInfo(info) if output != nil { output.Close() } if h.params.Logger != nil { h.params.Logger.Debug("eino_callback_stream_out", zap.String("runId", h.runID), - zap.String("component", string(info.Component)), - zap.String("name", info.Name), + zap.String("component", string(ri.Component)), + zap.String("name", ri.Name), ) } return ctx