package scheduler import ( "context" "errors" "fmt" "strings" "sync" "testing" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/sources" ) type testPollSource struct { name string } func (s testPollSource) Name() string { return s.name } func (s testPollSource) Poll(context.Context) ([]event.Event, error) { return nil, nil } type scriptedStreamSource struct { name string mu sync.Mutex calls int runs []func(context.Context, chan<- event.Event) error } func (s *scriptedStreamSource) Name() string { return s.name } func (s *scriptedStreamSource) Run(ctx context.Context, out chan<- event.Event) error { s.mu.Lock() call := s.calls s.calls++ var run func(context.Context, chan<- event.Event) error if call < len(s.runs) { run = s.runs[call] } s.mu.Unlock() if run != nil { return run(ctx, out) } <-ctx.Done() return ctx.Err() } func (s *scriptedStreamSource) CallCount() int { s.mu.Lock() defer s.mu.Unlock() return s.calls } type capturingLogger struct { mu sync.Mutex lines []string } func (l *capturingLogger) Logf(format string, args ...any) { l.mu.Lock() defer l.mu.Unlock() l.lines = append(l.lines, fmt.Sprintf(format, args...)) } func (l *capturingLogger) Contains(substr string) bool { l.mu.Lock() defer l.mu.Unlock() for _, line := range l.lines { if strings.Contains(line, substr) { return true } } return false } func TestSchedulerRunRestartsPlainStreamErrors(t *testing.T) { src := &scriptedStreamSource{ name: "stream-a", runs: []func(context.Context, chan<- event.Event) error{ func(context.Context, chan<- event.Event) error { return errors.New("temporary failure") }, func(context.Context, chan<- event.Event) error { return errors.New("temporary failure") }, }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() s := &Scheduler{ Jobs: []Job{{ Source: src, StreamBackoff: StreamBackoff{ Initial: time.Millisecond, Max: time.Millisecond, }, }}, Out: make(chan event.Event, 1), } errCh := make(chan error, 1) go func() { errCh <- s.Run(ctx) }() waitFor(t, func() bool { return src.CallCount() >= 3 }) cancel() err := <-errCh if !errors.Is(err, context.Canceled) { t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) } if src.CallCount() < 3 { t.Fatalf("stream call count = %d, want at least 3", src.CallCount()) } } func TestSchedulerRunFatalStreamErrorReturns(t *testing.T) { base := errors.New("fatal failure") src := &scriptedStreamSource{ name: "stream-fatal", runs: []func(context.Context, chan<- event.Event) error{ func(context.Context, chan<- event.Event) error { return sources.StreamFatal(base) }, }, } s := &Scheduler{ Jobs: []Job{{Source: src}}, Out: make(chan event.Event, 1), } err := s.Run(context.Background()) if err == nil { t.Fatalf("Scheduler.Run() error = nil, want fatal error") } if !sources.IsStreamFatal(err) { t.Fatalf("Scheduler.Run() error = %v, want fatal classification", err) } if !errors.Is(err, base) { t.Fatalf("Scheduler.Run() error does not wrap base fatal error: %v", err) } } func TestSchedulerRunStopPolicyStopsOnlyThatSource(t *testing.T) { src := &scriptedStreamSource{ name: "stream-stop", runs: []func(context.Context, chan<- event.Event) error{ func(context.Context, chan<- event.Event) error { return errors.New("stop now") }, }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() s := &Scheduler{ Jobs: []Job{{ Source: src, StreamExitPolicy: StreamExitPolicyStop, }}, Out: make(chan event.Event, 1), } errCh := make(chan error, 1) go func() { errCh <- s.Run(ctx) }() waitFor(t, func() bool { return src.CallCount() >= 1 }) time.Sleep(20 * time.Millisecond) select { case err := <-errCh: t.Fatalf("Scheduler.Run() returned early: %v", err) default: } if src.CallCount() != 1 { t.Fatalf("stream call count = %d, want 1", src.CallCount()) } cancel() err := <-errCh if !errors.Is(err, context.Canceled) { t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) } } func TestSchedulerRunFatalPolicyTreatsPlainErrorAsFatal(t *testing.T) { base := errors.New("plain failure") src := &scriptedStreamSource{ name: "stream-fatal-policy", runs: []func(context.Context, chan<- event.Event) error{ func(context.Context, chan<- event.Event) error { return base }, }, } s := &Scheduler{ Jobs: []Job{{ Source: src, StreamExitPolicy: StreamExitPolicyFatal, }}, Out: make(chan event.Event, 1), } err := s.Run(context.Background()) if err == nil { t.Fatalf("Scheduler.Run() error = nil, want fatal-policy error") } if !errors.Is(err, base) { t.Fatalf("Scheduler.Run() error does not wrap base error: %v", err) } } func TestSchedulerRunNilExitRestartsAsUnexpected(t *testing.T) { logger := &capturingLogger{} src := &scriptedStreamSource{ name: "stream-nil-exit", runs: []func(context.Context, chan<- event.Event) error{ func(context.Context, chan<- event.Event) error { return nil }, }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() s := &Scheduler{ Jobs: []Job{{ Source: src, StreamBackoff: StreamBackoff{ Initial: time.Millisecond, Max: time.Millisecond, }, }}, Out: make(chan event.Event, 1), Logf: logger.Logf, } errCh := make(chan error, 1) go func() { errCh <- s.Run(ctx) }() waitFor(t, func() bool { return src.CallCount() >= 2 }) cancel() err := <-errCh if !errors.Is(err, context.Canceled) { t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) } if !logger.Contains("exited unexpectedly without error") { t.Fatalf("expected log to mention unexpected nil stream exit") } } func TestSchedulerRunContextCancelDuringBackoff(t *testing.T) { src := &scriptedStreamSource{ name: "stream-backoff-cancel", runs: []func(context.Context, chan<- event.Event) error{ func(context.Context, chan<- event.Event) error { return errors.New("retry me") }, }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() s := &Scheduler{ Jobs: []Job{{ Source: src, StreamBackoff: StreamBackoff{ Initial: time.Second, Max: time.Second, }, }}, Out: make(chan event.Event, 1), } errCh := make(chan error, 1) go func() { errCh <- s.Run(ctx) }() waitFor(t, func() bool { return src.CallCount() >= 1 }) cancel() err := <-errCh if !errors.Is(err, context.Canceled) { t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) } time.Sleep(20 * time.Millisecond) if src.CallCount() != 1 { t.Fatalf("stream call count = %d, want 1", src.CallCount()) } } func TestNextStreamBackoffCapsAtMax(t *testing.T) { if got := nextStreamBackoff(500*time.Millisecond, 2*time.Second); got != time.Second { t.Fatalf("nextStreamBackoff() = %s, want 1s", got) } if got := nextStreamBackoff(time.Second, 2*time.Second); got != 2*time.Second { t.Fatalf("nextStreamBackoff() = %s, want 2s", got) } if got := nextStreamBackoff(2*time.Second, 2*time.Second); got != 2*time.Second { t.Fatalf("nextStreamBackoff() = %s, want 2s", got) } } func TestStreamRunWasStableAfterFiveMinutes(t *testing.T) { start := time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC) if streamRunWasStable(start, start.Add(4*time.Minute+59*time.Second)) { t.Fatalf("streamRunWasStable() = true, want false") } if !streamRunWasStable(start, start.Add(5*time.Minute)) { t.Fatalf("streamRunWasStable() = false, want true") } } func TestJobFromSourceConfigPollSource(t *testing.T) { job, err := JobFromSourceConfig(testPollSource{name: "poll-a"}, config.SourceConfig{ Name: "poll-a", Driver: "poll_driver", Every: config.Duration{Duration: time.Minute}, }) if err != nil { t.Fatalf("JobFromSourceConfig() error = %v", err) } if job.Every != time.Minute { t.Fatalf("Job.Every = %s, want 1m", job.Every) } } func TestJobFromSourceConfigPollSourceRejectsStreamParams(t *testing.T) { _, err := JobFromSourceConfig(testPollSource{name: "poll-a"}, config.SourceConfig{ Name: "poll-a", Driver: "poll_driver", Every: config.Duration{Duration: time.Minute}, Params: map[string]any{ "stream_exit_policy": "restart", }, }) if err == nil { t.Fatalf("JobFromSourceConfig() error = nil, want rejection") } if !strings.Contains(err.Error(), "only valid for stream sources") { t.Fatalf("JobFromSourceConfig() error = %q", err) } } func TestJobFromSourceConfigStreamSourceParsesDefaultsAndOverrides(t *testing.T) { src := &scriptedStreamSource{name: "stream-a"} job, err := JobFromSourceConfig(src, config.SourceConfig{ Name: "stream-a", Driver: "stream_driver", Mode: config.SourceModeStream, Params: map[string]any{ "stream_exit_policy": "stop", "stream_backoff_initial": "2s", "stream_backoff_max": "10s", "stream_backoff_jitter": "500ms", }, }) if err != nil { t.Fatalf("JobFromSourceConfig() error = %v", err) } if job.StreamExitPolicy != StreamExitPolicyStop { t.Fatalf("Job.StreamExitPolicy = %q, want %q", job.StreamExitPolicy, StreamExitPolicyStop) } if job.StreamBackoff.Initial != 2*time.Second { t.Fatalf("Job.StreamBackoff.Initial = %s, want 2s", job.StreamBackoff.Initial) } if job.StreamBackoff.Max != 10*time.Second { t.Fatalf("Job.StreamBackoff.Max = %s, want 10s", job.StreamBackoff.Max) } if job.StreamBackoff.Jitter != 500*time.Millisecond { t.Fatalf("Job.StreamBackoff.Jitter = %s, want 500ms", job.StreamBackoff.Jitter) } defaultJob, err := JobFromSourceConfig(src, config.SourceConfig{ Name: "stream-default", Driver: "stream_driver", Mode: config.SourceModeStream, }) if err != nil { t.Fatalf("JobFromSourceConfig() default error = %v", err) } if defaultJob.StreamExitPolicy != StreamExitPolicyRestart { t.Fatalf("default Job.StreamExitPolicy = %q, want restart", defaultJob.StreamExitPolicy) } if defaultJob.StreamBackoff.Initial != defaultStreamBackoffInitial { t.Fatalf("default Job.StreamBackoff.Initial = %s, want %s", defaultJob.StreamBackoff.Initial, defaultStreamBackoffInitial) } if defaultJob.StreamBackoff.Max != defaultStreamBackoffMax { t.Fatalf("default Job.StreamBackoff.Max = %s, want %s", defaultJob.StreamBackoff.Max, defaultStreamBackoffMax) } if defaultJob.StreamBackoff.Jitter != defaultStreamBackoffJitter { t.Fatalf("default Job.StreamBackoff.Jitter = %s, want %s", defaultJob.StreamBackoff.Jitter, defaultStreamBackoffJitter) } } func TestJobFromSourceConfigStreamSourceRejectsInvalidSettings(t *testing.T) { src := &scriptedStreamSource{name: "stream-b"} _, err := JobFromSourceConfig(src, config.SourceConfig{ Name: "stream-b", Driver: "stream_driver", Mode: config.SourceModeStream, Params: map[string]any{ "stream_exit_policy": "sometimes", }, }) if err == nil { t.Fatalf("JobFromSourceConfig() error = nil, want invalid policy error") } if !strings.Contains(err.Error(), "stream_exit_policy") { t.Fatalf("JobFromSourceConfig() error = %q", err) } _, err = JobFromSourceConfig(src, config.SourceConfig{ Name: "stream-b", Driver: "stream_driver", Mode: config.SourceModeStream, Params: map[string]any{ "stream_backoff_initial": "0s", }, }) if err == nil { t.Fatalf("JobFromSourceConfig() error = nil, want invalid initial backoff error") } if !strings.Contains(err.Error(), "stream_backoff_initial") { t.Fatalf("JobFromSourceConfig() error = %q", err) } _, err = JobFromSourceConfig(src, config.SourceConfig{ Name: "stream-b", Driver: "stream_driver", Mode: config.SourceModeStream, Params: map[string]any{ "stream_backoff_initial": "2s", "stream_backoff_max": "1s", }, }) if err == nil { t.Fatalf("JobFromSourceConfig() error = nil, want invalid max backoff error") } if !strings.Contains(err.Error(), "stream_backoff_max") { t.Fatalf("JobFromSourceConfig() error = %q", err) } } func TestJobFromSourceConfigStreamSourceRejectsEvery(t *testing.T) { src := &scriptedStreamSource{name: "stream-c"} _, err := JobFromSourceConfig(src, config.SourceConfig{ Name: "stream-c", Driver: "stream_driver", Mode: config.SourceModeStream, Every: config.Duration{Duration: time.Minute}, }) if err == nil { t.Fatalf("JobFromSourceConfig() error = nil, want every rejection") } if !strings.Contains(err.Error(), "sources[].every must be omitted") { t.Fatalf("JobFromSourceConfig() error = %q", err) } } func waitFor(t *testing.T, cond func() bool) { t.Helper() deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if cond() { return } time.Sleep(10 * time.Millisecond) } t.Fatalf("condition not satisfied before timeout") }