Files
feedkit/scheduler/scheduler.go

230 lines
5.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduler
import (
"context"
"fmt"
"hash/fnv"
"math/rand"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/feedkit/logging"
"gitea.maximumdirect.net/ejr/feedkit/sources"
)
// Logger is a printf-style logger used throughout scheduler.
// It is an alias to the shared feedkit logging type so callers can pass
// one function everywhere without type mismatch friction.
type Logger = logging.Logf
// Job describes one scheduler task.
//
// A Job may be backed by either:
// - a polling source (sources.Source): uses Every + jitter and calls Poll()
// - a stream source (sources.StreamSource): ignores Every and calls Run()
//
// Jitter behavior:
// - For polling sources: Jitter is applied at startup and before each poll tick.
// - For stream sources: Jitter is applied once at startup only (optional; useful to avoid
// reconnect storms when many instances start together).
type Job struct {
Source sources.Input
Every time.Duration
// Jitter is the maximum additional delay added before each poll.
// Example: if Every=15m and Jitter=30s, each poll will occur at:
// tick time + random(0..30s)
//
// If Jitter == 0 for polling sources, we compute a default jitter based on Every.
//
// For stream sources, Jitter is treated as *startup jitter only*.
Jitter time.Duration
}
type Scheduler struct {
Jobs []Job
Out chan<- event.Event
Logf Logger
}
// Run starts one goroutine per job.
// Poll jobs run on their own interval and emit 0..N events per poll.
// Stream jobs run continuously and emit events as they arrive.
func (s *Scheduler) Run(ctx context.Context) error {
if s.Out == nil {
return fmt.Errorf("scheduler.Run: Out channel is nil")
}
if len(s.Jobs) == 0 {
return fmt.Errorf("scheduler.Run: no jobs configured")
}
for _, job := range s.Jobs {
job := job // capture loop variable
go s.runJob(ctx, job)
}
<-ctx.Done()
return ctx.Err()
}
func (s *Scheduler) runJob(ctx context.Context, job Job) {
if job.Source == nil {
s.logf("scheduler: job has nil source")
return
}
// Stream sources: event-driven.
if ss, ok := job.Source.(sources.StreamSource); ok {
s.runStream(ctx, job, ss)
return
}
// Poll sources: time-based.
ps, ok := job.Source.(sources.Source)
if !ok {
s.logf("scheduler: source %T (%s) implements neither Poll() nor Run()", job.Source, job.Source.Name())
return
}
if job.Every <= 0 {
s.logf("scheduler: polling job %q missing/invalid interval (sources[].every)", ps.Name())
return
}
s.runPoller(ctx, job, ps)
}
func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource) {
// Optional startup jitter: helps avoid reconnect storms if many daemons start at once.
if job.Jitter > 0 {
rng := seededRNG(src.Name())
if !sleepJitter(ctx, rng, job.Jitter) {
return
}
}
// Stream sources should block until ctx cancel or fatal error.
if err := src.Run(ctx, s.Out); err != nil && ctx.Err() == nil {
s.logf("scheduler: stream source %q exited with error: %v", src.Name(), err)
}
}
func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.Source) {
// Compute jitter: either configured per job, or a sensible default.
jitter := effectiveJitter(job.Every, job.Jitter)
// Each worker gets its own RNG (safe + no lock contention).
rng := seededRNG(src.Name())
// Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts.
if !sleepJitter(ctx, rng, jitter) {
return
}
// Immediate poll at startup (after startup jitter).
s.pollOnce(ctx, src)
t := time.NewTicker(job.Every)
defer t.Stop()
for {
select {
case <-t.C:
// Per-tick jitter: spreads calls out within the interval.
if !sleepJitter(ctx, rng, jitter) {
return
}
s.pollOnce(ctx, src)
case <-ctx.Done():
return
}
}
}
func (s *Scheduler) pollOnce(ctx context.Context, src sources.Source) {
events, err := src.Poll(ctx)
if err != nil {
s.logf("scheduler: poll failed (%s): %v", src.Name(), err)
return
}
for _, e := range events {
select {
case s.Out <- e:
case <-ctx.Done():
return
}
}
}
func (s *Scheduler) logf(format string, args ...any) {
if s.Logf == nil {
return
}
s.Logf(format, args...)
}
// ---- helpers ----
func seededRNG(name string) *rand.Rand {
seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(name))
return rand.New(rand.NewSource(seed))
}
// effectiveJitter chooses a jitter value.
// - If configuredMax > 0, use it (but clamp).
// - Else default to min(every/10, 30s).
// - Clamp to at most every/2 (so jitter cant delay more than half the interval).
func effectiveJitter(every time.Duration, configuredMax time.Duration) time.Duration {
if every <= 0 {
return 0
}
j := configuredMax
if j <= 0 {
j = every / 10
if j > 30*time.Second {
j = 30 * time.Second
}
}
// Clamp jitter so it doesnt dominate the schedule.
maxAllowed := every / 2
if j > maxAllowed {
j = maxAllowed
}
if j < 0 {
j = 0
}
return j
}
// sleepJitter sleeps for a random duration in [0, max].
// Returns false if the context is cancelled while waiting.
func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool {
if max <= 0 {
return true
}
// Int63n requires a positive argument.
// We add 1 so max itself is attainable.
n := rng.Int63n(int64(max) + 1)
d := time.Duration(n)
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
return true
case <-ctx.Done():
return false
}
}
func hashStringFNV32a(s string) uint32 {
h := fnv.New32a()
_, _ = h.Write([]byte(s))
return h.Sum32()
}