diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 2bade093..ab0696e6 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -117,6 +117,7 @@ func RunEinoSingleChatModelAgent( }, } httpClient = openai.NewEinoHTTPClient(&appCfg.OpenAI, httpClient) + openai.AttachSummarizationDiagTransport(httpClient, logger) baseModelCfg := &einoopenai.ChatModelConfig{ APIKey: appCfg.OpenAI.APIKey, diff --git a/internal/multiagent/eino_summarize.go b/internal/multiagent/eino_summarize.go index d1ab90b2..762ffd34 100644 --- a/internal/multiagent/eino_summarize.go +++ b/internal/multiagent/eino_summarize.go @@ -9,15 +9,19 @@ import ( "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/config" + copenai "cyberstrike-ai/internal/openai" "github.com/bytedance/sonic" "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/adk/middlewares/summarization" "github.com/cloudwego/eino/components/model" "github.com/cloudwego/eino/schema" + einoopenai "github.com/cloudwego/eino-ext/components/model/openai" "go.uber.org/zap" ) +const defaultSummarizationRetryMax = 3 + // einoSummarizeUserInstruction:压缩历史时保留渗透测试关键信息。 const einoSummarizeUserInstruction = `在保持所有关键安全测试信息完整的前提下压缩对话历史。 @@ -89,8 +93,32 @@ func newEinoSummarizationMiddleware( } } + retryMax := defaultSummarizationRetryMax + if mwCfg != nil && mwCfg.SummarizationRetryMaxAttempts > 0 { + retryMax = mwCfg.SummarizationRetryMaxAttempts + } + + // ModelOptions apply only to summarization Generate (same ChatModel instance as the agent). + // Strip thinking/reasoning on this call path; mark requests for empty-choices diagnostics. + summaryModelOpts := []model.Option{ + einoopenai.WithExtraHeader(map[string]string{ + copenai.SummarizationRequestHeader: "1", + }), + einoopenai.WithRequestPayloadModifier(func(_ context.Context, in []*schema.Message, rawBody []byte) ([]byte, error) { + if logger != nil { + logger.Info("eino summarization generate request", + zap.Int("input_messages", len(in)), + zap.Int("payload_bytes", len(rawBody)), + zap.String("model", modelName), + ) + } + return stripReasoningFromSummarizationPayload(rawBody) + }), + } + mw, err := summarization.New(ctx, &summarization.Config{ - Model: summaryModel, + Model: summaryModel, + ModelOptions: summaryModelOpts, Trigger: &summarization.TriggerCondition{ ContextTokens: trigger, }, @@ -102,6 +130,18 @@ func newEinoSummarizationMiddleware( Enabled: true, MaxTokens: preserveMax, }, + Retry: &summarization.RetryConfig{ + MaxRetries: &retryMax, + ShouldRetry: func(_ context.Context, _ adk.Message, err error) bool { + if err != nil && logger != nil { + logger.Warn("eino summarization generate attempt failed, will retry if attempts remain", + zap.Error(err), + zap.Int("max_retries", retryMax), + ) + } + return err != nil + }, + }, Finalize: func(ctx context.Context, originalMessages []adk.Message, summary adk.Message) ([]adk.Message, error) { return summarizeFinalizeWithRecentAssistantToolTrail(ctx, originalMessages, summary, tokenCounter, recentTrailMax) }, diff --git a/internal/multiagent/eino_summarize_payload.go b/internal/multiagent/eino_summarize_payload.go new file mode 100644 index 00000000..03372dac --- /dev/null +++ b/internal/multiagent/eino_summarize_payload.go @@ -0,0 +1,35 @@ +package multiagent + +import ( + "github.com/bytedance/sonic" +) + +// stripReasoningFromSummarizationPayload removes thinking / reasoning fields from a +// chat-completions JSON body. Applied only to summarization Generate calls via +// model.ModelOptions on the shared ChatModel — main-agent requests are unchanged. +func stripReasoningFromSummarizationPayload(rawBody []byte) ([]byte, error) { + var payload map[string]any + if err := sonic.Unmarshal(rawBody, &payload); err != nil { + return rawBody, nil + } + changed := false + for _, key := range []string{ + "thinking", + "reasoning_effort", + "output_config", + "reasoning", + } { + if _, ok := payload[key]; ok { + delete(payload, key) + changed = true + } + } + if !changed { + return rawBody, nil + } + out, err := sonic.Marshal(payload) + if err != nil { + return rawBody, err + } + return out, nil +} diff --git a/internal/multiagent/eino_summarize_payload_test.go b/internal/multiagent/eino_summarize_payload_test.go new file mode 100644 index 00000000..a84ce33f --- /dev/null +++ b/internal/multiagent/eino_summarize_payload_test.go @@ -0,0 +1,30 @@ +package multiagent + +import ( + "strings" + "testing" +) + +func TestStripReasoningFromSummarizationPayload(t *testing.T) { + in := []byte(`{"model":"deepseek-chat","messages":[],"thinking":{"type":"enabled"},"reasoning_effort":"high"}`) + out, err := stripReasoningFromSummarizationPayload(in) + if err != nil { + t.Fatal(err) + } + s := string(out) + if strings.Contains(s, "thinking") || strings.Contains(s, "reasoning_effort") { + t.Fatalf("expected reasoning fields stripped, got %s", s) + } + if !strings.Contains(s, `"model":"deepseek-chat"`) { + t.Fatalf("expected model preserved, got %s", s) + } + + plain := []byte(`{"model":"gpt-4o","messages":[]}`) + out2, err := stripReasoningFromSummarizationPayload(plain) + if err != nil { + t.Fatal(err) + } + if string(out2) != string(plain) { + t.Fatalf("expected unchanged payload, got %s", out2) + } +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index d2f12db6..6d5c2237 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -161,6 +161,7 @@ func RunDeepAgent( // 若配置为 Claude provider,注入自动桥接 transport,对 Eino 透明走 Anthropic Messages API httpClient = openai.NewEinoHTTPClient(&appCfg.OpenAI, httpClient) + openai.AttachSummarizationDiagTransport(httpClient, logger) baseModelCfg := &einoopenai.ChatModelConfig{ APIKey: appCfg.OpenAI.APIKey, diff --git a/internal/openai/summarization_diag.go b/internal/openai/summarization_diag.go new file mode 100644 index 00000000..c3be41e5 --- /dev/null +++ b/internal/openai/summarization_diag.go @@ -0,0 +1,88 @@ +package openai + +import ( + "bytes" + "io" + "net/http" + "strings" + + "github.com/bytedance/sonic" + "go.uber.org/zap" +) + +// SummarizationRequestHeader marks chat/completion requests issued by Eino summarization +// middleware (via model.WithExtraHeader). The diagnostic transport logs empty-choices bodies +// only for these requests so main-agent traffic stays quiet. +const SummarizationRequestHeader = "X-CyberStrike-Summarization" + +const summarizationDiagBodyMaxBytes = 8192 + +// AttachSummarizationDiagTransport wraps client.Transport to log raw API bodies when +// summarization receives HTTP 200 with an empty choices array. +func AttachSummarizationDiagTransport(client *http.Client, logger *zap.Logger) { + if client == nil || logger == nil { + return + } + base := client.Transport + if base == nil { + base = http.DefaultTransport + } + client.Transport = &summarizationDiagRoundTripper{base: base, logger: logger} +} + +type summarizationDiagRoundTripper struct { + base http.RoundTripper + logger *zap.Logger +} + +func (rt *summarizationDiagRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.base.RoundTrip(req) + if err != nil || resp == nil || resp.Body == nil { + return resp, err + } + if !isSummarizationRequest(req) || !strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "json") { + return resp, err + } + + body, readErr := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if readErr != nil { + resp.Body = io.NopCloser(bytes.NewReader(nil)) + return resp, err + } + resp.Body = io.NopCloser(bytes.NewReader(body)) + resp.ContentLength = int64(len(body)) + + if rt.logger != nil && summarizationResponseEmptyChoices(body) { + rt.logger.Warn("eino summarization: API returned empty choices", + zap.Int("status", resp.StatusCode), + zap.Int("response_bytes", len(body)), + zap.String("raw_body", truncateForLog(string(body), summarizationDiagBodyMaxBytes)), + ) + } + return resp, err +} + +func isSummarizationRequest(req *http.Request) bool { + if req == nil { + return false + } + return strings.TrimSpace(req.Header.Get(SummarizationRequestHeader)) == "1" +} + +func summarizationResponseEmptyChoices(body []byte) bool { + var parsed struct { + Choices []any `json:"choices"` + } + if err := sonic.Unmarshal(body, &parsed); err != nil { + return false + } + return len(parsed.Choices) == 0 +} + +func truncateForLog(s string, maxBytes int) string { + if maxBytes <= 0 || len(s) <= maxBytes { + return s + } + return s[:maxBytes] + "…(truncated)" +} diff --git a/internal/openai/summarization_diag_test.go b/internal/openai/summarization_diag_test.go new file mode 100644 index 00000000..753a61ae --- /dev/null +++ b/internal/openai/summarization_diag_test.go @@ -0,0 +1,47 @@ +package openai + +import ( + "io" + "net/http" + "strings" + "testing" + + "go.uber.org/zap" +) + +type staticRoundTripper struct { + status int + body string +} + +func (s *staticRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: s.status, + Header: http.Header{"Content-Type": []string{"application/json"}}, + Body: io.NopCloser(strings.NewReader(s.body)), + }, nil +} + +func TestSummarizationResponseEmptyChoices(t *testing.T) { + if !summarizationResponseEmptyChoices([]byte(`{"choices":[]}`)) { + t.Fatal("expected empty choices") + } + if summarizationResponseEmptyChoices([]byte(`{"choices":[{"index":0}]}`)) { + t.Fatal("expected non-empty choices") + } +} + +func TestSummarizationDiagRoundTripper_SkipsWithoutHeader(t *testing.T) { + client := &http.Client{ + Transport: &summarizationDiagRoundTripper{ + base: &staticRoundTripper{status: 200, body: `{"choices":[]}`}, + logger: zap.NewNop(), + }, + } + req, _ := http.NewRequest(http.MethodPost, "https://example.com/v1/chat/completions", nil) + resp, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + _ = resp.Body.Close() +}