package dispatch import ( "context" "fmt" "time" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/pipeline" "gitea.maximumdirect.net/ejr/feedkit/sinks" ) type Dispatcher struct { In <-chan event.Event Pipeline *pipeline.Pipeline // Sinks by name Sinks map[string]sinks.Sink // Routing rules (compiled from config) Routes []Route // Fanout knobs (global defaults for now; we can wire these from config later). // // These are intentionally simple: one queue size, one enqueue timeout, // one consume timeout for all sinks. SinkQueueSize int SinkEnqueueTimeout time.Duration SinkConsumeTimeout time.Duration } type Route struct { SinkName string Kinds map[event.Kind]bool } type Logger func(format string, args ...any) func (d *Dispatcher) Run(ctx context.Context, logf Logger) error { if d.In == nil { return fmt.Errorf("dispatcher.Run: In channel is nil") } if d.Sinks == nil { return fmt.Errorf("dispatcher.Run: Sinks map is nil") } if d.Pipeline == nil { d.Pipeline = &pipeline.Pipeline{} } // Build and start sink workers. fanout, err := NewFanout(ctx, d.Sinks, FanoutOptions{ QueueSize: d.SinkQueueSize, EnqueueTimeout: d.SinkEnqueueTimeout, ConsumeTimeout: d.SinkConsumeTimeout, Logf: logf, }) if err != nil { return err } defer fanout.Close() for { select { case <-ctx.Done(): return ctx.Err() case e, ok := <-d.In: if !ok { // If someone closes the event bus, treat as clean shutdown. return nil } out, err := d.Pipeline.Process(ctx, e) if err != nil { if logf != nil { logf("dispatcher: pipeline error: %v", err) } continue } if out == nil { // Dropped by policy. continue } d.routeToSinks(ctx, fanout, *out, logf) } } } func (d *Dispatcher) routeToSinks(ctx context.Context, fanout *Fanout, e event.Event, logf Logger) { for _, r := range d.Routes { if len(r.Kinds) > 0 && !r.Kinds[e.Kind] { continue } // Publish is now the ONLY thing we do here. // It is bounded (if configured) and does not call sink adapters directly. if err := fanout.Publish(ctx, r.SinkName, e); err != nil && logf != nil { logf("dispatcher: failed to enqueue event for sink %q (id=%s kind=%s source=%s): %v", r.SinkName, e.ID, e.Kind, e.Source, err) } } }