Files
feedkit/README.md
Eric Rakestraw 96039f6530 refactor!: introduce generic processors registry and remove normalize registry adapter
- add new `processors` package with canonical `Processor` interface
- add `processors.Registry` with Register/Build/BuildChain factory model
- switch `pipeline.Pipeline` to `[]processors.Processor`
- replace `normalize.Registry` + registry adapter with direct `normalize.Processor`
- remove `normalize/registry.go`
- update root docs to position normalize as one optional processing stage
- add tests for processors registry, normalize processor behavior, and pipeline flow

BREAKING CHANGE:
- `pipeline.Processor` removed; use `processors.Processor`
- `normalize.Registry` and old normalize processor adapter APIs removed
- downstream daemons must update processor wiring to new `processors.Registry`
  and `normalize.NewProcessor(...)`
2026-03-16 13:14:24 -05:00

3.1 KiB

feedkit

feedkit provides domain-agnostic plumbing for feed-processing daemons.

A daemon built on feedkit typically:

  • ingests upstream input (polling APIs or consuming streams)
  • emits domain-agnostic event.Event values
  • applies optional processing (normalization, dedupe, policy)
  • routes events to sinks (stdout, NATS, files, databases, etc.)

Philosophy

feedkit is not a framework. It provides small composable packages and leaves lifecycle, domain schemas, and domain-specific validation in your daemon.

Conceptual pipeline

Collect -> Process (optional stages, including normalize) -> Route -> Emit

Stage Package(s)
Collect sources, scheduler
Process pipeline, processors, normalize (optional stage)
Route dispatch
Emit sinks
Configure config

Core packages

config

Loads YAML config with strict decoding and domain-agnostic validation.

SourceConfig supports both source modes:

  • mode: poll requires every
  • mode: stream forbids every
  • omitted mode means auto (inferred from the registered driver type)

It also supports optional expected source kinds:

  • kinds: ["observation", "alert"] (preferred)
  • kind: "observation" (legacy fallback)

event

Defines the domain-agnostic event envelope (event.Event) used across the system.

sources

Defines source interfaces and driver registry:

type Input interface {
    Name() string
}

type PollSource interface {
    Input
    Poll(ctx context.Context) ([]event.Event, error)
}

type StreamSource interface {
    Input
    Run(ctx context.Context, out chan<- event.Event) error
}

Notes:

  • a poll can emit 0..N events
  • stream sources emit events continuously
  • a single source may emit multiple event kinds
  • driver implementations live in downstream daemons and are registered via sources.Registry

scheduler

Runs one goroutine per source job:

  • poll sources: cadence driven (every + jitter)
  • stream sources: continuous run loop

pipeline

Optional processing chain between collection and dispatch. Processors can transform, drop, or reject events.

processors

Defines the generic processor interface and a named-driver registry used by daemons to build ordered processor chains.

normalize

Concrete normalization processor implementation. Typical use: sources emit raw payload events, then a normalize stage maps them to canonical schemas.

dispatch

Compiles routes and fans out events to sinks with per-sink queue/worker isolation.

sinks

Defines sink interface and sink registry. Built-ins include stdout and nats, with additional sink implementations at varying maturity.

Typical wiring

  1. Load config.
  2. Register/build sources from cfg.Sources.
  3. Register/build sinks from cfg.Sinks.
  4. Compile routes.
  5. Start scheduler (sources -> bus).
  6. Start dispatcher (bus -> pipeline -> sinks).

Non-goals

feedkit intentionally does not:

  • define domain payload schemas
  • enforce domain-specific event kinds
  • own application lifecycle
  • prescribe observability stack choices