Add files via upload

This commit is contained in:
公明
2026-05-28 13:00:01 +08:00
committed by GitHub
parent 1553e896c5
commit 9941f51b3e
+41 -25
View File
@@ -96,6 +96,17 @@ type runHandler struct {
seq atomic.Uint64
}
func safeRunInfo(info *callbacks.RunInfo) callbacks.RunInfo {
if info == nil {
return callbacks.RunInfo{
Name: "unknown",
Type: "unknown",
Component: components.Component("unknown"),
}
}
return *info
}
func (h *runHandler) genSpanID() string {
return fmt.Sprintf("%s-%d", h.runID, h.seq.Add(1))
}
@@ -134,6 +145,7 @@ func (h *runHandler) popMatching(want string) string {
}
func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
ri := safeRunInfo(info)
var parentID string
h.mu.Lock()
if len(h.spanStack) > 0 {
@@ -151,9 +163,9 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input
ctx, sp = tracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
attribute.String("eino.component", string(info.Component)),
attribute.String("eino.name", info.Name),
attribute.String("eino.type", info.Type),
attribute.String("eino.component", string(ri.Component)),
attribute.String("eino.name", ri.Name),
attribute.String("eino.type", ri.Type),
attribute.String("cyberstrike.run_id", h.runID),
attribute.String("cyberstrike.conversation_id", strings.TrimSpace(h.params.ConversationID)),
attribute.String("cyberstrike.orchestration", strings.TrimSpace(h.params.OrchMode)),
@@ -169,9 +181,9 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input
zap.String("runId", h.runID),
zap.String("spanId", spanID),
zap.String("parentSpanId", parentID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("type", info.Type),
zap.String("component", string(ri.Component)),
zap.String("name", ri.Name),
zap.String("type", ri.Type),
zap.String("phase", "start"),
}
if sp, ok := ctx.Value(ctxOtelSpanKey{}).(trace.Span); ok && sp != nil {
@@ -195,9 +207,9 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input
"parentSpanId": parentID,
"conversationId": strings.TrimSpace(h.params.ConversationID),
"orchestration": strings.TrimSpace(h.params.OrchMode),
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
"component": string(ri.Component),
"name": ri.Name,
"type": ri.Type,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"inputSummary": inSum,
"source": "eino_callbacks",
@@ -208,6 +220,7 @@ func (h *runHandler) onStart(ctx context.Context, info *callbacks.RunInfo, input
}
func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
ri := safeRunInfo(info)
spanID, _ := ctx.Value(ctxSpanKey{}).(string)
if spanID == "" {
spanID = h.popSpan()
@@ -226,9 +239,9 @@ func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output
fields := []zap.Field{
zap.String("runId", h.runID),
zap.String("spanId", spanID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("type", info.Type),
zap.String("component", string(ri.Component)),
zap.String("name", ri.Name),
zap.String("type", ri.Type),
zap.String("phase", "end"),
}
if h.cfg.ZapVerbose {
@@ -243,9 +256,9 @@ func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output
"spanId": spanID,
"conversationId": strings.TrimSpace(h.params.ConversationID),
"orchestration": strings.TrimSpace(h.params.OrchMode),
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
"component": string(ri.Component),
"name": ri.Name,
"type": ri.Type,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"outputSummary": outSum,
"source": "eino_callbacks",
@@ -255,6 +268,7 @@ func (h *runHandler) onEnd(ctx context.Context, info *callbacks.RunInfo, output
}
func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
ri := safeRunInfo(info)
spanID, _ := ctx.Value(ctxSpanKey{}).(string)
if spanID == "" {
spanID = h.popSpan()
@@ -276,9 +290,9 @@ func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err e
h.params.Logger.Warn("eino_callback_error",
zap.String("runId", h.runID),
zap.String("spanId", spanID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("type", info.Type),
zap.String("component", string(ri.Component)),
zap.String("name", ri.Name),
zap.String("type", ri.Type),
zap.Error(err),
)
}
@@ -288,9 +302,9 @@ func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err e
"spanId": spanID,
"conversationId": strings.TrimSpace(h.params.ConversationID),
"orchestration": strings.TrimSpace(h.params.OrchMode),
"component": string(info.Component),
"name": info.Name,
"type": info.Type,
"component": string(ri.Component),
"name": ri.Name,
"type": ri.Type,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
"error": msg,
"source": "eino_callbacks",
@@ -300,28 +314,30 @@ func (h *runHandler) onError(ctx context.Context, info *callbacks.RunInfo, err e
}
func (h *runHandler) onStartStreamIn(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
ri := safeRunInfo(info)
if input != nil {
input.Close()
}
if h.params.Logger != nil {
h.params.Logger.Debug("eino_callback_stream_in",
zap.String("runId", h.runID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("component", string(ri.Component)),
zap.String("name", ri.Name),
)
}
return ctx
}
func (h *runHandler) onEndStreamOut(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
ri := safeRunInfo(info)
if output != nil {
output.Close()
}
if h.params.Logger != nil {
h.params.Logger.Debug("eino_callback_stream_out",
zap.String("runId", h.runID),
zap.String("component", string(info.Component)),
zap.String("name", info.Name),
zap.String("component", string(ri.Component)),
zap.String("name", ri.Name),
)
}
return ctx