feedkit: ergonomics pass (shared logger, route compiler, param helpers)

- Add logging.Logf as the canonical printf-style logger type used across feedkit.
  - Update scheduler and dispatch to alias their Logger types to logging.Logf.
  - Eliminates type-mismatch friction when wiring one log function through the system.

- Add dispatch.CompileRoutes(*config.Config) ([]dispatch.Route, error)
  - Compiles config routes into dispatch routes with event.ParseKind normalization.
  - If routes: is omitted, defaults to “all sinks receive all kinds”.

- Expand config param helpers for both SourceConfig and SinkConfig
  - Add ParamBool/ParamInt/ParamDuration/ParamStringSlice (+ Default variants).
  - Supports common YAML-decoded types (bool/int/float/string, []any, etc.)
  - Keeps driver code cleaner and reduces repeated type assertions.

- Fix Postgres sink validation error prefix ("postgres sink", not "rabbitmq sink").
This commit is contained in:
2026-01-13 14:40:29 -06:00
parent 0cc2862170
commit 09bc65e947
9 changed files with 896 additions and 44 deletions

View File

@@ -6,10 +6,16 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/feedkit/logging"
"gitea.maximumdirect.net/ejr/feedkit/pipeline"
"gitea.maximumdirect.net/ejr/feedkit/sinks"
)
// Logger is a printf-style logger used throughout dispatch.
// It is an alias to the shared feedkit logging type so callers can pass
// one function everywhere without type mismatch friction.
type Logger = logging.Logf
type Dispatcher struct {
In <-chan event.Event
@@ -35,8 +41,6 @@ type Route struct {
Kinds map[event.Kind]bool
}
type Logger func(format string, args ...any)
func (d *Dispatcher) Run(ctx context.Context, logf Logger) error {
if d.In == nil {
return fmt.Errorf("dispatcher.Run: In channel is nil")

83
dispatch/routes.go Normal file
View File

@@ -0,0 +1,83 @@
package dispatch
import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// CompileRoutes converts config.Config routes into dispatch.Route rules.
//
// Behavior:
// - If cfg.Routes is empty, we default to "all sinks receive all kinds".
// (Implemented as one Route per sink with Kinds == nil.)
// - Kind strings are normalized via event.ParseKind (lowercase + trim).
//
// Note: config.Validate() already ensures route.sink references a known sink and
// route.kinds are non-empty strings. We re-check a few invariants here anyway so
// CompileRoutes is safe to call even if a daemon chooses not to call Validate().
func CompileRoutes(cfg *config.Config) ([]Route, error) {
if cfg == nil {
return nil, fmt.Errorf("dispatch.CompileRoutes: cfg is nil")
}
if len(cfg.Sinks) == 0 {
return nil, fmt.Errorf("dispatch.CompileRoutes: cfg has no sinks")
}
// Build a quick lookup of sink names.
sinkNames := make(map[string]bool, len(cfg.Sinks))
for i, s := range cfg.Sinks {
name := strings.TrimSpace(s.Name)
if name == "" {
return nil, fmt.Errorf("dispatch.CompileRoutes: sinks[%d].name is empty", i)
}
sinkNames[name] = true
}
// Default routing: everything to every sink.
if len(cfg.Routes) == 0 {
out := make([]Route, 0, len(cfg.Sinks))
for _, s := range cfg.Sinks {
out = append(out, Route{
SinkName: s.Name,
Kinds: nil, // nil/empty map means "all kinds"
})
}
return out, nil
}
out := make([]Route, 0, len(cfg.Routes))
for i, r := range cfg.Routes {
sink := strings.TrimSpace(r.Sink)
if sink == "" {
return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].sink is required", i)
}
if !sinkNames[sink] {
return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].sink references unknown sink %q", i, sink)
}
if len(r.Kinds) == 0 {
return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].kinds must contain at least one kind", i)
}
kinds := make(map[event.Kind]bool, len(r.Kinds))
for j, raw := range r.Kinds {
k, err := event.ParseKind(raw)
if err != nil {
return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].kinds[%d]: %w", i, j, err)
}
kinds[k] = true
}
out = append(out, Route{
SinkName: sink,
Kinds: kinds,
})
}
return out, nil
}