mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-15 21:08:01 +02:00
145 lines
3.7 KiB
Go
145 lines
3.7 KiB
Go
package c2
|
||
|
||
import (
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
// Event 是 EventBus 内部传输的事件单元,是 database.C2Event 的"实时投影"。
|
||
// 区别在于:
|
||
// - 数据库表保存全部历史,用于审计与列表分页;
|
||
// - EventBus 只缓存最近 N 条,用于 SSE/WS 实时推送给在线订阅者。
|
||
type Event struct {
|
||
ID string `json:"id"`
|
||
Level string `json:"level"`
|
||
Category string `json:"category"`
|
||
SessionID string `json:"sessionId,omitempty"`
|
||
TaskID string `json:"taskId,omitempty"`
|
||
Message string `json:"message"`
|
||
Data map[string]interface{} `json:"data,omitempty"`
|
||
CreatedAt time.Time `json:"createdAt"`
|
||
}
|
||
|
||
// EventBus 简单的内存广播总线。
|
||
// 设计要点:
|
||
// - 多订阅者:每个订阅者有独立 buffered channel,慢消费者不会阻塞 publisher;
|
||
// - 容量满即丢弃:发布端绝不阻塞,避免 listener accept loop / beacon handler 卡住;
|
||
// - 全局过滤:订阅时可限定 SessionID/Category,前端按需订阅,省 CPU;
|
||
// - 关闭安全:Close() 后所有订阅者 chan 关闭,防止 goroutine 泄漏。
|
||
type EventBus struct {
|
||
mu sync.RWMutex
|
||
subscribers map[string]*Subscription
|
||
closed bool
|
||
}
|
||
|
||
// Subscription 订阅句柄
|
||
type Subscription struct {
|
||
ID string
|
||
Ch chan *Event
|
||
SessionID string // 空表示不限制
|
||
Category string // 空表示不限制
|
||
Levels map[string]struct{}
|
||
dropCount atomic.Int64
|
||
}
|
||
|
||
// NewEventBus 创建总线
|
||
func NewEventBus() *EventBus {
|
||
return &EventBus{subscribers: make(map[string]*Subscription)}
|
||
}
|
||
|
||
// Subscribe 注册订阅者;返回 Subscription,调用方负责后续 Unsubscribe。
|
||
// - bufferSize:单订阅者 channel 容量,建议 64~256;
|
||
// - sessionFilter / categoryFilter:空字符串=不限;
|
||
// - levelFilter:[]string{"warn","critical"} 这类,nil/空表示全收。
|
||
func (b *EventBus) Subscribe(id string, bufferSize int, sessionFilter, categoryFilter string, levelFilter []string) *Subscription {
|
||
if bufferSize <= 0 {
|
||
bufferSize = 128
|
||
}
|
||
sub := &Subscription{
|
||
ID: id,
|
||
Ch: make(chan *Event, bufferSize),
|
||
SessionID: sessionFilter,
|
||
Category: categoryFilter,
|
||
}
|
||
if len(levelFilter) > 0 {
|
||
sub.Levels = make(map[string]struct{}, len(levelFilter))
|
||
for _, l := range levelFilter {
|
||
sub.Levels[l] = struct{}{}
|
||
}
|
||
}
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if b.closed {
|
||
close(sub.Ch)
|
||
return sub
|
||
}
|
||
b.subscribers[id] = sub
|
||
return sub
|
||
}
|
||
|
||
// Unsubscribe 注销订阅者并关闭 channel
|
||
func (b *EventBus) Unsubscribe(id string) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if sub, ok := b.subscribers[id]; ok {
|
||
delete(b.subscribers, id)
|
||
close(sub.Ch)
|
||
}
|
||
}
|
||
|
||
// Publish 广播事件给所有订阅者;非阻塞,channel 满时静默丢弃
|
||
func (b *EventBus) Publish(e *Event) {
|
||
if e == nil {
|
||
return
|
||
}
|
||
b.mu.RLock()
|
||
subs := make([]*Subscription, 0, len(b.subscribers))
|
||
for _, s := range b.subscribers {
|
||
if s.matches(e) {
|
||
subs = append(subs, s)
|
||
}
|
||
}
|
||
closed := b.closed
|
||
b.mu.RUnlock()
|
||
if closed {
|
||
return
|
||
}
|
||
for _, s := range subs {
|
||
select {
|
||
case s.Ch <- e:
|
||
default:
|
||
s.dropCount.Add(1)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Close 关闭总线,停止所有订阅
|
||
func (b *EventBus) Close() {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if b.closed {
|
||
return
|
||
}
|
||
b.closed = true
|
||
for id, s := range b.subscribers {
|
||
close(s.Ch)
|
||
delete(b.subscribers, id)
|
||
}
|
||
}
|
||
|
||
func (s *Subscription) matches(e *Event) bool {
|
||
if s.SessionID != "" && e.SessionID != s.SessionID {
|
||
return false
|
||
}
|
||
if s.Category != "" && e.Category != s.Category {
|
||
return false
|
||
}
|
||
if len(s.Levels) > 0 {
|
||
if _, ok := s.Levels[e.Level]; !ok {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|