Cleaned up documentation and removed stubs and TODOs throughout the application

This commit is contained in:
2026-03-28 13:02:37 -05:00
parent 3ef93faf69
commit 3281368922
18 changed files with 403 additions and 345 deletions

177
README.md
View File

@@ -1,127 +1,92 @@
# feedkit
`feedkit` provides domain-agnostic plumbing for feed-processing daemons.
`feedkit` is a small Go toolkit for building feed-processing daemons.
A daemon built on feedkit typically:
- ingests upstream input (polling APIs or consuming streams)
It gives you the reusable plumbing around collection, processing, routing, and
emission, while leaving domain concepts, schemas, and application wiring in
your daemon. The intended shape is a family of sibling applications such as
`weatherfeeder`, `newsfeeder`, or `earthquakefeeder` that all share the same
infrastructure patterns without sharing domain logic.
## What It Does
A daemon built on `feedkit` typically:
- ingests upstream input by polling HTTP APIs or consuming streams
- emits domain-agnostic `event.Event` values
- applies optional processing (normalization, dedupe, policy)
- routes events to sinks (stdout, NATS, files, databases, etc.)
- optionally processes those events with stages like dedupe or normalization
- routes events to one or more sinks such as stdout, NATS, or Postgres
Conceptually, the pipeline is:
`Collect -> Process -> Route -> Emit`
## Philosophy
feedkit is not a framework. It provides small composable packages and leaves
lifecycle, domain schemas, and domain-specific validation in your daemon.
`feedkit` is intentionally not a framework.
## Conceptual pipeline
It does not try to own:
- your domain payload schemas
- your domain event kinds
- your daemon lifecycle or `main.go`
- your observability stack or deployment model
Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit
Instead, it provides small composable packages that are easy to wire together in
different daemons.
| Stage | Package(s) |
|---|---|
| Collect | `sources`, `scheduler` |
| Process | `pipeline`, `processors`, `processors/dedupe`, `processors/normalize` (optional stages) |
| Route | `dispatch` |
| Emit | `sinks` |
| Configure | `config` |
## When To Use It
## Core packages
`feedkit` is a good fit when you want:
- multiple small ingestion daemons with shared infrastructure patterns
- clear separation between raw upstream payloads and normalized canonical models
- reusable routing and sink behavior across domains
- strong config and event-envelope conventions without centralizing domain rules
### `config`
It is a poor fit if you want a monolithic framework that dictates application
structure end-to-end.
Loads YAML config with strict decoding and domain-agnostic validation.
## Built-In Capabilities
`SourceConfig` supports both source modes:
- `mode: poll` requires `every`
- `mode: stream` forbids `every`
- omitted `mode` means auto (inferred from the registered driver type)
`feedkit` currently includes:
- strict YAML config loading and validation
- polling and streaming source abstractions
- scheduler orchestration for configured sources
- optional pipeline processors
- built-in dedupe and normalization processors
- route compilation and sink fanout
- built-in sinks for `stdout`, `nats`, and `postgres`
It also supports optional expected source kinds:
- `kinds: ["observation", "alert"]` (preferred)
- `kind: "observation"` (legacy fallback)
The Postgres sink is intentionally split between feedkit-owned infrastructure
and daemon-owned schema mapping. `feedkit` manages connection setup, DDL,
writes, and pruning; downstream applications define the schema and event mapper.
### `event`
## Typical Wiring
Defines the domain-agnostic event envelope (`event.Event`) used across the system.
### `sources`
Defines source interfaces and driver registry:
```go
type Input interface {
Name() string
}
type PollSource interface {
Input
Poll(ctx context.Context) ([]event.Event, error)
}
type StreamSource interface {
Input
Run(ctx context.Context, out chan<- event.Event) error
}
```
Notes:
- a poll can emit `0..N` events
- stream sources emit events continuously
- a single source may emit multiple event kinds
- driver implementations live in downstream daemons and are registered via `sources.Registry`
### `scheduler`
Runs one goroutine per source job:
- poll sources: cadence driven (`every` + jitter)
- stream sources: continuous run loop
### `pipeline`
Optional processing chain between collection and dispatch.
Processors can transform, drop, or reject events.
### `processors`
Defines the generic processor interface and a named-driver registry used by
daemons to build ordered processor chains.
### `processors/dedupe`
Built-in in-memory LRU dedupe processor that drops repeated events by `Event.ID`.
### `processors/normalize`
Concrete normalization processor implementation. Typical use: sources emit raw
payload events, then a normalize stage maps them to canonical schemas.
### `dispatch`
Compiles routes and fans out events to sinks with per-sink queue/worker isolation.
### `sinks`
Defines sink interface and sink registry. Built-ins include:
- `stdout`
- `nats`
- `postgres`
Detailed Postgres configuration and wiring examples live in package docs:
`sinks/doc.go`.
## Typical wiring
At a high level, a daemon built on `feedkit` does this:
1. Load config.
2. Register/build sources from `cfg.Sources`.
3. Register/build sinks from `cfg.Sinks`.
4. Compile routes.
5. Start scheduler (`sources -> bus`).
6. Start dispatcher (`bus -> pipeline -> sinks`).
2. Register domain-specific source drivers.
3. Register built-in and/or custom sinks.
4. Build sources, sinks, and optional processor chain from config.
5. Compile routes.
6. Start the scheduler and dispatcher.
## Non-goals
The package docs are the better source of truth for code-level details. In
particular, each subpackage `doc.go` describes its external API surface and any
optional helper APIs in `helpers.go`.
feedkit intentionally does not:
- define domain payload schemas
- enforce domain-specific event kinds
- own application lifecycle
- prescribe observability stack choices
## Package Layout
The major packages are:
- `config`: config loading and validation
- `event`: the domain-agnostic event envelope
- `sources`: source interfaces and reusable source helpers
- `scheduler`: source execution and cadence management
- `processors`: processor interfaces and registry
- `processors/dedupe`: built-in in-memory dedupe processor
- `processors/normalize`: built-in normalization processor and helpers
- `pipeline`: optional processor chain
- `dispatch`: route compilation and fanout
- `sinks`: sink interfaces, built-ins, and Postgres registration helpers
The root package docs in `doc.go` provide a concise package-by-package map for
Go documentation consumers.

