diff --git a/README.md b/README.md index dda20c3..fcdc9b6 100644 --- a/README.md +++ b/README.md @@ -1,255 +1,114 @@ # feedkit -**feedkit** provides the domain-agnostic core plumbing for *feed-processing daemons*. +`feedkit` provides domain-agnostic plumbing for feed-processing daemons. -A feed daemon is a long-running process that: -- polls one or more upstream providers (HTTP APIs, RSS feeds, etc.) -- normalizes upstream data into a consistent internal representation -- applies lightweight policy (dedupe, rate-limit, filtering) -- emits events to one or more sinks (stdout, files, databases, brokers) - -feedkit is designed to be reused by many concrete daemons (e.g. `weatherfeeder`, -`newsfeeder`, `rssfeeder`) without embedding *any* domain-specific logic. - ---- +A daemon built on feedkit typically: +- ingests upstream input (polling APIs or consuming streams) +- emits domain-agnostic `event.Event` values +- applies optional processing (normalization, dedupe, policy) +- routes events to sinks (stdout, NATS, files, databases, etc.) ## Philosophy -feedkit is **not a framework**. - -It does **not**: -- define domain schemas -- enforce allowed event kinds -- hide control flow behind inversion-of-control magic -- own your application lifecycle - -Instead, it provides **small, composable primitives** that concrete daemons wire -together explicitly. The goal is clarity, predictability, and long-term -maintainability. - ---- +feedkit is not a framework. It provides small composable packages and leaves +lifecycle, domain schemas, and domain-specific validation in your daemon. ## Conceptual pipeline -Collect → Normalize → Filter / Policy → Route → Persist / Emit +Collect -> Normalize (optional) -> Policy -> Route -> Emit -In feedkit terms: +| Stage | Package(s) | +|---|---| +| Collect | `sources`, `scheduler` | +| Normalize | `normalize` (optional in `pipeline`) | +| Policy | `pipeline` | +| Route | `dispatch` | +| Emit | `sinks` | +| Configure | `config` | -| Stage | Package(s) | -|------------|--------------------------------------| -| Collect | `sources`, `scheduler` | -| Normalize | *(today: domain code; planned: pipeline processor)* | -| Policy | `pipeline` | -| Route | `dispatch` | -| Emit | `sinks` | -| Configure | `config` | +## Core packages ---- +### `config` -## Public API overview +Loads YAML config with strict decoding and domain-agnostic validation. -### `config` — Configuration loading & validation -**Status:** 🟢 Stable +`SourceConfig` supports both source modes: +- `mode: poll` requires `every` +- `mode: stream` forbids `every` +- omitted `mode` means auto (inferred from the registered driver type) -- Loads YAML configuration -- Strict decoding (`KnownFields(true)`) -- Domain-agnostic validation only (shape, required fields, references) -- Flexible `Params map[string]any` with typed helpers +It also supports optional expected source kinds: +- `kinds: ["observation", "alert"]` (preferred) +- `kind: "observation"` (legacy fallback) -Key types: -- `config.Config` -- `config.SourceConfig` -- `config.SinkConfig` -- `config.Load(path)` +### `event` ---- +Defines the domain-agnostic event envelope (`event.Event`) used across the system. -### `event` — Domain-agnostic event envelope -**Status:** 🟢 Stable +### `sources` -Defines the canonical event structure that moves through feedkit. - -Includes: -- Stable ID -- Kind (stringly-typed, domain-defined) -- Source name -- Timestamps (`EmittedAt`, optional `EffectiveAt`) -- Optional `Schema` for payload versioning -- Opaque `Payload` - -Key types: -- `event.Event` -- `event.Kind` -- `event.ParseKind` -- `event.Event.Validate` - -feedkit infrastructure never inspects `Payload`. - ---- - -### `sources` — Polling abstraction -**Status:** 🟢 Stable (interface); 🔵 evolving patterns - -Defines the contract implemented by domain-specific polling jobs. +Defines source interfaces and driver registry: ```go -type Source interface { +type Input interface { Name() string - Kind() event.Kind +} + +type PollSource interface { + Input Poll(ctx context.Context) ([]event.Event, error) } -``` -Includes a registry (sources.Registry) so daemons can register drivers -(e.g. openweather_observation, rss_feed) without switch statements. -Note: Today, most sources both fetch and normalize. A dedicated -normalization hook is planned (see below). - -### `scheduler` — Time-based polling - -**Status:** 🟢 Stable - -Runs one goroutine per source on a configured interval with jitter. - -Features: -- Per-source interval -- Deterministic jitter (avoids thundering herd) -- Immediate poll at startup -- Context-aware shutdown - -Key types: -- `scheduler.Scheduler` -- `scheduler.Job` - -### `pipeline` — Event processing chain -**Status:** 🟡 Partial (API stable, processors evolving) - -Allows events to be transformed, dropped, or rejected between collection -and dispatch. - -```go -type Processor interface { - Process(ctx context.Context, in event.Event) (*event.Event, error) +type StreamSource interface { + Input + Run(ctx context.Context, out chan<- event.Event) error } ``` -Current state: -- `pipeline.Pipeline` is fully implemented +Notes: +- a poll can emit `0..N` events +- stream sources emit events continuously +- a single source may emit multiple event kinds +- driver implementations live in downstream daemons and are registered via `sources.Registry` -Placeholder files exist for: -- `dedupe` (planned) -- `ratelimit` (planned) +### `scheduler` -This is the intended home for: -- normalization -- deduplication -- rate limiting -- lightweight policy enforcement +Runs one goroutine per source job: +- poll sources: cadence driven (`every` + jitter) +- stream sources: continuous run loop -### `dispatch` — Routing & fan-out -**Status:** 🟢 Stable +### `pipeline` -Routes events to sinks based on kind and isolates slow sinks. +Optional processing chain between collection and dispatch. +Processors can transform, drop, or reject events. -Features: -- Compiled routing rules -- Per-sink buffered queues -- Bounded enqueue timeouts -- Per-consume timeouts -- Sink panic isolation -- Context-aware shutdown +### `normalize` -Key types: -- `dispatch.Dispatcher` -- `dispatch.Route` -- `dispatch.Fanout` +Optional normalization package (already implemented). Typical use: sources emit raw +payload events, then normalize to canonical schemas in a pipeline stage. -### `sinks` — Output adapters -***Status:*** 🟡 Mixed +### `dispatch` -Defines where events go after processing. +Compiles routes and fans out events to sinks with per-sink queue/worker isolation. -```go -type Sink interface { - Name() string - Consume(ctx context.Context, e event.Event) error -} -``` +### `sinks` -Registry-based construction allows daemons to opt into any sink drivers. +Defines sink interface and sink registry. Built-ins include `stdout` and `nats`, with +additional sink implementations at varying maturity. -Sink Status -stdout 🟢 Implemented -nats 🟢 Implemented -file 🔴 Stub -postgres 🔴 Stub +## Typical wiring -All sinks are required to respect context cancellation. - -### Normalization (planned) -**Status:** 🔵 Planned (API design in progress) - -Currently, most domain implementations normalize upstream data inside -`sources.Source.Poll`, which leads to: -- very large source files -- mixed responsibilities (HTTP + mapping) -- duplicated helper code - -The intended evolution is: -- Sources emit raw events (e.g. `json.RawMessage`) -- A dedicated normalization processor runs in the pipeline -- Normalizers are selected by `Event.Schema`, `Kind`, or `Source` - -This keeps: -- `feedkit` domain-agnostic -- `sources` small and focused -- normalization logic centralized and testable - -### Runner helper (planned) -**Status:** 🔵 Planned (optional convenience) - -Most daemons wire together the same steps: -- load config -- build sources -- build sinks -- compile routes -- start scheduler -- start dispatcher - -A small, opt-in `Runner` helper may be added to reduce boilerplate while keeping the system explicit and debuggable. - -This is not intended to become a framework. - -## Stability summary -Area Status -Event model 🟢 Stable -Config API 🟢 Stable -Scheduler 🟢 Stable -Dispatcher 🟢 Stable -Source interface 🟢 Stable -Pipeline core 🟡 Partial -Normalization 🔵 Planned -Dedupe/Ratelimit 🔵 Planned -Non-stdout sinks 🔴 Stub - -Legend: -🟢 Stable — API considered solid -🟡 Partial — usable, but incomplete -🔵 Planned — design direction agreed, not yet implemented -🔴 Stub — placeholder only +1. Load config. +2. Register/build sources from `cfg.Sources`. +3. Register/build sinks from `cfg.Sinks`. +4. Compile routes. +5. Start scheduler (`sources -> bus`). +6. Start dispatcher (`bus -> pipeline -> sinks`). ## Non-goals -`feedkit` intentionally does not: + +feedkit intentionally does not: - define domain payload schemas -- enforce domain-specific validation -- manage persistence semantics beyond sink adapters -- own observability, metrics, or tracing (left to daemons) - -Those concerns belong in concrete implementations. - -## See also -- NAMING.md — repository and daemon naming conventions -- event/doc.go — detailed event semantics -- **Concrete example:** weatherfeeder (reference implementation) - ---- \ No newline at end of file +- enforce domain-specific event kinds +- own application lifecycle +- prescribe observability stack choices diff --git a/config/config.go b/config/config.go index bace862..7afef7d 100644 --- a/config/config.go +++ b/config/config.go @@ -21,20 +21,56 @@ type Config struct { Routes []RouteConfig `yaml:"routes"` } -// SourceConfig describes one polling job. +// SourceMode selects how a source receives upstream input. +// +// Empty mode means "auto": feedkit infers mode from the registered driver type. +type SourceMode string + +const ( + SourceModeAuto SourceMode = "" + SourceModePoll SourceMode = "poll" + SourceModeStream SourceMode = "stream" +) + +// Normalize lowercases and trims the mode. +func (m SourceMode) Normalize() SourceMode { + switch strings.ToLower(strings.TrimSpace(string(m))) { + case "": + return SourceModeAuto + case string(SourceModePoll): + return SourceModePoll + case string(SourceModeStream): + return SourceModeStream + default: + return SourceMode(strings.ToLower(strings.TrimSpace(string(m)))) + } +} + +// SourceConfig describes one input source. // // This is intentionally generic: // - driver-specific knobs belong in Params. -// - "kind" is allowed (useful for safety checks / routing), but feedkit does not -// restrict the allowed values. +// - mode controls polling vs streaming behavior. +// - expected emitted kinds are optional and domain-defined. type SourceConfig struct { Name string `yaml:"name"` Driver string `yaml:"driver"` // e.g. "openmeteo_observation", "rss_feed", etc. - Every Duration `yaml:"every"` // "15m", "1m", etc. + // Mode is optional: + // - "poll": Every must be set (>0) + // - "stream": Every must be omitted/zero + // - empty: infer from driver registration type (poll vs stream) + Mode SourceMode `yaml:"mode"` - // Kind is optional and domain-defined. If set, it should be a non-empty string. - // Domains commonly use it to enforce "this source should only emit kind X". + // Every is the poll cadence for poll-mode sources ("15m", "1m", etc.). + Every Duration `yaml:"every"` + + // Kinds is optional and domain-defined. + // If set, it describes the expected emitted event kinds for this source. + Kinds []string `yaml:"kinds"` + + // Kind is the legacy singular form. Prefer "kinds". + // If both kind and kinds are set, validation fails. Kind string `yaml:"kind"` // Params are driver-specific settings (URL, headers, station IDs, API keys, etc.). @@ -42,6 +78,26 @@ type SourceConfig struct { Params map[string]any `yaml:"params"` } +// ExpectedKinds returns normalized expected kinds from config. +// "kinds" takes precedence; "kind" is used as a legacy fallback. +func (cfg SourceConfig) ExpectedKinds() []string { + if len(cfg.Kinds) > 0 { + out := make([]string, 0, len(cfg.Kinds)) + for _, k := range cfg.Kinds { + k = strings.TrimSpace(k) + if k == "" { + continue + } + out = append(out, k) + } + return out + } + if k := strings.TrimSpace(cfg.Kind); k != "" { + return []string{k} + } + return nil +} + // SinkConfig describes one output sink adapter. type SinkConfig struct { Name string `yaml:"name"` diff --git a/config/config_test.go b/config/config_test.go new file mode 100644 index 0000000..6dd26f7 --- /dev/null +++ b/config/config_test.go @@ -0,0 +1,56 @@ +package config + +import ( + "reflect" + "testing" +) + +func TestSourceConfigExpectedKinds(t *testing.T) { + tests := []struct { + name string + cfg SourceConfig + want []string + }{ + { + name: "plural kinds preferred", + cfg: SourceConfig{ + Kinds: []string{" observation ", "forecast"}, + Kind: "alert", + }, + want: []string{"observation", "forecast"}, + }, + { + name: "legacy singular fallback", + cfg: SourceConfig{ + Kind: " alert ", + }, + want: []string{"alert"}, + }, + { + name: "empty kinds", + cfg: SourceConfig{}, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.cfg.ExpectedKinds() + if !reflect.DeepEqual(got, tt.want) { + t.Fatalf("ExpectedKinds() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func TestSourceModeNormalize(t *testing.T) { + if got := SourceMode(" Poll ").Normalize(); got != SourceModePoll { + t.Fatalf("Normalize poll = %q, want %q", got, SourceModePoll) + } + if got := SourceMode("STREAM").Normalize(); got != SourceModeStream { + t.Fatalf("Normalize stream = %q, want %q", got, SourceModeStream) + } + if got := SourceMode("").Normalize(); got != SourceModeAuto { + t.Fatalf("Normalize auto = %q, want %q", got, SourceModeAuto) + } +} diff --git a/config/load.go b/config/load.go index 0d3bdf8..120cf30 100644 --- a/config/load.go +++ b/config/load.go @@ -83,15 +83,41 @@ func (c *Config) Validate() error { m.Add(fieldErr(path+".driver", "is required (e.g. openmeteo_observation, rss_feed, ...)")) } - // Every (optional but if present must be >=0) - if s.Every.Duration < 0 { - m.Add(fieldErr(path+".every", "is optional, but must be a positive duration (e.g. 15m, 1m, 30s) if provided")) + // Mode + mode := s.Mode.Normalize() + if s.Mode != SourceModeAuto && mode != SourceModePoll && mode != SourceModeStream { + m.Add(fieldErr(path+".mode", `must be one of: "poll", "stream" (or omit for auto)`)) } - // Kind (optional but if present must be non-empty after trimming) + // Every + if s.Every.Duration < 0 { + m.Add(fieldErr(path+".every", "is optional, but must be a positive duration (e.g. 15m, 1m, 30s) if provided")) + } else { + switch mode { + case SourceModePoll: + if s.Every.Duration <= 0 { + m.Add(fieldErr(path+".every", `is required when mode="poll" (e.g. 15m, 1m, 30s)`)) + } + case SourceModeStream: + if s.Every.Duration > 0 { + m.Add(fieldErr(path+".every", `must be omitted when mode="stream"`)) + } + } + } + + // Kind/Kinds (optional) + if s.Kind != "" && len(s.Kinds) > 0 { + m.Add(fieldErr(path+".kind", `cannot be set when "kinds" is provided (use only "kinds")`)) + } if s.Kind != "" && strings.TrimSpace(s.Kind) == "" { m.Add(fieldErr(path+".kind", "cannot be blank (omit it entirely, or provide a non-empty string)")) } + for j, k := range s.Kinds { + kpath := fmt.Sprintf("%s.kinds[%d]", path, j) + if strings.TrimSpace(k) == "" { + m.Add(fieldErr(kpath, "kind cannot be empty")) + } + } // Params can be nil; that's fine. } diff --git a/config/validate_test.go b/config/validate_test.go index a7d14da..d4b09fd 100644 --- a/config/validate_test.go +++ b/config/validate_test.go @@ -46,3 +46,119 @@ func TestValidate_RouteKindsRejectsBlankEntries(t *testing.T) { t.Fatalf("expected error to mention blank kind entry, got: %v", err) } } + +func TestValidate_SourceModePollRequiresEvery(t *testing.T) { + cfg := &Config{ + Sources: []SourceConfig{ + {Name: "src1", Driver: "driver1", Mode: SourceModePoll}, + }, + Sinks: []SinkConfig{ + {Name: "sink1", Driver: "stdout"}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), `sources[0].every`) { + t.Fatalf("expected error to mention sources[0].every, got: %v", err) + } +} + +func TestValidate_SourceModeStreamRejectsEvery(t *testing.T) { + cfg := &Config{ + Sources: []SourceConfig{ + { + Name: "src1", + Driver: "driver1", + Mode: SourceModeStream, + Every: Duration{Duration: time.Minute}, + }, + }, + Sinks: []SinkConfig{ + {Name: "sink1", Driver: "stdout"}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), `sources[0].every`) { + t.Fatalf("expected error to mention sources[0].every, got: %v", err) + } +} + +func TestValidate_SourceModeRejectsUnknownValue(t *testing.T) { + cfg := &Config{ + Sources: []SourceConfig{ + { + Name: "src1", + Driver: "driver1", + Mode: SourceMode("batch"), + Every: Duration{Duration: time.Minute}, + }, + }, + Sinks: []SinkConfig{ + {Name: "sink1", Driver: "stdout"}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), `sources[0].mode`) { + t.Fatalf("expected error to mention sources[0].mode, got: %v", err) + } +} + +func TestValidate_SourceKindAndKindsConflict(t *testing.T) { + cfg := &Config{ + Sources: []SourceConfig{ + { + Name: "src1", + Driver: "driver1", + Every: Duration{Duration: time.Minute}, + Kind: "observation", + Kinds: []string{"forecast"}, + }, + }, + Sinks: []SinkConfig{ + {Name: "sink1", Driver: "stdout"}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), `sources[0].kind`) { + t.Fatalf("expected error to mention sources[0].kind, got: %v", err) + } +} + +func TestValidate_SourceKindsRejectBlankEntries(t *testing.T) { + cfg := &Config{ + Sources: []SourceConfig{ + { + Name: "src1", + Driver: "driver1", + Every: Duration{Duration: time.Minute}, + Kinds: []string{"observation", " "}, + }, + }, + Sinks: []SinkConfig{ + {Name: "sink1", Driver: "stdout"}, + }, + } + + err := cfg.Validate() + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), `sources[0].kinds[1]`) { + t.Fatalf("expected error to mention sources[0].kinds[1], got: %v", err) + } +} diff --git a/doc.go b/doc.go index a0b1937..940c638 100644 --- a/doc.go +++ b/doc.go @@ -1,334 +1,108 @@ -// Package feedkit provides domain-agnostic plumbing for "feed processing daemons". +// Package feedkit provides domain-agnostic plumbing for feed-processing daemons. // -// A feed daemon polls one or more upstream providers (HTTP APIs, RSS, etc.), -// converts upstream items into a normalized internal representation, applies -// lightweight policy (dedupe/rate-limit/filters), and emits events to one or -// more sinks (stdout, files, Postgres, brokers, ...). +// A feed daemon ingests upstream input, turns it into event.Event values, applies +// optional processing, and emits to sinks. // -// feedkit is intentionally NOT a framework. It supplies small, composable -// primitives that concrete daemons wire together in main.go (or via a small -// optional Runner helper, see "Future additions"). +// Conceptual flow: // -// Conceptual pipeline +// Collect -> Normalize (optional) -> Policy -> Route -> Emit // -// Collect → Normalize → Filter/Policy → Persist/Emit → Signal +// In feedkit this maps to: // -// In feedkit today, that maps to: +// Collect: sources + scheduler +// Normalize: normalize (optional pipeline stage) +// Policy: pipeline +// Route: dispatch +// Emit: sinks +// Config: config // -// Collect: sources.Source + scheduler.Scheduler -// Normalize: (optional) normalize.Processor (or domain code inside Source.Poll) -// Policy: pipeline.Pipeline (Processor chain; dedupe/ratelimit are planned) -// Emit: dispatch.Dispatcher + dispatch.Fanout -// Sinks: sinks.Sink (+ sinks.Registry to build from config) -// Config: config.Load + config.Config validation +// feedkit intentionally does not define domain payload schemas or domain-specific +// validation rules. Those belong in each concrete daemon. // -// Public packages (API surface) +// Public packages // // - config -// YAML configuration types and loader/validator. +// YAML config loading/validation (strict decode + domain-agnostic checks). // -// - config.Load(path) (*config.Config, error) +// SourceConfig supports both polling and streaming sources: // -// - config.Config: Sources, Sinks, Routes +// - mode: "poll" | "stream" | omitted (auto by driver type) // -// - config.SourceConfig / SinkConfig include Params map[string]any -// with convenience helpers like: +// - every: poll interval (required for mode="poll") // -// - ParamString / ParamStringDefault +// - kinds: optional expected emitted kinds // -// - ParamBool / ParamBoolDefault +// - kind: legacy singular fallback // -// - ParamInt / ParamIntDefault -// -// - ParamDuration / ParamDurationDefault -// -// - ParamStringSlice +// - params: driver-specific settings // // - event -// Domain-agnostic event envelope moved through the system. -// -// - event.Event includes ID, Kind, Source, timestamps, Schema, Payload -// -// - event.Kind is stringly typed; event.ParseKind normalizes/validates. +// Domain-agnostic event envelope (ID, Kind, Source, EmittedAt, Schema, Payload). // // - sources -// Extension point for domain-specific polling jobs. +// Source abstractions and source-driver registry. // -// - sources.Source interface: Name(), Kind(), Poll(ctx) +// There are two source interfaces: // -// - sources.Registry lets daemons register driver factories and build -// sources from config.SourceConfig. +// - PollSource: Poll(ctx) ([]event.Event, error) +// +// - StreamSource: Run(ctx, out) error +// +// Both share Input{Name()}. A source may emit 0..N events per poll/run step, +// and may emit multiple event kinds. // // - scheduler -// Runs sources on a cadence and publishes emitted events onto a channel. +// Runs one goroutine per job: // -// - scheduler.Scheduler{Jobs, Out, Logf}.Run(ctx) +// - PollSource jobs run on Every (+ jitter) // -// - scheduler.Job: {Source, Every, Jitter} +// - StreamSource jobs run continuously // // - pipeline -// Optional processing chain between scheduler and dispatch. -// -// - pipeline.Pipeline{Processors}.Process(ctx, event) -// -// - pipeline.Processor can mutate, drop (return nil), or error. -// -// - dedupe/ratelimit processors are placeholders (planned). +// Processor chain between scheduler and dispatch. +// Processors can transform, drop, or reject events. // // - normalize -// Optional normalization hook for splitting "fetch" from "transform". -// -// Many domains (like weather) ingest multiple upstream providers whose payloads -// differ. A common evolution is to keep sources small and focused on polling, -// and move mapping/normalization into a dedicated stage. -// -// feedkit provides this as an OPTIONAL pipeline processor: -// -// - normalize.Normalizer: domain-implemented mapping logic -// -// - normalize.Registry: holds normalizers and selects one by Match() -// -// - normalize.Processor: adapts Registry into a pipeline.Processor -// -// Normalization is NOT required: -// -// - If you do all normalization inside Source.Poll, you can ignore this package. -// -// - If normalize.Processor is not installed in your pipeline, nothing changes. -// -// - If normalize.Processor is installed but no Normalizer matches an event, -// the event passes through unchanged. -// -// The key types: -// -// type Normalizer interface { -// // Match returns true if this normalizer should handle the event. -// // Matching is intentionally flexible: match on Schema, Kind, Source, -// // or any combination. -// Match(e event.Event) bool -// -// // Normalize converts the incoming event into a new (or modified) event. -// // -// // Return values: -// // - (out, nil) where out != nil: emit the normalized event -// // - (nil, nil): drop the event (policy drop) -// // - (nil, err): fail the pipeline -// Normalize(ctx context.Context, in event.Event) (*event.Event, error) -// } -// -// type Registry struct { ... } -// -// func (r *Registry) Register(n Normalizer) -// -// // Normalize finds the first matching normalizer (in registration order) and applies it. -// // If none match, it returns the input event unchanged. -// func (r *Registry) Normalize(ctx context.Context, in event.Event) (*event.Event, error) -// -// // Processor implements pipeline.Processor and calls into the Registry. -// // Optional behavior: -// // - If Registry is nil, Processor is a no-op pass-through. -// // - If RequireMatch is false (default), non-matching events pass through. -// // - If RequireMatch is true, non-matching events are treated as errors. -// type Processor struct { -// Registry *Registry -// RequireMatch bool -// } -// -// "First match wins": -// Registry applies the first Normalizer whose Match() returns true. -// This is intentional: normalization is usually a single mapping step from a -// raw schema into a canonical schema. If you want multiple sequential transforms, -// model them as multiple pipeline processors. -// -// Recommended convention: match by Event.Schema -// ------------------------------------------------ -// Schema gives you a versionable selector that doesn't depend on source names. -// -// A common pattern is: -// -// - sources emit "raw" events with Schema like: -// "raw.openweather.current.v1" -// "raw.openmeteo.current.v1" -// "raw.nws.observation.v1" -// -// - normalizers transform them into canonical domain schemas like: -// "weather.observation.v1" -// "weather.forecast.v1" -// "weather.alert.v1" -// -// What is a "raw event"? -// ------------------------------------------------ -// feedkit does not prescribe the raw payload representation. -// A raw payload is typically one of: -// -// - json.RawMessage (recommended for JSON APIs) -// -// - []byte (raw bytes) -// -// - map[string]any (already-decoded but untyped JSON) -// -// The only hard requirement enforced by feedkit is Event.Validate(): -// -// - ID, Kind, Source, EmittedAt must be set -// -// - Payload must be non-nil -// -// If you use raw events, you still must provide Event.Kind. -// Typical approaches: -// -// - set Kind to the intended canonical kind (e.g. "observation") even before normalization -// -// - or set Kind to a domain-defined "raw_*" kind and normalize it later -// -// The simplest approach is: set Kind to the final kind early, and use Schema -// to describe the raw-vs-normalized payload shape. -// -// Wiring example (daemon main.go) -// ------------------------------------------------ -// Install normalize.Processor at the front of your pipeline: -// -// normReg := &normalize.Registry{} -// -// normReg.Register(normalize.Func{ -// Name: "openweather current -> weather.observation.v1", -// MatchFn: func(e event.Event) bool { -// return e.Schema == "raw.openweather.current.v1" -// }, -// NormalizeFn: func(ctx context.Context, in event.Event) (*event.Event, error) { -// // 1) interpret in.Payload (json.RawMessage / []byte / map) -// // 2) build canonical domain payload -// // 3) return updated event -// -// out := in -// out.Schema = "weather.observation.v1" -// // Optionally adjust Kind, EffectiveAt, etc. -// out.Payload = /* canonical weather observation struct */ -// return &out, nil -// }, -// }) -// -// p := &pipeline.Pipeline{ -// Processors: []pipeline.Processor{ -// normalize.Processor{Registry: normReg}, // optional stage -// // dedupe.New(...), ratelimit.New(...), ... -// }, -// } -// -// If the event does not match any normalizer, it passes through unmodified. -// -// - sinks -// Extension point for output adapters. -// -// - sinks.Sink interface: Name(), Consume(ctx, event) -// -// - sinks.Registry to register driver factories and build sinks from config -// -// - sinks.RegisterBuiltins registers feedkit-provided sink drivers -// (stdout/file/postgres/rabbitmq; some are currently stubs). +// Optional pipeline processor for raw->canonical mapping. +// If no normalizer matches, the event passes through unchanged by default. // // - dispatch -// Routes processed events to sinks, and isolates slow sinks via per-sink queues. +// Routes events to sinks and isolates slow sinks via per-sink queues/workers. // -// - dispatch.Dispatcher{In, Pipeline, Sinks, Routes, ...}.Run(ctx, logf) +// - sinks +// Sink abstractions + sink registry. // -// - dispatch.Fanout: one buffered queue + worker goroutine per sink +// Typical wiring (daemon main.go) // -// - dispatch.CompileRoutes(*config.Config) compiles cfg.Routes into []dispatch.Route. -// If routes: is omitted, it defaults to "all sinks receive all kinds". If a route -// omits kinds: (or sets it empty), that route matches all kinds. +// 1. Load config. +// 2. Register source drivers and build sources from config.Sources. +// 3. Register sink drivers and build sinks from config.Sinks. +// 4. Compile routes. +// 5. Start scheduler (sources -> bus) and dispatcher (bus -> pipeline -> sinks). // -// - logging -// Shared logger type used across feedkit packages. -// -// - logging.Logf is a printf-style logger signature. -// -// Typical wiring (what a daemon does in main.go) -// -// 1. Load config (domain code may add domain-specific validation). -// 2. Register and build sources from config.Sources using sources.Registry. -// 3. Register and build sinks from config.Sinks using sinks.Registry. -// 4. Compile routes (typically via dispatch.CompileRoutes). -// 5. Create an event bus channel. -// 6. Start scheduler (sources → bus). -// 7. Start dispatcher (bus → pipeline → routes → sinks). -// -// A sketch: +// Sketch: // // cfg, _ := config.Load("config.yml") -// -// // Build sources (domain registers its drivers). // srcReg := sources.NewRegistry() -// // domain: srcReg.Register("openweather_observation", newOpenWeatherSource) -// // ... +// // domain registers poll/stream drivers... // // var jobs []scheduler.Job // for _, sc := range cfg.Sources { -// src, _ := srcReg.Build(sc) -// jobs = append(jobs, scheduler.Job{Source: src, Every: sc.Every.Duration}) +// src, _ := srcReg.BuildInput(sc) +// jobs = append(jobs, scheduler.Job{ +// Source: src, +// Every: sc.Every.Duration, +// }) // } // -// // Build sinks (feedkit can register builtins). -// sinkReg := sinks.NewRegistry() -// sinks.RegisterBuiltins(sinkReg) -// builtSinks := map[string]sinks.Sink{} -// for _, sk := range cfg.Sinks { -// s, _ := sinkReg.Build(sk) -// builtSinks[sk.Name] = s -// } -// -// // Compile routes. -// routes, _ := dispatch.CompileRoutes(cfg) -// -// // Event bus. // bus := make(chan event.Event, 256) -// -// // Optional normalization registry + pipeline. -// normReg := &normalize.Registry{} -// // domain registers normalizers into normReg... -// -// p := &pipeline.Pipeline{ -// Processors: []pipeline.Processor{ -// normalize.Processor{Registry: normReg}, // optional -// // dedupe/ratelimit/etc... -// }, -// } -// -// // Scheduler. // s := &scheduler.Scheduler{Jobs: jobs, Out: bus, Logf: logf} -// -// // Dispatcher. -// d := &dispatch.Dispatcher{ -// In: bus, -// Pipeline: p, -// Sinks: builtSinks, -// Routes: routes, -// } -// -// go s.Run(ctx) -// return d.Run(ctx, logf) -// -// Conventions (recommended, not required) -// -// - Event.ID should be stable for dedupe/storage (often ":"). -// - Event.Kind should be lowercase ("observation", "alert", "article", ...). -// - Event.Schema should identify the payload shape/version -// (e.g. "weather.observation.v1"). +// // start dispatcher similarly... // // # Context and cancellation // -// All blocking or I/O work should honor ctx.Done(): -// - sources.Source.Poll should pass ctx to HTTP calls, etc. -// - sinks.Sink.Consume should honor ctx (Fanout timeouts only help if sinks cooperate). -// - normalizers should honor ctx if they do expensive work (rare; usually pure transforms). -// -// Future additions (likely) -// -// - A small Runner helper that performs the standard wiring (load config, -// build sources/sinks/routes, run scheduler+dispatcher, handle shutdown). -// -// # Non-goals -// -// feedkit does not define domain payload schemas, does not enforce domain kinds, -// and does not embed domain-specific validation rules. Those live in each -// concrete daemon/module (weatherfeeder, newsfeeder, ...). +// All blocking work should honor context cancellation: +// - source polling/streaming I/O +// - sink consumption +// - any expensive processor work package feedkit diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7d10fc9..858b07b 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -20,7 +20,7 @@ type Logger = logging.Logf // Job describes one scheduler task. // // A Job may be backed by either: -// - a polling source (sources.Source): uses Every + jitter and calls Poll() +// - a polling source (sources.PollSource): uses Every + jitter and calls Poll() // - a stream source (sources.StreamSource): ignores Every and calls Run() // // Jitter behavior: @@ -80,7 +80,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { } // Poll sources: time-based. - ps, ok := job.Source.(sources.Source) + ps, ok := job.Source.(sources.PollSource) if !ok { s.logf("scheduler: source %T (%s) implements neither Poll() nor Run()", job.Source, job.Source.Name()) return @@ -108,7 +108,7 @@ func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSo } } -func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.Source) { +func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.PollSource) { // Compute jitter: either configured per job, or a sensible default. jitter := effectiveJitter(job.Every, job.Jitter) @@ -141,7 +141,7 @@ func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.Source) } } -func (s *Scheduler) pollOnce(ctx context.Context, src sources.Source) { +func (s *Scheduler) pollOnce(ctx context.Context, src sources.PollSource) { events, err := src.Poll(ctx) if err != nil { s.logf("scheduler: poll failed (%s): %v", src.Name(), err) diff --git a/sources/doc.go b/sources/doc.go new file mode 100644 index 0000000..939eb48 --- /dev/null +++ b/sources/doc.go @@ -0,0 +1,14 @@ +// Package sources defines feedkit's input-source abstraction. +// +// A source ingests upstream input and emits one or more event.Event values. +// +// feedkit supports two source modes: +// - PollSource: scheduler invokes Poll on a cadence. +// - StreamSource: source runs continuously and pushes events as input arrives. +// +// Source drivers are domain-specific and registered into Registry by driver name. +// Registry can then build configured sources from config.SourceConfig. +// +// A single source may emit 0..N events per poll or stream iteration, and those +// events may span multiple event kinds. +package sources diff --git a/sources/registry.go b/sources/registry.go index e545a40..c141933 100644 --- a/sources/registry.go +++ b/sources/registry.go @@ -7,22 +7,25 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" ) -// Factory constructs a configured Source instance from config. +// PollFactory constructs a configured PollSource instance from config. // // This is how concrete daemons (weatherfeeder/newsfeeder/...) register their // domain-specific source drivers (Open-Meteo, NWS, RSS, etc.) while feedkit // remains domain-agnostic. -type Factory func(cfg config.SourceConfig) (Source, error) +type PollFactory func(cfg config.SourceConfig) (PollSource, error) type StreamFactory func(cfg config.SourceConfig) (StreamSource, error) +// Factory is the legacy alias for poll source factories. +type Factory = PollFactory + type Registry struct { - byDriver map[string]Factory + byPollDriver map[string]PollFactory byStreamDriver map[string]StreamFactory } func NewRegistry() *Registry { return &Registry{ - byDriver: map[string]Factory{}, + byPollDriver: map[string]PollFactory{}, byStreamDriver: map[string]StreamFactory{}, } } @@ -30,20 +33,28 @@ func NewRegistry() *Registry { // Register associates a driver name (e.g. "openmeteo_observation") with a factory. // // The driver string is the "lookup key" used by config.sources[].driver. -func (r *Registry) Register(driver string, f Factory) { +func (r *Registry) Register(driver string, f PollFactory) { + r.RegisterPoll(driver, f) +} + +// RegisterPoll associates a driver name with a polling-source factory. +func (r *Registry) RegisterPoll(driver string, f PollFactory) { driver = strings.TrimSpace(driver) if driver == "" { // Panic is appropriate here: registering an empty driver is always a programmer error, // and it will lead to extremely confusing runtime behavior if allowed. - panic("sources.Registry.Register: driver cannot be empty") + panic("sources.Registry.RegisterPoll: driver cannot be empty") } if f == nil { - panic(fmt.Sprintf("sources.Registry.Register: factory cannot be nil (driver=%q)", driver)) + panic(fmt.Sprintf("sources.Registry.RegisterPoll: factory cannot be nil (driver=%q)", driver)) } if _, exists := r.byStreamDriver[driver]; exists { - panic(fmt.Sprintf("sources.Registry.Register: driver %q already registered as a stream source", driver)) + panic(fmt.Sprintf("sources.Registry.RegisterPoll: driver %q already registered as a stream source", driver)) } - r.byDriver[driver] = f + if _, exists := r.byPollDriver[driver]; exists { + panic(fmt.Sprintf("sources.Registry.RegisterPoll: driver %q already registered as a polling source", driver)) + } + r.byPollDriver[driver] = f } // RegisterStream is the StreamSource equivalent of Register. @@ -55,28 +66,71 @@ func (r *Registry) RegisterStream(driver string, f StreamFactory) { if f == nil { panic(fmt.Sprintf("sources.Registry.RegisterStream: factory cannot be nil (driver=%q)", driver)) } - if _, exists := r.byDriver[driver]; exists { + if _, exists := r.byPollDriver[driver]; exists { panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a polling source", driver)) } + if _, exists := r.byStreamDriver[driver]; exists { + panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a stream source", driver)) + } r.byStreamDriver[driver] = f } -// Build constructs a Source from a SourceConfig by looking up cfg.Driver. -func (r *Registry) Build(cfg config.SourceConfig) (Source, error) { - f, ok := r.byDriver[cfg.Driver] +// Build constructs a polling source from a SourceConfig by looking up cfg.Driver. +func (r *Registry) Build(cfg config.SourceConfig) (PollSource, error) { + return r.BuildPoll(cfg) +} + +// BuildPoll constructs a polling source from a SourceConfig by looking up cfg.Driver. +func (r *Registry) BuildPoll(cfg config.SourceConfig) (PollSource, error) { + driver := strings.TrimSpace(cfg.Driver) + if cfg.Mode.Normalize() == config.SourceModeStream { + return nil, fmt.Errorf("source %q mode=stream cannot be built as polling source", cfg.Name) + } + + f, ok := r.byPollDriver[driver] if !ok { - return nil, fmt.Errorf("unknown source driver: %q", cfg.Driver) + if _, streamExists := r.byStreamDriver[driver]; streamExists { + return nil, fmt.Errorf("source driver %q is stream-only; cannot build as polling source", driver) + } + return nil, fmt.Errorf("unknown source driver: %q", driver) } return f(cfg) } // BuildInput can return either a polling Source or a StreamSource. func (r *Registry) BuildInput(cfg config.SourceConfig) (Input, error) { - if f, ok := r.byStreamDriver[cfg.Driver]; ok { + driver := strings.TrimSpace(cfg.Driver) + mode := cfg.Mode.Normalize() + if mode != config.SourceModeAuto && mode != config.SourceModePoll && mode != config.SourceModeStream { + return nil, fmt.Errorf("source %q has invalid mode %q (expected \"poll\" or \"stream\")", cfg.Name, cfg.Mode) + } + + switch mode { + case config.SourceModePoll: + f, ok := r.byPollDriver[driver] + if !ok { + if _, streamExists := r.byStreamDriver[driver]; streamExists { + return nil, fmt.Errorf("source %q mode=poll conflicts with stream-only driver %q", cfg.Name, driver) + } + return nil, fmt.Errorf("unknown source driver: %q", driver) + } + return f(cfg) + case config.SourceModeStream: + f, ok := r.byStreamDriver[driver] + if !ok { + if _, pollExists := r.byPollDriver[driver]; pollExists { + return nil, fmt.Errorf("source %q mode=stream conflicts with polling driver %q", cfg.Name, driver) + } + return nil, fmt.Errorf("unknown source driver: %q", driver) + } return f(cfg) } - if f, ok := r.byDriver[cfg.Driver]; ok { + + if f, ok := r.byStreamDriver[driver]; ok { return f(cfg) } - return nil, fmt.Errorf("unknown source driver: %q", cfg.Driver) + if f, ok := r.byPollDriver[driver]; ok { + return f(cfg) + } + return nil, fmt.Errorf("unknown source driver: %q", driver) } diff --git a/sources/registry_test.go b/sources/registry_test.go new file mode 100644 index 0000000..77e518c --- /dev/null +++ b/sources/registry_test.go @@ -0,0 +1,84 @@ +package sources + +import ( + "context" + "strings" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type testPollSource struct{ name string } + +func (s testPollSource) Name() string { return s.name } +func (s testPollSource) Poll(context.Context) ([]event.Event, error) { + return nil, nil +} + +type testStreamSource struct{ name string } + +func (s testStreamSource) Name() string { return s.name } +func (s testStreamSource) Run(context.Context, chan<- event.Event) error { + return nil +} + +func TestRegistryBuildInputModeConflicts(t *testing.T) { + r := NewRegistry() + r.RegisterPoll("poll_driver", func(cfg config.SourceConfig) (PollSource, error) { + return testPollSource{name: cfg.Name}, nil + }) + r.RegisterStream("stream_driver", func(cfg config.SourceConfig) (StreamSource, error) { + return testStreamSource{name: cfg.Name}, nil + }) + + _, err := r.BuildInput(config.SourceConfig{ + Name: "s1", + Driver: "stream_driver", + Mode: config.SourceModePoll, + }) + if err == nil { + t.Fatalf("expected mode conflict error, got nil") + } + if !strings.Contains(err.Error(), "mode=poll") { + t.Fatalf("expected poll conflict error, got: %v", err) + } + + _, err = r.BuildInput(config.SourceConfig{ + Name: "s2", + Driver: "poll_driver", + Mode: config.SourceModeStream, + }) + if err == nil { + t.Fatalf("expected mode conflict error, got nil") + } + if !strings.Contains(err.Error(), "mode=stream") { + t.Fatalf("expected stream conflict error, got: %v", err) + } +} + +func TestRegistryBuildInputAutoByDriverType(t *testing.T) { + r := NewRegistry() + r.RegisterPoll("poll_driver", func(cfg config.SourceConfig) (PollSource, error) { + return testPollSource{name: cfg.Name}, nil + }) + r.RegisterStream("stream_driver", func(cfg config.SourceConfig) (StreamSource, error) { + return testStreamSource{name: cfg.Name}, nil + }) + + src, err := r.BuildInput(config.SourceConfig{Name: "p", Driver: "poll_driver"}) + if err != nil { + t.Fatalf("BuildInput poll auto failed: %v", err) + } + if _, ok := src.(PollSource); !ok { + t.Fatalf("expected PollSource, got %T", src) + } + + src, err = r.BuildInput(config.SourceConfig{Name: "s", Driver: "stream_driver"}) + if err != nil { + t.Fatalf("BuildInput stream auto failed: %v", err) + } + if _, ok := src.(StreamSource); !ok { + t.Fatalf("expected StreamSource, got %T", src) + } +} diff --git a/sources/source.go b/sources/source.go index c0254ae..908df44 100644 --- a/sources/source.go +++ b/sources/source.go @@ -7,34 +7,33 @@ import ( ) // Input is the common surface shared by all source types. +// +// A source may be polling (PollSource) or event-driven (StreamSource). +// Both source types emit domain-agnostic event.Event values. type Input interface { Name() string - Kind() event.Kind } -// Source is a configured polling job that emits 0..N events per poll. +// PollSource is a configured polling source that emits 0..N events per poll. // -// Source implementations live in domain modules (weatherfeeder/newsfeeder/...) +// PollSource implementations live in domain modules (weatherfeeder/newsfeeder/...) // and are registered into a feedkit sources.Registry. // -// feedkit infrastructure treats Source as opaque; it just calls Poll() +// feedkit infrastructure treats PollSource as opaque; it just calls Poll() // on the configured cadence and publishes the resulting events. -type Source interface { +type PollSource interface { // Name is the configured source name (used for logs and included in emitted events). Name() string - // Kind is the "primary kind" emitted by this source. - // - // This is mainly useful as a *safety check* (e.g. config says kind=forecast but - // driver emits observation). Some future sources may emit multiple kinds; if/when - // that happens, we can evolve this interface (e.g., make Kind optional, or remove it). - Kind() event.Kind - - // Poll fetches from upstream and returns 0..N events. + // Poll fetches/processes one input batch and returns 0..N events. + // A single poll can emit multiple event kinds. // Implementations should honor ctx.Done() for network calls and other I/O. Poll(ctx context.Context) ([]event.Event, error) } +// Source is a compatibility alias for the legacy polling-source name. +type Source = PollSource + // StreamSource is an event-driven source (NATS/RabbitMQ/MQTT/etc). // // Run should block, producing events into `out` until ctx is cancelled or a fatal error occurs. @@ -43,3 +42,14 @@ type StreamSource interface { Input Run(ctx context.Context, out chan<- event.Event) error } + +// KindSource is an optional interface for sources that advertise one "primary" kind. +// This is legacy-friendly but no longer required. +type KindSource interface { + Kind() event.Kind +} + +// KindsSource is an optional interface for sources that advertise multiple kinds. +type KindsSource interface { + Kinds() []event.Kind +} diff --git a/transport/http.go b/transport/http.go index d844516..03c1b48 100644 --- a/transport/http.go +++ b/transport/http.go @@ -14,7 +14,7 @@ import ( // or malicious large responses. const maxResponseBodyBytes = 2 << 21 // 4 MiB -// DefaultHTTPTimeout is the standard timeout used by weatherfeeder HTTP sources. +// DefaultHTTPTimeout is the standard timeout used by HTTP sources. // Individual drivers may override this if they have a specific need. const DefaultHTTPTimeout = 10 * time.Second