From 32813689221971f679f22261c1fc1761e3900a29 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Sat, 28 Mar 2026 13:02:37 -0500 Subject: [PATCH] Cleaned up documentation and removed stubs and TODOs throughout the application --- README.md | 177 +++++++++++++++--------------------- config/config.go | 2 +- config/load.go | 2 +- doc.go | 136 +++++++-------------------- pipeline/ratelimit.go | 5 - processors/normalize/doc.go | 22 +++-- scheduler/worker.go | 7 -- sinks/builtins.go | 33 +------ sinks/doc.go | 17 +++- sinks/file.go | 30 ------ sinks/helpers.go | 19 ++++ sinks/nats.go | 12 +-- sinks/nats_test.go | 47 ++++++++++ sinks/registry.go | 36 +++++++- sinks/registry_test.go | 126 +++++++++++++++++++++++++ sources/doc.go | 20 ++-- sources/http.go | 35 ++----- sources/http_test.go | 22 +++++ 18 files changed, 403 insertions(+), 345 deletions(-) delete mode 100644 pipeline/ratelimit.go delete mode 100644 scheduler/worker.go delete mode 100644 sinks/file.go create mode 100644 sinks/nats_test.go create mode 100644 sinks/registry_test.go diff --git a/README.md b/README.md index 73a302f..3bfa696 100644 --- a/README.md +++ b/README.md @@ -1,127 +1,92 @@ # feedkit -`feedkit` provides domain-agnostic plumbing for feed-processing daemons. +`feedkit` is a small Go toolkit for building feed-processing daemons. -A daemon built on feedkit typically: -- ingests upstream input (polling APIs or consuming streams) +It gives you the reusable plumbing around collection, processing, routing, and +emission, while leaving domain concepts, schemas, and application wiring in +your daemon. The intended shape is a family of sibling applications such as +`weatherfeeder`, `newsfeeder`, or `earthquakefeeder` that all share the same +infrastructure patterns without sharing domain logic. + +## What It Does + +A daemon built on `feedkit` typically: +- ingests upstream input by polling HTTP APIs or consuming streams - emits domain-agnostic `event.Event` values -- applies optional processing (normalization, dedupe, policy) -- routes events to sinks (stdout, NATS, files, databases, etc.) +- optionally processes those events with stages like dedupe or normalization +- routes events to one or more sinks such as stdout, NATS, or Postgres + +Conceptually, the pipeline is: + +`Collect -> Process -> Route -> Emit` ## Philosophy -feedkit is not a framework. It provides small composable packages and leaves -lifecycle, domain schemas, and domain-specific validation in your daemon. +`feedkit` is intentionally not a framework. -## Conceptual pipeline +It does not try to own: +- your domain payload schemas +- your domain event kinds +- your daemon lifecycle or `main.go` +- your observability stack or deployment model -Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit +Instead, it provides small composable packages that are easy to wire together in +different daemons. -| Stage | Package(s) | -|---|---| -| Collect | `sources`, `scheduler` | -| Process | `pipeline`, `processors`, `processors/dedupe`, `processors/normalize` (optional stages) | -| Route | `dispatch` | -| Emit | `sinks` | -| Configure | `config` | +## When To Use It -## Core packages +`feedkit` is a good fit when you want: +- multiple small ingestion daemons with shared infrastructure patterns +- clear separation between raw upstream payloads and normalized canonical models +- reusable routing and sink behavior across domains +- strong config and event-envelope conventions without centralizing domain rules -### `config` +It is a poor fit if you want a monolithic framework that dictates application +structure end-to-end. -Loads YAML config with strict decoding and domain-agnostic validation. +## Built-In Capabilities -`SourceConfig` supports both source modes: -- `mode: poll` requires `every` -- `mode: stream` forbids `every` -- omitted `mode` means auto (inferred from the registered driver type) +`feedkit` currently includes: +- strict YAML config loading and validation +- polling and streaming source abstractions +- scheduler orchestration for configured sources +- optional pipeline processors +- built-in dedupe and normalization processors +- route compilation and sink fanout +- built-in sinks for `stdout`, `nats`, and `postgres` -It also supports optional expected source kinds: -- `kinds: ["observation", "alert"]` (preferred) -- `kind: "observation"` (legacy fallback) +The Postgres sink is intentionally split between feedkit-owned infrastructure +and daemon-owned schema mapping. `feedkit` manages connection setup, DDL, +writes, and pruning; downstream applications define the schema and event mapper. -### `event` +## Typical Wiring -Defines the domain-agnostic event envelope (`event.Event`) used across the system. - -### `sources` - -Defines source interfaces and driver registry: - -```go -type Input interface { - Name() string -} - -type PollSource interface { - Input - Poll(ctx context.Context) ([]event.Event, error) -} - -type StreamSource interface { - Input - Run(ctx context.Context, out chan<- event.Event) error -} -``` - -Notes: -- a poll can emit `0..N` events -- stream sources emit events continuously -- a single source may emit multiple event kinds -- driver implementations live in downstream daemons and are registered via `sources.Registry` - -### `scheduler` - -Runs one goroutine per source job: -- poll sources: cadence driven (`every` + jitter) -- stream sources: continuous run loop - -### `pipeline` - -Optional processing chain between collection and dispatch. -Processors can transform, drop, or reject events. - -### `processors` - -Defines the generic processor interface and a named-driver registry used by -daemons to build ordered processor chains. - -### `processors/dedupe` - -Built-in in-memory LRU dedupe processor that drops repeated events by `Event.ID`. - -### `processors/normalize` - -Concrete normalization processor implementation. Typical use: sources emit raw -payload events, then a normalize stage maps them to canonical schemas. - -### `dispatch` - -Compiles routes and fans out events to sinks with per-sink queue/worker isolation. - -### `sinks` - -Defines sink interface and sink registry. Built-ins include: -- `stdout` -- `nats` -- `postgres` - -Detailed Postgres configuration and wiring examples live in package docs: -`sinks/doc.go`. - -## Typical wiring +At a high level, a daemon built on `feedkit` does this: 1. Load config. -2. Register/build sources from `cfg.Sources`. -3. Register/build sinks from `cfg.Sinks`. -4. Compile routes. -5. Start scheduler (`sources -> bus`). -6. Start dispatcher (`bus -> pipeline -> sinks`). +2. Register domain-specific source drivers. +3. Register built-in and/or custom sinks. +4. Build sources, sinks, and optional processor chain from config. +5. Compile routes. +6. Start the scheduler and dispatcher. -## Non-goals +The package docs are the better source of truth for code-level details. In +particular, each subpackage `doc.go` describes its external API surface and any +optional helper APIs in `helpers.go`. -feedkit intentionally does not: -- define domain payload schemas -- enforce domain-specific event kinds -- own application lifecycle -- prescribe observability stack choices +## Package Layout + +The major packages are: +- `config`: config loading and validation +- `event`: the domain-agnostic event envelope +- `sources`: source interfaces and reusable source helpers +- `scheduler`: source execution and cadence management +- `processors`: processor interfaces and registry +- `processors/dedupe`: built-in in-memory dedupe processor +- `processors/normalize`: built-in normalization processor and helpers +- `pipeline`: optional processor chain +- `dispatch`: route compilation and fanout +- `sinks`: sink interfaces, built-ins, and Postgres registration helpers + +The root package docs in `doc.go` provide a concise package-by-package map for +Go documentation consumers. diff --git a/config/config.go b/config/config.go index 7afef7d..f5f8468 100644 --- a/config/config.go +++ b/config/config.go @@ -101,7 +101,7 @@ func (cfg SourceConfig) ExpectedKinds() []string { // SinkConfig describes one output sink adapter. type SinkConfig struct { Name string `yaml:"name"` - Driver string `yaml:"driver"` // "stdout", "file", "postgres", "rabbitmq", ... + Driver string `yaml:"driver"` // "stdout", "nats", "postgres", ... Params map[string]any `yaml:"params"` // sink-specific settings } diff --git a/config/load.go b/config/load.go index 120cf30..1c15f86 100644 --- a/config/load.go +++ b/config/load.go @@ -141,7 +141,7 @@ func (c *Config) Validate() error { } if strings.TrimSpace(s.Driver) == "" { - m.Add(fieldErr(path+".driver", "is required (stdout|file|postgres|rabbitmq|...)")) + m.Add(fieldErr(path+".driver", "is required (stdout|nats|postgres|...)")) } // Params can be nil; that's fine. diff --git a/doc.go b/doc.go index 047cff0..7d077e1 100644 --- a/doc.go +++ b/doc.go @@ -1,130 +1,56 @@ -// Package feedkit provides domain-agnostic plumbing for feed-processing daemons. +// Package feedkit provides a high-level map of the feedkit package set. // -// A feed daemon ingests upstream input, turns it into event.Event values, applies -// optional processing, and emits to sinks. +// Most real applications do not import the root package directly. Instead, they +// compose the subpackages that handle configuration, collection, processing, +// routing, and sinks. // -// Conceptual flow: +// The usual flow through feedkit is: // -// Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit +// Collect -> Process -> Route -> Emit // -// In feedkit this maps to: -// -// Collect: sources + scheduler -// Process: pipeline + processors + processors/dedupe + processors/normalize (optional stages) -// Route: dispatch -// Emit: sinks -// Config: config -// -// feedkit intentionally does not define domain payload schemas or domain-specific -// validation rules. Those belong in each concrete daemon. -// -// Public packages +// That flow maps to packages like this: // // - config -// YAML config loading/validation (strict decode + domain-agnostic checks). -// -// SourceConfig supports both polling and streaming sources: -// -// - mode: "poll" | "stream" | omitted (auto by driver type) -// -// - every: poll interval (required for mode="poll") -// -// - kinds: optional expected emitted kinds -// -// - kind: legacy singular fallback -// -// - params: driver-specific settings +// Loads and validates daemon config. This package owns domain-agnostic +// config shape and consistency checks. // // - event -// Domain-agnostic event envelope (ID, Kind, Source, EmittedAt, Schema, Payload). +// Defines the event.Event envelope shared across sources, processors, +// dispatch, and sinks. // // - sources -// Source abstractions and source-driver registry. -// -// There are two source interfaces: -// -// - PollSource: Poll(ctx) ([]event.Event, error) -// -// - StreamSource: Run(ctx, out) error -// -// Both share Input{Name()}. A source may emit 0..N events per poll/run step, -// and may emit multiple event kinds. -// -// For HTTP-backed polling sources, sources.NewHTTPSource provides a shared -// helper for generic params: -// -// - params.url -// -// - params.user_agent -// -// - params.conditional (optional, default true) -// -// When conditional polling is enabled, feedkit opportunistically uses ETag -// and Last-Modified validators. A 304 Not Modified response is treated as a -// successful poll that emits no events. +// Defines polling and streaming source interfaces, the source registry, and +// reusable source helpers. // // - scheduler -// Runs one goroutine per job: -// -// - PollSource jobs run on Every (+ jitter) -// -// - StreamSource jobs run continuously -// -// - pipeline -// Processor chain between scheduler and dispatch. -// Processors can transform, drop, or reject events. +// Runs configured sources on a cadence or as long-lived stream workers. // // - processors -// Generic processor interface and named factory registry for wiring chains. +// Defines the generic processor interface and registry used to build +// ordered processor chains. // // - processors/dedupe -// Built-in in-memory LRU dedupe processor keyed by Event.ID. +// Built-in in-memory dedupe processor keyed by Event.ID. // // - processors/normalize -// Concrete pipeline processor for raw->canonical mapping. -// If no normalizer matches, the event passes through unchanged by default. +// Built-in normalization processor plus helper APIs for raw-to-canonical +// event mapping. +// +// - pipeline +// Applies an ordered processor chain between collection and dispatch. // // - dispatch -// Routes events to sinks and isolates slow sinks via per-sink queues/workers. +// Compiles routes and fans events out to sinks with per-sink isolation. // // - sinks -// Sink abstractions + sink registry. -// Built-ins include stdout, NATS, and Postgres. For Postgres, downstream -// code registers table schemas/mappers while feedkit manages DDL, writes, -// optional automatic retention pruning (via sink params.prune), and -// manual prune helpers. Postgres table schemas must declare PruneColumn. +// Defines sink interfaces, the sink registry, built-in sinks, and Postgres +// schema registration helpers. // -// Typical wiring (daemon main.go) +// feedkit is intentionally domain-agnostic. Domain schemas, domain event kinds, +// upstream-specific parsing, and daemon lifecycle remain the responsibility of +// each concrete application. // -// 1. Load config. -// 2. Register source drivers and build sources from config.Sources. -// 3. Register sink drivers and build sinks from config.Sinks. -// 4. Compile routes. -// 5. Start scheduler (sources -> bus) and dispatcher (bus -> pipeline -> sinks). -// -// Sketch: -// -// cfg, _ := config.Load("config.yml") -// srcReg := sources.NewRegistry() -// // domain registers poll/stream drivers... -// -// var jobs []scheduler.Job -// for _, sc := range cfg.Sources { -// src, _ := srcReg.BuildInput(sc) -// jobs = append(jobs, scheduler.Job{ -// Source: src, -// Every: sc.Every.Duration, -// }) -// } -// -// bus := make(chan event.Event, 256) -// s := &scheduler.Scheduler{Jobs: jobs, Out: bus, Logf: logf} -// // start dispatcher similarly... -// -// # Context and cancellation -// -// All blocking work should honor context cancellation: -// - source polling/streaming I/O -// - sink consumption -// - any expensive processor work +// For repository-level overview and usage narrative, see README.md. For +// code-level details, each subpackage doc.go is the source of truth for that +// package's public API surface and optional helpers. package feedkit diff --git a/pipeline/ratelimit.go b/pipeline/ratelimit.go deleted file mode 100644 index 1235aa5..0000000 --- a/pipeline/ratelimit.go +++ /dev/null @@ -1,5 +0,0 @@ -package pipeline - -// Placeholder for rate limit processor: -// - per source/kind sink routing limits -// - cooldown windows diff --git a/processors/normalize/doc.go b/processors/normalize/doc.go index a93a5d4..1245898 100644 --- a/processors/normalize/doc.go +++ b/processors/normalize/doc.go @@ -1,16 +1,18 @@ -// Package normalize provides a concrete normalization processor for feedkit pipelines. +// Package normalize provides the feedkit normalization processor and related +// helper APIs for raw-to-canonical event mapping. // -// Motivation: -// Many daemons have sources that: -// 1. fetch raw upstream data (often JSON), and -// 2. transform it into a domain's normalized payload format. +// External API surface: +// - Processor: concrete processors.Processor implementation +// - Normalizer / Func: normalization interface and ergonomic function adapter // -// Doing both steps inside Source.Poll works, but tends to make sources large and -// encourages duplication (unit conversions, common mapping helpers, etc.). +// Optional helpers from helpers.go: +// - PayloadJSONBytes: extract supported JSON-shaped payloads into bytes +// - DecodeJSONPayload: decode an event payload into a typed struct +// - FinalizeEvent: copy the input event envelope onto a normalized output // -// This package lets a source emit a "raw" event (e.g., Schema="raw.openweather.current.v1", -// Payload=json.RawMessage), and then a normalize.Processor can convert it into a -// normalized event (e.g., Schema="weather.observation.v1", Payload=WeatherObservation{}). +// Typical usage: +// sources emit raw events (often with json.RawMessage payloads), then a +// normalize.Processor converts matching raw schemas into canonical payloads. // // Key property: normalization is optional. // If no Normalizer matches an event, Processor passes it through unchanged by default. diff --git a/scheduler/worker.go b/scheduler/worker.go deleted file mode 100644 index 8830daa..0000000 --- a/scheduler/worker.go +++ /dev/null @@ -1,7 +0,0 @@ -package scheduler - -// Placeholder for per-source worker logic: -// - ticker loop -// - jitter -// - backoff on errors -// - emits events into scheduler.Out diff --git a/sinks/builtins.go b/sinks/builtins.go index 59974b2..5049c96 100644 --- a/sinks/builtins.go +++ b/sinks/builtins.go @@ -1,11 +1,6 @@ package sinks -import ( - "fmt" - "strings" - - "gitea.maximumdirect.net/ejr/feedkit/config" -) +import "gitea.maximumdirect.net/ejr/feedkit/config" // RegisterBuiltins registers sink drivers included in this binary. // @@ -17,11 +12,6 @@ func RegisterBuiltins(r *Registry) { return NewStdoutSink(cfg.Name), nil }) - // File sink: writes/archives events somewhere on disk. - r.Register("file", func(cfg config.SinkConfig) (Sink, error) { - return NewFileSinkFromConfig(cfg) - }) - // Postgres sink: persists events durably. r.Register("postgres", func(cfg config.SinkConfig) (Sink, error) { return NewPostgresSinkFromConfig(cfg) @@ -32,24 +22,3 @@ func RegisterBuiltins(r *Registry) { return NewNATSSinkFromConfig(cfg) }) } - -// ---- helpers for validating sink params ---- -// -// These helpers live in sinks (not config) on purpose: -// - config is domain-agnostic and should not embed driver-specific validation helpers. -// - sinks are adapters; validating their own params here keeps the logic near the driver. - -func requireStringParam(cfg config.SinkConfig, key string) (string, error) { - v, ok := cfg.Params[key] - if !ok { - return "", fmt.Errorf("sink %q: params.%s is required", cfg.Name, key) - } - s, ok := v.(string) - if !ok { - return "", fmt.Errorf("sink %q: params.%s must be a string", cfg.Name, key) - } - if strings.TrimSpace(s) == "" { - return "", fmt.Errorf("sink %q: params.%s cannot be empty", cfg.Name, key) - } - return s, nil -} diff --git a/sinks/doc.go b/sinks/doc.go index ecd4926..7962d5c 100644 --- a/sinks/doc.go +++ b/sinks/doc.go @@ -1,18 +1,27 @@ -// Package sinks provides sink abstractions, a sink driver registry, and several -// built-in sink drivers. +// Package sinks defines the feedkit sink interface, sink driver registry, and +// built-in infrastructure sinks. +// +// External API surface: +// - Sink: adapter interface that consumes event.Event values +// - Registry / NewRegistry: named sink factory registry +// - RegisterBuiltins: registers the built-in sink drivers in this binary // // Built-in drivers: // - stdout // - nats // - postgres // +// Optional helpers from helpers.go: +// - RegisterPostgresSchemaForConfiguredSinks: registers one Postgres schema +// for each configured sink using driver=postgres +// // # NATS built-in overview // // The NATS sink publishes each event as JSON to a configured subject. // // Required params: // - url: NATS server URL (for example, nats://localhost:4222) -// - exchange: NATS subject to publish to +// - subject: NATS subject to publish to // // Example config: // @@ -21,7 +30,7 @@ // driver: nats // params: // url: nats://localhost:4222 -// exchange: feedkit.events +// subject: feedkit.events // // # Postgres built-in overview // diff --git a/sinks/file.go b/sinks/file.go deleted file mode 100644 index ce582e8..0000000 --- a/sinks/file.go +++ /dev/null @@ -1,30 +0,0 @@ -package sinks - -import ( - "context" - "fmt" - - "gitea.maximumdirect.net/ejr/feedkit/config" - "gitea.maximumdirect.net/ejr/feedkit/event" -) - -type FileSink struct { - name string - path string -} - -func NewFileSinkFromConfig(cfg config.SinkConfig) (Sink, error) { - path, err := requireStringParam(cfg, "path") - if err != nil { - return nil, err - } - return &FileSink{name: cfg.Name, path: path}, nil -} - -func (s *FileSink) Name() string { return s.name } - -func (s *FileSink) Consume(ctx context.Context, e event.Event) error { - _ = ctx - _ = e - return fmt.Errorf("file sink: TODO implement (path=%s)", s.path) -} diff --git a/sinks/helpers.go b/sinks/helpers.go index 3188f94..0f1a933 100644 --- a/sinks/helpers.go +++ b/sinks/helpers.go @@ -7,6 +7,25 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" ) +// requireStringParam returns a non-empty string sink param. +// +// This helper is intentionally local to sinks rather than config so +// driver-specific validation stays close to the adapters that use it. +func requireStringParam(cfg config.SinkConfig, key string) (string, error) { + v, ok := cfg.Params[key] + if !ok { + return "", fmt.Errorf("sink %q: params.%s is required", cfg.Name, key) + } + s, ok := v.(string) + if !ok { + return "", fmt.Errorf("sink %q: params.%s must be a string", cfg.Name, key) + } + if strings.TrimSpace(s) == "" { + return "", fmt.Errorf("sink %q: params.%s cannot be empty", cfg.Name, key) + } + return s, nil +} + // RegisterPostgresSchemaForConfiguredSinks registers one Postgres schema for each // configured sink using driver=postgres. func RegisterPostgresSchemaForConfiguredSinks(cfg *config.Config, schema PostgresSchema) error { diff --git a/sinks/nats.go b/sinks/nats.go index 20cbea7..c751977 100644 --- a/sinks/nats.go +++ b/sinks/nats.go @@ -13,9 +13,9 @@ import ( ) type NATSSink struct { - name string - url string - exchange string + name string + url string + subject string mu sync.Mutex conn *nats.Conn @@ -26,11 +26,11 @@ func NewNATSSinkFromConfig(cfg config.SinkConfig) (Sink, error) { if err != nil { return nil, err } - ex, err := requireStringParam(cfg, "exchange") + subject, err := requireStringParam(cfg, "subject") if err != nil { return nil, err } - return &NATSSink{name: cfg.Name, url: url, exchange: ex}, nil + return &NATSSink{name: cfg.Name, url: url, subject: subject}, nil } func (r *NATSSink) Name() string { return r.name } @@ -59,7 +59,7 @@ func (r *NATSSink) Consume(ctx context.Context, e event.Event) error { if err := ctx.Err(); err != nil { return err } - if err := conn.Publish(r.exchange, b); err != nil { + if err := conn.Publish(r.subject, b); err != nil { return fmt.Errorf("NATS sink: publish: %w", err) } return nil diff --git a/sinks/nats_test.go b/sinks/nats_test.go new file mode 100644 index 0000000..7c06aff --- /dev/null +++ b/sinks/nats_test.go @@ -0,0 +1,47 @@ +package sinks + +import ( + "strings" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +func TestNewNATSSinkFromConfigRequiresSubject(t *testing.T) { + sink, err := NewNATSSinkFromConfig(config.SinkConfig{ + Name: "nats-main", + Driver: "nats", + Params: map[string]any{ + "url": "nats://localhost:4222", + "subject": "feedkit.events", + }, + }) + if err != nil { + t.Fatalf("NewNATSSinkFromConfig() error = %v", err) + } + + natsSink, ok := sink.(*NATSSink) + if !ok { + t.Fatalf("sink type = %T, want *NATSSink", sink) + } + if natsSink.subject != "feedkit.events" { + t.Fatalf("subject = %q, want feedkit.events", natsSink.subject) + } +} + +func TestNewNATSSinkFromConfigRejectsLegacyExchange(t *testing.T) { + _, err := NewNATSSinkFromConfig(config.SinkConfig{ + Name: "nats-main", + Driver: "nats", + Params: map[string]any{ + "url": "nats://localhost:4222", + "exchange": "feedkit.events", + }, + }) + if err == nil { + t.Fatalf("NewNATSSinkFromConfig() expected error") + } + if !strings.Contains(err.Error(), "params.subject is required") { + t.Fatalf("error = %q, want params.subject is required", err) + } +} diff --git a/sinks/registry.go b/sinks/registry.go index c3e8b9e..4967cae 100644 --- a/sinks/registry.go +++ b/sinks/registry.go @@ -2,6 +2,7 @@ package sinks import ( "fmt" + "strings" "gitea.maximumdirect.net/ejr/feedkit/config" ) @@ -21,13 +22,40 @@ func NewRegistry() *Registry { } func (r *Registry) Register(driver string, f Factory) { + if r == nil { + panic("sinks.Registry.Register: registry cannot be nil") + } + driver = strings.TrimSpace(driver) + if driver == "" { + panic("sinks.Registry.Register: driver cannot be empty") + } + if f == nil { + panic(fmt.Sprintf("sinks.Registry.Register: factory cannot be nil (driver=%q)", driver)) + } + if r.byDriver == nil { + r.byDriver = map[string]Factory{} + } + if _, exists := r.byDriver[driver]; exists { + panic(fmt.Sprintf("sinks.Registry.Register: driver %q already registered", driver)) + } r.byDriver[driver] = f } func (r *Registry) Build(cfg config.SinkConfig) (Sink, error) { - f, ok := r.byDriver[cfg.Driver] - if !ok { - return nil, fmt.Errorf("unknown sink driver: %q", cfg.Driver) + if r == nil { + return nil, fmt.Errorf("sinks registry is nil") } - return f(cfg) + driver := strings.TrimSpace(cfg.Driver) + f, ok := r.byDriver[driver] + if !ok { + return nil, fmt.Errorf("unknown sink driver: %q", driver) + } + sink, err := f(cfg) + if err != nil { + return nil, fmt.Errorf("build sink %q: %w", driver, err) + } + if sink == nil { + return nil, fmt.Errorf("build sink %q: factory returned nil sink", driver) + } + return sink, nil } diff --git a/sinks/registry_test.go b/sinks/registry_test.go new file mode 100644 index 0000000..ba18d00 --- /dev/null +++ b/sinks/registry_test.go @@ -0,0 +1,126 @@ +package sinks + +import ( + "context" + "errors" + "strings" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type testSink struct{ name string } + +func (s testSink) Name() string { return s.name } + +func (s testSink) Consume(context.Context, event.Event) error { return nil } + +func TestRegistryRegisterPanicsOnNilRegistry(t *testing.T) { + var r *Registry + defer func() { + if recover() == nil { + t.Fatalf("Register() expected panic on nil registry") + } + }() + r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "stdout"}, nil }) +} + +func TestRegistryRegisterPanicsOnEmptyDriver(t *testing.T) { + r := NewRegistry() + defer func() { + if recover() == nil { + t.Fatalf("Register() expected panic on empty driver") + } + }() + r.Register(" ", func(config.SinkConfig) (Sink, error) { return testSink{name: "x"}, nil }) +} + +func TestRegistryRegisterPanicsOnNilFactory(t *testing.T) { + r := NewRegistry() + defer func() { + if recover() == nil { + t.Fatalf("Register() expected panic on nil factory") + } + }() + r.Register("stdout", nil) +} + +func TestRegistryRegisterPanicsOnDuplicateDriver(t *testing.T) { + r := NewRegistry() + r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "a"}, nil }) + + defer func() { + if recover() == nil { + t.Fatalf("Register() expected panic on duplicate driver") + } + }() + r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "b"}, nil }) +} + +func TestRegistryBuildNilRegistryFails(t *testing.T) { + var r *Registry + _, err := r.Build(config.SinkConfig{Driver: "stdout"}) + if err == nil { + t.Fatalf("Build() expected error for nil registry") + } + if !strings.Contains(err.Error(), "registry is nil") { + t.Fatalf("Build() error = %q, want registry is nil", err) + } +} + +func TestRegistryBuildTrimsDriver(t *testing.T) { + r := NewRegistry() + r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "stdout"}, nil }) + + sink, err := r.Build(config.SinkConfig{Name: "sink1", Driver: " stdout "}) + if err != nil { + t.Fatalf("Build() error = %v", err) + } + if sink.Name() != "stdout" { + t.Fatalf("Build() sink name = %q, want stdout", sink.Name()) + } +} + +func TestRegistryBuildWrapsFactoryError(t *testing.T) { + r := NewRegistry() + r.Register("broken", func(config.SinkConfig) (Sink, error) { return nil, errors.New("boom") }) + + _, err := r.Build(config.SinkConfig{Driver: "broken"}) + if err == nil { + t.Fatalf("Build() expected error") + } + if !strings.Contains(err.Error(), `build sink "broken": boom`) { + t.Fatalf("Build() error = %q", err) + } +} + +func TestRegistryBuildRejectsNilSink(t *testing.T) { + r := NewRegistry() + r.Register("nil_sink", func(config.SinkConfig) (Sink, error) { return nil, nil }) + + _, err := r.Build(config.SinkConfig{Driver: "nil_sink"}) + if err == nil { + t.Fatalf("Build() expected nil sink error") + } + if !strings.Contains(err.Error(), "factory returned nil sink") { + t.Fatalf("Build() error = %q", err) + } +} + +func TestRegisterBuiltinsExposesExpectedDrivers(t *testing.T) { + r := NewRegistry() + RegisterBuiltins(r) + + if len(r.byDriver) != 3 { + t.Fatalf("len(byDriver) = %d, want 3", len(r.byDriver)) + } + for _, driver := range []string{"stdout", "nats", "postgres"} { + if _, ok := r.byDriver[driver]; !ok { + t.Fatalf("builtins missing driver %q", driver) + } + } + if _, ok := r.byDriver["file"]; ok { + t.Fatalf("builtins unexpectedly registered file driver") + } +} diff --git a/sources/doc.go b/sources/doc.go index 7bf9f19..007c2b2 100644 --- a/sources/doc.go +++ b/sources/doc.go @@ -1,10 +1,12 @@ -// Package sources defines feedkit's input-source abstraction. +// Package sources defines feedkit's input-source abstractions and source +// registry. // -// A source ingests upstream input and emits one or more event.Event values. -// -// feedkit supports two source modes: -// - PollSource: scheduler invokes Poll on a cadence. -// - StreamSource: source runs continuously and pushes events as input arrives. +// External API surface: +// - Input: common source identity surface +// - PollSource: polling source interface +// - StreamSource: streaming source interface +// - Registry / NewRegistry: source driver registry and builders +// - HTTPSource / NewHTTPSource: reusable HTTP polling helper // // Source drivers are domain-specific and registered into Registry by driver name. // Registry can then build configured sources from config.SourceConfig. @@ -12,6 +14,12 @@ // A single source may emit 0..N events per poll or stream iteration, and those // events may span multiple event kinds. // +// Optional helpers from helpers.go: +// - DefaultEventID: default event ID policy for source implementations +// - SingleEvent: construct and validate a one-element event slice +// - ValidateExpectedKinds: compare configured expected kinds against source +// advertised kinds when metadata is available +// // HTTP-backed polling sources can share NewHTTPSource for generic HTTP config // parsing and conditional GET behavior. The helper understands: // - params.url diff --git a/sources/http.go b/sources/http.go index f4b636d..c124e9d 100644 --- a/sources/http.go +++ b/sources/http.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "net/http" - "strconv" "strings" "sync" @@ -60,9 +59,13 @@ func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTP return nil, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name) } - conditional, err := parseConditionalParam(cfg) - if err != nil { - return nil, err + conditional := true + if _, exists := cfg.Params["conditional"]; exists { + var ok bool + conditional, ok = cfg.ParamBool("conditional") + if !ok { + return nil, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) + } } return &HTTPSource{ @@ -121,27 +124,3 @@ func (s *HTTPSource) FetchJSONIfChanged(ctx context.Context) (json.RawMessage, b } return json.RawMessage(body), true, nil } - -func parseConditionalParam(cfg config.SourceConfig) (bool, error) { - raw, ok := cfg.Params["conditional"] - if !ok || raw == nil { - return true, nil - } - - switch v := raw.(type) { - case bool: - return v, nil - case string: - s := strings.TrimSpace(v) - if s == "" { - return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) - } - parsed, err := strconv.ParseBool(s) - if err != nil { - return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) - } - return parsed, nil - default: - return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) - } -} diff --git a/sources/http_test.go b/sources/http_test.go index 5b664b7..2b8341b 100644 --- a/sources/http_test.go +++ b/sources/http_test.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "net/http/httptest" + "strings" "testing" "gitea.maximumdirect.net/ejr/feedkit/config" @@ -39,6 +40,27 @@ func TestNewHTTPSourceRejectsInvalidConditional(t *testing.T) { if err == nil { t.Fatalf("NewHTTPSource() error = nil, want error") } + if !strings.Contains(err.Error(), "params.conditional must be a boolean") { + t.Fatalf("NewHTTPSource() error = %q, want params.conditional must be a boolean", err) + } +} + +func TestNewHTTPSourceConditionalCanBeExplicitlyFalse(t *testing.T) { + src, err := NewHTTPSource("test_driver", config.SourceConfig{ + Name: "test-source", + Driver: "test_driver", + Params: map[string]any{ + "url": "https://example.invalid", + "user_agent": "test-agent", + "conditional": false, + }, + }, "application/json") + if err != nil { + t.Fatalf("NewHTTPSource() error = %v", err) + } + if src.Conditional { + t.Fatalf("Conditional = true, want false") + } } func TestHTTPSourceFetchJSONIfChanged(t *testing.T) {