mirror of
https://github.com/Vyntral/god-eye.git
synced 2026-05-23 16:19:42 +02:00
3a4c230aa7
Complete architectural overhaul. Replaces the v0.1 monolithic scanner with an event-driven pipeline of auto-registered modules. Foundation (internal/): - eventbus: typed pub/sub, 20 event types, race-safe, drop counter - module: registry with phase-based selection - store: thread-safe host store with per-host locks + deep-copy reads - pipeline: coordinator with phase barriers + panic recovery - config: 5 scan profiles + 3 AI tiers + YAML loader + auto-discovery Modules (26 auto-registered across 6 phases): - Discovery: passive (26 sources), bruteforce, recursive, AXFR, GitHub dorks, CT streaming, permutation, reverse DNS, vhost, ASN, supply chain (npm + PyPI) - Enrichment: HTTP probe + tech fingerprint + TLS appliance ID, ports - Analysis: security checks, takeover (110+ sigs), cloud, JavaScript, GraphQL, JWT, headers (OWASP), HTTP smuggling, AI cascade, Nuclei - Reporting: TXT/JSON/CSV writer + AI scan brief AI layer (internal/ai/ + internal/modules/ai/): - Three profiles: lean (16 GB), balanced (32 GB MoE), heavy (64 GB) - Six event-driven handlers: CVE, JS file, HTTP response, secret filter, multi-agent vuln enrichment, anomaly + executive report - Content-hash cache dedups Ollama calls across hosts - Auto-pull of missing models via /api/pull with streaming progress - End-of-scan AI SCAN BRIEF in terminal with top chains + next actions Nuclei compat layer (internal/nucleitpl/): - Executes ~13k community templates (HTTP subset) - Auto-download of nuclei-templates ZIP to ~/.god-eye/nuclei-templates - Scope filter rejects off-host templates (eliminates OSINT FPs) Operations: - Interactive wizard (internal/wizard/) — zero-flag launch - LivePrinter (internal/tui/) — colorized event stream - Diff engine + scheduler (internal/diff, internal/scheduler) for continuous ASM monitoring with webhook alerts - Proxy support (internal/proxyconf/): http / https / socks5 / socks5h + basic auth Fixes #1 — native SOCKS5 / Tor compatibility via --proxy flag. 185 unit tests across 15 packages, all race-detector clean.
284 lines
6.7 KiB
Go
284 lines
6.7 KiB
Go
package eventbus
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// ErrBusClosed is returned when attempting to use a closed bus.
|
|
var ErrBusClosed = errors.New("eventbus: bus closed")
|
|
|
|
// Handler processes a single event. It runs on the subscriber's own goroutine
|
|
// so handlers may block or perform I/O without stalling publishers. A handler
|
|
// must respect ctx cancellation when performing long work.
|
|
type Handler func(ctx context.Context, e Event)
|
|
|
|
// Subscription is returned by Subscribe/SubscribeAll and is used to stop
|
|
// receiving events. Unsubscribe is idempotent.
|
|
type Subscription struct {
|
|
bus *Bus
|
|
eventType EventType // empty string means "all"
|
|
id uint64
|
|
once sync.Once
|
|
}
|
|
|
|
// Unsubscribe stops the subscription. Pending events in the subscriber's
|
|
// buffer are dropped. Safe to call multiple times.
|
|
func (s *Subscription) Unsubscribe() {
|
|
if s == nil || s.bus == nil {
|
|
return
|
|
}
|
|
s.once.Do(func() {
|
|
s.bus.unsubscribe(s.eventType, s.id)
|
|
})
|
|
}
|
|
|
|
// Stats captures runtime metrics for observability. Stats are cumulative from
|
|
// bus creation; callers should compute deltas if rate matters.
|
|
type Stats struct {
|
|
Published uint64 // total Publish calls accepted
|
|
Delivered uint64 // events delivered to subscribers (sum across subscribers)
|
|
Dropped uint64 // events dropped because a subscriber buffer was full
|
|
Subscribers int // active subscribers right now
|
|
Closed bool
|
|
}
|
|
|
|
// Bus is the default eventbus implementation.
|
|
type Bus struct {
|
|
bufferSize int
|
|
|
|
mu sync.RWMutex
|
|
closed bool
|
|
nextID uint64
|
|
subs map[EventType]map[uint64]*subscriber // type → id → subscriber
|
|
allSubs map[uint64]*subscriber // wildcard subscribers
|
|
|
|
published uint64
|
|
delivered uint64
|
|
dropped uint64
|
|
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
type subscriber struct {
|
|
id uint64
|
|
eventT EventType
|
|
ch chan Event
|
|
handler Handler
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// New creates a new Bus. bufferSize controls the per-subscriber channel
|
|
// buffer; values ≤0 default to 256. A buffer of 1 is legal but increases
|
|
// drop probability under bursty load.
|
|
func New(bufferSize int) *Bus {
|
|
if bufferSize <= 0 {
|
|
bufferSize = 256
|
|
}
|
|
return &Bus{
|
|
bufferSize: bufferSize,
|
|
subs: make(map[EventType]map[uint64]*subscriber),
|
|
allSubs: make(map[uint64]*subscriber),
|
|
}
|
|
}
|
|
|
|
// Subscribe registers a handler for a specific event type. Returns a
|
|
// Subscription that can be used to unsubscribe.
|
|
func (b *Bus) Subscribe(t EventType, h Handler) *Subscription {
|
|
return b.subscribe(t, h, false)
|
|
}
|
|
|
|
// SubscribeAll registers a handler that receives every event type.
|
|
// Useful for logging, metrics collection, or persistence modules.
|
|
func (b *Bus) SubscribeAll(h Handler) *Subscription {
|
|
return b.subscribe("", h, true)
|
|
}
|
|
|
|
func (b *Bus) subscribe(t EventType, h Handler, all bool) *Subscription {
|
|
if h == nil {
|
|
return &Subscription{bus: b}
|
|
}
|
|
b.mu.Lock()
|
|
if b.closed {
|
|
b.mu.Unlock()
|
|
return &Subscription{bus: b}
|
|
}
|
|
b.nextID++
|
|
id := b.nextID
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s := &subscriber{
|
|
id: id,
|
|
eventT: t,
|
|
ch: make(chan Event, b.bufferSize),
|
|
handler: h,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
if all {
|
|
b.allSubs[id] = s
|
|
} else {
|
|
if b.subs[t] == nil {
|
|
b.subs[t] = make(map[uint64]*subscriber)
|
|
}
|
|
b.subs[t][id] = s
|
|
}
|
|
b.mu.Unlock()
|
|
|
|
b.wg.Add(1)
|
|
go b.run(s)
|
|
|
|
return &Subscription{bus: b, eventType: t, id: id}
|
|
}
|
|
|
|
func (b *Bus) unsubscribe(t EventType, id uint64) {
|
|
b.mu.Lock()
|
|
var s *subscriber
|
|
if t == "" {
|
|
s = b.allSubs[id]
|
|
delete(b.allSubs, id)
|
|
} else {
|
|
if m, ok := b.subs[t]; ok {
|
|
s = m[id]
|
|
delete(m, id)
|
|
if len(m) == 0 {
|
|
delete(b.subs, t)
|
|
}
|
|
}
|
|
}
|
|
b.mu.Unlock()
|
|
if s != nil {
|
|
close(s.ch) // run() drains remaining events then returns
|
|
}
|
|
}
|
|
|
|
// run is the per-subscriber goroutine loop.
|
|
func (b *Bus) run(s *subscriber) {
|
|
defer b.wg.Done()
|
|
defer s.cancel()
|
|
for e := range s.ch {
|
|
// Protect bus from handler panics — one bad handler must not
|
|
// take down the pipeline.
|
|
func() {
|
|
defer func() {
|
|
_ = recover()
|
|
}()
|
|
s.handler(s.ctx, e)
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Publish delivers e to every subscriber interested in e.Type() and every
|
|
// SubscribeAll subscriber. If ctx is canceled, Publish returns early and the
|
|
// event is not queued to any subscriber that would block.
|
|
//
|
|
// Publish is non-blocking per subscriber: if a subscriber's buffer is full the
|
|
// event is dropped for that subscriber and Stats.Dropped is incremented.
|
|
func (b *Bus) Publish(ctx context.Context, e Event) {
|
|
if e == nil {
|
|
return
|
|
}
|
|
b.mu.RLock()
|
|
if b.closed {
|
|
b.mu.RUnlock()
|
|
return
|
|
}
|
|
// Snapshot the subscriber slices under lock, then release before send.
|
|
typed := b.subs[e.Type()]
|
|
var typedList []*subscriber
|
|
if len(typed) > 0 {
|
|
typedList = make([]*subscriber, 0, len(typed))
|
|
for _, s := range typed {
|
|
typedList = append(typedList, s)
|
|
}
|
|
}
|
|
var allList []*subscriber
|
|
if len(b.allSubs) > 0 {
|
|
allList = make([]*subscriber, 0, len(b.allSubs))
|
|
for _, s := range b.allSubs {
|
|
allList = append(allList, s)
|
|
}
|
|
}
|
|
b.mu.RUnlock()
|
|
|
|
atomic.AddUint64(&b.published, 1)
|
|
|
|
for _, s := range typedList {
|
|
b.dispatch(ctx, s, e)
|
|
}
|
|
for _, s := range allList {
|
|
b.dispatch(ctx, s, e)
|
|
}
|
|
}
|
|
|
|
func (b *Bus) dispatch(ctx context.Context, s *subscriber, e Event) {
|
|
select {
|
|
case <-ctx.Done():
|
|
// caller abandoned; count as dropped so observability reflects reality
|
|
atomic.AddUint64(&b.dropped, 1)
|
|
case s.ch <- e:
|
|
atomic.AddUint64(&b.delivered, 1)
|
|
default:
|
|
atomic.AddUint64(&b.dropped, 1)
|
|
}
|
|
}
|
|
|
|
// Close stops accepting new publishes and drains in-flight subscriber
|
|
// buffers. It waits until all handlers have returned, or until ctx expires.
|
|
// Returns ctx.Err() if draining did not complete in time.
|
|
func (b *Bus) Close(ctx context.Context) error {
|
|
b.mu.Lock()
|
|
if b.closed {
|
|
b.mu.Unlock()
|
|
return nil
|
|
}
|
|
b.closed = true
|
|
|
|
// Close every subscriber channel; their goroutines will drain and exit.
|
|
for _, m := range b.subs {
|
|
for _, s := range m {
|
|
close(s.ch)
|
|
}
|
|
}
|
|
for _, s := range b.allSubs {
|
|
close(s.ch)
|
|
}
|
|
b.subs = make(map[EventType]map[uint64]*subscriber)
|
|
b.allSubs = make(map[uint64]*subscriber)
|
|
b.mu.Unlock()
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
b.wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Stats returns a snapshot of current metrics.
|
|
func (b *Bus) Stats() Stats {
|
|
b.mu.RLock()
|
|
closed := b.closed
|
|
subCount := len(b.allSubs)
|
|
for _, m := range b.subs {
|
|
subCount += len(m)
|
|
}
|
|
b.mu.RUnlock()
|
|
|
|
return Stats{
|
|
Published: atomic.LoadUint64(&b.published),
|
|
Delivered: atomic.LoadUint64(&b.delivered),
|
|
Dropped: atomic.LoadUint64(&b.dropped),
|
|
Subscribers: subCount,
|
|
Closed: closed,
|
|
}
|
|
}
|