View File

@@ -101,7 +101,7 @@ func (cfg SourceConfig) ExpectedKinds() []string {
// SinkConfig describes one output sink adapter.
type SinkConfig struct {
Name string `yaml:"name"`
Driver string `yaml:"driver"` // "stdout", "file", "postgres", "rabbitmq", ...
Driver string `yaml:"driver"` // "stdout", "nats", "postgres", ...
Params map[string]any `yaml:"params"` // sink-specific settings
}

View File

@@ -141,7 +141,7 @@ func (c *Config) Validate() error {
}
if strings.TrimSpace(s.Driver) == "" {
m.Add(fieldErr(path+".driver", "is required (stdout|file|postgres|rabbitmq|...)"))
m.Add(fieldErr(path+".driver", "is required (stdout|nats|postgres|...)"))
}
// Params can be nil; that's fine.

136
doc.go
View File

@@ -1,130 +1,56 @@
// Package feedkit provides domain-agnostic plumbing for feed-processing daemons.
// Package feedkit provides a high-level map of the feedkit package set.
//
// A feed daemon ingests upstream input, turns it into event.Event values, applies
// optional processing, and emits to sinks.
// Most real applications do not import the root package directly. Instead, they
// compose the subpackages that handle configuration, collection, processing,
// routing, and sinks.
//
// Conceptual flow:
// The usual flow through feedkit is:
//
// Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit
// Collect -> Process -> Route -> Emit
//
// In feedkit this maps to:
//
// Collect: sources + scheduler
// Process: pipeline + processors + processors/dedupe + processors/normalize (optional stages)
// Route: dispatch
// Emit: sinks
// Config: config
//
// feedkit intentionally does not define domain payload schemas or domain-specific
// validation rules. Those belong in each concrete daemon.
//
// Public packages
// That flow maps to packages like this:
//
// - config
// YAML config loading/validation (strict decode + domain-agnostic checks).
//
// SourceConfig supports both polling and streaming sources:
//
// - mode: "poll" | "stream" | omitted (auto by driver type)
//
// - every: poll interval (required for mode="poll")
//
// - kinds: optional expected emitted kinds
//
// - kind: legacy singular fallback
//
// - params: driver-specific settings
// Loads and validates daemon config. This package owns domain-agnostic
// config shape and consistency checks.
//
// - event
// Domain-agnostic event envelope (ID, Kind, Source, EmittedAt, Schema, Payload).
// Defines the event.Event envelope shared across sources, processors,
// dispatch, and sinks.
//
// - sources
// Source abstractions and source-driver registry.
//
// There are two source interfaces:
//
// - PollSource: Poll(ctx) ([]event.Event, error)
//
// - StreamSource: Run(ctx, out) error
//
// Both share Input{Name()}. A source may emit 0..N events per poll/run step,
// and may emit multiple event kinds.
//
// For HTTP-backed polling sources, sources.NewHTTPSource provides a shared
// helper for generic params:
//
// - params.url
//
// - params.user_agent
//
// - params.conditional (optional, default true)
//
// When conditional polling is enabled, feedkit opportunistically uses ETag
// and Last-Modified validators. A 304 Not Modified response is treated as a
// successful poll that emits no events.
// Defines polling and streaming source interfaces, the source registry, and
// reusable source helpers.
//
// - scheduler
// Runs one goroutine per job:
//
// - PollSource jobs run on Every (+ jitter)
//
// - StreamSource jobs run continuously
//
// - pipeline
// Processor chain between scheduler and dispatch.
// Processors can transform, drop, or reject events.
// Runs configured sources on a cadence or as long-lived stream workers.
//
// - processors
// Generic processor interface and named factory registry for wiring chains.
// Defines the generic processor interface and registry used to build
// ordered processor chains.
//
// - processors/dedupe
// Built-in in-memory LRU dedupe processor keyed by Event.ID.
// Built-in in-memory dedupe processor keyed by Event.ID.
//
// - processors/normalize
// Concrete pipeline processor for raw->canonical mapping.
// If no normalizer matches, the event passes through unchanged by default.
// Built-in normalization processor plus helper APIs for raw-to-canonical
// event mapping.
//
// - pipeline
// Applies an ordered processor chain between collection and dispatch.
//
// - dispatch
// Routes events to sinks and isolates slow sinks via per-sink queues/workers.
// Compiles routes and fans events out to sinks with per-sink isolation.
//
// - sinks
// Sink abstractions + sink registry.
// Built-ins include stdout, NATS, and Postgres. For Postgres, downstream
// code registers table schemas/mappers while feedkit manages DDL, writes,
// optional automatic retention pruning (via sink params.prune), and
// manual prune helpers. Postgres table schemas must declare PruneColumn.
// Defines sink interfaces, the sink registry, built-in sinks, and Postgres
// schema registration helpers.
//
// Typical wiring (daemon main.go)
// feedkit is intentionally domain-agnostic. Domain schemas, domain event kinds,
// upstream-specific parsing, and daemon lifecycle remain the responsibility of
// each concrete application.
//
// 1. Load config.
// 2. Register source drivers and build sources from config.Sources.
// 3. Register sink drivers and build sinks from config.Sinks.
// 4. Compile routes.
// 5. Start scheduler (sources -> bus) and dispatcher (bus -> pipeline -> sinks).
//
// Sketch:
//
// cfg, _ := config.Load("config.yml")
// srcReg := sources.NewRegistry()
// // domain registers poll/stream drivers...
//
// var jobs []scheduler.Job
// for _, sc := range cfg.Sources {
// src, _ := srcReg.BuildInput(sc)
// jobs = append(jobs, scheduler.Job{
// Source: src,
// Every: sc.Every.Duration,
// })
// }
//
// bus := make(chan event.Event, 256)
// s := &scheduler.Scheduler{Jobs: jobs, Out: bus, Logf: logf}
// // start dispatcher similarly...
//
// # Context and cancellation
//
// All blocking work should honor context cancellation:
// - source polling/streaming I/O
// - sink consumption
// - any expensive processor work
// For repository-level overview and usage narrative, see README.md. For
// code-level details, each subpackage doc.go is the source of truth for that
// package's public API surface and optional helpers.
package feedkit

View File

@@ -1,5 +0,0 @@
package pipeline
// Placeholder for rate limit processor:
// - per source/kind sink routing limits
// - cooldown windows

View File

@@ -1,16 +1,18 @@
// Package normalize provides a concrete normalization processor for feedkit pipelines.
// Package normalize provides the feedkit normalization processor and related
// helper APIs for raw-to-canonical event mapping.
//
// Motivation:
// Many daemons have sources that:
// 1. fetch raw upstream data (often JSON), and
// 2. transform it into a domain's normalized payload format.
// External API surface:
// - Processor: concrete processors.Processor implementation
// - Normalizer / Func: normalization interface and ergonomic function adapter
//
// Doing both steps inside Source.Poll works, but tends to make sources large and
// encourages duplication (unit conversions, common mapping helpers, etc.).
// Optional helpers from helpers.go:
// - PayloadJSONBytes: extract supported JSON-shaped payloads into bytes
// - DecodeJSONPayload: decode an event payload into a typed struct
// - FinalizeEvent: copy the input event envelope onto a normalized output
//
// This package lets a source emit a "raw" event (e.g., Schema="raw.openweather.current.v1",
// Payload=json.RawMessage), and then a normalize.Processor can convert it into a
// normalized event (e.g., Schema="weather.observation.v1", Payload=WeatherObservation{}).
// Typical usage:
// sources emit raw events (often with json.RawMessage payloads), then a
// normalize.Processor converts matching raw schemas into canonical payloads.
//
// Key property: normalization is optional.
// If no Normalizer matches an event, Processor passes it through unchanged by default.

View File

@@ -1,7 +0,0 @@
package scheduler
// Placeholder for per-source worker logic:
// - ticker loop
// - jitter
// - backoff on errors
// - emits events into scheduler.Out

View File

@@ -1,11 +1,6 @@
package sinks
import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
import "gitea.maximumdirect.net/ejr/feedkit/config"
// RegisterBuiltins registers sink drivers included in this binary.
//
@@ -17,11 +12,6 @@ func RegisterBuiltins(r *Registry) {
return NewStdoutSink(cfg.Name), nil
})
// File sink: writes/archives events somewhere on disk.
r.Register("file", func(cfg config.SinkConfig) (Sink, error) {
return NewFileSinkFromConfig(cfg)
})
// Postgres sink: persists events durably.
r.Register("postgres", func(cfg config.SinkConfig) (Sink, error) {
return NewPostgresSinkFromConfig(cfg)
@@ -32,24 +22,3 @@ func RegisterBuiltins(r *Registry) {
return NewNATSSinkFromConfig(cfg)
})
}
// ---- helpers for validating sink params ----
//
// These helpers live in sinks (not config) on purpose:
// - config is domain-agnostic and should not embed driver-specific validation helpers.
// - sinks are adapters; validating their own params here keeps the logic near the driver.
func requireStringParam(cfg config.SinkConfig, key string) (string, error) {
v, ok := cfg.Params[key]
if !ok {
return "", fmt.Errorf("sink %q: params.%s is required", cfg.Name, key)
}
s, ok := v.(string)
if !ok {
return "", fmt.Errorf("sink %q: params.%s must be a string", cfg.Name, key)
}
if strings.TrimSpace(s) == "" {
return "", fmt.Errorf("sink %q: params.%s cannot be empty", cfg.Name, key)
}
return s, nil
}

