feedkit: split the former maximumdirect.net/weatherd project in two.
feedkit now contains a reusable core, while weatherfeeder is a concrete implementation that includes weather-specific functions.
This commit is contained in:
104
dispatch/dispatch.go
Normal file
104
dispatch/dispatch.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package dispatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
)
|
||||
|
||||
type Dispatcher struct {
|
||||
In <-chan event.Event
|
||||
|
||||
Pipeline *pipeline.Pipeline
|
||||
|
||||
// Sinks by name
|
||||
Sinks map[string]sinks.Sink
|
||||
|
||||
// Routing rules (compiled from config)
|
||||
Routes []Route
|
||||
|
||||
// Fanout knobs (global defaults for now; we can wire these from config later).
|
||||
//
|
||||
// These are intentionally simple: one queue size, one enqueue timeout,
|
||||
// one consume timeout for all sinks.
|
||||
SinkQueueSize int
|
||||
SinkEnqueueTimeout time.Duration
|
||||
SinkConsumeTimeout time.Duration
|
||||
}
|
||||
|
||||
type Route struct {
|
||||
SinkName string
|
||||
Kinds map[event.Kind]bool
|
||||
}
|
||||
|
||||
type Logger func(format string, args ...any)
|
||||
|
||||
func (d *Dispatcher) Run(ctx context.Context, logf Logger) error {
|
||||
if d.In == nil {
|
||||
return fmt.Errorf("dispatcher.Run: In channel is nil")
|
||||
}
|
||||
if d.Sinks == nil {
|
||||
return fmt.Errorf("dispatcher.Run: Sinks map is nil")
|
||||
}
|
||||
if d.Pipeline == nil {
|
||||
d.Pipeline = &pipeline.Pipeline{}
|
||||
}
|
||||
|
||||
// Build and start sink workers.
|
||||
fanout, err := NewFanout(ctx, d.Sinks, FanoutOptions{
|
||||
QueueSize: d.SinkQueueSize,
|
||||
EnqueueTimeout: d.SinkEnqueueTimeout,
|
||||
ConsumeTimeout: d.SinkConsumeTimeout,
|
||||
Logf: logf,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fanout.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
||||
case e, ok := <-d.In:
|
||||
if !ok {
|
||||
// If someone closes the event bus, treat as clean shutdown.
|
||||
return nil
|
||||
}
|
||||
|
||||
out, err := d.Pipeline.Process(ctx, e)
|
||||
if err != nil {
|
||||
if logf != nil {
|
||||
logf("dispatcher: pipeline error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if out == nil {
|
||||
// Dropped by policy.
|
||||
continue
|
||||
}
|
||||
|
||||
d.routeToSinks(ctx, fanout, *out, logf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) routeToSinks(ctx context.Context, fanout *Fanout, e event.Event, logf Logger) {
|
||||
for _, r := range d.Routes {
|
||||
if len(r.Kinds) > 0 && !r.Kinds[e.Kind] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Publish is now the ONLY thing we do here.
|
||||
// It is bounded (if configured) and does not call sink adapters directly.
|
||||
if err := fanout.Publish(ctx, r.SinkName, e); err != nil && logf != nil {
|
||||
logf("dispatcher: failed to enqueue event for sink %q (id=%s kind=%s source=%s): %v",
|
||||
r.SinkName, e.ID, e.Kind, e.Source, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
222
dispatch/fanout.go
Normal file
222
dispatch/fanout.go
Normal file
@@ -0,0 +1,222 @@
|
||||
package dispatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
)
|
||||
|
||||
// Fanout owns one buffered queue + one worker goroutine per sink.
|
||||
//
|
||||
// The goal is to decouple the dispatcher from slow sinks.
|
||||
// The dispatcher enqueues quickly; each sink consumes at its own pace.
|
||||
//
|
||||
// Design notes:
|
||||
// - We use a global enqueue timeout to prevent unbounded blocking if a sink queue fills.
|
||||
// - We use a per-consume timeout (context.WithTimeout) to bound sink work.
|
||||
// - This timeout only works if the sink honors ctx.Done(). Sinks MUST be written to respect context.
|
||||
// (If a sink ignores context and blocks forever, no safe mechanism exists to "kill" that goroutine.)
|
||||
type Fanout struct {
|
||||
workers map[string]*sinkWorker
|
||||
|
||||
enqueueTimeout time.Duration
|
||||
logger Logger
|
||||
|
||||
closeOnce sync.Once
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type FanoutOptions struct {
|
||||
// QueueSize is the per-sink channel buffer size.
|
||||
// Larger buffers absorb bursts but can also hide a stuck sink longer.
|
||||
QueueSize int
|
||||
|
||||
// EnqueueTimeout bounds how long Publish() will wait if a sink queue is full.
|
||||
// If this fires, the event is dropped for that sink and an error is returned.
|
||||
EnqueueTimeout time.Duration
|
||||
|
||||
// ConsumeTimeout bounds how long we allow a single sink Consume() call to run.
|
||||
// If your sink respects context, it should return early with context deadline exceeded.
|
||||
ConsumeTimeout time.Duration
|
||||
|
||||
Logf Logger
|
||||
}
|
||||
|
||||
type sinkWorker struct {
|
||||
name string
|
||||
sink sinks.Sink
|
||||
ch chan event.Event
|
||||
consumeTimeout time.Duration
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrSinkQueueFull indicates that a sink queue could not accept the event within EnqueueTimeout.
|
||||
ErrSinkQueueFull = errors.New("sink queue full (enqueue timeout)")
|
||||
)
|
||||
|
||||
// NewFanout builds workers for all provided sinks and starts their goroutines.
|
||||
func NewFanout(ctx context.Context, sinkMap map[string]sinks.Sink, opts FanoutOptions) (*Fanout, error) {
|
||||
if sinkMap == nil {
|
||||
return nil, fmt.Errorf("dispatch.NewFanout: sink map is nil")
|
||||
}
|
||||
|
||||
queueSize := opts.QueueSize
|
||||
if queueSize <= 0 {
|
||||
queueSize = 64 // conservative default; adjust later as needed
|
||||
}
|
||||
|
||||
f := &Fanout{
|
||||
workers: make(map[string]*sinkWorker, len(sinkMap)),
|
||||
enqueueTimeout: opts.EnqueueTimeout,
|
||||
logger: opts.Logf,
|
||||
}
|
||||
|
||||
// Create + start one worker per sink.
|
||||
for name, s := range sinkMap {
|
||||
if s == nil {
|
||||
return nil, fmt.Errorf("dispatch.NewFanout: sink %q is nil", name)
|
||||
}
|
||||
|
||||
w := &sinkWorker{
|
||||
name: name,
|
||||
sink: s,
|
||||
ch: make(chan event.Event, queueSize),
|
||||
consumeTimeout: opts.ConsumeTimeout,
|
||||
}
|
||||
|
||||
f.workers[name] = w
|
||||
|
||||
f.wg.Add(1)
|
||||
go func(w *sinkWorker) {
|
||||
defer f.wg.Done()
|
||||
f.runWorker(ctx, w)
|
||||
}(w)
|
||||
}
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// Close stops all workers by closing their channels and waits for them to exit.
|
||||
// It is safe to call multiple times.
|
||||
func (f *Fanout) Close() {
|
||||
if f == nil {
|
||||
return
|
||||
}
|
||||
|
||||
f.closeOnce.Do(func() {
|
||||
// Closing the per-sink channels tells workers "no more events".
|
||||
for _, w := range f.workers {
|
||||
close(w.ch)
|
||||
}
|
||||
f.wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// Publish enqueues an event to a named sink's queue.
|
||||
//
|
||||
// If the sink queue is full, Publish will wait up to f.enqueueTimeout.
|
||||
// If enqueueTimeout is <= 0, Publish will block until it can enqueue or ctx cancels.
|
||||
//
|
||||
// If Publish returns an error, the event has NOT been enqueued for that sink.
|
||||
func (f *Fanout) Publish(ctx context.Context, sinkName string, e event.Event) error {
|
||||
w, ok := f.workers[sinkName]
|
||||
if !ok {
|
||||
return fmt.Errorf("dispatch.Fanout.Publish: unknown sink %q", sinkName)
|
||||
}
|
||||
|
||||
// Fast path: no timeout configured; block until enqueue or ctx cancels.
|
||||
if f.enqueueTimeout <= 0 {
|
||||
select {
|
||||
case w.ch <- e:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Bounded enqueue: wait up to enqueueTimeout for space in the buffer.
|
||||
timer := time.NewTimer(f.enqueueTimeout)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case w.ch <- e:
|
||||
return nil
|
||||
|
||||
case <-timer.C:
|
||||
return fmt.Errorf("%w: sink=%q id=%s kind=%s source=%s",
|
||||
ErrSinkQueueFull, sinkName, e.ID, e.Kind, e.Source)
|
||||
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Fanout) runWorker(ctx context.Context, w *sinkWorker) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context cancellation means we're shutting down; drop queued work.
|
||||
return
|
||||
|
||||
case e, ok := <-w.ch:
|
||||
if !ok {
|
||||
// Channel closed: dispatcher is done publishing to this sink.
|
||||
return
|
||||
}
|
||||
|
||||
f.consumeOne(ctx, w, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Fanout) consumeOne(parent context.Context, w *sinkWorker, e event.Event) {
|
||||
// Always give sinks a context they can observe.
|
||||
consumeCtx := parent
|
||||
cancel := func() {}
|
||||
if w.consumeTimeout > 0 {
|
||||
consumeCtx, cancel = context.WithTimeout(parent, w.consumeTimeout)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Sinks are adapters. We keep the worker alive even if a sink panics:
|
||||
// it's better to log loudly and continue than to silently lose a sink forever.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
f.logf("dispatch: sink %q PANIC while consuming event (id=%s kind=%s source=%s): %v",
|
||||
w.name, e.ID, e.Kind, e.Source, r)
|
||||
}
|
||||
}()
|
||||
|
||||
err := w.sink.Consume(consumeCtx, e)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// If the sink respects context, timeouts should surface as context deadline exceeded.
|
||||
// We log a distinct message because it's operationally useful.
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
f.logf("dispatch: sink %q timed out after %s consuming event (id=%s kind=%s source=%s)",
|
||||
w.name, elapsed, e.ID, e.Kind, e.Source)
|
||||
return
|
||||
}
|
||||
|
||||
// Normal errors.
|
||||
f.logf("dispatch: sink %q failed consuming event (id=%s kind=%s source=%s): %v",
|
||||
w.name, e.ID, e.Kind, e.Source, err)
|
||||
}
|
||||
|
||||
func (f *Fanout) logf(format string, args ...any) {
|
||||
if f == nil || f.logger == nil {
|
||||
return
|
||||
}
|
||||
f.logger(format, args...)
|
||||
}
|
||||
Reference in New Issue
Block a user