Files
feedkit/scheduler/scheduler.go
Eric Rakestraw 09bc65e947 feedkit: ergonomics pass (shared logger, route compiler, param helpers)
- Add logging.Logf as the canonical printf-style logger type used across feedkit.
  - Update scheduler and dispatch to alias their Logger types to logging.Logf.
  - Eliminates type-mismatch friction when wiring one log function through the system.

- Add dispatch.CompileRoutes(*config.Config) ([]dispatch.Route, error)
  - Compiles config routes into dispatch routes with event.ParseKind normalization.
  - If routes: is omitted, defaults to “all sinks receive all kinds”.

- Expand config param helpers for both SourceConfig and SinkConfig
  - Add ParamBool/ParamInt/ParamDuration/ParamStringSlice (+ Default variants).
  - Supports common YAML-decoded types (bool/int/float/string, []any, etc.)
  - Keeps driver code cleaner and reduces repeated type assertions.

- Fix Postgres sink validation error prefix ("postgres sink", not "rabbitmq sink").
2026-01-13 14:40:29 -06:00

179 lines
3.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduler
import (
"context"
"fmt"
"hash/fnv"
"math/rand"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/feedkit/logging"
"gitea.maximumdirect.net/ejr/feedkit/sources"
)
// Logger is a printf-style logger used throughout scheduler.
// It is an alias to the shared feedkit logging type so callers can pass
// one function everywhere without type mismatch friction.
type Logger = logging.Logf
type Job struct {
Source sources.Source
Every time.Duration
// Jitter is the maximum additional delay added before each poll.
// Example: if Every=15m and Jitter=30s, each poll will occur at:
// tick time + random(0..30s)
//
// If Jitter == 0, we compute a default jitter based on Every.
Jitter time.Duration
}
type Scheduler struct {
Jobs []Job
Out chan<- event.Event
Logf Logger
}
// Run starts one polling goroutine per job.
// Each job runs on its own interval and emits 0..N events per poll.
func (s *Scheduler) Run(ctx context.Context) error {
if s.Out == nil {
return fmt.Errorf("scheduler.Run: Out channel is nil")
}
if len(s.Jobs) == 0 {
return fmt.Errorf("scheduler.Run: no jobs configured")
}
for _, job := range s.Jobs {
job := job // capture loop variable
go s.runJob(ctx, job)
}
<-ctx.Done()
return ctx.Err()
}
func (s *Scheduler) runJob(ctx context.Context, job Job) {
if job.Source == nil {
s.logf("scheduler: job has nil source")
return
}
if job.Every <= 0 {
s.logf("scheduler: job %s has invalid interval", job.Source.Name())
return
}
// Compute jitter: either configured per job, or a sensible default.
jitter := effectiveJitter(job.Every, job.Jitter)
// Each worker gets its own RNG (safe + no lock contention).
seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(job.Source.Name()))
rng := rand.New(rand.NewSource(seed))
// Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts.
if !sleepJitter(ctx, rng, jitter) {
return
}
// Immediate poll at startup (after startup jitter).
s.pollOnce(ctx, job)
t := time.NewTicker(job.Every)
defer t.Stop()
for {
select {
case <-t.C:
// Per-tick jitter: spreads calls out within the interval.
if !sleepJitter(ctx, rng, jitter) {
return
}
s.pollOnce(ctx, job)
case <-ctx.Done():
return
}
}
}
func (s *Scheduler) pollOnce(ctx context.Context, job Job) {
events, err := job.Source.Poll(ctx)
if err != nil {
s.logf("scheduler: poll failed (%s): %v", job.Source.Name(), err)
return
}
for _, e := range events {
select {
case s.Out <- e:
case <-ctx.Done():
return
}
}
}
func (s *Scheduler) logf(format string, args ...any) {
if s.Logf == nil {
return
}
s.Logf(format, args...)
}
// effectiveJitter chooses a jitter value.
// - If configuredMax > 0, use it (but clamp).
// - Else default to min(every/10, 30s).
// - Clamp to at most every/2 (so jitter cant delay more than half the interval).
func effectiveJitter(every time.Duration, configuredMax time.Duration) time.Duration {
if every <= 0 {
return 0
}
j := configuredMax
if j <= 0 {
j = every / 10
if j > 30*time.Second {
j = 30 * time.Second
}
}
// Clamp jitter so it doesnt dominate the schedule.
maxAllowed := every / 2
if j > maxAllowed {
j = maxAllowed
}
if j < 0 {
j = 0
}
return j
}
// sleepJitter sleeps for a random duration in [0, max].
// Returns false if the context is cancelled while waiting.
func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool {
if max <= 0 {
return true
}
// Int63n requires a positive argument.
// We add 1 so max itself is attainable.
n := rng.Int63n(int64(max) + 1)
d := time.Duration(n)
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
return true
case <-ctx.Done():
return false
}
}
func hashStringFNV32a(s string) uint32 {
h := fnv.New32a()
_, _ = h.Write([]byte(s))
return h.Sum32()
}