View File

@@ -1,18 +1,27 @@
// Package sinks provides sink abstractions, a sink driver registry, and several
// built-in sink drivers.
// Package sinks defines the feedkit sink interface, sink driver registry, and
// built-in infrastructure sinks.
//
// External API surface:
// - Sink: adapter interface that consumes event.Event values
// - Registry / NewRegistry: named sink factory registry
// - RegisterBuiltins: registers the built-in sink drivers in this binary
//
// Built-in drivers:
// - stdout
// - nats
// - postgres
//
// Optional helpers from helpers.go:
// - RegisterPostgresSchemaForConfiguredSinks: registers one Postgres schema
// for each configured sink using driver=postgres
//
// # NATS built-in overview
//
// The NATS sink publishes each event as JSON to a configured subject.
//
// Required params:
// - url: NATS server URL (for example, nats://localhost:4222)
// - exchange: NATS subject to publish to
// - subject: NATS subject to publish to
//
// Example config:
//
@@ -21,7 +30,7 @@
// driver: nats
// params:
// url: nats://localhost:4222
// exchange: feedkit.events
// subject: feedkit.events
//
// # Postgres built-in overview
//

View File

@@ -1,30 +0,0 @@
package sinks
import (
"context"
"fmt"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
type FileSink struct {
name string
path string
}
func NewFileSinkFromConfig(cfg config.SinkConfig) (Sink, error) {
path, err := requireStringParam(cfg, "path")
if err != nil {
return nil, err
}
return &FileSink{name: cfg.Name, path: path}, nil
}
func (s *FileSink) Name() string { return s.name }
func (s *FileSink) Consume(ctx context.Context, e event.Event) error {
_ = ctx
_ = e
return fmt.Errorf("file sink: TODO implement (path=%s)", s.path)
}

View File

@@ -7,6 +7,25 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
)
// requireStringParam returns a non-empty string sink param.
//
// This helper is intentionally local to sinks rather than config so
// driver-specific validation stays close to the adapters that use it.
func requireStringParam(cfg config.SinkConfig, key string) (string, error) {
v, ok := cfg.Params[key]
if !ok {
return "", fmt.Errorf("sink %q: params.%s is required", cfg.Name, key)
}
s, ok := v.(string)
if !ok {
return "", fmt.Errorf("sink %q: params.%s must be a string", cfg.Name, key)
}
if strings.TrimSpace(s) == "" {
return "", fmt.Errorf("sink %q: params.%s cannot be empty", cfg.Name, key)
}
return s, nil
}
// RegisterPostgresSchemaForConfiguredSinks registers one Postgres schema for each
// configured sink using driver=postgres.
func RegisterPostgresSchemaForConfiguredSinks(cfg *config.Config, schema PostgresSchema) error {

View File

@@ -15,7 +15,7 @@ import (
type NATSSink struct {
name string
url string
exchange string
subject string
mu sync.Mutex
conn *nats.Conn
@@ -26,11 +26,11 @@ func NewNATSSinkFromConfig(cfg config.SinkConfig) (Sink, error) {
if err != nil {
return nil, err
}
ex, err := requireStringParam(cfg, "exchange")
subject, err := requireStringParam(cfg, "subject")
if err != nil {
return nil, err
}
return &NATSSink{name: cfg.Name, url: url, exchange: ex}, nil
return &NATSSink{name: cfg.Name, url: url, subject: subject}, nil
}
func (r *NATSSink) Name() string { return r.name }
@@ -59,7 +59,7 @@ func (r *NATSSink) Consume(ctx context.Context, e event.Event) error {
if err := ctx.Err(); err != nil {
return err
}
if err := conn.Publish(r.exchange, b); err != nil {
if err := conn.Publish(r.subject, b); err != nil {
return fmt.Errorf("NATS sink: publish: %w", err)
}
return nil

47
sinks/nats_test.go Normal file
View File

@@ -0,0 +1,47 @@
package sinks
import (
"strings"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
func TestNewNATSSinkFromConfigRequiresSubject(t *testing.T) {
sink, err := NewNATSSinkFromConfig(config.SinkConfig{
Name: "nats-main",
Driver: "nats",
Params: map[string]any{
"url": "nats://localhost:4222",
"subject": "feedkit.events",
},
})
if err != nil {
t.Fatalf("NewNATSSinkFromConfig() error = %v", err)
}
natsSink, ok := sink.(*NATSSink)
if !ok {
t.Fatalf("sink type = %T, want *NATSSink", sink)
}
if natsSink.subject != "feedkit.events" {
t.Fatalf("subject = %q, want feedkit.events", natsSink.subject)
}
}
func TestNewNATSSinkFromConfigRejectsLegacyExchange(t *testing.T) {
_, err := NewNATSSinkFromConfig(config.SinkConfig{
Name: "nats-main",
Driver: "nats",
Params: map[string]any{
"url": "nats://localhost:4222",
"exchange": "feedkit.events",
},
})
if err == nil {
t.Fatalf("NewNATSSinkFromConfig() expected error")
}
if !strings.Contains(err.Error(), "params.subject is required") {
t.Fatalf("error = %q, want params.subject is required", err)
}
}

View File

@@ -2,6 +2,7 @@ package sinks
import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
@@ -21,13 +22,40 @@ func NewRegistry() *Registry {
}
func (r *Registry) Register(driver string, f Factory) {
if r == nil {
panic("sinks.Registry.Register: registry cannot be nil")
}
driver = strings.TrimSpace(driver)
if driver == "" {
panic("sinks.Registry.Register: driver cannot be empty")
}
if f == nil {
panic(fmt.Sprintf("sinks.Registry.Register: factory cannot be nil (driver=%q)", driver))
}
if r.byDriver == nil {
r.byDriver = map[string]Factory{}
}
if _, exists := r.byDriver[driver]; exists {
panic(fmt.Sprintf("sinks.Registry.Register: driver %q already registered", driver))
}
r.byDriver[driver] = f
}
func (r *Registry) Build(cfg config.SinkConfig) (Sink, error) {
f, ok := r.byDriver[cfg.Driver]
if !ok {
return nil, fmt.Errorf("unknown sink driver: %q", cfg.Driver)
if r == nil {
return nil, fmt.Errorf("sinks registry is nil")
}
return f(cfg)
driver := strings.TrimSpace(cfg.Driver)
f, ok := r.byDriver[driver]
if !ok {
return nil, fmt.Errorf("unknown sink driver: %q", driver)
}
sink, err := f(cfg)
if err != nil {
return nil, fmt.Errorf("build sink %q: %w", driver, err)
}
if sink == nil {
return nil, fmt.Errorf("build sink %q: factory returned nil sink", driver)
}
return sink, nil
}

126
sinks/registry_test.go Normal file
View File

@@ -0,0 +1,126 @@
package sinks
import (
"context"
"errors"
"strings"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
type testSink struct{ name string }
func (s testSink) Name() string { return s.name }
func (s testSink) Consume(context.Context, event.Event) error { return nil }
func TestRegistryRegisterPanicsOnNilRegistry(t *testing.T) {
var r *Registry
defer func() {
if recover() == nil {
t.Fatalf("Register() expected panic on nil registry")
}
}()
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "stdout"}, nil })
}
func TestRegistryRegisterPanicsOnEmptyDriver(t *testing.T) {
r := NewRegistry()
defer func() {
if recover() == nil {
t.Fatalf("Register() expected panic on empty driver")
}
}()
r.Register(" ", func(config.SinkConfig) (Sink, error) { return testSink{name: "x"}, nil })
}
func TestRegistryRegisterPanicsOnNilFactory(t *testing.T) {
r := NewRegistry()
defer func() {
if recover() == nil {
t.Fatalf("Register() expected panic on nil factory")
}
}()
r.Register("stdout", nil)
}
func TestRegistryRegisterPanicsOnDuplicateDriver(t *testing.T) {
r := NewRegistry()
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "a"}, nil })
defer func() {
if recover() == nil {
t.Fatalf("Register() expected panic on duplicate driver")
}
}()
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "b"}, nil })
}
func TestRegistryBuildNilRegistryFails(t *testing.T) {
var r *Registry
_, err := r.Build(config.SinkConfig{Driver: "stdout"})
if err == nil {
t.Fatalf("Build() expected error for nil registry")
}
if !strings.Contains(err.Error(), "registry is nil") {
t.Fatalf("Build() error = %q, want registry is nil", err)
}
}
func TestRegistryBuildTrimsDriver(t *testing.T) {
r := NewRegistry()
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "stdout"}, nil })
sink, err := r.Build(config.SinkConfig{Name: "sink1", Driver: " stdout "})
if err != nil {
t.Fatalf("Build() error = %v", err)
}
if sink.Name() != "stdout" {
t.Fatalf("Build() sink name = %q, want stdout", sink.Name())
}
}
func TestRegistryBuildWrapsFactoryError(t *testing.T) {
r := NewRegistry()
r.Register("broken", func(config.SinkConfig) (Sink, error) { return nil, errors.New("boom") })
_, err := r.Build(config.SinkConfig{Driver: "broken"})
if err == nil {
t.Fatalf("Build() expected error")
}
if !strings.Contains(err.Error(), `build sink "broken": boom`) {
t.Fatalf("Build() error = %q", err)
}
}
func TestRegistryBuildRejectsNilSink(t *testing.T) {
r := NewRegistry()
r.Register("nil_sink", func(config.SinkConfig) (Sink, error) { return nil, nil })
_, err := r.Build(config.SinkConfig{Driver: "nil_sink"})
if err == nil {
t.Fatalf("Build() expected nil sink error")
}
if !strings.Contains(err.Error(), "factory returned nil sink") {
t.Fatalf("Build() error = %q", err)
}
}
func TestRegisterBuiltinsExposesExpectedDrivers(t *testing.T) {
r := NewRegistry()
RegisterBuiltins(r)
if len(r.byDriver) != 3 {
t.Fatalf("len(byDriver) = %d, want 3", len(r.byDriver))
}
for _, driver := range []string{"stdout", "nats", "postgres"} {
if _, ok := r.byDriver[driver]; !ok {
t.Fatalf("builtins missing driver %q", driver)
}
}
if _, ok := r.byDriver["file"]; ok {
t.Fatalf("builtins unexpectedly registered file driver")
}
}

