From 09bc65e947f486f29b1e8cf12f36e61b2a9b11a9 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Tue, 13 Jan 2026 14:40:29 -0600 Subject: [PATCH] feedkit: ergonomics pass (shared logger, route compiler, param helpers) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add logging.Logf as the canonical printf-style logger type used across feedkit. - Update scheduler and dispatch to alias their Logger types to logging.Logf. - Eliminates type-mismatch friction when wiring one log function through the system. - Add dispatch.CompileRoutes(*config.Config) ([]dispatch.Route, error) - Compiles config routes into dispatch routes with event.ParseKind normalization. - If routes: is omitted, defaults to “all sinks receive all kinds”. - Expand config param helpers for both SourceConfig and SinkConfig - Add ParamBool/ParamInt/ParamDuration/ParamStringSlice (+ Default variants). - Supports common YAML-decoded types (bool/int/float/string, []any, etc.) - Keeps driver code cleaner and reduces repeated type assertions. - Fix Postgres sink validation error prefix ("postgres sink", not "rabbitmq sink"). --- README.md | 254 ++++++++++++++++++++++++++- config/config.go | 9 - config/params.go | 385 +++++++++++++++++++++++++++++++++++++---- dispatch/dispatch.go | 8 +- dispatch/routes.go | 83 +++++++++ doc.go | 183 ++++++++++++++++++++ logging/logging.go | 8 + scheduler/scheduler.go | 8 +- sinks/postgres.go | 2 +- 9 files changed, 896 insertions(+), 44 deletions(-) create mode 100644 dispatch/routes.go create mode 100644 doc.go create mode 100644 logging/logging.go diff --git a/README.md b/README.md index a162cbf..804cfc7 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,255 @@ # feedkit -Feedkit provides core interfaces and plumbing for applications that ingest and process feeds. \ No newline at end of file +**feedkit** provides the domain-agnostic core plumbing for *feed-processing daemons*. + +A feed daemon is a long-running process that: +- polls one or more upstream providers (HTTP APIs, RSS feeds, etc.) +- normalizes upstream data into a consistent internal representation +- applies lightweight policy (dedupe, rate-limit, filtering) +- emits events to one or more sinks (stdout, files, databases, brokers) + +feedkit is designed to be reused by many concrete daemons (e.g. `weatherfeeder`, +`newsfeeder`, `rssfeeder`) without embedding *any* domain-specific logic. + +--- + +## Philosophy + +feedkit is **not a framework**. + +It does **not**: +- define domain schemas +- enforce allowed event kinds +- hide control flow behind inversion-of-control magic +- own your application lifecycle + +Instead, it provides **small, composable primitives** that concrete daemons wire +together explicitly. The goal is clarity, predictability, and long-term +maintainability. + +--- + +## Conceptual pipeline + +Collect → Normalize → Filter / Policy → Route → Persist / Emit + +In feedkit terms: + +| Stage | Package(s) | +|------------|--------------------------------------| +| Collect | `sources`, `scheduler` | +| Normalize | *(today: domain code; planned: pipeline processor)* | +| Policy | `pipeline` | +| Route | `dispatch` | +| Emit | `sinks` | +| Configure | `config` | + +--- + +## Public API overview + +### `config` — Configuration loading & validation +**Status:** 🟢 Stable + +- Loads YAML configuration +- Strict decoding (`KnownFields(true)`) +- Domain-agnostic validation only (shape, required fields, references) +- Flexible `Params map[string]any` with typed helpers + +Key types: +- `config.Config` +- `config.SourceConfig` +- `config.SinkConfig` +- `config.Load(path)` + +--- + +### `event` — Domain-agnostic event envelope +**Status:** 🟢 Stable + +Defines the canonical event structure that moves through feedkit. + +Includes: +- Stable ID +- Kind (stringly-typed, domain-defined) +- Source name +- Timestamps (`EmittedAt`, optional `EffectiveAt`) +- Optional `Schema` for payload versioning +- Opaque `Payload` + +Key types: +- `event.Event` +- `event.Kind` +- `event.ParseKind` +- `event.Event.Validate` + +feedkit infrastructure never inspects `Payload`. + +--- + +### `sources` — Polling abstraction +**Status:** 🟢 Stable (interface); 🔵 evolving patterns + +Defines the contract implemented by domain-specific polling jobs. + +```go +type Source interface { + Name() string + Kind() event.Kind + Poll(ctx context.Context) ([]event.Event, error) +} +``` +Includes a registry (sources.Registry) so daemons can register drivers +(e.g. openweather_observation, rss_feed) without switch statements. + +Note: Today, most sources both fetch and normalize. A dedicated +normalization hook is planned (see below). + +### `scheduler` — Time-based polling + +**Status:** 🟢 Stable + +Runs one goroutine per source on a configured interval with jitter. + +Features: +- Per-source interval +- Deterministic jitter (avoids thundering herd) +- Immediate poll at startup +- Context-aware shutdown + +Key types: +- `scheduler.Scheduler` +- `scheduler.Job` + +### `pipeline` — Event processing chain +**Status:** 🟡 Partial (API stable, processors evolving) + +Allows events to be transformed, dropped, or rejected between collection +and dispatch. + +```go +type Processor interface { + Process(ctx context.Context, in event.Event) (*event.Event, error) +} +``` + +Current state: +- `pipeline.Pipeline` is fully implemented + +Placeholder files exist for: +- `dedupe` (planned) +- `ratelimit` (planned) + +This is the intended home for: +- normalization +- deduplication +- rate limiting +- lightweight policy enforcement + +### `dispatch` — Routing & fan-out +**Status:** 🟢 Stable + +Routes events to sinks based on kind and isolates slow sinks. + +Features: +- Compiled routing rules +- Per-sink buffered queues +- Bounded enqueue timeouts +- Per-consume timeouts +- Sink panic isolation +- Context-aware shutdown + +Key types: +- `dispatch.Dispatcher` +- `dispatch.Route` +- `dispatch.Fanout` + +### `sinks` — Output adapters +***Status:*** 🟡 Mixed + +Defines where events go after processing. + +```go +type Sink interface { + Name() string + Consume(ctx context.Context, e event.Event) error +} +``` + +Registry-based construction allows daemons to opt into any sink drivers. + +Sink Status +stdout 🟢 Implemented +file 🔴 Stub +postgres 🔴 Stub +rabbitmq 🔴 Stub + +All sinks are required to respect context cancellation. + +### Normalization (planned) +**Status:** 🔵 Planned (API design in progress) + +Currently, most domain implementations normalize upstream data inside +`sources.Source.Poll`, which leads to: +- very large source files +- mixed responsibilities (HTTP + mapping) +- duplicated helper code + +The intended evolution is: +- Sources emit raw events (e.g. `json.RawMessage`) +- A dedicated normalization processor runs in the pipeline +- Normalizers are selected by `Event.Schema`, `Kind`, or `Source` + +This keeps: +- `feedkit` domain-agnostic +- `sources` small and focused +- normalization logic centralized and testable + +### Runner helper (planned) +**Status:** 🔵 Planned (optional convenience) + +Most daemons wire together the same steps: +- load config +- build sources +- build sinks +- compile routes +- start scheduler +- start dispatcher + +A small, opt-in `Runner` helper may be added to reduce boilerplate while keeping the system explicit and debuggable. + +This is not intended to become a framework. + +## Stability summary +Area Status +Event model 🟢 Stable +Config API 🟢 Stable +Scheduler 🟢 Stable +Dispatcher 🟢 Stable +Source interface 🟢 Stable +Pipeline core 🟡 Partial +Normalization 🔵 Planned +Dedupe/Ratelimit 🔵 Planned +Non-stdout sinks 🔴 Stub + +Legend: +🟢 Stable — API considered solid +🟡 Partial — usable, but incomplete +🔵 Planned — design direction agreed, not yet implemented +🔴 Stub — placeholder only + +## Non-goals +`feedkit` intentionally does not: +- define domain payload schemas +- enforce domain-specific validation +- manage persistence semantics beyond sink adapters +- own observability, metrics, or tracing (left to daemons) + +Those concerns belong in concrete implementations. + +## See also +- NAMING.md — repository and daemon naming conventions +- event/doc.go — detailed event semantics +- **Concrete example:** weatherfeeder (reference implementation) + +--- \ No newline at end of file diff --git a/config/config.go b/config/config.go index f41b979..ef97c73 100644 --- a/config/config.go +++ b/config/config.go @@ -128,12 +128,3 @@ func (d *Duration) UnmarshalYAML(value *yaml.Node) error { // Anything else: reject. return fmt.Errorf("duration must be a string like 15m or an integer minutes, got tag %s", value.Tag) } - -func isAllDigits(s string) bool { - for _, r := range s { - if r < '0' || r > '9' { - return false - } - } - return len(s) > 0 -} diff --git a/config/params.go b/config/params.go index 92c7ad0..bd71c9d 100644 --- a/config/params.go +++ b/config/params.go @@ -1,32 +1,21 @@ // feedkit/config/params.go package config -import "strings" +import ( + "math" + "strconv" + "strings" + "time" +) + +// ---- SourceConfig param helpers ---- // ParamString returns the first non-empty string found for any of the provided keys. // Values must actually be strings in the decoded config; other types are ignored. // // This keeps cfg.Params flexible (map[string]any) while letting callers stay type-safe. func (cfg SourceConfig) ParamString(keys ...string) (string, bool) { - if cfg.Params == nil { - return "", false - } - for _, k := range keys { - v, ok := cfg.Params[k] - if !ok || v == nil { - continue - } - s, ok := v.(string) - if !ok { - continue - } - s = strings.TrimSpace(s) - if s == "" { - continue - } - return s, true - } - return "", false + return paramString(cfg.Params, keys...) } // ParamStringDefault returns ParamString(keys...) if present; otherwise it returns def. @@ -38,14 +27,150 @@ func (cfg SourceConfig) ParamStringDefault(def string, keys ...string) string { return strings.TrimSpace(def) } +// ParamBool returns the first boolean found for any of the provided keys. +// +// Accepted types in Params: +// - bool +// - string: parsed via strconv.ParseBool ("true"/"false"/"1"/"0", etc.) +func (cfg SourceConfig) ParamBool(keys ...string) (bool, bool) { + return paramBool(cfg.Params, keys...) +} + +func (cfg SourceConfig) ParamBoolDefault(def bool, keys ...string) bool { + if v, ok := cfg.ParamBool(keys...); ok { + return v + } + return def +} + +// ParamInt returns the first integer-like value found for any of the provided keys. +// +// Accepted types in Params: +// - any integer type (int, int64, uint32, ...) +// - float32/float64 ONLY if it is an exact integer (e.g. 15.0) +// - string: parsed via strconv.Atoi (e.g. "42") +func (cfg SourceConfig) ParamInt(keys ...string) (int, bool) { + return paramInt(cfg.Params, keys...) +} + +func (cfg SourceConfig) ParamIntDefault(def int, keys ...string) int { + if v, ok := cfg.ParamInt(keys...); ok { + return v + } + return def +} + +// ParamDuration returns the first duration-like value found for any of the provided keys. +// +// Accepted types in Params: +// - time.Duration +// - string: parsed via time.ParseDuration (e.g. "250ms", "30s", "5m") +// - if the string is all digits (e.g. "30"), it is interpreted as SECONDS +// - numeric: interpreted as SECONDS (e.g. 30 => 30s) +// +// Rationale: Param durations are usually timeouts/backoffs; seconds are a sane numeric default. +// If you want minutes/hours, prefer a duration string like "5m" or "1h". +func (cfg SourceConfig) ParamDuration(keys ...string) (time.Duration, bool) { + return paramDuration(cfg.Params, keys...) +} + +func (cfg SourceConfig) ParamDurationDefault(def time.Duration, keys ...string) time.Duration { + if v, ok := cfg.ParamDuration(keys...); ok { + return v + } + return def +} + +// ParamStringSlice returns the first string-slice-like value found for any of the provided keys. +// +// Accepted types in Params: +// - []string +// - []any where each element is a string +// - string: +// - if it contains commas, split on commas (",") and trim each item +// - otherwise treat as a single-item list +// +// Empty/blank items are removed. +func (cfg SourceConfig) ParamStringSlice(keys ...string) ([]string, bool) { + return paramStringSlice(cfg.Params, keys...) +} + +// ---- SinkConfig param helpers ---- + // ParamString returns the first non-empty string found for any of the provided keys // in SinkConfig.Params. (Same rationale as SourceConfig.ParamString.) func (cfg SinkConfig) ParamString(keys ...string) (string, bool) { - if cfg.Params == nil { - return "", false + return paramString(cfg.Params, keys...) +} + +// ParamStringDefault returns ParamString(keys...) if present; otherwise it returns def. +// Symmetric helper for sink implementations. +func (cfg SinkConfig) ParamStringDefault(def string, keys ...string) string { + if s, ok := cfg.ParamString(keys...); ok { + return s + } + return strings.TrimSpace(def) +} + +func (cfg SinkConfig) ParamBool(keys ...string) (bool, bool) { + return paramBool(cfg.Params, keys...) +} + +func (cfg SinkConfig) ParamBoolDefault(def bool, keys ...string) bool { + if v, ok := cfg.ParamBool(keys...); ok { + return v + } + return def +} + +func (cfg SinkConfig) ParamInt(keys ...string) (int, bool) { + return paramInt(cfg.Params, keys...) +} + +func (cfg SinkConfig) ParamIntDefault(def int, keys ...string) int { + if v, ok := cfg.ParamInt(keys...); ok { + return v + } + return def +} + +func (cfg SinkConfig) ParamDuration(keys ...string) (time.Duration, bool) { + return paramDuration(cfg.Params, keys...) +} + +func (cfg SinkConfig) ParamDurationDefault(def time.Duration, keys ...string) time.Duration { + if v, ok := cfg.ParamDuration(keys...); ok { + return v + } + return def +} + +func (cfg SinkConfig) ParamStringSlice(keys ...string) ([]string, bool) { + return paramStringSlice(cfg.Params, keys...) +} + +// ---- shared implementations (package-private) ---- + +func paramAny(params map[string]any, keys ...string) (any, bool) { + if params == nil { + return nil, false } for _, k := range keys { - v, ok := cfg.Params[k] + v, ok := params[k] + if !ok || v == nil { + continue + } + return v, true + } + return nil, false +} + +func paramString(params map[string]any, keys ...string) (string, bool) { + for _, k := range keys { + if params == nil { + return "", false + } + v, ok := params[k] if !ok || v == nil { continue } @@ -62,11 +187,213 @@ func (cfg SinkConfig) ParamString(keys ...string) (string, bool) { return "", false } -// ParamStringDefault returns ParamString(keys...) if present; otherwise it returns def. -// Symmetric helper for sink implementations. -func (cfg SinkConfig) ParamStringDefault(def string, keys ...string) string { - if s, ok := cfg.ParamString(keys...); ok { - return s +func paramBool(params map[string]any, keys ...string) (bool, bool) { + v, ok := paramAny(params, keys...) + if !ok { + return false, false + } + + switch t := v.(type) { + case bool: + return t, true + case string: + s := strings.TrimSpace(t) + if s == "" { + return false, false + } + parsed, err := strconv.ParseBool(s) + if err != nil { + return false, false + } + return parsed, true + default: + return false, false } - return strings.TrimSpace(def) +} + +func paramInt(params map[string]any, keys ...string) (int, bool) { + v, ok := paramAny(params, keys...) + if !ok { + return 0, false + } + + switch t := v.(type) { + case int: + return t, true + case int8: + return int(t), true + case int16: + return int(t), true + case int32: + return int(t), true + case int64: + return int(t), true + + case uint: + return int(t), true + case uint8: + return int(t), true + case uint16: + return int(t), true + case uint32: + return int(t), true + case uint64: + return int(t), true + + case float32: + f := float64(t) + if math.IsNaN(f) || math.IsInf(f, 0) { + return 0, false + } + if math.Trunc(f) != f { + return 0, false + } + return int(f), true + + case float64: + if math.IsNaN(t) || math.IsInf(t, 0) { + return 0, false + } + if math.Trunc(t) != t { + return 0, false + } + return int(t), true + + case string: + s := strings.TrimSpace(t) + if s == "" { + return 0, false + } + n, err := strconv.Atoi(s) + if err != nil { + return 0, false + } + return n, true + + default: + return 0, false + } +} + +func paramDuration(params map[string]any, keys ...string) (time.Duration, bool) { + v, ok := paramAny(params, keys...) + if !ok { + return 0, false + } + + switch t := v.(type) { + case time.Duration: + if t <= 0 { + return 0, false + } + return t, true + + case string: + s := strings.TrimSpace(t) + if s == "" { + return 0, false + } + // Numeric strings are interpreted as seconds (see doc comment). + if isAllDigits(s) { + n, err := strconv.Atoi(s) + if err != nil || n <= 0 { + return 0, false + } + return time.Duration(n) * time.Second, true + } + d, err := time.ParseDuration(s) + if err != nil || d <= 0 { + return 0, false + } + return d, true + + case int: + if t <= 0 { + return 0, false + } + return time.Duration(t) * time.Second, true + case int64: + if t <= 0 { + return 0, false + } + return time.Duration(t) * time.Second, true + case float64: + if math.IsNaN(t) || math.IsInf(t, 0) || t <= 0 { + return 0, false + } + // Allow fractional seconds. + secs := t * float64(time.Second) + return time.Duration(secs), true + case float32: + f := float64(t) + if math.IsNaN(f) || math.IsInf(f, 0) || f <= 0 { + return 0, false + } + secs := f * float64(time.Second) + return time.Duration(secs), true + + default: + return 0, false + } +} + +func paramStringSlice(params map[string]any, keys ...string) ([]string, bool) { + v, ok := paramAny(params, keys...) + if !ok { + return nil, false + } + + clean := func(items []string) ([]string, bool) { + out := make([]string, 0, len(items)) + for _, it := range items { + it = strings.TrimSpace(it) + if it == "" { + continue + } + out = append(out, it) + } + if len(out) == 0 { + return nil, false + } + return out, true + } + + switch t := v.(type) { + case []string: + return clean(t) + + case []any: + tmp := make([]string, 0, len(t)) + for _, it := range t { + s, ok := it.(string) + if !ok { + continue + } + tmp = append(tmp, s) + } + return clean(tmp) + + case string: + s := strings.TrimSpace(t) + if s == "" { + return nil, false + } + if strings.Contains(s, ",") { + parts := strings.Split(s, ",") + return clean(parts) + } + return clean([]string{s}) + + default: + return nil, false + } +} + +func isAllDigits(s string) bool { + for _, r := range s { + if r < '0' || r > '9' { + return false + } + } + return len(s) > 0 } diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 7c7c8b5..71c91b2 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -6,10 +6,16 @@ import ( "time" "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/logging" "gitea.maximumdirect.net/ejr/feedkit/pipeline" "gitea.maximumdirect.net/ejr/feedkit/sinks" ) +// Logger is a printf-style logger used throughout dispatch. +// It is an alias to the shared feedkit logging type so callers can pass +// one function everywhere without type mismatch friction. +type Logger = logging.Logf + type Dispatcher struct { In <-chan event.Event @@ -35,8 +41,6 @@ type Route struct { Kinds map[event.Kind]bool } -type Logger func(format string, args ...any) - func (d *Dispatcher) Run(ctx context.Context, logf Logger) error { if d.In == nil { return fmt.Errorf("dispatcher.Run: In channel is nil") diff --git a/dispatch/routes.go b/dispatch/routes.go new file mode 100644 index 0000000..98accc7 --- /dev/null +++ b/dispatch/routes.go @@ -0,0 +1,83 @@ +package dispatch + +import ( + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// CompileRoutes converts config.Config routes into dispatch.Route rules. +// +// Behavior: +// - If cfg.Routes is empty, we default to "all sinks receive all kinds". +// (Implemented as one Route per sink with Kinds == nil.) +// - Kind strings are normalized via event.ParseKind (lowercase + trim). +// +// Note: config.Validate() already ensures route.sink references a known sink and +// route.kinds are non-empty strings. We re-check a few invariants here anyway so +// CompileRoutes is safe to call even if a daemon chooses not to call Validate(). +func CompileRoutes(cfg *config.Config) ([]Route, error) { + if cfg == nil { + return nil, fmt.Errorf("dispatch.CompileRoutes: cfg is nil") + } + + if len(cfg.Sinks) == 0 { + return nil, fmt.Errorf("dispatch.CompileRoutes: cfg has no sinks") + } + + // Build a quick lookup of sink names. + sinkNames := make(map[string]bool, len(cfg.Sinks)) + for i, s := range cfg.Sinks { + name := strings.TrimSpace(s.Name) + if name == "" { + return nil, fmt.Errorf("dispatch.CompileRoutes: sinks[%d].name is empty", i) + } + sinkNames[name] = true + } + + // Default routing: everything to every sink. + if len(cfg.Routes) == 0 { + out := make([]Route, 0, len(cfg.Sinks)) + for _, s := range cfg.Sinks { + out = append(out, Route{ + SinkName: s.Name, + Kinds: nil, // nil/empty map means "all kinds" + }) + } + return out, nil + } + + out := make([]Route, 0, len(cfg.Routes)) + + for i, r := range cfg.Routes { + sink := strings.TrimSpace(r.Sink) + if sink == "" { + return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].sink is required", i) + } + if !sinkNames[sink] { + return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].sink references unknown sink %q", i, sink) + } + + if len(r.Kinds) == 0 { + return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].kinds must contain at least one kind", i) + } + + kinds := make(map[event.Kind]bool, len(r.Kinds)) + for j, raw := range r.Kinds { + k, err := event.ParseKind(raw) + if err != nil { + return nil, fmt.Errorf("dispatch.CompileRoutes: routes[%d].kinds[%d]: %w", i, j, err) + } + kinds[k] = true + } + + out = append(out, Route{ + SinkName: sink, + Kinds: kinds, + }) + } + + return out, nil +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..392838b --- /dev/null +++ b/doc.go @@ -0,0 +1,183 @@ +// Package feedkit provides domain-agnostic plumbing for "feed processing daemons". +// +// A feed daemon polls one or more upstream providers (HTTP APIs, RSS, etc.), +// converts upstream items into a normalized internal representation, applies +// lightweight policy (dedupe/rate-limit/filters), and emits events to one or +// more sinks (stdout, files, Postgres, brokers, ...). +// +// feedkit is intentionally NOT a framework. It supplies small, composable +// primitives that concrete daemons wire together in main.go (or via a small +// optional Runner helper, see "Future additions"). +// +// Conceptual pipeline +// +// Collect → Normalize → Filter/Policy → Persist/Emit → Signal +// +// In feedkit today, that maps to: +// +// Collect: sources.Source + scheduler.Scheduler +// Normalize: (today: domain code typically does this inside Source.Poll; +// future: a normalization Processor is a good fit) +// Policy: pipeline.Pipeline (Processor chain; dedupe/ratelimit are planned) +// Emit: dispatch.Dispatcher + dispatch.Fanout +// Sinks: sinks.Sink (+ sinks.Registry to build from config) +// Config: config.Load + config.Config validation +// +// Public packages (API surface) +// +// - config +// YAML configuration types and loader/validator. +// +// - config.Load(path) (*config.Config, error) +// +// - config.Config: Sources, Sinks, Routes +// +// - config.SourceConfig / SinkConfig include Params map[string]any +// with convenience helpers like: +// +// - ParamString / ParamStringDefault +// +// - ParamBool / ParamBoolDefault +// +// - ParamInt / ParamIntDefault +// +// - ParamDuration / ParamDurationDefault +// +// - ParamStringSlice +// +// - event +// Domain-agnostic event envelope moved through the system. +// +// - event.Event includes ID, Kind, Source, timestamps, Schema, Payload +// +// - event.Kind is stringly typed; event.ParseKind normalizes/validates. +// +// - sources +// Extension point for domain-specific polling jobs. +// +// - sources.Source interface: Name(), Kind(), Poll(ctx) +// +// - sources.Registry lets daemons register driver factories and build +// sources from config.SourceConfig. +// +// - scheduler +// Runs sources on a cadence and publishes emitted events onto a channel. +// +// - scheduler.Scheduler{Jobs, Out, Logf}.Run(ctx) +// +// - scheduler.Job: {Source, Every, Jitter} +// +// - pipeline +// Optional processing chain between scheduler and dispatch. +// +// - pipeline.Pipeline{Processors}.Process(ctx, event) +// +// - pipeline.Processor can mutate, drop (return nil), or error. +// +// - dedupe/ratelimit processors are placeholders (planned). +// +// - sinks +// Extension point for output adapters. +// +// - sinks.Sink interface: Name(), Consume(ctx, event) +// +// - sinks.Registry to register driver factories and build sinks from config +// +// - sinks.RegisterBuiltins registers feedkit-provided sink drivers +// (stdout/file/postgres/rabbitmq; some are currently stubs). +// +// - dispatch +// Routes processed events to sinks, and isolates slow sinks via per-sink queues. +// +// - dispatch.Dispatcher{In, Pipeline, Sinks, Routes, ...}.Run(ctx, logf) +// +// - dispatch.Fanout: one buffered queue + worker goroutine per sink +// +// - dispatch.CompileRoutes(*config.Config) compiles cfg.Routes into []dispatch.Route. +// If routes: is omitted, it defaults to "all sinks receive all kinds". +// +// - logging +// Shared logger type used across feedkit packages. +// +// - logging.Logf is a printf-style logger signature. +// +// Typical wiring (what a daemon does in main.go) +// +// 1. Load config (domain code may add domain-specific validation). +// 2. Register and build sources from config.Sources using sources.Registry. +// 3. Register and build sinks from config.Sinks using sinks.Registry. +// 4. Compile routes (typically via dispatch.CompileRoutes). +// 5. Create an event bus channel. +// 6. Start scheduler (sources → bus). +// 7. Start dispatcher (bus → pipeline → routes → sinks). +// +// A sketch: +// +// cfg, _ := config.Load("config.yml") +// +// // Build sources (domain registers its drivers). +// srcReg := sources.NewRegistry() +// // domain: srcReg.Register("openweather_observation", newOpenWeatherSource) +// // ... +// +// var jobs []scheduler.Job +// for _, sc := range cfg.Sources { +// src, _ := srcReg.Build(sc) +// jobs = append(jobs, scheduler.Job{Source: src, Every: sc.Every.Duration}) +// } +// +// // Build sinks (feedkit can register builtins). +// sinkReg := sinks.NewRegistry() +// sinks.RegisterBuiltins(sinkReg) +// builtSinks := map[string]sinks.Sink{} +// for _, sk := range cfg.Sinks { +// s, _ := sinkReg.Build(sk) +// builtSinks[sk.Name] = s +// } +// +// // Compile routes. +// routes, _ := dispatch.CompileRoutes(cfg) +// +// // Event bus. +// bus := make(chan event.Event, 256) +// +// // Scheduler. +// s := &scheduler.Scheduler{Jobs: jobs, Out: bus, Logf: logf} +// +// // Dispatcher. +// d := &dispatch.Dispatcher{ +// In: bus, +// Pipeline: &pipeline.Pipeline{Processors: nil}, +// Sinks: builtSinks, +// Routes: routes, +// } +// +// go s.Run(ctx) +// return d.Run(ctx, logf) +// +// Conventions (recommended, not required) +// +// - Event.ID should be stable for dedupe/storage (often ":"). +// - Event.Kind should be lowercase ("observation", "alert", "article", ...). +// - Event.Schema should identify the payload shape/version +// (e.g. "weather.observation.v1"). +// +// # Context and cancellation +// +// All blocking or I/O work should honor ctx.Done(): +// - sources.Source.Poll should pass ctx to HTTP calls, etc. +// - sinks.Sink.Consume should honor ctx (Fanout timeouts only help if sinks cooperate). +// +// Future additions (likely) +// +// - A small Runner helper that performs the standard wiring (load config, +// build sources/sinks/routes, run scheduler+dispatcher, handle shutdown). +// - A normalization hook (a Pipeline Processor + registry) that allows sources +// to emit "raw" payloads and defer normalization to a dedicated stage. +// +// # Non-goals +// +// feedkit does not define domain payload schemas, does not enforce domain kinds, +// and does not embed domain-specific validation rules. Those live in each +// concrete daemon/module (weatherfeeder, newsfeeder, ...). +package feedkit diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..cd547ad --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,8 @@ +package logging + +// Logf is the shared printf-style logger signature used across feedkit. +// +// Keeping this in one place avoids the "scheduler.Logger vs dispatch.Logger" +// friction and makes it trivial for downstream apps to pass a single log +// function throughout the system. +type Logf func(format string, args ...any) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 2557bd6..c6d1d72 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -8,9 +8,15 @@ import ( "time" "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/logging" "gitea.maximumdirect.net/ejr/feedkit/sources" ) +// Logger is a printf-style logger used throughout scheduler. +// It is an alias to the shared feedkit logging type so callers can pass +// one function everywhere without type mismatch friction. +type Logger = logging.Logf + type Job struct { Source sources.Source Every time.Duration @@ -23,8 +29,6 @@ type Job struct { Jitter time.Duration } -type Logger func(format string, args ...any) - type Scheduler struct { Jobs []Job Out chan<- event.Event diff --git a/sinks/postgres.go b/sinks/postgres.go index 9567a3a..9b9151b 100644 --- a/sinks/postgres.go +++ b/sinks/postgres.go @@ -29,7 +29,7 @@ func (p *PostgresSink) Consume(ctx context.Context, e event.Event) error { // Boundary validation: if something upstream violated invariants, // surface it loudly rather than printing partial nonsense. if err := e.Validate(); err != nil { - return fmt.Errorf("rabbitmq sink: invalid event: %w", err) + return fmt.Errorf("postgres sink: invalid event: %w", err) } // TODO implement Postgres transaction