Upgraded feedkit's handling of stream sources
This commit is contained in:
@@ -49,7 +49,7 @@ structure end-to-end.
|
|||||||
`feedkit` currently includes:
|
`feedkit` currently includes:
|
||||||
- strict YAML config loading and validation
|
- strict YAML config loading and validation
|
||||||
- polling and streaming source abstractions
|
- polling and streaming source abstractions
|
||||||
- scheduler orchestration for configured sources
|
- scheduler orchestration for configured sources and supervised stream workers
|
||||||
- optional pipeline processors
|
- optional pipeline processors
|
||||||
- built-in dedupe and normalization processors
|
- built-in dedupe and normalization processors
|
||||||
- route compilation and sink fanout
|
- route compilation and sink fanout
|
||||||
|
|||||||
3
doc.go
3
doc.go
@@ -23,7 +23,8 @@
|
|||||||
// reusable source helpers.
|
// reusable source helpers.
|
||||||
//
|
//
|
||||||
// - scheduler
|
// - scheduler
|
||||||
// Runs configured sources on a cadence or as long-lived stream workers.
|
// Runs configured sources on a cadence and supervises long-lived stream
|
||||||
|
// workers with restart/fatal handling.
|
||||||
//
|
//
|
||||||
// - processors
|
// - processors
|
||||||
// Defines the generic processor interface and registry used to build
|
// Defines the generic processor interface and registry used to build
|
||||||
|
|||||||
25
scheduler/doc.go
Normal file
25
scheduler/doc.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
// Package scheduler runs feedkit sources and forwards their events to the
|
||||||
|
// daemon event bus.
|
||||||
|
//
|
||||||
|
// External API surface:
|
||||||
|
// - Scheduler: runs configured polling and streaming jobs
|
||||||
|
// - Job: one scheduler task bound to a source
|
||||||
|
// - StreamExitPolicy: stream supervision policy for non-fatal exits
|
||||||
|
// - StreamBackoff: restart pacing for supervised stream sources
|
||||||
|
//
|
||||||
|
// Optional helpers from helpers.go:
|
||||||
|
// - JobFromSourceConfig: build a scheduler job from a configured source and
|
||||||
|
// feedkit-owned scheduling params
|
||||||
|
//
|
||||||
|
// Poll sources are run on a fixed cadence with optional jitter. Stream sources
|
||||||
|
// are supervised long-lived workers. Their generic feedkit controls live under
|
||||||
|
// sources[].params:
|
||||||
|
// - stream_exit_policy: restart|stop|fatal (default restart)
|
||||||
|
// - stream_backoff_initial: positive duration (default 1s)
|
||||||
|
// - stream_backoff_max: positive duration (default 1m)
|
||||||
|
// - stream_backoff_jitter: non-negative duration (default 250ms)
|
||||||
|
//
|
||||||
|
// Stream sources can classify exits with sources.StreamRetryable and
|
||||||
|
// sources.StreamFatal. Plain errors are treated as retryable by default, while
|
||||||
|
// fatal exits are propagated from Scheduler.Run so the daemon can shut down.
|
||||||
|
package scheduler
|
||||||
138
scheduler/helpers.go
Normal file
138
scheduler/helpers.go
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
)
|
||||||
|
|
||||||
|
// JobFromSourceConfig builds a scheduler Job from a configured source and its
|
||||||
|
// generic feedkit config.
|
||||||
|
func JobFromSourceConfig(src sources.Input, cfg config.SourceConfig) (Job, error) {
|
||||||
|
if src == nil {
|
||||||
|
return Job{}, fmt.Errorf("scheduler: source %q is nil", cfg.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
job := Job{
|
||||||
|
Source: src,
|
||||||
|
Every: cfg.Every.Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := src.(sources.StreamSource); ok {
|
||||||
|
if cfg.Every.Duration > 0 {
|
||||||
|
return Job{}, fmt.Errorf("source %q: sources[].every must be omitted for stream sources", cfg.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
policy, err := parseStreamExitPolicy(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return Job{}, err
|
||||||
|
}
|
||||||
|
backoff, err := parseStreamBackoff(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return Job{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
job.StreamExitPolicy = policy
|
||||||
|
job.StreamBackoff = backoff
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := src.(sources.PollSource); ok {
|
||||||
|
if cfg.Every.Duration <= 0 {
|
||||||
|
return Job{}, fmt.Errorf("source %q: sources[].every must be > 0 for polling sources", cfg.Name)
|
||||||
|
}
|
||||||
|
if err := rejectStreamParams(cfg); err != nil {
|
||||||
|
return Job{}, err
|
||||||
|
}
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return Job{}, fmt.Errorf("scheduler: source %q implements neither PollSource nor StreamSource", cfg.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStreamExitPolicy(cfg config.SourceConfig) (StreamExitPolicy, error) {
|
||||||
|
const key = "stream_exit_policy"
|
||||||
|
raw, exists := cfg.Params[key]
|
||||||
|
if !exists || raw == nil {
|
||||||
|
return StreamExitPolicyRestart, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s, ok := raw.(string)
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("source %q: params.%s must be one of: restart, stop, fatal", cfg.Name, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch StreamExitPolicy(strings.ToLower(strings.TrimSpace(s))) {
|
||||||
|
case StreamExitPolicyRestart:
|
||||||
|
return StreamExitPolicyRestart, nil
|
||||||
|
case StreamExitPolicyStop:
|
||||||
|
return StreamExitPolicyStop, nil
|
||||||
|
case StreamExitPolicyFatal:
|
||||||
|
return StreamExitPolicyFatal, nil
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("source %q: params.%s must be one of: restart, stop, fatal", cfg.Name, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStreamBackoff(cfg config.SourceConfig) (StreamBackoff, error) {
|
||||||
|
initial, err := parsePositiveOrDefaultDuration(cfg, "stream_backoff_initial", defaultStreamBackoffInitial)
|
||||||
|
if err != nil {
|
||||||
|
return StreamBackoff{}, err
|
||||||
|
}
|
||||||
|
max, err := parsePositiveOrDefaultDuration(cfg, "stream_backoff_max", defaultStreamBackoffMax)
|
||||||
|
if err != nil {
|
||||||
|
return StreamBackoff{}, err
|
||||||
|
}
|
||||||
|
jitter, err := parseNonNegativeOrDefaultDuration(cfg, "stream_backoff_jitter", defaultStreamBackoffJitter)
|
||||||
|
if err != nil {
|
||||||
|
return StreamBackoff{}, err
|
||||||
|
}
|
||||||
|
if max < initial {
|
||||||
|
return StreamBackoff{}, fmt.Errorf("source %q: params.stream_backoff_max must be >= params.stream_backoff_initial", cfg.Name)
|
||||||
|
}
|
||||||
|
return StreamBackoff{
|
||||||
|
Initial: initial,
|
||||||
|
Max: max,
|
||||||
|
Jitter: jitter,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func rejectStreamParams(cfg config.SourceConfig) error {
|
||||||
|
streamKeys := []string{
|
||||||
|
"stream_exit_policy",
|
||||||
|
"stream_backoff_initial",
|
||||||
|
"stream_backoff_max",
|
||||||
|
"stream_backoff_jitter",
|
||||||
|
}
|
||||||
|
for _, key := range streamKeys {
|
||||||
|
if _, ok := cfg.Params[key]; ok {
|
||||||
|
return fmt.Errorf("source %q: params.%s is only valid for stream sources", cfg.Name, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePositiveOrDefaultDuration(cfg config.SourceConfig, key string, def time.Duration) (time.Duration, error) {
|
||||||
|
if _, exists := cfg.Params[key]; !exists {
|
||||||
|
return def, nil
|
||||||
|
}
|
||||||
|
v, ok := cfg.ParamDuration(key)
|
||||||
|
if !ok || v <= 0 {
|
||||||
|
return 0, fmt.Errorf("source %q: params.%s must be a positive duration", cfg.Name, key)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseNonNegativeOrDefaultDuration(cfg config.SourceConfig, key string, def time.Duration) (time.Duration, error) {
|
||||||
|
if _, exists := cfg.Params[key]; !exists {
|
||||||
|
return def, nil
|
||||||
|
}
|
||||||
|
v, ok := cfg.ParamDuration(key)
|
||||||
|
if !ok || v < 0 {
|
||||||
|
return 0, fmt.Errorf("source %q: params.%s must be a non-negative duration", cfg.Name, key)
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
@@ -28,8 +29,10 @@ type Logger = logging.Logf
|
|||||||
// - For stream sources: Jitter is applied once at startup only (optional; useful to avoid
|
// - For stream sources: Jitter is applied once at startup only (optional; useful to avoid
|
||||||
// reconnect storms when many instances start together).
|
// reconnect storms when many instances start together).
|
||||||
type Job struct {
|
type Job struct {
|
||||||
Source sources.Input
|
Source sources.Input
|
||||||
Every time.Duration
|
Every time.Duration
|
||||||
|
StreamExitPolicy StreamExitPolicy
|
||||||
|
StreamBackoff StreamBackoff
|
||||||
|
|
||||||
// Jitter is the maximum additional delay added before each poll.
|
// Jitter is the maximum additional delay added before each poll.
|
||||||
// Example: if Every=15m and Jitter=30s, each poll will occur at:
|
// Example: if Every=15m and Jitter=30s, each poll will occur at:
|
||||||
@@ -41,12 +44,37 @@ type Job struct {
|
|||||||
Jitter time.Duration
|
Jitter time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamExitPolicy controls how the scheduler handles non-fatal stream exits.
|
||||||
|
type StreamExitPolicy string
|
||||||
|
|
||||||
|
const (
|
||||||
|
StreamExitPolicyRestart StreamExitPolicy = "restart"
|
||||||
|
StreamExitPolicyStop StreamExitPolicy = "stop"
|
||||||
|
StreamExitPolicyFatal StreamExitPolicy = "fatal"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StreamBackoff controls restart pacing for stream supervision.
|
||||||
|
type StreamBackoff struct {
|
||||||
|
Initial time.Duration
|
||||||
|
Max time.Duration
|
||||||
|
Jitter time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
Jobs []Job
|
Jobs []Job
|
||||||
Out chan<- event.Event
|
Out chan<- event.Event
|
||||||
Logf Logger
|
Logf Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultStreamBackoffInitial = 1 * time.Second
|
||||||
|
defaultStreamBackoffMax = 1 * time.Minute
|
||||||
|
defaultStreamBackoffJitter = 250 * time.Millisecond
|
||||||
|
streamBackoffResetAfter = 5 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
var timeNow = time.Now
|
||||||
|
|
||||||
// Run starts one goroutine per job.
|
// Run starts one goroutine per job.
|
||||||
// Poll jobs run on their own interval and emit 0..N events per poll.
|
// Poll jobs run on their own interval and emit 0..N events per poll.
|
||||||
// Stream jobs run continuously and emit events as they arrive.
|
// Stream jobs run continuously and emit events as they arrive.
|
||||||
@@ -58,16 +86,38 @@ func (s *Scheduler) Run(ctx context.Context) error {
|
|||||||
return fmt.Errorf("scheduler.Run: no jobs configured")
|
return fmt.Errorf("scheduler.Run: no jobs configured")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
runCtx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
fatalErrCh := make(chan error, 1)
|
||||||
|
var wg sync.WaitGroup
|
||||||
for _, job := range s.Jobs {
|
for _, job := range s.Jobs {
|
||||||
job := job // capture loop variable
|
job := job // capture loop variable
|
||||||
go s.runJob(ctx, job)
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
s.runJob(runCtx, job, fatalErrCh)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
<-ctx.Done()
|
done := make(chan struct{})
|
||||||
return ctx.Err()
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-fatalErrCh:
|
||||||
|
cancel()
|
||||||
|
<-done
|
||||||
|
return err
|
||||||
|
case <-runCtx.Done():
|
||||||
|
<-done
|
||||||
|
return runCtx.Err()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
func (s *Scheduler) runJob(ctx context.Context, job Job, fatalErrCh chan<- error) {
|
||||||
if job.Source == nil {
|
if job.Source == nil {
|
||||||
s.logf("scheduler: job has nil source")
|
s.logf("scheduler: job has nil source")
|
||||||
return
|
return
|
||||||
@@ -75,7 +125,7 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
|||||||
|
|
||||||
// Stream sources: event-driven.
|
// Stream sources: event-driven.
|
||||||
if ss, ok := job.Source.(sources.StreamSource); ok {
|
if ss, ok := job.Source.(sources.StreamSource); ok {
|
||||||
s.runStream(ctx, job, ss)
|
s.runStream(ctx, job, ss, fatalErrCh)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,18 +143,51 @@ func (s *Scheduler) runJob(ctx context.Context, job Job) {
|
|||||||
s.runPoller(ctx, job, ps)
|
s.runPoller(ctx, job, ps)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource) {
|
func (s *Scheduler) runStream(ctx context.Context, job Job, src sources.StreamSource, fatalErrCh chan<- error) {
|
||||||
|
policy := effectiveStreamExitPolicy(job.StreamExitPolicy)
|
||||||
|
backoff := effectiveStreamBackoff(job.StreamBackoff)
|
||||||
|
rng := seededRNG(src.Name())
|
||||||
|
|
||||||
// Optional startup jitter: helps avoid reconnect storms if many daemons start at once.
|
// Optional startup jitter: helps avoid reconnect storms if many daemons start at once.
|
||||||
if job.Jitter > 0 {
|
if job.Jitter > 0 {
|
||||||
rng := seededRNG(src.Name())
|
|
||||||
if !sleepJitter(ctx, rng, job.Jitter) {
|
if !sleepJitter(ctx, rng, job.Jitter) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stream sources should block until ctx cancel or fatal error.
|
nextDelay := backoff.Initial
|
||||||
if err := src.Run(ctx, s.Out); err != nil && ctx.Err() == nil {
|
for {
|
||||||
s.logf("scheduler: stream source %q exited with error: %v", src.Name(), err)
|
startedAt := timeNow()
|
||||||
|
err := src.Run(ctx, s.Out)
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
normalizedErr := normalizeStreamExitError(src.Name(), err)
|
||||||
|
if sources.IsStreamFatal(normalizedErr) {
|
||||||
|
s.reportFatal(fatalErrCh, fmt.Errorf("scheduler: stream source %q exited fatally: %w", src.Name(), normalizedErr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch policy {
|
||||||
|
case StreamExitPolicyStop:
|
||||||
|
s.logf("scheduler: stream source %q stopped after exit: %v", src.Name(), normalizedErr)
|
||||||
|
return
|
||||||
|
case StreamExitPolicyFatal:
|
||||||
|
s.reportFatal(fatalErrCh, fmt.Errorf("scheduler: stream source %q exited under fatal policy: %w", src.Name(), normalizedErr))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if streamRunWasStable(startedAt, timeNow()) {
|
||||||
|
nextDelay = backoff.Initial
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := nextDelay + randomDuration(rng, backoff.Jitter)
|
||||||
|
s.logf("scheduler: stream source %q exited; restarting in %s: %v", src.Name(), delay, normalizedErr)
|
||||||
|
if !sleepDuration(ctx, delay) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nextDelay = nextStreamBackoff(nextDelay, backoff.Max)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,10 +247,77 @@ func (s *Scheduler) logf(format string, args ...any) {
|
|||||||
s.Logf(format, args...)
|
s.Logf(format, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) reportFatal(ch chan<- error, err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case ch <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---- helpers ----
|
// ---- helpers ----
|
||||||
|
|
||||||
|
func effectiveStreamExitPolicy(policy StreamExitPolicy) StreamExitPolicy {
|
||||||
|
switch policy {
|
||||||
|
case StreamExitPolicyStop, StreamExitPolicyFatal:
|
||||||
|
return policy
|
||||||
|
default:
|
||||||
|
return StreamExitPolicyRestart
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func effectiveStreamBackoff(cfg StreamBackoff) StreamBackoff {
|
||||||
|
out := cfg
|
||||||
|
if out.Initial <= 0 {
|
||||||
|
out.Initial = defaultStreamBackoffInitial
|
||||||
|
}
|
||||||
|
if out.Max <= 0 {
|
||||||
|
out.Max = defaultStreamBackoffMax
|
||||||
|
}
|
||||||
|
if out.Max < out.Initial {
|
||||||
|
out.Max = out.Initial
|
||||||
|
}
|
||||||
|
if out.Jitter < 0 {
|
||||||
|
out.Jitter = 0
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeStreamExitError(sourceName string, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return sources.StreamRetryable(fmt.Errorf("stream source %q exited unexpectedly without error", sourceName))
|
||||||
|
}
|
||||||
|
|
||||||
|
func nextStreamBackoff(current, max time.Duration) time.Duration {
|
||||||
|
if current <= 0 {
|
||||||
|
current = defaultStreamBackoffInitial
|
||||||
|
}
|
||||||
|
if max <= 0 {
|
||||||
|
max = defaultStreamBackoffMax
|
||||||
|
}
|
||||||
|
if current >= max {
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
next := current * 2
|
||||||
|
if next < current || next > max {
|
||||||
|
return max
|
||||||
|
}
|
||||||
|
return next
|
||||||
|
}
|
||||||
|
|
||||||
|
func streamRunWasStable(startedAt, endedAt time.Time) bool {
|
||||||
|
if startedAt.IsZero() || endedAt.IsZero() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return endedAt.Sub(startedAt) >= streamBackoffResetAfter
|
||||||
|
}
|
||||||
|
|
||||||
func seededRNG(name string) *rand.Rand {
|
func seededRNG(name string) *rand.Rand {
|
||||||
seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(name))
|
seed := timeNow().UnixNano() ^ int64(hashStringFNV32a(name))
|
||||||
return rand.New(rand.NewSource(seed))
|
return rand.New(rand.NewSource(seed))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -206,11 +356,23 @@ func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return sleepDuration(ctx, randomDuration(rng, max))
|
||||||
|
}
|
||||||
|
|
||||||
|
func randomDuration(rng *rand.Rand, max time.Duration) time.Duration {
|
||||||
|
if max <= 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
// Int63n requires a positive argument.
|
// Int63n requires a positive argument.
|
||||||
// We add 1 so max itself is attainable.
|
// We add 1 so max itself is attainable.
|
||||||
n := rng.Int63n(int64(max) + 1)
|
n := rng.Int63n(int64(max) + 1)
|
||||||
d := time.Duration(n)
|
return time.Duration(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sleepDuration(ctx context.Context, d time.Duration) bool {
|
||||||
|
if d <= 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
timer := time.NewTimer(d)
|
timer := time.NewTimer(d)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
|
|||||||
472
scheduler/scheduler_test.go
Normal file
472
scheduler/scheduler_test.go
Normal file
@@ -0,0 +1,472 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testPollSource struct {
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s testPollSource) Name() string { return s.name }
|
||||||
|
|
||||||
|
func (s testPollSource) Poll(context.Context) ([]event.Event, error) { return nil, nil }
|
||||||
|
|
||||||
|
type scriptedStreamSource struct {
|
||||||
|
name string
|
||||||
|
mu sync.Mutex
|
||||||
|
calls int
|
||||||
|
runs []func(context.Context, chan<- event.Event) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *scriptedStreamSource) Name() string { return s.name }
|
||||||
|
|
||||||
|
func (s *scriptedStreamSource) Run(ctx context.Context, out chan<- event.Event) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
call := s.calls
|
||||||
|
s.calls++
|
||||||
|
var run func(context.Context, chan<- event.Event) error
|
||||||
|
if call < len(s.runs) {
|
||||||
|
run = s.runs[call]
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if run != nil {
|
||||||
|
return run(ctx, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *scriptedStreamSource) CallCount() int {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
return s.calls
|
||||||
|
}
|
||||||
|
|
||||||
|
type capturingLogger struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
lines []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *capturingLogger) Logf(format string, args ...any) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
l.lines = append(l.lines, fmt.Sprintf(format, args...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *capturingLogger) Contains(substr string) bool {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
for _, line := range l.lines {
|
||||||
|
if strings.Contains(line, substr) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRunRestartsPlainStreamErrors(t *testing.T) {
|
||||||
|
src := &scriptedStreamSource{
|
||||||
|
name: "stream-a",
|
||||||
|
runs: []func(context.Context, chan<- event.Event) error{
|
||||||
|
func(context.Context, chan<- event.Event) error { return errors.New("temporary failure") },
|
||||||
|
func(context.Context, chan<- event.Event) error { return errors.New("temporary failure") },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s := &Scheduler{
|
||||||
|
Jobs: []Job{{
|
||||||
|
Source: src,
|
||||||
|
StreamBackoff: StreamBackoff{
|
||||||
|
Initial: time.Millisecond,
|
||||||
|
Max: time.Millisecond,
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
Out: make(chan event.Event, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() { errCh <- s.Run(ctx) }()
|
||||||
|
|
||||||
|
waitFor(t, func() bool { return src.CallCount() >= 3 })
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := <-errCh
|
||||||
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Fatalf("Scheduler.Run() error = %v, want context canceled", err)
|
||||||
|
}
|
||||||
|
if src.CallCount() < 3 {
|
||||||
|
t.Fatalf("stream call count = %d, want at least 3", src.CallCount())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRunFatalStreamErrorReturns(t *testing.T) {
|
||||||
|
base := errors.New("fatal failure")
|
||||||
|
src := &scriptedStreamSource{
|
||||||
|
name: "stream-fatal",
|
||||||
|
runs: []func(context.Context, chan<- event.Event) error{
|
||||||
|
func(context.Context, chan<- event.Event) error { return sources.StreamFatal(base) },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Scheduler{
|
||||||
|
Jobs: []Job{{Source: src}},
|
||||||
|
Out: make(chan event.Event, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.Run(context.Background())
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Scheduler.Run() error = nil, want fatal error")
|
||||||
|
}
|
||||||
|
if !sources.IsStreamFatal(err) {
|
||||||
|
t.Fatalf("Scheduler.Run() error = %v, want fatal classification", err)
|
||||||
|
}
|
||||||
|
if !errors.Is(err, base) {
|
||||||
|
t.Fatalf("Scheduler.Run() error does not wrap base fatal error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRunStopPolicyStopsOnlyThatSource(t *testing.T) {
|
||||||
|
src := &scriptedStreamSource{
|
||||||
|
name: "stream-stop",
|
||||||
|
runs: []func(context.Context, chan<- event.Event) error{
|
||||||
|
func(context.Context, chan<- event.Event) error { return errors.New("stop now") },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s := &Scheduler{
|
||||||
|
Jobs: []Job{{
|
||||||
|
Source: src,
|
||||||
|
StreamExitPolicy: StreamExitPolicyStop,
|
||||||
|
}},
|
||||||
|
Out: make(chan event.Event, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() { errCh <- s.Run(ctx) }()
|
||||||
|
|
||||||
|
waitFor(t, func() bool { return src.CallCount() >= 1 })
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
t.Fatalf("Scheduler.Run() returned early: %v", err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if src.CallCount() != 1 {
|
||||||
|
t.Fatalf("stream call count = %d, want 1", src.CallCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
err := <-errCh
|
||||||
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Fatalf("Scheduler.Run() error = %v, want context canceled", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRunFatalPolicyTreatsPlainErrorAsFatal(t *testing.T) {
|
||||||
|
base := errors.New("plain failure")
|
||||||
|
src := &scriptedStreamSource{
|
||||||
|
name: "stream-fatal-policy",
|
||||||
|
runs: []func(context.Context, chan<- event.Event) error{
|
||||||
|
func(context.Context, chan<- event.Event) error { return base },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Scheduler{
|
||||||
|
Jobs: []Job{{
|
||||||
|
Source: src,
|
||||||
|
StreamExitPolicy: StreamExitPolicyFatal,
|
||||||
|
}},
|
||||||
|
Out: make(chan event.Event, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.Run(context.Background())
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Scheduler.Run() error = nil, want fatal-policy error")
|
||||||
|
}
|
||||||
|
if !errors.Is(err, base) {
|
||||||
|
t.Fatalf("Scheduler.Run() error does not wrap base error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRunNilExitRestartsAsUnexpected(t *testing.T) {
|
||||||
|
logger := &capturingLogger{}
|
||||||
|
src := &scriptedStreamSource{
|
||||||
|
name: "stream-nil-exit",
|
||||||
|
runs: []func(context.Context, chan<- event.Event) error{
|
||||||
|
func(context.Context, chan<- event.Event) error { return nil },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s := &Scheduler{
|
||||||
|
Jobs: []Job{{
|
||||||
|
Source: src,
|
||||||
|
StreamBackoff: StreamBackoff{
|
||||||
|
Initial: time.Millisecond,
|
||||||
|
Max: time.Millisecond,
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
Out: make(chan event.Event, 1),
|
||||||
|
Logf: logger.Logf,
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() { errCh <- s.Run(ctx) }()
|
||||||
|
|
||||||
|
waitFor(t, func() bool { return src.CallCount() >= 2 })
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := <-errCh
|
||||||
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Fatalf("Scheduler.Run() error = %v, want context canceled", err)
|
||||||
|
}
|
||||||
|
if !logger.Contains("exited unexpectedly without error") {
|
||||||
|
t.Fatalf("expected log to mention unexpected nil stream exit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerRunContextCancelDuringBackoff(t *testing.T) {
|
||||||
|
src := &scriptedStreamSource{
|
||||||
|
name: "stream-backoff-cancel",
|
||||||
|
runs: []func(context.Context, chan<- event.Event) error{
|
||||||
|
func(context.Context, chan<- event.Event) error { return errors.New("retry me") },
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
s := &Scheduler{
|
||||||
|
Jobs: []Job{{
|
||||||
|
Source: src,
|
||||||
|
StreamBackoff: StreamBackoff{
|
||||||
|
Initial: time.Second,
|
||||||
|
Max: time.Second,
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
Out: make(chan event.Event, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() { errCh <- s.Run(ctx) }()
|
||||||
|
|
||||||
|
waitFor(t, func() bool { return src.CallCount() >= 1 })
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
err := <-errCh
|
||||||
|
if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Fatalf("Scheduler.Run() error = %v, want context canceled", err)
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
if src.CallCount() != 1 {
|
||||||
|
t.Fatalf("stream call count = %d, want 1", src.CallCount())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNextStreamBackoffCapsAtMax(t *testing.T) {
|
||||||
|
if got := nextStreamBackoff(500*time.Millisecond, 2*time.Second); got != time.Second {
|
||||||
|
t.Fatalf("nextStreamBackoff() = %s, want 1s", got)
|
||||||
|
}
|
||||||
|
if got := nextStreamBackoff(time.Second, 2*time.Second); got != 2*time.Second {
|
||||||
|
t.Fatalf("nextStreamBackoff() = %s, want 2s", got)
|
||||||
|
}
|
||||||
|
if got := nextStreamBackoff(2*time.Second, 2*time.Second); got != 2*time.Second {
|
||||||
|
t.Fatalf("nextStreamBackoff() = %s, want 2s", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamRunWasStableAfterFiveMinutes(t *testing.T) {
|
||||||
|
start := time.Date(2026, 3, 29, 12, 0, 0, 0, time.UTC)
|
||||||
|
if streamRunWasStable(start, start.Add(4*time.Minute+59*time.Second)) {
|
||||||
|
t.Fatalf("streamRunWasStable() = true, want false")
|
||||||
|
}
|
||||||
|
if !streamRunWasStable(start, start.Add(5*time.Minute)) {
|
||||||
|
t.Fatalf("streamRunWasStable() = false, want true")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobFromSourceConfigPollSource(t *testing.T) {
|
||||||
|
job, err := JobFromSourceConfig(testPollSource{name: "poll-a"}, config.SourceConfig{
|
||||||
|
Name: "poll-a",
|
||||||
|
Driver: "poll_driver",
|
||||||
|
Every: config.Duration{Duration: time.Minute},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %v", err)
|
||||||
|
}
|
||||||
|
if job.Every != time.Minute {
|
||||||
|
t.Fatalf("Job.Every = %s, want 1m", job.Every)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobFromSourceConfigPollSourceRejectsStreamParams(t *testing.T) {
|
||||||
|
_, err := JobFromSourceConfig(testPollSource{name: "poll-a"}, config.SourceConfig{
|
||||||
|
Name: "poll-a",
|
||||||
|
Driver: "poll_driver",
|
||||||
|
Every: config.Duration{Duration: time.Minute},
|
||||||
|
Params: map[string]any{
|
||||||
|
"stream_exit_policy": "restart",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = nil, want rejection")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "only valid for stream sources") {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %q", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobFromSourceConfigStreamSourceParsesDefaultsAndOverrides(t *testing.T) {
|
||||||
|
src := &scriptedStreamSource{name: "stream-a"}
|
||||||
|
|
||||||
|
job, err := JobFromSourceConfig(src, config.SourceConfig{
|
||||||
|
Name: "stream-a",
|
||||||
|
Driver: "stream_driver",
|
||||||
|
Mode: config.SourceModeStream,
|
||||||
|
Params: map[string]any{
|
||||||
|
"stream_exit_policy": "stop",
|
||||||
|
"stream_backoff_initial": "2s",
|
||||||
|
"stream_backoff_max": "10s",
|
||||||
|
"stream_backoff_jitter": "500ms",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %v", err)
|
||||||
|
}
|
||||||
|
if job.StreamExitPolicy != StreamExitPolicyStop {
|
||||||
|
t.Fatalf("Job.StreamExitPolicy = %q, want %q", job.StreamExitPolicy, StreamExitPolicyStop)
|
||||||
|
}
|
||||||
|
if job.StreamBackoff.Initial != 2*time.Second {
|
||||||
|
t.Fatalf("Job.StreamBackoff.Initial = %s, want 2s", job.StreamBackoff.Initial)
|
||||||
|
}
|
||||||
|
if job.StreamBackoff.Max != 10*time.Second {
|
||||||
|
t.Fatalf("Job.StreamBackoff.Max = %s, want 10s", job.StreamBackoff.Max)
|
||||||
|
}
|
||||||
|
if job.StreamBackoff.Jitter != 500*time.Millisecond {
|
||||||
|
t.Fatalf("Job.StreamBackoff.Jitter = %s, want 500ms", job.StreamBackoff.Jitter)
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultJob, err := JobFromSourceConfig(src, config.SourceConfig{
|
||||||
|
Name: "stream-default",
|
||||||
|
Driver: "stream_driver",
|
||||||
|
Mode: config.SourceModeStream,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() default error = %v", err)
|
||||||
|
}
|
||||||
|
if defaultJob.StreamExitPolicy != StreamExitPolicyRestart {
|
||||||
|
t.Fatalf("default Job.StreamExitPolicy = %q, want restart", defaultJob.StreamExitPolicy)
|
||||||
|
}
|
||||||
|
if defaultJob.StreamBackoff.Initial != defaultStreamBackoffInitial {
|
||||||
|
t.Fatalf("default Job.StreamBackoff.Initial = %s, want %s", defaultJob.StreamBackoff.Initial, defaultStreamBackoffInitial)
|
||||||
|
}
|
||||||
|
if defaultJob.StreamBackoff.Max != defaultStreamBackoffMax {
|
||||||
|
t.Fatalf("default Job.StreamBackoff.Max = %s, want %s", defaultJob.StreamBackoff.Max, defaultStreamBackoffMax)
|
||||||
|
}
|
||||||
|
if defaultJob.StreamBackoff.Jitter != defaultStreamBackoffJitter {
|
||||||
|
t.Fatalf("default Job.StreamBackoff.Jitter = %s, want %s", defaultJob.StreamBackoff.Jitter, defaultStreamBackoffJitter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobFromSourceConfigStreamSourceRejectsInvalidSettings(t *testing.T) {
|
||||||
|
src := &scriptedStreamSource{name: "stream-b"}
|
||||||
|
|
||||||
|
_, err := JobFromSourceConfig(src, config.SourceConfig{
|
||||||
|
Name: "stream-b",
|
||||||
|
Driver: "stream_driver",
|
||||||
|
Mode: config.SourceModeStream,
|
||||||
|
Params: map[string]any{
|
||||||
|
"stream_exit_policy": "sometimes",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = nil, want invalid policy error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "stream_exit_policy") {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %q", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = JobFromSourceConfig(src, config.SourceConfig{
|
||||||
|
Name: "stream-b",
|
||||||
|
Driver: "stream_driver",
|
||||||
|
Mode: config.SourceModeStream,
|
||||||
|
Params: map[string]any{
|
||||||
|
"stream_backoff_initial": "0s",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = nil, want invalid initial backoff error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "stream_backoff_initial") {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %q", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = JobFromSourceConfig(src, config.SourceConfig{
|
||||||
|
Name: "stream-b",
|
||||||
|
Driver: "stream_driver",
|
||||||
|
Mode: config.SourceModeStream,
|
||||||
|
Params: map[string]any{
|
||||||
|
"stream_backoff_initial": "2s",
|
||||||
|
"stream_backoff_max": "1s",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = nil, want invalid max backoff error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "stream_backoff_max") {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %q", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobFromSourceConfigStreamSourceRejectsEvery(t *testing.T) {
|
||||||
|
src := &scriptedStreamSource{name: "stream-c"}
|
||||||
|
|
||||||
|
_, err := JobFromSourceConfig(src, config.SourceConfig{
|
||||||
|
Name: "stream-c",
|
||||||
|
Driver: "stream_driver",
|
||||||
|
Mode: config.SourceModeStream,
|
||||||
|
Every: config.Duration{Duration: time.Minute},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = nil, want every rejection")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "sources[].every must be omitted") {
|
||||||
|
t.Fatalf("JobFromSourceConfig() error = %q", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitFor(t *testing.T, cond func() bool) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if cond() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
t.Fatalf("condition not satisfied before timeout")
|
||||||
|
}
|
||||||
@@ -5,6 +5,8 @@
|
|||||||
// - Input: common source identity surface
|
// - Input: common source identity surface
|
||||||
// - PollSource: polling source interface
|
// - PollSource: polling source interface
|
||||||
// - StreamSource: streaming source interface
|
// - StreamSource: streaming source interface
|
||||||
|
// - StreamRetryable / StreamFatal / IsStreamRetryable / IsStreamFatal:
|
||||||
|
// stream exit classification helpers
|
||||||
// - Registry / NewRegistry: source driver registry and builders
|
// - Registry / NewRegistry: source driver registry and builders
|
||||||
// - HTTPSource / NewHTTPSource: reusable HTTP polling helper
|
// - HTTPSource / NewHTTPSource: reusable HTTP polling helper
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -35,6 +35,9 @@ type PollSource interface {
|
|||||||
//
|
//
|
||||||
// Run should block, producing events into `out` until ctx is cancelled or a fatal error occurs.
|
// Run should block, producing events into `out` until ctx is cancelled or a fatal error occurs.
|
||||||
// It MUST NOT close out (the scheduler/daemon owns the bus).
|
// It MUST NOT close out (the scheduler/daemon owns the bus).
|
||||||
|
//
|
||||||
|
// Stream sources can classify exits by wrapping errors with StreamRetryable or
|
||||||
|
// StreamFatal. Plain non-nil errors are treated as retryable by the scheduler.
|
||||||
type StreamSource interface {
|
type StreamSource interface {
|
||||||
Input
|
Input
|
||||||
Run(ctx context.Context, out chan<- event.Event) error
|
Run(ctx context.Context, out chan<- event.Event) error
|
||||||
|
|||||||
63
sources/stream_errors.go
Normal file
63
sources/stream_errors.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package sources
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
type streamRetryableError struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *streamRetryableError) Error() string {
|
||||||
|
if e.err == nil {
|
||||||
|
return "retryable stream error"
|
||||||
|
}
|
||||||
|
return e.err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *streamRetryableError) Unwrap() error { return e.err }
|
||||||
|
|
||||||
|
type streamFatalError struct {
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *streamFatalError) Error() string {
|
||||||
|
if e.err == nil {
|
||||||
|
return "fatal stream error"
|
||||||
|
}
|
||||||
|
return e.err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *streamFatalError) Unwrap() error { return e.err }
|
||||||
|
|
||||||
|
// StreamRetryable marks a stream-source exit as retryable.
|
||||||
|
func StreamRetryable(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &streamRetryableError{err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamFatal marks a stream-source exit as fatal.
|
||||||
|
func StreamFatal(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &streamFatalError{err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsStreamRetryable reports whether err contains a retryable stream marker.
|
||||||
|
func IsStreamRetryable(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var target *streamRetryableError
|
||||||
|
return errors.As(err, &target)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsStreamFatal reports whether err contains a fatal stream marker.
|
||||||
|
func IsStreamFatal(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
var target *streamFatalError
|
||||||
|
return errors.As(err, &target)
|
||||||
|
}
|
||||||
52
sources/stream_errors_test.go
Normal file
52
sources/stream_errors_test.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package sources
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStreamRetryableWrapsThroughErrorChains(t *testing.T) {
|
||||||
|
base := errors.New("retry me")
|
||||||
|
err := fmt.Errorf("outer: %w", StreamRetryable(base))
|
||||||
|
|
||||||
|
if !IsStreamRetryable(err) {
|
||||||
|
t.Fatalf("IsStreamRetryable() = false, want true")
|
||||||
|
}
|
||||||
|
if IsStreamFatal(err) {
|
||||||
|
t.Fatalf("IsStreamFatal() = true, want false")
|
||||||
|
}
|
||||||
|
if !errors.Is(err, base) {
|
||||||
|
t.Fatalf("errors.Is(err, base) = false, want true")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamFatalWrapsThroughErrorChains(t *testing.T) {
|
||||||
|
base := errors.New("fatal")
|
||||||
|
err := fmt.Errorf("outer: %w", StreamFatal(base))
|
||||||
|
|
||||||
|
if !IsStreamFatal(err) {
|
||||||
|
t.Fatalf("IsStreamFatal() = false, want true")
|
||||||
|
}
|
||||||
|
if IsStreamRetryable(err) {
|
||||||
|
t.Fatalf("IsStreamRetryable() = true, want false")
|
||||||
|
}
|
||||||
|
if !errors.Is(err, base) {
|
||||||
|
t.Fatalf("errors.Is(err, base) = false, want true")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStreamErrorHelpersNil(t *testing.T) {
|
||||||
|
if StreamRetryable(nil) != nil {
|
||||||
|
t.Fatalf("StreamRetryable(nil) != nil")
|
||||||
|
}
|
||||||
|
if StreamFatal(nil) != nil {
|
||||||
|
t.Fatalf("StreamFatal(nil) != nil")
|
||||||
|
}
|
||||||
|
if IsStreamRetryable(nil) {
|
||||||
|
t.Fatalf("IsStreamRetryable(nil) = true")
|
||||||
|
}
|
||||||
|
if IsStreamFatal(nil) {
|
||||||
|
t.Fatalf("IsStreamFatal(nil) = true")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user