package scheduler import ( "context" "fmt" "hash/fnv" "math/rand" "time" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/sources" ) type Job struct { Source sources.Source Every time.Duration // Jitter is the maximum additional delay added before each poll. // Example: if Every=15m and Jitter=30s, each poll will occur at: // tick time + random(0..30s) // // If Jitter == 0, we compute a default jitter based on Every. Jitter time.Duration } type Logger func(format string, args ...any) type Scheduler struct { Jobs []Job Out chan<- event.Event Logf Logger } // Run starts one polling goroutine per job. // Each job runs on its own interval and emits 0..N events per poll. func (s *Scheduler) Run(ctx context.Context) error { if s.Out == nil { return fmt.Errorf("scheduler.Run: Out channel is nil") } if len(s.Jobs) == 0 { return fmt.Errorf("scheduler.Run: no jobs configured") } for _, job := range s.Jobs { job := job // capture loop variable go s.runJob(ctx, job) } <-ctx.Done() return ctx.Err() } func (s *Scheduler) runJob(ctx context.Context, job Job) { if job.Source == nil { s.logf("scheduler: job has nil source") return } if job.Every <= 0 { s.logf("scheduler: job %s has invalid interval", job.Source.Name()) return } // Compute jitter: either configured per job, or a sensible default. jitter := effectiveJitter(job.Every, job.Jitter) // Each worker gets its own RNG (safe + no lock contention). seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(job.Source.Name())) rng := rand.New(rand.NewSource(seed)) // Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts. if !sleepJitter(ctx, rng, jitter) { return } // Immediate poll at startup (after startup jitter). s.pollOnce(ctx, job) t := time.NewTicker(job.Every) defer t.Stop() for { select { case <-t.C: // Per-tick jitter: spreads calls out within the interval. if !sleepJitter(ctx, rng, jitter) { return } s.pollOnce(ctx, job) case <-ctx.Done(): return } } } func (s *Scheduler) pollOnce(ctx context.Context, job Job) { events, err := job.Source.Poll(ctx) if err != nil { s.logf("scheduler: poll failed (%s): %v", job.Source.Name(), err) return } for _, e := range events { select { case s.Out <- e: case <-ctx.Done(): return } } } func (s *Scheduler) logf(format string, args ...any) { if s.Logf == nil { return } s.Logf(format, args...) } // effectiveJitter chooses a jitter value. // - If configuredMax > 0, use it (but clamp). // - Else default to min(every/10, 30s). // - Clamp to at most every/2 (so jitter can’t delay more than half the interval). func effectiveJitter(every time.Duration, configuredMax time.Duration) time.Duration { if every <= 0 { return 0 } j := configuredMax if j <= 0 { j = every / 10 if j > 30*time.Second { j = 30 * time.Second } } // Clamp jitter so it doesn’t dominate the schedule. maxAllowed := every / 2 if j > maxAllowed { j = maxAllowed } if j < 0 { j = 0 } return j } // sleepJitter sleeps for a random duration in [0, max]. // Returns false if the context is cancelled while waiting. func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool { if max <= 0 { return true } // Int63n requires a positive argument. // We add 1 so max itself is attainable. n := rng.Int63n(int64(max) + 1) d := time.Duration(n) timer := time.NewTimer(d) defer timer.Stop() select { case <-timer.C: return true case <-ctx.Done(): return false } } func hashStringFNV32a(s string) uint32 { h := fnv.New32a() _, _ = h.Write([]byte(s)) return h.Sum32() }