package dispatch import ( "context" "errors" "fmt" "sync" "time" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/sinks" ) // Fanout owns one buffered queue + one worker goroutine per sink. // // The goal is to decouple the dispatcher from slow sinks. // The dispatcher enqueues quickly; each sink consumes at its own pace. // // Design notes: // - We use a global enqueue timeout to prevent unbounded blocking if a sink queue fills. // - We use a per-consume timeout (context.WithTimeout) to bound sink work. // - This timeout only works if the sink honors ctx.Done(). Sinks MUST be written to respect context. // (If a sink ignores context and blocks forever, no safe mechanism exists to "kill" that goroutine.) type Fanout struct { workers map[string]*sinkWorker enqueueTimeout time.Duration logger Logger closeOnce sync.Once wg sync.WaitGroup } type FanoutOptions struct { // QueueSize is the per-sink channel buffer size. // Larger buffers absorb bursts but can also hide a stuck sink longer. QueueSize int // EnqueueTimeout bounds how long Publish() will wait if a sink queue is full. // If this fires, the event is dropped for that sink and an error is returned. EnqueueTimeout time.Duration // ConsumeTimeout bounds how long we allow a single sink Consume() call to run. // If your sink respects context, it should return early with context deadline exceeded. ConsumeTimeout time.Duration Logf Logger } type sinkWorker struct { name string sink sinks.Sink ch chan event.Event consumeTimeout time.Duration } var ( // ErrSinkQueueFull indicates that a sink queue could not accept the event within EnqueueTimeout. ErrSinkQueueFull = errors.New("sink queue full (enqueue timeout)") ) // NewFanout builds workers for all provided sinks and starts their goroutines. func NewFanout(ctx context.Context, sinkMap map[string]sinks.Sink, opts FanoutOptions) (*Fanout, error) { if sinkMap == nil { return nil, fmt.Errorf("dispatch.NewFanout: sink map is nil") } queueSize := opts.QueueSize if queueSize <= 0 { queueSize = 64 // conservative default; adjust later as needed } f := &Fanout{ workers: make(map[string]*sinkWorker, len(sinkMap)), enqueueTimeout: opts.EnqueueTimeout, logger: opts.Logf, } // Create + start one worker per sink. for name, s := range sinkMap { if s == nil { return nil, fmt.Errorf("dispatch.NewFanout: sink %q is nil", name) } w := &sinkWorker{ name: name, sink: s, ch: make(chan event.Event, queueSize), consumeTimeout: opts.ConsumeTimeout, } f.workers[name] = w f.wg.Add(1) go func(w *sinkWorker) { defer f.wg.Done() f.runWorker(ctx, w) }(w) } return f, nil } // Close stops all workers by closing their channels and waits for them to exit. // It is safe to call multiple times. func (f *Fanout) Close() { if f == nil { return } f.closeOnce.Do(func() { // Closing the per-sink channels tells workers "no more events". for _, w := range f.workers { close(w.ch) } f.wg.Wait() }) } // Publish enqueues an event to a named sink's queue. // // If the sink queue is full, Publish will wait up to f.enqueueTimeout. // If enqueueTimeout is <= 0, Publish will block until it can enqueue or ctx cancels. // // If Publish returns an error, the event has NOT been enqueued for that sink. func (f *Fanout) Publish(ctx context.Context, sinkName string, e event.Event) error { w, ok := f.workers[sinkName] if !ok { return fmt.Errorf("dispatch.Fanout.Publish: unknown sink %q", sinkName) } // Fast path: no timeout configured; block until enqueue or ctx cancels. if f.enqueueTimeout <= 0 { select { case w.ch <- e: return nil case <-ctx.Done(): return ctx.Err() } } // Bounded enqueue: wait up to enqueueTimeout for space in the buffer. timer := time.NewTimer(f.enqueueTimeout) defer timer.Stop() select { case w.ch <- e: return nil case <-timer.C: return fmt.Errorf("%w: sink=%q id=%s kind=%s source=%s", ErrSinkQueueFull, sinkName, e.ID, e.Kind, e.Source) case <-ctx.Done(): return ctx.Err() } } func (f *Fanout) runWorker(ctx context.Context, w *sinkWorker) { for { select { case <-ctx.Done(): // Context cancellation means we're shutting down; drop queued work. return case e, ok := <-w.ch: if !ok { // Channel closed: dispatcher is done publishing to this sink. return } f.consumeOne(ctx, w, e) } } } func (f *Fanout) consumeOne(parent context.Context, w *sinkWorker, e event.Event) { // Always give sinks a context they can observe. consumeCtx := parent cancel := func() {} if w.consumeTimeout > 0 { consumeCtx, cancel = context.WithTimeout(parent, w.consumeTimeout) } defer cancel() start := time.Now() // Sinks are adapters. We keep the worker alive even if a sink panics: // it's better to log loudly and continue than to silently lose a sink forever. defer func() { if r := recover(); r != nil { f.logf("dispatch: sink %q PANIC while consuming event (id=%s kind=%s source=%s): %v", w.name, e.ID, e.Kind, e.Source, r) } }() err := w.sink.Consume(consumeCtx, e) elapsed := time.Since(start) if err == nil { return } // If the sink respects context, timeouts should surface as context deadline exceeded. // We log a distinct message because it's operationally useful. if errors.Is(err, context.DeadlineExceeded) { f.logf("dispatch: sink %q timed out after %s consuming event (id=%s kind=%s source=%s)", w.name, elapsed, e.ID, e.Kind, e.Source) return } // Normal errors. f.logf("dispatch: sink %q failed consuming event (id=%s kind=%s source=%s): %v", w.name, e.ID, e.Kind, e.Source, err) } func (f *Fanout) logf(format string, args ...any) { if f == nil || f.logger == nil { return } f.logger(format, args...) }