Files
Vyntral 3a4c230aa7 feat: v2.0 full rewrite — event-driven pipeline, AI + Nuclei + proxy
Complete architectural overhaul. Replaces the v0.1 monolithic scanner
with an event-driven pipeline of auto-registered modules.

Foundation (internal/):
- eventbus: typed pub/sub, 20 event types, race-safe, drop counter
- module: registry with phase-based selection
- store: thread-safe host store with per-host locks + deep-copy reads
- pipeline: coordinator with phase barriers + panic recovery
- config: 5 scan profiles + 3 AI tiers + YAML loader + auto-discovery

Modules (26 auto-registered across 6 phases):
- Discovery: passive (26 sources), bruteforce, recursive, AXFR, GitHub
  dorks, CT streaming, permutation, reverse DNS, vhost, ASN, supply
  chain (npm + PyPI)
- Enrichment: HTTP probe + tech fingerprint + TLS appliance ID, ports
- Analysis: security checks, takeover (110+ sigs), cloud, JavaScript,
  GraphQL, JWT, headers (OWASP), HTTP smuggling, AI cascade, Nuclei
- Reporting: TXT/JSON/CSV writer + AI scan brief

AI layer (internal/ai/ + internal/modules/ai/):
- Three profiles: lean (16 GB), balanced (32 GB MoE), heavy (64 GB)
- Six event-driven handlers: CVE, JS file, HTTP response, secret
  filter, multi-agent vuln enrichment, anomaly + executive report
- Content-hash cache dedups Ollama calls across hosts
- Auto-pull of missing models via /api/pull with streaming progress
- End-of-scan AI SCAN BRIEF in terminal with top chains + next actions

Nuclei compat layer (internal/nucleitpl/):
- Executes ~13k community templates (HTTP subset)
- Auto-download of nuclei-templates ZIP to ~/.god-eye/nuclei-templates
- Scope filter rejects off-host templates (eliminates OSINT FPs)

Operations:
- Interactive wizard (internal/wizard/) — zero-flag launch
- LivePrinter (internal/tui/) — colorized event stream
- Diff engine + scheduler (internal/diff, internal/scheduler) for
  continuous ASM monitoring with webhook alerts
- Proxy support (internal/proxyconf/): http / https / socks5 / socks5h
  + basic auth

Fixes #1 — native SOCKS5 / Tor compatibility via --proxy flag.

185 unit tests across 15 packages, all race-detector clean.
2026-04-18 16:48:41 +02:00

284 lines
6.7 KiB
Go

