From 247937b65ea967b592811fa368ccb7627a7dc3b8 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Sun, 29 Mar 2026 08:34:35 -0500 Subject: [PATCH] Upgraded feedkit's handling of stream sources --- README.md | 2 +- doc.go | 3 +- scheduler/doc.go | 25 ++ scheduler/helpers.go | 138 ++++++++++ scheduler/scheduler.go | 190 +++++++++++++- scheduler/scheduler_test.go | 472 ++++++++++++++++++++++++++++++++++ sources/doc.go | 2 + sources/source.go | 3 + sources/stream_errors.go | 63 +++++ sources/stream_errors_test.go | 52 ++++ 10 files changed, 934 insertions(+), 16 deletions(-) create mode 100644 scheduler/doc.go create mode 100644 scheduler/helpers.go create mode 100644 scheduler/scheduler_test.go create mode 100644 sources/stream_errors.go create mode 100644 sources/stream_errors_test.go diff --git a/README.md b/README.md index 53e441c..71d1635 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ structure end-to-end. `feedkit` currently includes: - strict YAML config loading and validation - polling and streaming source abstractions -- scheduler orchestration for configured sources +- scheduler orchestration for configured sources and supervised stream workers - optional pipeline processors - built-in dedupe and normalization processors - route compilation and sink fanout diff --git a/doc.go b/doc.go index 7f98343..121ba6e 100644 --- a/doc.go +++ b/doc.go @@ -23,7 +23,8 @@ // reusable source helpers. // // - scheduler -// Runs configured sources on a cadence or as long-lived stream workers. +// Runs configured sources on a cadence and supervises long-lived stream +// workers with restart/fatal handling. // // - processors // Defines the generic processor interface and registry used to build diff --git a/scheduler/doc.go b/scheduler/doc.go new file mode 100644 index 0000000..744e105 --- /dev/null +++ b/scheduler/doc.go @@ -0,0 +1,25 @@ +// Package scheduler runs feedkit sources and forwards their events to the +// daemon event bus. +// +// External API surface: +// - Scheduler: runs configured polling and streaming jobs +// - Job: one scheduler task bound to a source +// - StreamExitPolicy: stream supervision policy for non-fatal exits +// - StreamBackoff: restart pacing for supervised stream sources +// +// Optional helpers from helpers.go: +// - JobFromSourceConfig: build a scheduler job from a configured source and +// feedkit-owned scheduling params +// +// Poll sources are run on a fixed cadence with optional jitter. Stream sources +// are supervised long-lived workers. Their generic feedkit controls live under +// sources[].params: +// - stream_exit_policy: restart|stop|fatal (default restart) +// - stream_backoff_initial: positive duration (default 1s) +// - stream_backoff_max: positive duration (default 1m) +// - stream_backoff_jitter: non-negative duration (default 250ms) +// +// Stream sources can classify exits with sources.StreamRetryable and +// sources.StreamFatal. Plain errors are treated as retryable by default, while +// fatal exits are propagated from Scheduler.Run so the daemon can shut down. +package scheduler diff --git a/scheduler/helpers.go b/scheduler/helpers.go new file mode 100644 index 0000000..aa7169e --- /dev/null +++ b/scheduler/helpers.go @@ -0,0 +1,138 @@ +package scheduler + +import ( + "fmt" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/sources" +) + +// JobFromSourceConfig builds a scheduler Job from a configured source and its +// generic feedkit config. +func JobFromSourceConfig(src sources.Input, cfg config.SourceConfig) (Job, error) { + if src == nil { + return Job{}, fmt.Errorf("scheduler: source %q is nil", cfg.Name) + } + + job := Job{ + Source: src, + Every: cfg.Every.Duration, + } + + if _, ok := src.(sources.StreamSource); ok { + if cfg.Every.Duration > 0 { + return Job{}, fmt.Errorf("source %q: sources[].every must be omitted for stream sources", cfg.Name) + } + + policy, err := parseStreamExitPolicy(cfg) + if err != nil { + return Job{}, err + } + backoff, err := parseStreamBackoff(cfg) + if err != nil { + return Job{}, err + } + + job.StreamExitPolicy = policy + job.StreamBackoff = backoff + return job, nil + } + + if _, ok := src.(sources.PollSource); ok { + if cfg.Every.Duration <= 0 { + return Job{}, fmt.Errorf("source %q: sources[].every must be > 0 for polling sources", cfg.Name) + } + if err := rejectStreamParams(cfg); err != nil { + return Job{}, err + } + return job, nil + } + + return Job{}, fmt.Errorf("scheduler: source %q implements neither PollSource nor StreamSource", cfg.Name) +} + +func parseStreamExitPolicy(cfg config.SourceConfig) (StreamExitPolicy, error) { + const key = "stream_exit_policy" + raw, exists := cfg.Params[key] + if !exists || raw == nil { + return StreamExitPolicyRestart, nil + } + + s, ok := raw.(string) + if !ok { + return "", fmt.Errorf("source %q: params.%s must be one of: restart, stop, fatal", cfg.Name, key) + } + + switch StreamExitPolicy(strings.ToLower(strings.TrimSpace(s))) { + case StreamExitPolicyRestart: + return StreamExitPolicyRestart, nil + case StreamExitPolicyStop: + return StreamExitPolicyStop, nil + case StreamExitPolicyFatal: + return StreamExitPolicyFatal, nil + default: + return "", fmt.Errorf("source %q: params.%s must be one of: restart, stop, fatal", cfg.Name, key) + } +} + +func parseStreamBackoff(cfg config.SourceConfig) (StreamBackoff, error) { + initial, err := parsePositiveOrDefaultDuration(cfg, "stream_backoff_initial", defaultStreamBackoffInitial) + if err != nil { + return StreamBackoff{}, err + } + max, err := parsePositiveOrDefaultDuration(cfg, "stream_backoff_max", defaultStreamBackoffMax) + if err != nil { + return StreamBackoff{}, err + } + jitter, err := parseNonNegativeOrDefaultDuration(cfg, "stream_backoff_jitter", defaultStreamBackoffJitter) + if err != nil { + return StreamBackoff{}, err + } + if max < initial { + return StreamBackoff{}, fmt.Errorf("source %q: params.stream_backoff_max must be >= params.stream_backoff_initial", cfg.Name) + } + return StreamBackoff{ + Initial: initial, + Max: max, + Jitter: jitter, + }, nil +} + +func rejectStreamParams(cfg config.SourceConfig) error { + streamKeys := []string{ + "stream_exit_policy", + "stream_backoff_initial", + "stream_backoff_max", + "stream_backoff_jitter", + } + for _, key := range streamKeys { + if _, ok := cfg.Params[key]; ok { + return fmt.Errorf("source %q: params.%s is only valid for stream sources", cfg.Name, key) + } + } + return nil +} + +func parsePositiveOrDefaultDuration(cfg config.SourceConfig, key string, def time.Duration) (time.Duration, error) { + if _, exists := cfg.Params[key]; !exists { + return def, nil + } + v, ok := cfg.ParamDuration(key) + if !ok || v <= 0 { + return 0, fmt.Errorf("source %q: params.%s must be a positive duration", cfg.Name, key) + } + return v, nil +} + +func parseNonNegativeOrDefaultDuration(cfg config.SourceConfig, key string, def time.Duration) (time.Duration, error) { + if _, exists := cfg.Params[key]; !exists { + return def, nil + } + v, ok := cfg.ParamDuration(key) + if !ok || v < 0 { + return 0, fmt.Errorf("source %q: params.%s must be a non-negative duration", cfg.Name, key) + } + return v, nil +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 858b07b..d2f6772 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "sync" "time" "gitea.maximumdirect.net/ejr/feedkit/event" @@ -28,8 +29,10 @@ type Logger = logging.Logf // - For stream sources: Jitter is applied once at startup only (optional; useful to avoid // reconnect storms when many instances start together). type Job struct { - Source sources.Input - Every time.Duration + Source sources.Input + Every time.Duration + StreamExitPolicy StreamExitPolicy + StreamBackoff StreamBackoff // Jitter is the maximum additional delay added before each poll. // Example: if Every=15m and Jitter=30s, each poll will occur at: @@ -41,12 +44,37 @@ type Job struct { Jitter time.Duration } +// StreamExitPolicy controls how the scheduler handles non-fatal stream exits. +type StreamExitPolicy string + +const ( + StreamExitPolicyRestart StreamExitPolicy = "restart" + StreamExitPolicyStop StreamExitPolicy = "stop" + StreamExitPolicyFatal StreamExitPolicy = "fatal" +) + +// StreamBackoff controls restart pacing for stream supervision. +type StreamBackoff struct { + Initial time.Duration + Max time.Duration + Jitter time.Duration +} + type Scheduler struct { Jobs []Job Out chan<- event.Event Logf Logger } +const ( + defaultStreamBackoffInitial = 1 * time.Second + defaultStreamBackoffMax = 1 * time.Minute + defaultStreamBackoffJitter = 250 * time.Millisecond + streamBackoffResetAfter = 5 * time.Minute +) + +var timeNow = time.Now + // Run starts one goroutine per job. // Poll jobs run on their own interval and emit 0..N events per poll. // Stream jobs run continuously and emit events as they arrive. @@ -58,16 +86,38 @@ func (s *Scheduler) Run(ctx context.Context) error { return fmt.Errorf("scheduler.Run: no jobs configured") } + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + fatalErrCh := make(chan error, 1) + var wg sync.WaitGroup for _, job := range s.Jobs { job := job // capture loop variable - go s.runJob(ctx, job) + wg.Add(1) + go func() { + defer wg.Done() + s.runJob(runCtx, job, fatalErrCh) + }() } - <-ctx.Done() - return ctx.Err() + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case err := <-fatalErrCh: + cancel() + <-done + return err + case <-runCtx.Done(): + <-done + return runCtx.Err() + } } -func (s *Scheduler) runJob(ctx context.Context, job Job) { +func (s *Scheduler) runJob(ctx context.Context, job Job, fatalErrCh chan<- error) { if job.Source == nil { s.logf("scheduler: job has nil source") return @@ -75,7 +125,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { // Stream sources: event-driven. if ss, ok := job.Source.(sources.StreamSource); ok { - s.runStream(ctx, job, ss) + s.runStream(ctx, job, ss, fatalErrCh) return } @@ -93,18 +143,51 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) { s.runPoller(ctx, job, ps) } -func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource) { +func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource, fatalErrCh chan<- error) { + policy := effectiveStreamExitPolicy(job.StreamExitPolicy) + backoff := effectiveStreamBackoff(job.StreamBackoff) + rng := seededRNG(src.Name()) + // Optional startup jitter: helps avoid reconnect storms if many daemons start at once. if job.Jitter > 0 { - rng := seededRNG(src.Name()) if !sleepJitter(ctx, rng, job.Jitter) { return } } - // Stream sources should block until ctx cancel or fatal error. - if err := src.Run(ctx, s.Out); err != nil && ctx.Err() == nil { - s.logf("scheduler: stream source %q exited with error: %v", src.Name(), err) + nextDelay := backoff.Initial + for { + startedAt := timeNow() + err := src.Run(ctx, s.Out) + if ctx.Err() != nil { + return + } + + normalizedErr := normalizeStreamExitError(src.Name(), err) + if sources.IsStreamFatal(normalizedErr) { + s.reportFatal(fatalErrCh, fmt.Errorf("scheduler: stream source %q exited fatally: %w", src.Name(), normalizedErr)) + return + } + + switch policy { + case StreamExitPolicyStop: + s.logf("scheduler: stream source %q stopped after exit: %v", src.Name(), normalizedErr) + return + case StreamExitPolicyFatal: + s.reportFatal(fatalErrCh, fmt.Errorf("scheduler: stream source %q exited under fatal policy: %w", src.Name(), normalizedErr)) + return + } + + if streamRunWasStable(startedAt, timeNow()) { + nextDelay = backoff.Initial + } + + delay := nextDelay + randomDuration(rng, backoff.Jitter) + s.logf("scheduler: stream source %q exited; restarting in %s: %v", src.Name(), delay, normalizedErr) + if !sleepDuration(ctx, delay) { + return + } + nextDelay = nextStreamBackoff(nextDelay, backoff.Max) } } @@ -164,10 +247,77 @@ func (s *Scheduler) logf(format string, args ...any) { s.Logf(format, args...) } +func (s *Scheduler) reportFatal(ch chan<- error, err error) { + if err == nil { + return + } + select { + case ch <- err: + default: + } +} + // ---- helpers ---- +func effectiveStreamExitPolicy(policy StreamExitPolicy) StreamExitPolicy { + switch policy { + case StreamExitPolicyStop, StreamExitPolicyFatal: + return policy + default: + return StreamExitPolicyRestart + } +} + +func effectiveStreamBackoff(cfg StreamBackoff) StreamBackoff { + out := cfg + if out.Initial <= 0 { + out.Initial = defaultStreamBackoffInitial + } + if out.Max <= 0 { + out.Max = defaultStreamBackoffMax + } + if out.Max < out.Initial { + out.Max = out.Initial + } + if out.Jitter < 0 { + out.Jitter = 0 + } + return out +} + +func normalizeStreamExitError(sourceName string, err error) error { + if err != nil { + return err + } + return sources.StreamRetryable(fmt.Errorf("stream source %q exited unexpectedly without error", sourceName)) +} + +func nextStreamBackoff(current, max time.Duration) time.Duration { + if current <= 0 { + current = defaultStreamBackoffInitial + } + if max <= 0 { + max = defaultStreamBackoffMax + } + if current >= max { + return max + } + next := current * 2 + if next < current || next > max { + return max + } + return next +} + +func streamRunWasStable(startedAt, endedAt time.Time) bool { + if startedAt.IsZero() || endedAt.IsZero() { + return false + } + return endedAt.Sub(startedAt) >= streamBackoffResetAfter +} + func seededRNG(name string) *rand.Rand { - seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(name)) + seed := timeNow().UnixNano() ^ int64(hashStringFNV32a(name)) return rand.New(rand.NewSource(seed)) } @@ -206,11 +356,23 @@ func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool { return true } + return sleepDuration(ctx, randomDuration(rng, max)) +} + +func randomDuration(rng *rand.Rand, max time.Duration) time.Duration { + if max <= 0 { + return 0 + } // Int63n requires a positive argument. // We add 1 so max itself is attainable. n := rng.Int63n(int64(max) + 1) - d := time.Duration(n) + return time.Duration(n) +} +func sleepDuration(ctx context.Context, d time.Duration) bool { + if d <= 0 { + return true + } timer := time.NewTimer(d) defer timer.Stop() diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go new file mode 100644 index 0000000..cdefe76 --- /dev/null +++ b/scheduler/scheduler_test.go @@ -0,0 +1,472 @@ +package scheduler + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/sources" +) + +type testPollSource struct { + name string +} + +func (s testPollSource) Name() string { return s.name } + +func (s testPollSource) Poll(context.Context) ([]event.Event, error) { return nil, nil } + +type scriptedStreamSource struct { + name string + mu sync.Mutex + calls int + runs []func(context.Context, chan<- event.Event) error +} + +func (s *scriptedStreamSource) Name() string { return s.name } + +func (s *scriptedStreamSource) Run(ctx context.Context, out chan<- event.Event) error { + s.mu.Lock() + call := s.calls + s.calls++ + var run func(context.Context, chan<- event.Event) error + if call < len(s.runs) { + run = s.runs[call] + } + s.mu.Unlock() + + if run != nil { + return run(ctx, out) + } + + <-ctx.Done() + return ctx.Err() +} + +func (s *scriptedStreamSource) CallCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.calls +} + +type capturingLogger struct { + mu sync.Mutex + lines []string +} + +func (l *capturingLogger) Logf(format string, args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, fmt.Sprintf(format, args...)) +} + +func (l *capturingLogger) Contains(substr string) bool { + l.mu.Lock() + defer l.mu.Unlock() + for _, line := range l.lines { + if strings.Contains(line, substr) { + return true + } + } + return false +} + +func TestSchedulerRunRestartsPlainStreamErrors(t *testing.T) { + src := &scriptedStreamSource{ + name: "stream-a", + runs: []func(context.Context, chan<- event.Event) error{ + func(context.Context, chan<- event.Event) error { return errors.New("temporary failure") }, + func(context.Context, chan<- event.Event) error { return errors.New("temporary failure") }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := &Scheduler{ + Jobs: []Job{{ + Source: src, + StreamBackoff: StreamBackoff{ + Initial: time.Millisecond, + Max: time.Millisecond, + }, + }}, + Out: make(chan event.Event, 1), + } + + errCh := make(chan error, 1) + go func() { errCh <- s.Run(ctx) }() + + waitFor(t, func() bool { return src.CallCount() >= 3 }) + cancel() + + err := <-errCh + if !errors.Is(err, context.Canceled) { + t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) + } + if src.CallCount() < 3 { + t.Fatalf("stream call count = %d, want at least 3", src.CallCount()) + } +} + +func TestSchedulerRunFatalStreamErrorReturns(t *testing.T) { + base := errors.New("fatal failure") + src := &scriptedStreamSource{ + name: "stream-fatal", + runs: []func(context.Context, chan<- event.Event) error{ + func(context.Context, chan<- event.Event) error { return sources.StreamFatal(base) }, + }, + } + + s := &Scheduler{ + Jobs: []Job{{Source: src}}, + Out: make(chan event.Event, 1), + } + + err := s.Run(context.Background()) + if err == nil { + t.Fatalf("Scheduler.Run() error = nil, want fatal error") + } + if !sources.IsStreamFatal(err) { + t.Fatalf("Scheduler.Run() error = %v, want fatal classification", err) + } + if !errors.Is(err, base) { + t.Fatalf("Scheduler.Run() error does not wrap base fatal error: %v", err) + } +} + +func TestSchedulerRunStopPolicyStopsOnlyThatSource(t *testing.T) { + src := &scriptedStreamSource{ + name: "stream-stop", + runs: []func(context.Context, chan<- event.Event) error{ + func(context.Context, chan<- event.Event) error { return errors.New("stop now") }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := &Scheduler{ + Jobs: []Job{{ + Source: src, + StreamExitPolicy: StreamExitPolicyStop, + }}, + Out: make(chan event.Event, 1), + } + + errCh := make(chan error, 1) + go func() { errCh <- s.Run(ctx) }() + + waitFor(t, func() bool { return src.CallCount() >= 1 }) + time.Sleep(20 * time.Millisecond) + + select { + case err := <-errCh: + t.Fatalf("Scheduler.Run() returned early: %v", err) + default: + } + + if src.CallCount() != 1 { + t.Fatalf("stream call count = %d, want 1", src.CallCount()) + } + + cancel() + err := <-errCh + if !errors.Is(err, context.Canceled) { + t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) + } +} + +func TestSchedulerRunFatalPolicyTreatsPlainErrorAsFatal(t *testing.T) { + base := errors.New("plain failure") + src := &scriptedStreamSource{ + name: "stream-fatal-policy", + runs: []func(context.Context, chan<- event.Event) error{ + func(context.Context, chan<- event.Event) error { return base }, + }, + } + + s := &Scheduler{ + Jobs: []Job{{ + Source: src, + StreamExitPolicy: StreamExitPolicyFatal, + }}, + Out: make(chan event.Event, 1), + } + + err := s.Run(context.Background()) + if err == nil { + t.Fatalf("Scheduler.Run() error = nil, want fatal-policy error") + } + if !errors.Is(err, base) { + t.Fatalf("Scheduler.Run() error does not wrap base error: %v", err) + } +} + +func TestSchedulerRunNilExitRestartsAsUnexpected(t *testing.T) { + logger := &capturingLogger{} + src := &scriptedStreamSource{ + name: "stream-nil-exit", + runs: []func(context.Context, chan<- event.Event) error{ + func(context.Context, chan<- event.Event) error { return nil }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := &Scheduler{ + Jobs: []Job{{ + Source: src, + StreamBackoff: StreamBackoff{ + Initial: time.Millisecond, + Max: time.Millisecond, + }, + }}, + Out: make(chan event.Event, 1), + Logf: logger.Logf, + } + + errCh := make(chan error, 1) + go func() { errCh <- s.Run(ctx) }() + + waitFor(t, func() bool { return src.CallCount() >= 2 }) + cancel() + + err := <-errCh + if !errors.Is(err, context.Canceled) { + t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) + } + if !logger.Contains("exited unexpectedly without error") { + t.Fatalf("expected log to mention unexpected nil stream exit") + } +} + +func TestSchedulerRunContextCancelDuringBackoff(t *testing.T) { + src := &scriptedStreamSource{ + name: "stream-backoff-cancel", + runs: []func(context.Context, chan<- event.Event) error{ + func(context.Context, chan<- event.Event) error { return errors.New("retry me") }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := &Scheduler{ + Jobs: []Job{{ + Source: src, + StreamBackoff: StreamBackoff{ + Initial: time.Second, + Max: time.Second, + }, + }}, + Out: make(chan event.Event, 1), + } + + errCh := make(chan error, 1) + go func() { errCh <- s.Run(ctx) }() + + waitFor(t, func() bool { return src.CallCount() >= 1 }) + cancel() + + err := <-errCh + if !errors.Is(err, context.Canceled) { + t.Fatalf("Scheduler.Run() error = %v, want context canceled", err) + } + time.Sleep(20 * time.Millisecond) + if src.CallCount() != 1 { + t.Fatalf("stream call count = %d, want 1", src.CallCount()) + } +} + +func TestNextStreamBackoffCapsAtMax(t *testing.T) { + if got := nextStreamBackoff(500*time.Millisecond, 2*time.Second); got != time.Second { + t.Fatalf("nextStreamBackoff() = %s, want 1s", got) + } + if got := nextStreamBackoff(time.Second, 2*time.Second); got != 2*time.Second { + t.Fatalf("nextStreamBackoff() = %s, want 2s", got) + } + if got := nextStreamBackoff(2*time.Second, 2*time.Second); got != 2*time.Second { + t.Fatalf("nextStreamBackoff() = %s, want 2s", got) + } +} + +func TestStreamRunWasStableAfterFiveMinutes(t *testing.T) { + start := time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC) + if streamRunWasStable(start, start.Add(4*time.Minute+59*time.Second)) { + t.Fatalf("streamRunWasStable() = true, want false") + } + if !streamRunWasStable(start, start.Add(5*time.Minute)) { + t.Fatalf("streamRunWasStable() = false, want true") + } +} + +func TestJobFromSourceConfigPollSource(t *testing.T) { + job, err := JobFromSourceConfig(testPollSource{name: "poll-a"}, config.SourceConfig{ + Name: "poll-a", + Driver: "poll_driver", + Every: config.Duration{Duration: time.Minute}, + }) + if err != nil { + t.Fatalf("JobFromSourceConfig() error = %v", err) + } + if job.Every != time.Minute { + t.Fatalf("Job.Every = %s, want 1m", job.Every) + } +} + +func TestJobFromSourceConfigPollSourceRejectsStreamParams(t *testing.T) { + _, err := JobFromSourceConfig(testPollSource{name: "poll-a"}, config.SourceConfig{ + Name: "poll-a", + Driver: "poll_driver", + Every: config.Duration{Duration: time.Minute}, + Params: map[string]any{ + "stream_exit_policy": "restart", + }, + }) + if err == nil { + t.Fatalf("JobFromSourceConfig() error = nil, want rejection") + } + if !strings.Contains(err.Error(), "only valid for stream sources") { + t.Fatalf("JobFromSourceConfig() error = %q", err) + } +} + +func TestJobFromSourceConfigStreamSourceParsesDefaultsAndOverrides(t *testing.T) { + src := &scriptedStreamSource{name: "stream-a"} + + job, err := JobFromSourceConfig(src, config.SourceConfig{ + Name: "stream-a", + Driver: "stream_driver", + Mode: config.SourceModeStream, + Params: map[string]any{ + "stream_exit_policy": "stop", + "stream_backoff_initial": "2s", + "stream_backoff_max": "10s", + "stream_backoff_jitter": "500ms", + }, + }) + if err != nil { + t.Fatalf("JobFromSourceConfig() error = %v", err) + } + if job.StreamExitPolicy != StreamExitPolicyStop { + t.Fatalf("Job.StreamExitPolicy = %q, want %q", job.StreamExitPolicy, StreamExitPolicyStop) + } + if job.StreamBackoff.Initial != 2*time.Second { + t.Fatalf("Job.StreamBackoff.Initial = %s, want 2s", job.StreamBackoff.Initial) + } + if job.StreamBackoff.Max != 10*time.Second { + t.Fatalf("Job.StreamBackoff.Max = %s, want 10s", job.StreamBackoff.Max) + } + if job.StreamBackoff.Jitter != 500*time.Millisecond { + t.Fatalf("Job.StreamBackoff.Jitter = %s, want 500ms", job.StreamBackoff.Jitter) + } + + defaultJob, err := JobFromSourceConfig(src, config.SourceConfig{ + Name: "stream-default", + Driver: "stream_driver", + Mode: config.SourceModeStream, + }) + if err != nil { + t.Fatalf("JobFromSourceConfig() default error = %v", err) + } + if defaultJob.StreamExitPolicy != StreamExitPolicyRestart { + t.Fatalf("default Job.StreamExitPolicy = %q, want restart", defaultJob.StreamExitPolicy) + } + if defaultJob.StreamBackoff.Initial != defaultStreamBackoffInitial { + t.Fatalf("default Job.StreamBackoff.Initial = %s, want %s", defaultJob.StreamBackoff.Initial, defaultStreamBackoffInitial) + } + if defaultJob.StreamBackoff.Max != defaultStreamBackoffMax { + t.Fatalf("default Job.StreamBackoff.Max = %s, want %s", defaultJob.StreamBackoff.Max, defaultStreamBackoffMax) + } + if defaultJob.StreamBackoff.Jitter != defaultStreamBackoffJitter { + t.Fatalf("default Job.StreamBackoff.Jitter = %s, want %s", defaultJob.StreamBackoff.Jitter, defaultStreamBackoffJitter) + } +} + +func TestJobFromSourceConfigStreamSourceRejectsInvalidSettings(t *testing.T) { + src := &scriptedStreamSource{name: "stream-b"} + + _, err := JobFromSourceConfig(src, config.SourceConfig{ + Name: "stream-b", + Driver: "stream_driver", + Mode: config.SourceModeStream, + Params: map[string]any{ + "stream_exit_policy": "sometimes", + }, + }) + if err == nil { + t.Fatalf("JobFromSourceConfig() error = nil, want invalid policy error") + } + if !strings.Contains(err.Error(), "stream_exit_policy") { + t.Fatalf("JobFromSourceConfig() error = %q", err) + } + + _, err = JobFromSourceConfig(src, config.SourceConfig{ + Name: "stream-b", + Driver: "stream_driver", + Mode: config.SourceModeStream, + Params: map[string]any{ + "stream_backoff_initial": "0s", + }, + }) + if err == nil { + t.Fatalf("JobFromSourceConfig() error = nil, want invalid initial backoff error") + } + if !strings.Contains(err.Error(), "stream_backoff_initial") { + t.Fatalf("JobFromSourceConfig() error = %q", err) + } + + _, err = JobFromSourceConfig(src, config.SourceConfig{ + Name: "stream-b", + Driver: "stream_driver", + Mode: config.SourceModeStream, + Params: map[string]any{ + "stream_backoff_initial": "2s", + "stream_backoff_max": "1s", + }, + }) + if err == nil { + t.Fatalf("JobFromSourceConfig() error = nil, want invalid max backoff error") + } + if !strings.Contains(err.Error(), "stream_backoff_max") { + t.Fatalf("JobFromSourceConfig() error = %q", err) + } +} + +func TestJobFromSourceConfigStreamSourceRejectsEvery(t *testing.T) { + src := &scriptedStreamSource{name: "stream-c"} + + _, err := JobFromSourceConfig(src, config.SourceConfig{ + Name: "stream-c", + Driver: "stream_driver", + Mode: config.SourceModeStream, + Every: config.Duration{Duration: time.Minute}, + }) + if err == nil { + t.Fatalf("JobFromSourceConfig() error = nil, want every rejection") + } + if !strings.Contains(err.Error(), "sources[].every must be omitted") { + t.Fatalf("JobFromSourceConfig() error = %q", err) + } +} + +func waitFor(t *testing.T, cond func() bool) { + t.Helper() + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("condition not satisfied before timeout") +} diff --git a/sources/doc.go b/sources/doc.go index befd5a6..70d4eb4 100644 --- a/sources/doc.go +++ b/sources/doc.go @@ -5,6 +5,8 @@ // - Input: common source identity surface // - PollSource: polling source interface // - StreamSource: streaming source interface +// - StreamRetryable / StreamFatal / IsStreamRetryable / IsStreamFatal: +// stream exit classification helpers // - Registry / NewRegistry: source driver registry and builders // - HTTPSource / NewHTTPSource: reusable HTTP polling helper // diff --git a/sources/source.go b/sources/source.go index efb954b..9ca4015 100644 --- a/sources/source.go +++ b/sources/source.go @@ -35,6 +35,9 @@ type PollSource interface { // // Run should block, producing events into `out` until ctx is cancelled or a fatal error occurs. // It MUST NOT close out (the scheduler/daemon owns the bus). +// +// Stream sources can classify exits by wrapping errors with StreamRetryable or +// StreamFatal. Plain non-nil errors are treated as retryable by the scheduler. type StreamSource interface { Input Run(ctx context.Context, out chan<- event.Event) error diff --git a/sources/stream_errors.go b/sources/stream_errors.go new file mode 100644 index 0000000..5f33ec6 --- /dev/null +++ b/sources/stream_errors.go @@ -0,0 +1,63 @@ +package sources + +import "errors" + +type streamRetryableError struct { + err error +} + +func (e *streamRetryableError) Error() string { + if e.err == nil { + return "retryable stream error" + } + return e.err.Error() +} + +func (e *streamRetryableError) Unwrap() error { return e.err } + +type streamFatalError struct { + err error +} + +func (e *streamFatalError) Error() string { + if e.err == nil { + return "fatal stream error" + } + return e.err.Error() +} + +func (e *streamFatalError) Unwrap() error { return e.err } + +// StreamRetryable marks a stream-source exit as retryable. +func StreamRetryable(err error) error { + if err == nil { + return nil + } + return &streamRetryableError{err: err} +} + +// StreamFatal marks a stream-source exit as fatal. +func StreamFatal(err error) error { + if err == nil { + return nil + } + return &streamFatalError{err: err} +} + +// IsStreamRetryable reports whether err contains a retryable stream marker. +func IsStreamRetryable(err error) bool { + if err == nil { + return false + } + var target *streamRetryableError + return errors.As(err, &target) +} + +// IsStreamFatal reports whether err contains a fatal stream marker. +func IsStreamFatal(err error) bool { + if err == nil { + return false + } + var target *streamFatalError + return errors.As(err, &target) +} diff --git a/sources/stream_errors_test.go b/sources/stream_errors_test.go new file mode 100644 index 0000000..69e8e75 --- /dev/null +++ b/sources/stream_errors_test.go @@ -0,0 +1,52 @@ +package sources + +import ( + "errors" + "fmt" + "testing" +) + +func TestStreamRetryableWrapsThroughErrorChains(t *testing.T) { + base := errors.New("retry me") + err := fmt.Errorf("outer: %w", StreamRetryable(base)) + + if !IsStreamRetryable(err) { + t.Fatalf("IsStreamRetryable() = false, want true") + } + if IsStreamFatal(err) { + t.Fatalf("IsStreamFatal() = true, want false") + } + if !errors.Is(err, base) { + t.Fatalf("errors.Is(err, base) = false, want true") + } +} + +func TestStreamFatalWrapsThroughErrorChains(t *testing.T) { + base := errors.New("fatal") + err := fmt.Errorf("outer: %w", StreamFatal(base)) + + if !IsStreamFatal(err) { + t.Fatalf("IsStreamFatal() = false, want true") + } + if IsStreamRetryable(err) { + t.Fatalf("IsStreamRetryable() = true, want false") + } + if !errors.Is(err, base) { + t.Fatalf("errors.Is(err, base) = false, want true") + } +} + +func TestStreamErrorHelpersNil(t *testing.T) { + if StreamRetryable(nil) != nil { + t.Fatalf("StreamRetryable(nil) != nil") + } + if StreamFatal(nil) != nil { + t.Fatalf("StreamFatal(nil) != nil") + } + if IsStreamRetryable(nil) { + t.Fatalf("IsStreamRetryable(nil) = true") + } + if IsStreamFatal(nil) { + t.Fatalf("IsStreamFatal(nil) = true") + } +}