diff --git a/config/load.go b/config/load.go index 8f8a96a..0d3bdf8 100644 --- a/config/load.go +++ b/config/load.go @@ -83,9 +83,9 @@ func (c *Config) Validate() error { m.Add(fieldErr(path+".driver", "is required (e.g. openmeteo_observation, rss_feed, ...)")) } - // Every - if s.Every.Duration <= 0 { - m.Add(fieldErr(path+".every", "must be a positive duration (e.g. 15m, 1m, 30s)")) + // Every (optional but if present must be >=0) + if s.Every.Duration < 0 { + m.Add(fieldErr(path+".every", "is optional, but must be a positive duration (e.g. 15m, 1m, 30s) if provided")) } // Kind (optional but if present must be non-empty after trimming) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index c6d1d72..7d10fc9 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -17,15 +17,27 @@ import ( // one function everywhere without type mismatch friction. type Logger = logging.Logf +// Job describes one scheduler task. +// +// A Job may be backed by either: +// - a polling source (sources.Source): uses Every + jitter and calls Poll() +// - a stream source (sources.StreamSource): ignores Every and calls Run() +// +// Jitter behavior: +// - For polling sources: Jitter is applied at startup and before each poll tick. +// - For stream sources: Jitter is applied once at startup only (optional; useful to avoid +// reconnect storms when many instances start together). type Job struct { - Source sources.Source + Source sources.Input Every time.Duration // Jitter is the maximum additional delay added before each poll. // Example: if Every=15m and Jitter=30s, each poll will occur at: // tick time + random(0..30s) // - // If Jitter == 0, we compute a default jitter based on Every. + // If Jitter == 0 for polling sources, we compute a default jitter based on Every. + // + // For stream sources, Jitter is treated as *startup jitter only*. Jitter time.Duration } @@ -35,8 +47,9 @@ type Scheduler struct { Logf Logger } -// Run starts one polling goroutine per job. -// Each job runs on its own interval and emits 0..N events per poll. +// Run starts one goroutine per job. +// Poll jobs run on their own interval and emit 0..N events per poll. +// Stream jobs run continuously and emit events as they arrive. func (s *Scheduler) Run(ctx context.Context) error { if s.Out == nil { return fmt.Errorf("scheduler.Run: Out channel is nil") @@ -59,17 +72,48 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { s.logf("scheduler: job has nil source") return } - if job.Every <= 0 { - s.logf("scheduler: job %s has invalid interval", job.Source.Name()) + + // Stream sources: event-driven. + if ss, ok := job.Source.(sources.StreamSource); ok { + s.runStream(ctx, job, ss) return } + // Poll sources: time-based. + ps, ok := job.Source.(sources.Source) + if !ok { + s.logf("scheduler: source %T (%s) implements neither Poll() nor Run()", job.Source, job.Source.Name()) + return + } + if job.Every <= 0 { + s.logf("scheduler: polling job %q missing/invalid interval (sources[].every)", ps.Name()) + return + } + + s.runPoller(ctx, job, ps) +} + +func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource) { + // Optional startup jitter: helps avoid reconnect storms if many daemons start at once. + if job.Jitter > 0 { + rng := seededRNG(src.Name()) + if !sleepJitter(ctx, rng, job.Jitter) { + return + } + } + + // Stream sources should block until ctx cancel or fatal error. + if err := src.Run(ctx, s.Out); err != nil && ctx.Err() == nil { + s.logf("scheduler: stream source %q exited with error: %v", src.Name(), err) + } +} + +func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.Source) { // Compute jitter: either configured per job, or a sensible default. jitter := effectiveJitter(job.Every, job.Jitter) // Each worker gets its own RNG (safe + no lock contention). - seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(job.Source.Name())) - rng := rand.New(rand.NewSource(seed)) + rng := seededRNG(src.Name()) // Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts. if !sleepJitter(ctx, rng, jitter) { @@ -77,7 +121,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { } // Immediate poll at startup (after startup jitter). - s.pollOnce(ctx, job) + s.pollOnce(ctx, src) t := time.NewTicker(job.Every) defer t.Stop() @@ -89,7 +133,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { if !sleepJitter(ctx, rng, jitter) { return } - s.pollOnce(ctx, job) + s.pollOnce(ctx, src) case <-ctx.Done(): return @@ -97,10 +141,10 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { } } -func (s *Scheduler) pollOnce(ctx context.Context, job Job) { - events, err := job.Source.Poll(ctx) +func (s *Scheduler) pollOnce(ctx context.Context, src sources.Source) { + events, err := src.Poll(ctx) if err != nil { - s.logf("scheduler: poll failed (%s): %v", job.Source.Name(), err) + s.logf("scheduler: poll failed (%s): %v", src.Name(), err) return } @@ -120,6 +164,13 @@ func (s *Scheduler) logf(format string, args ...any) { s.Logf(format, args...) } +// ---- helpers ---- + +func seededRNG(name string) *rand.Rand { + seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(name)) + return rand.New(rand.NewSource(seed)) +} + // effectiveJitter chooses a jitter value. // - If configuredMax > 0, use it (but clamp). // - Else default to min(every/10, 30s). diff --git a/sources/registry.go b/sources/registry.go index 3635d41..e545a40 100644 --- a/sources/registry.go +++ b/sources/registry.go @@ -13,13 +13,18 @@ import ( // domain-specific source drivers (Open-Meteo, NWS, RSS, etc.) while feedkit // remains domain-agnostic. type Factory func(cfg config.SourceConfig) (Source, error) +type StreamFactory func(cfg config.SourceConfig) (StreamSource, error) type Registry struct { - byDriver map[string]Factory + byDriver map[string]Factory + byStreamDriver map[string]StreamFactory } func NewRegistry() *Registry { - return &Registry{byDriver: map[string]Factory{}} + return &Registry{ + byDriver: map[string]Factory{}, + byStreamDriver: map[string]StreamFactory{}, + } } // Register associates a driver name (e.g. "openmeteo_observation") with a factory. @@ -35,10 +40,27 @@ func (r *Registry) Register(driver string, f Factory) { if f == nil { panic(fmt.Sprintf("sources.Registry.Register: factory cannot be nil (driver=%q)", driver)) } - + if _, exists := r.byStreamDriver[driver]; exists { + panic(fmt.Sprintf("sources.Registry.Register: driver %q already registered as a stream source", driver)) + } r.byDriver[driver] = f } +// RegisterStream is the StreamSource equivalent of Register. +func (r *Registry) RegisterStream(driver string, f StreamFactory) { + driver = strings.TrimSpace(driver) + if driver == "" { + panic("sources.Registry.RegisterStream: driver cannot be empty") + } + if f == nil { + panic(fmt.Sprintf("sources.Registry.RegisterStream: factory cannot be nil (driver=%q)", driver)) + } + if _, exists := r.byDriver[driver]; exists { + panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a polling source", driver)) + } + r.byStreamDriver[driver] = f +} + // Build constructs a Source from a SourceConfig by looking up cfg.Driver. func (r *Registry) Build(cfg config.SourceConfig) (Source, error) { f, ok := r.byDriver[cfg.Driver] @@ -47,3 +69,14 @@ func (r *Registry) Build(cfg config.SourceConfig) (Source, error) { } return f(cfg) } + +// BuildInput can return either a polling Source or a StreamSource. +func (r *Registry) BuildInput(cfg config.SourceConfig) (Input, error) { + if f, ok := r.byStreamDriver[cfg.Driver]; ok { + return f(cfg) + } + if f, ok := r.byDriver[cfg.Driver]; ok { + return f(cfg) + } + return nil, fmt.Errorf("unknown source driver: %q", cfg.Driver) +} diff --git a/sources/source.go b/sources/source.go index 997d10c..c0254ae 100644 --- a/sources/source.go +++ b/sources/source.go @@ -6,6 +6,12 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" ) +// Input is the common surface shared by all source types. +type Input interface { + Name() string + Kind() event.Kind +} + // Source is a configured polling job that emits 0..N events per poll. // // Source implementations live in domain modules (weatherfeeder/newsfeeder/...) @@ -28,3 +34,12 @@ type Source interface { // Implementations should honor ctx.Done() for network calls and other I/O. Poll(ctx context.Context) ([]event.Event, error) } + +// StreamSource is an event-driven source (NATS/RabbitMQ/MQTT/etc). +// +// Run should block, producing events into `out` until ctx is cancelled or a fatal error occurs. +// It MUST NOT close out (the scheduler/daemon owns the bus). +type StreamSource interface { + Input + Run(ctx context.Context, out chan<- event.Event) error +}