package eventbus
import (
"context"
"errors"
"sync"
"sync/atomic"
)
// ErrBusClosed is returned when attempting to use a closed bus.
var ErrBusClosed = errors.New("eventbus: bus closed")
// Handler processes a single event. It runs on the subscriber's own goroutine
// so handlers may block or perform I/O without stalling publishers. A handler
// must respect ctx cancellation when performing long work.
type Handler func(ctx context.Context, e Event)
// Subscription is returned by Subscribe/SubscribeAll and is used to stop
// receiving events. Unsubscribe is idempotent.
type Subscription struct {
bus *Bus
eventType EventType // empty string means "all"
id uint64
once sync.Once
}
// Unsubscribe stops the subscription. Pending events in the subscriber's
// buffer are dropped. Safe to call multiple times.
func (s *Subscription) Unsubscribe() {
if s == nil || s.bus == nil {
return
}
s.once.Do(func() {
s.bus.unsubscribe(s.eventType, s.id)
})
}
// Stats captures runtime metrics for observability. Stats are cumulative from
// bus creation; callers should compute deltas if rate matters.
type Stats struct {
Published uint64 // total Publish calls accepted
Delivered uint64 // events delivered to subscribers (sum across subscribers)
Dropped uint64 // events dropped because a subscriber buffer was full
Subscribers int // active subscribers right now
Closed bool
}
// Bus is the default eventbus implementation.
type Bus struct {
bufferSize int
mu sync.RWMutex
closed bool
nextID uint64
subs map[EventType]map[uint64]*subscriber // type → id → subscriber
allSubs map[uint64]*subscriber // wildcard subscribers
published uint64
delivered uint64
dropped uint64
wg sync.WaitGroup
}
type subscriber struct {
id uint64
eventT EventType
ch chan Event
handler Handler
ctx context.Context
cancel context.CancelFunc
}
// New creates a new Bus. bufferSize controls the per-subscriber channel
// buffer; values ≤0 default to 256. A buffer of 1 is legal but increases
// drop probability under bursty load.
func New(bufferSize int) *Bus {
if bufferSize <= 0 {
bufferSize = 256
}
return &Bus{
bufferSize: bufferSize,
subs: make(map[EventType]map[uint64]*subscriber),
allSubs: make(map[uint64]*subscriber),
}
}
// Subscribe registers a handler for a specific event type. Returns a
// Subscription that can be used to unsubscribe.
func (b *Bus) Subscribe(t EventType, h Handler) *Subscription {
return b.subscribe(t, h, false)
}
// SubscribeAll registers a handler that receives every event type.
// Useful for logging, metrics collection, or persistence modules.
func (b *Bus) SubscribeAll(h Handler) *Subscription {
return b.subscribe("", h, true)
}
func (b *Bus) subscribe(t EventType, h Handler, all bool) *Subscription {
if h == nil {
return &Subscription{bus: b}
}
b.mu.Lock()
if b.closed {
b.mu.Unlock()
return &Subscription{bus: b}
}
b.nextID++
id := b.nextID
ctx, cancel := context.WithCancel(context.Background())
s := &subscriber{
id: id,
eventT: t,
ch: make(chan Event, b.bufferSize),
handler: h,
ctx: ctx,
cancel: cancel,
}
if all {
b.allSubs[id] = s
} else {
if b.subs[t] == nil {
b.subs[t] = make(map[uint64]*subscriber)
}
b.subs[t][id] = s
}
b.mu.Unlock()
b.wg.Add(1)
go b.run(s)
return &Subscription{bus: b, eventType: t, id: id}
}
func (b *Bus) unsubscribe(t EventType, id uint64) {
b.mu.Lock()
var s *subscriber
if t == "" {
s = b.allSubs[id]
delete(b.allSubs, id)
} else {
if m, ok := b.subs[t]; ok {
s = m[id]
delete(m, id)
if len(m) == 0 {
delete(b.subs, t)
}
}
}
b.mu.Unlock()
if s != nil {
close(s.ch) // run() drains remaining events then returns
}
}
// run is the per-subscriber goroutine loop.
func (b *Bus) run(s *subscriber) {
defer b.wg.Done()
defer s.cancel()
for e := range s.ch {
// Protect bus from handler panics — one bad handler must not
// take down the pipeline.
func() {
defer func() {
_ = recover()
}()
s.handler(s.ctx, e)
}()
}
}
// Publish delivers e to every subscriber interested in e.Type() and every
// SubscribeAll subscriber. If ctx is canceled, Publish returns early and the
// event is not queued to any subscriber that would block.
//
// Publish is non-blocking per subscriber: if a subscriber's buffer is full the
// event is dropped for that subscriber and Stats.Dropped is incremented.
func (b *Bus) Publish(ctx context.Context, e Event) {
if e == nil {
return
}
b.mu.RLock()
if b.closed {
b.mu.RUnlock()
return
}
// Snapshot the subscriber slices under lock, then release before send.
typed := b.subs[e.Type()]
var typedList []*subscriber
if len(typed) > 0 {
typedList = make([]*subscriber, 0, len(typed))
for _, s := range typed {
typedList = append(typedList, s)
}
}
var allList []*subscriber
if len(b.allSubs) > 0 {
allList = make([]*subscriber, 0, len(b.allSubs))
for _, s := range b.allSubs {
allList = append(allList, s)
}
}
b.mu.RUnlock()
atomic.AddUint64(&b.published, 1)
for _, s := range typedList {
b.dispatch(ctx, s, e)
}
for _, s := range allList {
b.dispatch(ctx, s, e)
}
}
func (b *Bus) dispatch(ctx context.Context, s *subscriber, e Event) {
select {
case <-ctx.Done():
// caller abandoned; count as dropped so observability reflects reality
atomic.AddUint64(&b.dropped, 1)
case s.ch <- e:
atomic.AddUint64(&b.delivered, 1)
default:
atomic.AddUint64(&b.dropped, 1)
}
}
// Close stops accepting new publishes and drains in-flight subscriber
// buffers. It waits until all handlers have returned, or until ctx expires.
// Returns ctx.Err() if draining did not complete in time.
func (b *Bus) Close(ctx context.Context) error {
b.mu.Lock()
if b.closed {
b.mu.Unlock()
return nil
}
b.closed = true
// Close every subscriber channel; their goroutines will drain and exit.
for _, m := range b.subs {
for _, s := range m {
close(s.ch)
}
}
for _, s := range b.allSubs {
close(s.ch)
}
b.subs = make(map[EventType]map[uint64]*subscriber)
b.allSubs = make(map[uint64]*subscriber)
b.mu.Unlock()
done := make(chan struct{})
go func() {
b.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Stats returns a snapshot of current metrics.
func (b *Bus) Stats() Stats {
b.mu.RLock()
closed := b.closed
subCount := len(b.allSubs)
for _, m := range b.subs {
subCount += len(m)
}
b.mu.RUnlock()
return Stats{
Published: atomic.LoadUint64(&b.published),
Delivered: atomic.LoadUint64(&b.delivered),
Dropped: atomic.LoadUint64(&b.dropped),
Subscribers: subCount,
Closed: closed,
}
}