Files
feedkit/dispatch/dispatch.go
Eric Rakestraw 09bc65e947 feedkit: ergonomics pass (shared logger, route compiler, param helpers)
- Add logging.Logf as the canonical printf-style logger type used across feedkit.
  - Update scheduler and dispatch to alias their Logger types to logging.Logf.
  - Eliminates type-mismatch friction when wiring one log function through the system.

- Add dispatch.CompileRoutes(*config.Config) ([]dispatch.Route, error)
  - Compiles config routes into dispatch routes with event.ParseKind normalization.
  - If routes: is omitted, defaults to “all sinks receive all kinds”.

- Expand config param helpers for both SourceConfig and SinkConfig
  - Add ParamBool/ParamInt/ParamDuration/ParamStringSlice (+ Default variants).
  - Supports common YAML-decoded types (bool/int/float/string, []any, etc.)
  - Keeps driver code cleaner and reduces repeated type assertions.

- Fix Postgres sink validation error prefix ("postgres sink", not "rabbitmq sink").
2026-01-13 14:40:29 -06:00

109 lines
2.5 KiB
Go

package dispatch
import (
"context"
"fmt"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/feedkit/logging"
"gitea.maximumdirect.net/ejr/feedkit/pipeline"
"gitea.maximumdirect.net/ejr/feedkit/sinks"
)
// Logger is a printf-style logger used throughout dispatch.
// It is an alias to the shared feedkit logging type so callers can pass
// one function everywhere without type mismatch friction.
type Logger = logging.Logf
type Dispatcher struct {
In <-chan event.Event
Pipeline *pipeline.Pipeline
// Sinks by name
Sinks map[string]sinks.Sink
// Routing rules (compiled from config)
Routes []Route
// Fanout knobs (global defaults for now; we can wire these from config later).
//
// These are intentionally simple: one queue size, one enqueue timeout,
// one consume timeout for all sinks.
SinkQueueSize int
SinkEnqueueTimeout time.Duration
SinkConsumeTimeout time.Duration
}
type Route struct {
SinkName string
Kinds map[event.Kind]bool
}
func (d *Dispatcher) Run(ctx context.Context, logf Logger) error {
if d.In == nil {
return fmt.Errorf("dispatcher.Run: In channel is nil")
}
if d.Sinks == nil {
return fmt.Errorf("dispatcher.Run: Sinks map is nil")
}
if d.Pipeline == nil {
d.Pipeline = &pipeline.Pipeline{}
}
// Build and start sink workers.
fanout, err := NewFanout(ctx, d.Sinks, FanoutOptions{
QueueSize: d.SinkQueueSize,
EnqueueTimeout: d.SinkEnqueueTimeout,
ConsumeTimeout: d.SinkConsumeTimeout,
Logf: logf,
})
if err != nil {
return err
}
defer fanout.Close()
for {
select {
case <-ctx.Done():
return ctx.Err()
case e, ok := <-d.In:
if !ok {
// If someone closes the event bus, treat as clean shutdown.
return nil
}
out, err := d.Pipeline.Process(ctx, e)
if err != nil {
if logf != nil {
logf("dispatcher: pipeline error: %v", err)
}
continue
}
if out == nil {
// Dropped by policy.
continue
}
d.routeToSinks(ctx, fanout, *out, logf)
}
}
}
func (d *Dispatcher) routeToSinks(ctx context.Context, fanout *Fanout, e event.Event, logf Logger) {
for _, r := range d.Routes {
if len(r.Kinds) > 0 && !r.Kinds[e.Kind] {
continue
}
// Publish is now the ONLY thing we do here.
// It is bounded (if configured) and does not call sink adapters directly.
if err := fanout.Publish(ctx, r.SinkName, e); err != nil && logf != nil {
logf("dispatcher: failed to enqueue event for sink %q (id=%s kind=%s source=%s): %v",
r.SinkName, e.ID, e.Kind, e.Source, err)
}
}
}