- Add logging.Logf as the canonical printf-style logger type used across feedkit.
- Update scheduler and dispatch to alias their Logger types to logging.Logf.
- Eliminates type-mismatch friction when wiring one log function through the system.
- Add dispatch.CompileRoutes(*config.Config) ([]dispatch.Route, error)
- Compiles config routes into dispatch routes with event.ParseKind normalization.
- If routes: is omitted, defaults to “all sinks receive all kinds”.
- Expand config param helpers for both SourceConfig and SinkConfig
- Add ParamBool/ParamInt/ParamDuration/ParamStringSlice (+ Default variants).
- Supports common YAML-decoded types (bool/int/float/string, []any, etc.)
- Keeps driver code cleaner and reduces repeated type assertions.
- Fix Postgres sink validation error prefix ("postgres sink", not "rabbitmq sink").
109 lines
2.5 KiB
Go
109 lines
2.5 KiB
Go
package dispatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
|
"gitea.maximumdirect.net/ejr/feedkit/logging"
|
|
"gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
|
"gitea.maximumdirect.net/ejr/feedkit/sinks"
|
|
)
|
|
|
|
// Logger is a printf-style logger used throughout dispatch.
|
|
// It is an alias to the shared feedkit logging type so callers can pass
|
|
// one function everywhere without type mismatch friction.
|
|
type Logger = logging.Logf
|
|
|
|
type Dispatcher struct {
|
|
In <-chan event.Event
|
|
|
|
Pipeline *pipeline.Pipeline
|
|
|
|
// Sinks by name
|
|
Sinks map[string]sinks.Sink
|
|
|
|
// Routing rules (compiled from config)
|
|
Routes []Route
|
|
|
|
// Fanout knobs (global defaults for now; we can wire these from config later).
|
|
//
|
|
// These are intentionally simple: one queue size, one enqueue timeout,
|
|
// one consume timeout for all sinks.
|
|
SinkQueueSize int
|
|
SinkEnqueueTimeout time.Duration
|
|
SinkConsumeTimeout time.Duration
|
|
}
|
|
|
|
type Route struct {
|
|
SinkName string
|
|
Kinds map[event.Kind]bool
|
|
}
|
|
|
|
func (d *Dispatcher) Run(ctx context.Context, logf Logger) error {
|
|
if d.In == nil {
|
|
return fmt.Errorf("dispatcher.Run: In channel is nil")
|
|
}
|
|
if d.Sinks == nil {
|
|
return fmt.Errorf("dispatcher.Run: Sinks map is nil")
|
|
}
|
|
if d.Pipeline == nil {
|
|
d.Pipeline = &pipeline.Pipeline{}
|
|
}
|
|
|
|
// Build and start sink workers.
|
|
fanout, err := NewFanout(ctx, d.Sinks, FanoutOptions{
|
|
QueueSize: d.SinkQueueSize,
|
|
EnqueueTimeout: d.SinkEnqueueTimeout,
|
|
ConsumeTimeout: d.SinkConsumeTimeout,
|
|
Logf: logf,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer fanout.Close()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
|
|
case e, ok := <-d.In:
|
|
if !ok {
|
|
// If someone closes the event bus, treat as clean shutdown.
|
|
return nil
|
|
}
|
|
|
|
out, err := d.Pipeline.Process(ctx, e)
|
|
if err != nil {
|
|
if logf != nil {
|
|
logf("dispatcher: pipeline error: %v", err)
|
|
}
|
|
continue
|
|
}
|
|
if out == nil {
|
|
// Dropped by policy.
|
|
continue
|
|
}
|
|
|
|
d.routeToSinks(ctx, fanout, *out, logf)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Dispatcher) routeToSinks(ctx context.Context, fanout *Fanout, e event.Event, logf Logger) {
|
|
for _, r := range d.Routes {
|
|
if len(r.Kinds) > 0 && !r.Kinds[e.Kind] {
|
|
continue
|
|
}
|
|
|
|
// Publish is now the ONLY thing we do here.
|
|
// It is bounded (if configured) and does not call sink adapters directly.
|
|
if err := fanout.Publish(ctx, r.SinkName, e); err != nil && logf != nil {
|
|
logf("dispatcher: failed to enqueue event for sink %q (id=%s kind=%s source=%s): %v",
|
|
r.SinkName, e.ID, e.Kind, e.Source, err)
|
|
}
|
|
}
|
|
}
|