Refactored the scheduler and source interfaces to accommondate both polling (e.g., HTTP) sources and streaming (e.g., message queue) sources.
This commit is contained in:
@@ -17,15 +17,27 @@ import (
|
||||
// one function everywhere without type mismatch friction.
|
||||
type Logger = logging.Logf
|
||||
|
||||
// Job describes one scheduler task.
|
||||
//
|
||||
// A Job may be backed by either:
|
||||
// - a polling source (sources.Source): uses Every + jitter and calls Poll()
|
||||
// - a stream source (sources.StreamSource): ignores Every and calls Run()
|
||||
//
|
||||
// Jitter behavior:
|
||||
// - For polling sources: Jitter is applied at startup and before each poll tick.
|
||||
// - For stream sources: Jitter is applied once at startup only (optional; useful to avoid
|
||||
// reconnect storms when many instances start together).
|
||||
type Job struct {
|
||||
Source sources.Source
|
||||
Source sources.Input
|
||||
Every time.Duration
|
||||
|
||||
// Jitter is the maximum additional delay added before each poll.
|
||||
// Example: if Every=15m and Jitter=30s, each poll will occur at:
|
||||
// tick time + random(0..30s)
|
||||
//
|
||||
// If Jitter == 0, we compute a default jitter based on Every.
|
||||
// If Jitter == 0 for polling sources, we compute a default jitter based on Every.
|
||||
//
|
||||
// For stream sources, Jitter is treated as *startup jitter only*.
|
||||
Jitter time.Duration
|
||||
}
|
||||
|
||||
@@ -35,8 +47,9 @@ type Scheduler struct {
|
||||
Logf Logger
|
||||
}
|
||||
|
||||
// Run starts one polling goroutine per job.
|
||||
// Each job runs on its own interval and emits 0..N events per poll.
|
||||
// Run starts one goroutine per job.
|
||||
// Poll jobs run on their own interval and emit 0..N events per poll.
|
||||
// Stream jobs run continuously and emit events as they arrive.
|
||||
func (s *Scheduler) Run(ctx context.Context) error {
|
||||
if s.Out == nil {
|
||||
return fmt.Errorf("scheduler.Run: Out channel is nil")
|
||||
@@ -59,17 +72,48 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
||||
s.logf("scheduler: job has nil source")
|
||||
return
|
||||
}
|
||||
if job.Every <= 0 {
|
||||
s.logf("scheduler: job %s has invalid interval", job.Source.Name())
|
||||
|
||||
// Stream sources: event-driven.
|
||||
if ss, ok := job.Source.(sources.StreamSource); ok {
|
||||
s.runStream(ctx, job, ss)
|
||||
return
|
||||
}
|
||||
|
||||
// Poll sources: time-based.
|
||||
ps, ok := job.Source.(sources.Source)
|
||||
if !ok {
|
||||
s.logf("scheduler: source %T (%s) implements neither Poll() nor Run()", job.Source, job.Source.Name())
|
||||
return
|
||||
}
|
||||
if job.Every <= 0 {
|
||||
s.logf("scheduler: polling job %q missing/invalid interval (sources[].every)", ps.Name())
|
||||
return
|
||||
}
|
||||
|
||||
s.runPoller(ctx, job, ps)
|
||||
}
|
||||
|
||||
func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource) {
|
||||
// Optional startup jitter: helps avoid reconnect storms if many daemons start at once.
|
||||
if job.Jitter > 0 {
|
||||
rng := seededRNG(src.Name())
|
||||
if !sleepJitter(ctx, rng, job.Jitter) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Stream sources should block until ctx cancel or fatal error.
|
||||
if err := src.Run(ctx, s.Out); err != nil && ctx.Err() == nil {
|
||||
s.logf("scheduler: stream source %q exited with error: %v", src.Name(), err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) runPoller(ctx context.Context, job Job, src sources.Source) {
|
||||
// Compute jitter: either configured per job, or a sensible default.
|
||||
jitter := effectiveJitter(job.Every, job.Jitter)
|
||||
|
||||
// Each worker gets its own RNG (safe + no lock contention).
|
||||
seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(job.Source.Name()))
|
||||
rng := rand.New(rand.NewSource(seed))
|
||||
rng := seededRNG(src.Name())
|
||||
|
||||
// Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts.
|
||||
if !sleepJitter(ctx, rng, jitter) {
|
||||
@@ -77,7 +121,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
||||
}
|
||||
|
||||
// Immediate poll at startup (after startup jitter).
|
||||
s.pollOnce(ctx, job)
|
||||
s.pollOnce(ctx, src)
|
||||
|
||||
t := time.NewTicker(job.Every)
|
||||
defer t.Stop()
|
||||
@@ -89,7 +133,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
||||
if !sleepJitter(ctx, rng, jitter) {
|
||||
return
|
||||
}
|
||||
s.pollOnce(ctx, job)
|
||||
s.pollOnce(ctx, src)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -97,10 +141,10 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) pollOnce(ctx context.Context, job Job) {
|
||||
events, err := job.Source.Poll(ctx)
|
||||
func (s *Scheduler) pollOnce(ctx context.Context, src sources.Source) {
|
||||
events, err := src.Poll(ctx)
|
||||
if err != nil {
|
||||
s.logf("scheduler: poll failed (%s): %v", job.Source.Name(), err)
|
||||
s.logf("scheduler: poll failed (%s): %v", src.Name(), err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -120,6 +164,13 @@ func (s *Scheduler) logf(format string, args ...any) {
|
||||
s.Logf(format, args...)
|
||||
}
|
||||
|
||||
// ---- helpers ----
|
||||
|
||||
func seededRNG(name string) *rand.Rand {
|
||||
seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(name))
|
||||
return rand.New(rand.NewSource(seed))
|
||||
}
|
||||
|
||||
// effectiveJitter chooses a jitter value.
|
||||
// - If configuredMax > 0, use it (but clamp).
|
||||
// - Else default to min(every/10, 30s).
|
||||
|
||||
Reference in New Issue
Block a user