Files
Vyntral 3a4c230aa7 feat: v2.0 full rewrite — event-driven pipeline, AI + Nuclei + proxy
Complete architectural overhaul. Replaces the v0.1 monolithic scanner
with an event-driven pipeline of auto-registered modules.

Foundation (internal/):
- eventbus: typed pub/sub, 20 event types, race-safe, drop counter
- module: registry with phase-based selection
- store: thread-safe host store with per-host locks + deep-copy reads
- pipeline: coordinator with phase barriers + panic recovery
- config: 5 scan profiles + 3 AI tiers + YAML loader + auto-discovery

Modules (26 auto-registered across 6 phases):
- Discovery: passive (26 sources), bruteforce, recursive, AXFR, GitHub
  dorks, CT streaming, permutation, reverse DNS, vhost, ASN, supply
  chain (npm + PyPI)
- Enrichment: HTTP probe + tech fingerprint + TLS appliance ID, ports
- Analysis: security checks, takeover (110+ sigs), cloud, JavaScript,
  GraphQL, JWT, headers (OWASP), HTTP smuggling, AI cascade, Nuclei
- Reporting: TXT/JSON/CSV writer + AI scan brief

AI layer (internal/ai/ + internal/modules/ai/):
- Three profiles: lean (16 GB), balanced (32 GB MoE), heavy (64 GB)
- Six event-driven handlers: CVE, JS file, HTTP response, secret
  filter, multi-agent vuln enrichment, anomaly + executive report
- Content-hash cache dedups Ollama calls across hosts
- Auto-pull of missing models via /api/pull with streaming progress
- End-of-scan AI SCAN BRIEF in terminal with top chains + next actions

Nuclei compat layer (internal/nucleitpl/):
- Executes ~13k community templates (HTTP subset)
- Auto-download of nuclei-templates ZIP to ~/.god-eye/nuclei-templates
- Scope filter rejects off-host templates (eliminates OSINT FPs)

Operations:
- Interactive wizard (internal/wizard/) — zero-flag launch
- LivePrinter (internal/tui/) — colorized event stream
- Diff engine + scheduler (internal/diff, internal/scheduler) for
  continuous ASM monitoring with webhook alerts
- Proxy support (internal/proxyconf/): http / https / socks5 / socks5h
  + basic auth

Fixes #1 — native SOCKS5 / Tor compatibility via --proxy flag.

185 unit tests across 15 packages, all race-detector clean.
2026-04-18 16:48:41 +02:00

308 lines
9.3 KiB
Go

