package scheduler import ( "context" "fmt" "hash/fnv" "math/rand" "time" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/logging" "gitea.maximumdirect.net/ejr/feedkit/sources" ) // Logger is a printf-style logger used throughout scheduler. // It is an alias to the shared feedkit logging type so callers can pass // one function everywhere without type mismatch friction. type Logger = logging.Logf // Job describes one scheduler task. // // A Job may be backed by either: // - a polling source (sources.Source): uses Every + jitter and calls Poll() // - a stream source (sources.StreamSource): ignores Every and calls Run() // // Jitter behavior: // - For polling sources: Jitter is applied at startup and before each poll tick. // - For stream sources: Jitter is applied once at startup only (optional; useful to avoid // reconnect storms when many instances start together). type Job struct { Source sources.Input Every time.Duration // Jitter is the maximum additional delay added before each poll. // Example: if Every=15m and Jitter=30s, each poll will occur at: // tick time + random(0..30s) // // If Jitter == 0 for polling sources, we compute a default jitter based on Every. // // For stream sources, Jitter is treated as *startup jitter only*. Jitter time.Duration } type Scheduler struct { Jobs []Job Out chan<- event.Event Logf Logger } // Run starts one goroutine per job. // Poll jobs run on their own interval and emit 0..N events per poll. // Stream jobs run continuously and emit events as they arrive. func (s *Scheduler) Run(ctx context.Context) error { if s.Out == nil { return fmt.Errorf("scheduler.Run: Out channel is nil") } if len(s.Jobs) == 0 { return fmt.Errorf("scheduler.Run: no jobs configured") } for _, job := range s.Jobs { job := job // capture loop variable go s.runJob(ctx, job) } <-ctx.Done() return ctx.Err() } func (s *Scheduler) runJob(ctx context.Context, job Job) { if job.Source == nil { s.logf("scheduler: job has nil source") return } // Stream sources: event-driven. if ss, ok := job.Source.(sources.StreamSource); ok { s.runStream(ctx, job, ss) return } // Poll sources: time-based. ps, ok := job.Source.(sources.Source) if !ok { s.logf("scheduler: source %T (%s) implements neither Poll() nor Run()", job.Source, job.Source.Name()) return } if job.Every <= 0 { s.logf("scheduler: polling job %q missing/invalid interval (sources[].every)", ps.Name()) return } s.runPoller(ctx, job, ps) } func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource) { // Optional startup jitter: helps avoid reconnect storms if many daemons start at once. if job.Jitter > 0 { rng := seededRNG(src.Name()) if !sleepJitter(ctx, rng, job.Jitter) { return } } // Stream sources should block until ctx cancel or fatal error. if err := src.Run(ctx, s.Out); err != nil && ctx.Err() == nil { s.logf("scheduler: stream source %q exited with error: %v", src.Name(), err) } } func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.Source) { // Compute jitter: either configured per job, or a sensible default. jitter := effectiveJitter(job.Every, job.Jitter) // Each worker gets its own RNG (safe + no lock contention). rng := seededRNG(src.Name()) // Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts. if !sleepJitter(ctx, rng, jitter) { return } // Immediate poll at startup (after startup jitter). s.pollOnce(ctx, src) t := time.NewTicker(job.Every) defer t.Stop() for { select { case <-t.C: // Per-tick jitter: spreads calls out within the interval. if !sleepJitter(ctx, rng, jitter) { return } s.pollOnce(ctx, src) case <-ctx.Done(): return } } } func (s *Scheduler) pollOnce(ctx context.Context, src sources.Source) { events, err := src.Poll(ctx) if err != nil { s.logf("scheduler: poll failed (%s): %v", src.Name(), err) return } for _, e := range events { select { case s.Out <- e: case <-ctx.Done(): return } } } func (s *Scheduler) logf(format string, args ...any) { if s.Logf == nil { return } s.Logf(format, args...) } // ---- helpers ---- func seededRNG(name string) *rand.Rand { seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(name)) return rand.New(rand.NewSource(seed)) } // effectiveJitter chooses a jitter value. // - If configuredMax > 0, use it (but clamp). // - Else default to min(every/10, 30s). // - Clamp to at most every/2 (so jitter can’t delay more than half the interval). func effectiveJitter(every time.Duration, configuredMax time.Duration) time.Duration { if every <= 0 { return 0 } j := configuredMax if j <= 0 { j = every / 10 if j > 30*time.Second { j = 30 * time.Second } } // Clamp jitter so it doesn’t dominate the schedule. maxAllowed := every / 2 if j > maxAllowed { j = maxAllowed } if j < 0 { j = 0 } return j } // sleepJitter sleeps for a random duration in [0, max]. // Returns false if the context is cancelled while waiting. func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool { if max <= 0 { return true } // Int63n requires a positive argument. // We add 1 so max itself is attainable. n := rng.Int63n(int64(max) + 1) d := time.Duration(n) timer := time.NewTimer(d) defer timer.Stop() select { case <-timer.C: return true case <-ctx.Done(): return false } } func hashStringFNV32a(s string) uint32 { h := fnv.New32a() _, _ = h.Write([]byte(s)) return h.Sum32() }