- Introduce explicit source interfaces: sources.PollSource and sources.StreamSource, with shared sources.Input (Name() only). - Remove mandatory Kind() from the base source contract to support sources that emit multiple kinds. - Add config.SourceMode (poll, stream, or omitted/auto) and SourceConfig.Kinds (plural expected kinds), while keeping legacy SourceConfig.Kind for compatibility. - Enforce mode semantics in config validation (poll requires every, stream forbids every) and detect mode/driver mismatches in sources.Registry. - Update docs and tests for the new source model and config behavior.
2.9 KiB
feedkit
feedkit provides domain-agnostic plumbing for feed-processing daemons.
A daemon built on feedkit typically:
- ingests upstream input (polling APIs or consuming streams)
- emits domain-agnostic
event.Eventvalues - applies optional processing (normalization, dedupe, policy)
- routes events to sinks (stdout, NATS, files, databases, etc.)
Philosophy
feedkit is not a framework. It provides small composable packages and leaves lifecycle, domain schemas, and domain-specific validation in your daemon.
Conceptual pipeline
Collect -> Normalize (optional) -> Policy -> Route -> Emit
| Stage | Package(s) |
|---|---|
| Collect | sources, scheduler |
| Normalize | normalize (optional in pipeline) |
| Policy | pipeline |
| Route | dispatch |
| Emit | sinks |
| Configure | config |
Core packages
config
Loads YAML config with strict decoding and domain-agnostic validation.
SourceConfig supports both source modes:
mode: pollrequireseverymode: streamforbidsevery- omitted
modemeans auto (inferred from the registered driver type)
It also supports optional expected source kinds:
kinds: ["observation", "alert"](preferred)kind: "observation"(legacy fallback)
event
Defines the domain-agnostic event envelope (event.Event) used across the system.
sources
Defines source interfaces and driver registry:
type Input interface {
Name() string
}
type PollSource interface {
Input
Poll(ctx context.Context) ([]event.Event, error)
}
type StreamSource interface {
Input
Run(ctx context.Context, out chan<- event.Event) error
}
Notes:
- a poll can emit
0..Nevents - stream sources emit events continuously
- a single source may emit multiple event kinds
- driver implementations live in downstream daemons and are registered via
sources.Registry
scheduler
Runs one goroutine per source job:
- poll sources: cadence driven (
every+ jitter) - stream sources: continuous run loop
pipeline
Optional processing chain between collection and dispatch. Processors can transform, drop, or reject events.
normalize
Optional normalization package (already implemented). Typical use: sources emit raw payload events, then normalize to canonical schemas in a pipeline stage.
dispatch
Compiles routes and fans out events to sinks with per-sink queue/worker isolation.
sinks
Defines sink interface and sink registry. Built-ins include stdout and nats, with
additional sink implementations at varying maturity.
Typical wiring
- Load config.
- Register/build sources from
cfg.Sources. - Register/build sinks from
cfg.Sinks. - Compile routes.
- Start scheduler (
sources -> bus). - Start dispatcher (
bus -> pipeline -> sinks).
Non-goals
feedkit intentionally does not:
- define domain payload schemas
- enforce domain-specific event kinds
- own application lifecycle
- prescribe observability stack choices