package eventbus
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
)
// waitUntil polls predicate every 2ms up to timeout. Used to avoid flaky
// sleeps in async tests without adding dependencies.
func waitUntil(t *testing.T, timeout time.Duration, pred func() bool, msg string) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if pred() {
return
}
time.Sleep(2 * time.Millisecond)
}
t.Fatalf("timeout waiting: %s", msg)
}
func TestPublishSubscribe_SingleType(t *testing.T) {
b := New(16)
defer b.Close(context.Background())
var got atomic.Int32
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, e Event) {
ev, ok := e.(SubdomainDiscovered)
if !ok {
t.Errorf("wrong event type: %T", e)
return
}
if ev.Subdomain == "" {
t.Error("empty subdomain")
}
got.Add(1)
})
for i := 0; i < 5; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("test", "api.example.com", "passive"))
}
waitUntil(t, time.Second, func() bool { return got.Load() == 5 }, "5 events delivered")
}
func TestSubscribeAll_ReceivesEveryType(t *testing.T) {
b := New(16)
defer b.Close(context.Background())
var got atomic.Int32
b.SubscribeAll(func(_ context.Context, _ Event) { got.Add(1) })
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
b.Publish(context.Background(), DNSResolved{EventMeta: newMeta("dns", "a.example.com"), Subdomain: "a.example.com", IPs: []string{"1.2.3.4"}})
b.Publish(context.Background(), HTTPProbed{EventMeta: newMeta("http", "a.example.com"), URL: "https://a.example.com", StatusCode: 200})
waitUntil(t, time.Second, func() bool { return got.Load() == 3 }, "3 events on wildcard")
}
func TestSubscribe_FilteringByType(t *testing.T) {
b := New(16)
defer b.Close(context.Background())
var subs, dns atomic.Int32
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { subs.Add(1) })
b.Subscribe(EventDNSResolved, func(_ context.Context, _ Event) { dns.Add(1) })
for i := 0; i < 3; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
}
for i := 0; i < 2; i++ {
b.Publish(context.Background(), DNSResolved{EventMeta: newMeta("dns", "x"), Subdomain: "x"})
}
waitUntil(t, time.Second, func() bool { return subs.Load() == 3 && dns.Load() == 2 }, "typed counts match")
}
func TestUnsubscribe_StopsDelivery(t *testing.T) {
b := New(16)
defer b.Close(context.Background())
var count atomic.Int32
sub := b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { count.Add(1) })
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
waitUntil(t, time.Second, func() bool { return count.Load() == 1 }, "first event")
sub.Unsubscribe()
sub.Unsubscribe() // idempotent
// Publish after unsubscribe — should not be delivered to this handler.
for i := 0; i < 5; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "b.example.com", "p"))
}
time.Sleep(30 * time.Millisecond)
if got := count.Load(); got != 1 {
t.Errorf("expected 1 delivery after unsubscribe, got %d", got)
}
}
func TestPublish_MultipleSubscribersEachGetEvent(t *testing.T) {
b := New(16)
defer b.Close(context.Background())
var a, c atomic.Int32
b.Subscribe(EventVulnerability, func(_ context.Context, _ Event) { a.Add(1) })
b.Subscribe(EventVulnerability, func(_ context.Context, _ Event) { c.Add(1) })
b.Publish(context.Background(), VulnerabilityFound{EventMeta: newMeta("sec", "x"), ID: "test", Severity: SeverityHigh})
waitUntil(t, time.Second, func() bool { return a.Load() == 1 && c.Load() == 1 }, "both subscribers received")
}
func TestPublish_NonBlocking_DropsWhenBufferFull(t *testing.T) {
b := New(2)
defer b.Close(context.Background())
blocker := make(chan struct{})
var started atomic.Int32
b.Subscribe(EventSubdomainDiscovered, func(ctx context.Context, _ Event) {
started.Add(1)
<-blocker
})
// First event enters handler (blocks). Next 2 fill the buffer of size 2.
// Subsequent publishes should be counted as dropped.
for i := 0; i < 100; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "x.example.com", "p"))
}
// Give the bus a moment to register drops.
waitUntil(t, time.Second, func() bool {
return b.Stats().Dropped > 0
}, "some events dropped when buffer full")
// Unblock and close cleanly.
close(blocker)
}
func TestClose_DrainsAndStops(t *testing.T) {
b := New(16)
var got atomic.Int32
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { got.Add(1) })
for i := 0; i < 10; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := b.Close(ctx); err != nil {
t.Fatalf("Close error: %v", err)
}
if got.Load() != 10 {
t.Errorf("expected 10 delivered before close drains, got %d", got.Load())
}
// Publish after close is a silent no-op.
b.Publish(context.Background(), NewSubdomainDiscovered("t", "z.example.com", "p"))
if got.Load() != 10 {
t.Errorf("delivery continued after close: %d", got.Load())
}
}
func TestClose_IdempotentAndMulticall(t *testing.T) {
b := New(4)
ctx := context.Background()
if err := b.Close(ctx); err != nil {
t.Fatalf("first close: %v", err)
}
if err := b.Close(ctx); err != nil {
t.Fatalf("second close: %v", err)
}
}
func TestPanicInHandler_DoesNotAffectOthers(t *testing.T) {
b := New(8)
defer b.Close(context.Background())
var good atomic.Int32
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { panic("bad handler") })
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { good.Add(1) })
for i := 0; i < 5; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
}
waitUntil(t, time.Second, func() bool { return good.Load() == 5 }, "good handler received all events")
}
func TestConcurrentPublishers_PreservesInvariant(t *testing.T) {
// With a fast-enough consumer and large buffer, some events may still be
// dropped under heavy burst. The invariant that must ALWAYS hold is:
// Published == Delivered + Dropped
// This protects against race conditions in metric bookkeeping.
b := New(4096)
defer b.Close(context.Background())
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) {})
const publishers = 20
const perPublisher = 100
var wg sync.WaitGroup
for i := 0; i < publishers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < perPublisher; j++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
}
}()
}
wg.Wait()
total := uint64(publishers * perPublisher)
waitUntil(t, 5*time.Second, func() bool {
s := b.Stats()
return s.Published == total && s.Delivered+s.Dropped == total
}, "published count matches and delivered+dropped == published")
}
func TestStats_Increment(t *testing.T) {
b := New(16)
defer b.Close(context.Background())
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) {})
for i := 0; i < 3; i++ {
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a.example.com", "p"))
}
waitUntil(t, time.Second, func() bool { return b.Stats().Delivered == 3 }, "3 deliveries recorded")
s := b.Stats()
if s.Published != 3 {
t.Errorf("Published = %d, want 3", s.Published)
}
if s.Subscribers != 1 {
t.Errorf("Subscribers = %d, want 1", s.Subscribers)
}
if s.Closed {
t.Error("Closed = true on open bus")
}
}
func TestPublish_NilEvent_NoOp(t *testing.T) {
b := New(8)
defer b.Close(context.Background())
var got atomic.Int32
b.SubscribeAll(func(_ context.Context, _ Event) { got.Add(1) })
b.Publish(context.Background(), nil)
time.Sleep(20 * time.Millisecond)
if got.Load() != 0 {
t.Errorf("nil event was delivered")
}
}
func TestPublish_CancelledContext_DropsNotDelivers(t *testing.T) {
b := New(1)
defer b.Close(context.Background())
hold := make(chan struct{})
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, _ Event) { <-hold })
// First publish occupies buffer slot 1 and handler goroutine starts consuming.
b.Publish(context.Background(), NewSubdomainDiscovered("t", "a", "p"))
ctx, cancel := context.WithCancel(context.Background())
cancel()
// With ctx already canceled and the subscriber busy, dispatch should record a drop.
before := b.Stats().Dropped
b.Publish(ctx, NewSubdomainDiscovered("t", "b", "p"))
b.Publish(ctx, NewSubdomainDiscovered("t", "c", "p"))
after := b.Stats().Dropped
if after <= before {
t.Errorf("expected Dropped to increase with canceled ctx, before=%d after=%d", before, after)
}
close(hold)
}
func TestHandlerReceivesEventMetadata(t *testing.T) {
b := New(8)
defer b.Close(context.Background())
done := make(chan Event, 1)
b.Subscribe(EventSubdomainDiscovered, func(_ context.Context, e Event) { done <- e })
before := time.Now().Add(-time.Second)
b.Publish(context.Background(), NewSubdomainDiscovered("sources.crtsh", "api.example.com", "passive:crt.sh"))
select {
case e := <-done:
m := e.Meta()
if m.Source != "sources.crtsh" {
t.Errorf("Source = %q", m.Source)
}
if m.Target != "api.example.com" {
t.Errorf("Target = %q", m.Target)
}
if m.At.Before(before) {
t.Errorf("At = %v is before %v", m.At, before)
}
case <-time.After(time.Second):
t.Fatal("no event received")
}
}