View File

@@ -1,10 +1,12 @@
// Package sources defines feedkit's input-source abstraction.
// Package sources defines feedkit's input-source abstractions and source
// registry.
//
// A source ingests upstream input and emits one or more event.Event values.
//
// feedkit supports two source modes:
// - PollSource: scheduler invokes Poll on a cadence.
// - StreamSource: source runs continuously and pushes events as input arrives.
// External API surface:
// - Input: common source identity surface
// - PollSource: polling source interface
// - StreamSource: streaming source interface
// - Registry / NewRegistry: source driver registry and builders
// - HTTPSource / NewHTTPSource: reusable HTTP polling helper
//
// Source drivers are domain-specific and registered into Registry by driver name.
// Registry can then build configured sources from config.SourceConfig.
@@ -12,6 +14,12 @@
// A single source may emit 0..N events per poll or stream iteration, and those
// events may span multiple event kinds.
//
// Optional helpers from helpers.go:
// - DefaultEventID: default event ID policy for source implementations
// - SingleEvent: construct and validate a one-element event slice
// - ValidateExpectedKinds: compare configured expected kinds against source
// advertised kinds when metadata is available
//
// HTTP-backed polling sources can share NewHTTPSource for generic HTTP config
// parsing and conditional GET behavior. The helper understands:
// - params.url

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
@@ -60,9 +59,13 @@ func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTP
return nil, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
}
conditional, err := parseConditionalParam(cfg)
if err != nil {
return nil, err
conditional := true
if _, exists := cfg.Params["conditional"]; exists {
var ok bool
conditional, ok = cfg.ParamBool("conditional")
if !ok {
return nil, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
}
return &HTTPSource{
@@ -121,27 +124,3 @@ func (s *HTTPSource) FetchJSONIfChanged(ctx context.Context) (json.RawMessage, b
}
return json.RawMessage(body), true, nil
}
func parseConditionalParam(cfg config.SourceConfig) (bool, error) {
raw, ok := cfg.Params["conditional"]
if !ok || raw == nil {
return true, nil
}
switch v := raw.(type) {
case bool:
return v, nil
case string:
s := strings.TrimSpace(v)
if s == "" {
return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
parsed, err := strconv.ParseBool(s)
if err != nil {
return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
return parsed, nil
default:
return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
@@ -39,6 +40,27 @@ func TestNewHTTPSourceRejectsInvalidConditional(t *testing.T) {
if err == nil {
t.Fatalf("NewHTTPSource() error = nil, want error")
}
if !strings.Contains(err.Error(), "params.conditional must be a boolean") {
t.Fatalf("NewHTTPSource() error = %q, want params.conditional must be a boolean", err)
}
}
func TestNewHTTPSourceConditionalCanBeExplicitlyFalse(t *testing.T) {
src, err := NewHTTPSource("test_driver", config.SourceConfig{
Name: "test-source",
Driver: "test_driver",
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
"conditional": false,
},
}, "application/json")
if err != nil {
t.Fatalf("NewHTTPSource() error = %v", err)
}
if src.Conditional {
t.Fatalf("Conditional = true, want false")
}
}
func TestHTTPSourceFetchJSONIfChanged(t *testing.T) {