mirror of
https://github.com/Vyntral/god-eye.git
synced 2026-05-30 10:59:35 +02:00
3a4c230aa7
Complete architectural overhaul. Replaces the v0.1 monolithic scanner with an event-driven pipeline of auto-registered modules. Foundation (internal/): - eventbus: typed pub/sub, 20 event types, race-safe, drop counter - module: registry with phase-based selection - store: thread-safe host store with per-host locks + deep-copy reads - pipeline: coordinator with phase barriers + panic recovery - config: 5 scan profiles + 3 AI tiers + YAML loader + auto-discovery Modules (26 auto-registered across 6 phases): - Discovery: passive (26 sources), bruteforce, recursive, AXFR, GitHub dorks, CT streaming, permutation, reverse DNS, vhost, ASN, supply chain (npm + PyPI) - Enrichment: HTTP probe + tech fingerprint + TLS appliance ID, ports - Analysis: security checks, takeover (110+ sigs), cloud, JavaScript, GraphQL, JWT, headers (OWASP), HTTP smuggling, AI cascade, Nuclei - Reporting: TXT/JSON/CSV writer + AI scan brief AI layer (internal/ai/ + internal/modules/ai/): - Three profiles: lean (16 GB), balanced (32 GB MoE), heavy (64 GB) - Six event-driven handlers: CVE, JS file, HTTP response, secret filter, multi-agent vuln enrichment, anomaly + executive report - Content-hash cache dedups Ollama calls across hosts - Auto-pull of missing models via /api/pull with streaming progress - End-of-scan AI SCAN BRIEF in terminal with top chains + next actions Nuclei compat layer (internal/nucleitpl/): - Executes ~13k community templates (HTTP subset) - Auto-download of nuclei-templates ZIP to ~/.god-eye/nuclei-templates - Scope filter rejects off-host templates (eliminates OSINT FPs) Operations: - Interactive wizard (internal/wizard/) — zero-flag launch - LivePrinter (internal/tui/) — colorized event stream - Diff engine + scheduler (internal/diff, internal/scheduler) for continuous ASM monitoring with webhook alerts - Proxy support (internal/proxyconf/): http / https / socks5 / socks5h + basic auth Fixes #1 — native SOCKS5 / Tor compatibility via --proxy flag. 185 unit tests across 15 packages, all race-detector clean.
308 lines
9.3 KiB
Go
308 lines
9.3 KiB
Go
package eventbus
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// waitUntil polls predicate every 2ms up to timeout. Used to avoid flaky
|
|
// sleeps in async tests without adding dependencies.
|
|
func waitUntil(t *testing.T, timeout time.Duration, pred func() bool, msg string) {
|
|
t.Helper()
|
|
deadline := time.Now().Add(timeout)
|
|
for time.Now().Before(deadline) {
|
|
if pred() {
|
|
return
|
|
}
|
|
time.Sleep(2 * time.Millisecond)
|
|
}
|
|
t.Fatalf("timeout waiting: %s", msg)
|
|
}
|
|
|
|
func TestPublishSubscribe_SingleType(t *testing.T) {
|
|
b := New(16)
|
|
defer b.Close(context.Background())
|
|
|
|
var got atomic.Int32
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, e Event) {
|
|
ev, ok := e.(SubdomainDiscovered)
|
|
if !ok {
|
|
t.Errorf("wrong event type: %T", e)
|
|
return
|
|
}
|
|
if ev.Subdomain == "" {
|
|
t.Error("empty subdomain")
|
|
}
|
|
got.Add(1)
|
|
})
|
|
|
|
for i := 0; i < 5; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("test", "api.example.com", "passive"))
|
|
}
|
|
waitUntil(t, time.Second, func() bool { return got.Load() == 5 }, "5 events delivered")
|
|
}
|
|
|
|
func TestSubscribeAll_ReceivesEveryType(t *testing.T) {
|
|
b := New(16)
|
|
defer b.Close(context.Background())
|
|
|
|
var got atomic.Int32
|
|
b.SubscribeAll(func(_ context.Context, _ Event) { got.Add(1) })
|
|
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
b.Publish(context.Background(), DNSResolved{EventMeta: newMeta("dns", "a.example.com"), Subdomain: "a.example.com", IPs: []string{"1.2.3.4"}})
|
|
b.Publish(context.Background(), HTTPProbed{EventMeta: newMeta("http", "a.example.com"), URL: "https://a.example.com", StatusCode: 200})
|
|
|
|
waitUntil(t, time.Second, func() bool { return got.Load() == 3 }, "3 events on wildcard")
|
|
}
|
|
|
|
func TestSubscribe_FilteringByType(t *testing.T) {
|
|
b := New(16)
|
|
defer b.Close(context.Background())
|
|
|
|
var subs, dns atomic.Int32
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { subs.Add(1) })
|
|
b.Subscribe(EventDNSResolved, func(_ context.Context, _ Event) { dns.Add(1) })
|
|
|
|
for i := 0; i < 3; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
}
|
|
for i := 0; i < 2; i++ {
|
|
b.Publish(context.Background(), DNSResolved{EventMeta: newMeta("dns", "x"), Subdomain: "x"})
|
|
}
|
|
waitUntil(t, time.Second, func() bool { return subs.Load() == 3 && dns.Load() == 2 }, "typed counts match")
|
|
}
|
|
|
|
func TestUnsubscribe_StopsDelivery(t *testing.T) {
|
|
b := New(16)
|
|
defer b.Close(context.Background())
|
|
|
|
var count atomic.Int32
|
|
sub := b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { count.Add(1) })
|
|
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
waitUntil(t, time.Second, func() bool { return count.Load() == 1 }, "first event")
|
|
|
|
sub.Unsubscribe()
|
|
sub.Unsubscribe() // idempotent
|
|
|
|
// Publish after unsubscribe — should not be delivered to this handler.
|
|
for i := 0; i < 5; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "b.example.com", "p"))
|
|
}
|
|
time.Sleep(30 * time.Millisecond)
|
|
if got := count.Load(); got != 1 {
|
|
t.Errorf("expected 1 delivery after unsubscribe, got %d", got)
|
|
}
|
|
}
|
|
|
|
func TestPublish_MultipleSubscribersEachGetEvent(t *testing.T) {
|
|
b := New(16)
|
|
defer b.Close(context.Background())
|
|
|
|
var a, c atomic.Int32
|
|
b.Subscribe(EventVulnerability, func(_ context.Context, _ Event) { a.Add(1) })
|
|
b.Subscribe(EventVulnerability, func(_ context.Context, _ Event) { c.Add(1) })
|
|
|
|
b.Publish(context.Background(), VulnerabilityFound{EventMeta: newMeta("sec", "x"), ID: "test", Severity: SeverityHigh})
|
|
|
|
waitUntil(t, time.Second, func() bool { return a.Load() == 1 && c.Load() == 1 }, "both subscribers received")
|
|
}
|
|
|
|
func TestPublish_NonBlocking_DropsWhenBufferFull(t *testing.T) {
|
|
b := New(2)
|
|
defer b.Close(context.Background())
|
|
|
|
blocker := make(chan struct{})
|
|
var started atomic.Int32
|
|
b.Subscribe(EventSubdomainDiscovered, func(ctx context.Context, _ Event) {
|
|
started.Add(1)
|
|
<-blocker
|
|
})
|
|
|
|
// First event enters handler (blocks). Next 2 fill the buffer of size 2.
|
|
// Subsequent publishes should be counted as dropped.
|
|
for i := 0; i < 100; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "x.example.com", "p"))
|
|
}
|
|
|
|
// Give the bus a moment to register drops.
|
|
waitUntil(t, time.Second, func() bool {
|
|
return b.Stats().Dropped > 0
|
|
}, "some events dropped when buffer full")
|
|
|
|
// Unblock and close cleanly.
|
|
close(blocker)
|
|
}
|
|
|
|
func TestClose_DrainsAndStops(t *testing.T) {
|
|
b := New(16)
|
|
|
|
var got atomic.Int32
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { got.Add(1) })
|
|
|
|
for i := 0; i < 10; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
if err := b.Close(ctx); err != nil {
|
|
t.Fatalf("Close error: %v", err)
|
|
}
|
|
if got.Load() != 10 {
|
|
t.Errorf("expected 10 delivered before close drains, got %d", got.Load())
|
|
}
|
|
|
|
// Publish after close is a silent no-op.
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "z.example.com", "p"))
|
|
if got.Load() != 10 {
|
|
t.Errorf("delivery continued after close: %d", got.Load())
|
|
}
|
|
}
|
|
|
|
func TestClose_IdempotentAndMulticall(t *testing.T) {
|
|
b := New(4)
|
|
ctx := context.Background()
|
|
if err := b.Close(ctx); err != nil {
|
|
t.Fatalf("first close: %v", err)
|
|
}
|
|
if err := b.Close(ctx); err != nil {
|
|
t.Fatalf("second close: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestPanicInHandler_DoesNotAffectOthers(t *testing.T) {
|
|
b := New(8)
|
|
defer b.Close(context.Background())
|
|
|
|
var good atomic.Int32
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { panic("bad handler") })
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { good.Add(1) })
|
|
|
|
for i := 0; i < 5; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
}
|
|
waitUntil(t, time.Second, func() bool { return good.Load() == 5 }, "good handler received all events")
|
|
}
|
|
|
|
func TestConcurrentPublishers_PreservesInvariant(t *testing.T) {
|
|
// With a fast-enough consumer and large buffer, some events may still be
|
|
// dropped under heavy burst. The invariant that must ALWAYS hold is:
|
|
// Published == Delivered + Dropped
|
|
// This protects against race conditions in metric bookkeeping.
|
|
b := New(4096)
|
|
defer b.Close(context.Background())
|
|
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) {})
|
|
|
|
const publishers = 20
|
|
const perPublisher = 100
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < publishers; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < perPublisher; j++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
total := uint64(publishers * perPublisher)
|
|
waitUntil(t, 5*time.Second, func() bool {
|
|
s := b.Stats()
|
|
return s.Published == total && s.Delivered+s.Dropped == total
|
|
}, "published count matches and delivered+dropped == published")
|
|
}
|
|
|
|
func TestStats_Increment(t *testing.T) {
|
|
b := New(16)
|
|
defer b.Close(context.Background())
|
|
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) {})
|
|
|
|
for i := 0; i < 3; i++ {
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
|
|
}
|
|
waitUntil(t, time.Second, func() bool { return b.Stats().Delivered == 3 }, "3 deliveries recorded")
|
|
s := b.Stats()
|
|
if s.Published != 3 {
|
|
t.Errorf("Published = %d, want 3", s.Published)
|
|
}
|
|
if s.Subscribers != 1 {
|
|
t.Errorf("Subscribers = %d, want 1", s.Subscribers)
|
|
}
|
|
if s.Closed {
|
|
t.Error("Closed = true on open bus")
|
|
}
|
|
}
|
|
|
|
func TestPublish_NilEvent_NoOp(t *testing.T) {
|
|
b := New(8)
|
|
defer b.Close(context.Background())
|
|
var got atomic.Int32
|
|
b.SubscribeAll(func(_ context.Context, _ Event) { got.Add(1) })
|
|
b.Publish(context.Background(), nil)
|
|
time.Sleep(20 * time.Millisecond)
|
|
if got.Load() != 0 {
|
|
t.Errorf("nil event was delivered")
|
|
}
|
|
}
|
|
|
|
func TestPublish_CancelledContext_DropsNotDelivers(t *testing.T) {
|
|
b := New(1)
|
|
defer b.Close(context.Background())
|
|
|
|
hold := make(chan struct{})
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { <-hold })
|
|
|
|
// First publish occupies buffer slot 1 and handler goroutine starts consuming.
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a", "p"))
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
// With ctx already canceled and the subscriber busy, dispatch should record a drop.
|
|
before := b.Stats().Dropped
|
|
b.Publish(ctx, NewSubdomainDiscovered("t", "b", "p"))
|
|
b.Publish(ctx, NewSubdomainDiscovered("t", "c", "p"))
|
|
after := b.Stats().Dropped
|
|
if after <= before {
|
|
t.Errorf("expected Dropped to increase with canceled ctx, before=%d after=%d", before, after)
|
|
}
|
|
|
|
close(hold)
|
|
}
|
|
|
|
func TestHandlerReceivesEventMetadata(t *testing.T) {
|
|
b := New(8)
|
|
defer b.Close(context.Background())
|
|
|
|
done := make(chan Event, 1)
|
|
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, e Event) { done <- e })
|
|
|
|
before := time.Now().Add(-time.Second)
|
|
b.Publish(context.Background(), NewSubdomainDiscovered("sources.crtsh", "api.example.com", "passive:crt.sh"))
|
|
|
|
select {
|
|
case e := <-done:
|
|
m := e.Meta()
|
|
if m.Source != "sources.crtsh" {
|
|
t.Errorf("Source = %q", m.Source)
|
|
}
|
|
if m.Target != "api.example.com" {
|
|
t.Errorf("Target = %q", m.Target)
|
|
}
|
|
if m.At.Before(before) {
|
|
t.Errorf("At = %v is before %v", m.At, before)
|
|
}
|
|
case <-time.After(time.Second):
|
|
t.Fatal("no event received")
|
|
}
|
|
}
|