// Package feedkit provides domain-agnostic plumbing for "feed processing daemons". // // A feed daemon polls one or more upstream providers (HTTP APIs, RSS, etc.), // converts upstream items into a normalized internal representation, applies // lightweight policy (dedupe/rate-limit/filters), and emits events to one or // more sinks (stdout, files, Postgres, brokers, ...). // // feedkit is intentionally NOT a framework. It supplies small, composable // primitives that concrete daemons wire together in main.go (or via a small // optional Runner helper, see "Future additions"). // // Conceptual pipeline // // Collect → Normalize → Filter/Policy → Persist/Emit → Signal // // In feedkit today, that maps to: // // Collect: sources.Source + scheduler.Scheduler // Normalize: (today: domain code typically does this inside Source.Poll; // future: a normalization Processor is a good fit) // Policy: pipeline.Pipeline (Processor chain; dedupe/ratelimit are planned) // Emit: dispatch.Dispatcher + dispatch.Fanout // Sinks: sinks.Sink (+ sinks.Registry to build from config) // Config: config.Load + config.Config validation // // Public packages (API surface) // // - config // YAML configuration types and loader/validator. // // - config.Load(path) (*config.Config, error) // // - config.Config: Sources, Sinks, Routes // // - config.SourceConfig / SinkConfig include Params map[string]any // with convenience helpers like: // // - ParamString / ParamStringDefault // // - ParamBool / ParamBoolDefault // // - ParamInt / ParamIntDefault // // - ParamDuration / ParamDurationDefault // // - ParamStringSlice // // - event // Domain-agnostic event envelope moved through the system. // // - event.Event includes ID, Kind, Source, timestamps, Schema, Payload // // - event.Kind is stringly typed; event.ParseKind normalizes/validates. // // - sources // Extension point for domain-specific polling jobs. // // - sources.Source interface: Name(), Kind(), Poll(ctx) // // - sources.Registry lets daemons register driver factories and build // sources from config.SourceConfig. // // - scheduler // Runs sources on a cadence and publishes emitted events onto a channel. // // - scheduler.Scheduler{Jobs, Out, Logf}.Run(ctx) // // - scheduler.Job: {Source, Every, Jitter} // // - pipeline // Optional processing chain between scheduler and dispatch. // // - pipeline.Pipeline{Processors}.Process(ctx, event) // // - pipeline.Processor can mutate, drop (return nil), or error. // // - dedupe/ratelimit processors are placeholders (planned). // // - sinks // Extension point for output adapters. // // - sinks.Sink interface: Name(), Consume(ctx, event) // // - sinks.Registry to register driver factories and build sinks from config // // - sinks.RegisterBuiltins registers feedkit-provided sink drivers // (stdout/file/postgres/rabbitmq; some are currently stubs). // // - dispatch // Routes processed events to sinks, and isolates slow sinks via per-sink queues. // // - dispatch.Dispatcher{In, Pipeline, Sinks, Routes, ...}.Run(ctx, logf) // // - dispatch.Fanout: one buffered queue + worker goroutine per sink // // - dispatch.CompileRoutes(*config.Config) compiles cfg.Routes into []dispatch.Route. // If routes: is omitted, it defaults to "all sinks receive all kinds". // // - logging // Shared logger type used across feedkit packages. // // - logging.Logf is a printf-style logger signature. // // Typical wiring (what a daemon does in main.go) // // 1. Load config (domain code may add domain-specific validation). // 2. Register and build sources from config.Sources using sources.Registry. // 3. Register and build sinks from config.Sinks using sinks.Registry. // 4. Compile routes (typically via dispatch.CompileRoutes). // 5. Create an event bus channel. // 6. Start scheduler (sources → bus). // 7. Start dispatcher (bus → pipeline → routes → sinks). // // A sketch: // // cfg, _ := config.Load("config.yml") // // // Build sources (domain registers its drivers). // srcReg := sources.NewRegistry() // // domain: srcReg.Register("openweather_observation", newOpenWeatherSource) // // ... // // var jobs []scheduler.Job // for _, sc := range cfg.Sources { // src, _ := srcReg.Build(sc) // jobs = append(jobs, scheduler.Job{Source: src, Every: sc.Every.Duration}) // } // // // Build sinks (feedkit can register builtins). // sinkReg := sinks.NewRegistry() // sinks.RegisterBuiltins(sinkReg) // builtSinks := map[string]sinks.Sink{} // for _, sk := range cfg.Sinks { // s, _ := sinkReg.Build(sk) // builtSinks[sk.Name] = s // } // // // Compile routes. // routes, _ := dispatch.CompileRoutes(cfg) // // // Event bus. // bus := make(chan event.Event, 256) // // // Scheduler. // s := &scheduler.Scheduler{Jobs: jobs, Out: bus, Logf: logf} // // // Dispatcher. // d := &dispatch.Dispatcher{ // In: bus, // Pipeline: &pipeline.Pipeline{Processors: nil}, // Sinks: builtSinks, // Routes: routes, // } // // go s.Run(ctx) // return d.Run(ctx, logf) // // Conventions (recommended, not required) // // - Event.ID should be stable for dedupe/storage (often ":"). // - Event.Kind should be lowercase ("observation", "alert", "article", ...). // - Event.Schema should identify the payload shape/version // (e.g. "weather.observation.v1"). // // # Context and cancellation // // All blocking or I/O work should honor ctx.Done(): // - sources.Source.Poll should pass ctx to HTTP calls, etc. // - sinks.Sink.Consume should honor ctx (Fanout timeouts only help if sinks cooperate). // // Future additions (likely) // // - A small Runner helper that performs the standard wiring (load config, // build sources/sinks/routes, run scheduler+dispatcher, handle shutdown). // - A normalization hook (a Pipeline Processor + registry) that allows sources // to emit "raw" payloads and defer normalization to a dedicated stage. // // # Non-goals // // feedkit does not define domain payload schemas, does not enforce domain kinds, // and does not embed domain-specific validation rules. Those live in each // concrete daemon/module (weatherfeeder, newsfeeder, ...). package feedkit