package scheduler import ( "fmt" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/sources" ) // JobFromSourceConfig builds a scheduler Job from a configured source and its // generic feedkit config. func JobFromSourceConfig(src sources.Input, cfg config.SourceConfig) (Job, error) { if src == nil { return Job{}, fmt.Errorf("scheduler: source %q is nil", cfg.Name) } job := Job{ Source: src, Every: cfg.Every.Duration, } if _, ok := src.(sources.StreamSource); ok { if cfg.Every.Duration > 0 { return Job{}, fmt.Errorf("source %q: sources[].every must be omitted for stream sources", cfg.Name) } policy, err := parseStreamExitPolicy(cfg) if err != nil { return Job{}, err } backoff, err := parseStreamBackoff(cfg) if err != nil { return Job{}, err } job.StreamExitPolicy = policy job.StreamBackoff = backoff return job, nil } if _, ok := src.(sources.PollSource); ok { if cfg.Every.Duration <= 0 { return Job{}, fmt.Errorf("source %q: sources[].every must be > 0 for polling sources", cfg.Name) } if err := rejectStreamParams(cfg); err != nil { return Job{}, err } return job, nil } return Job{}, fmt.Errorf("scheduler: source %q implements neither PollSource nor StreamSource", cfg.Name) } func parseStreamExitPolicy(cfg config.SourceConfig) (StreamExitPolicy, error) { const key = "stream_exit_policy" raw, exists := cfg.Params[key] if !exists || raw == nil { return StreamExitPolicyRestart, nil } s, ok := raw.(string) if !ok { return "", fmt.Errorf("source %q: params.%s must be one of: restart, stop, fatal", cfg.Name, key) } switch StreamExitPolicy(strings.ToLower(strings.TrimSpace(s))) { case StreamExitPolicyRestart: return StreamExitPolicyRestart, nil case StreamExitPolicyStop: return StreamExitPolicyStop, nil case StreamExitPolicyFatal: return StreamExitPolicyFatal, nil default: return "", fmt.Errorf("source %q: params.%s must be one of: restart, stop, fatal", cfg.Name, key) } } func parseStreamBackoff(cfg config.SourceConfig) (StreamBackoff, error) { initial, err := parsePositiveOrDefaultDuration(cfg, "stream_backoff_initial", defaultStreamBackoffInitial) if err != nil { return StreamBackoff{}, err } max, err := parsePositiveOrDefaultDuration(cfg, "stream_backoff_max", defaultStreamBackoffMax) if err != nil { return StreamBackoff{}, err } jitter, err := parseNonNegativeOrDefaultDuration(cfg, "stream_backoff_jitter", defaultStreamBackoffJitter) if err != nil { return StreamBackoff{}, err } if max < initial { return StreamBackoff{}, fmt.Errorf("source %q: params.stream_backoff_max must be >= params.stream_backoff_initial", cfg.Name) } return StreamBackoff{ Initial: initial, Max: max, Jitter: jitter, }, nil } func rejectStreamParams(cfg config.SourceConfig) error { streamKeys := []string{ "stream_exit_policy", "stream_backoff_initial", "stream_backoff_max", "stream_backoff_jitter", } for _, key := range streamKeys { if _, ok := cfg.Params[key]; ok { return fmt.Errorf("source %q: params.%s is only valid for stream sources", cfg.Name, key) } } return nil } func parsePositiveOrDefaultDuration(cfg config.SourceConfig, key string, def time.Duration) (time.Duration, error) { if _, exists := cfg.Params[key]; !exists { return def, nil } v, ok := cfg.ParamDuration(key) if !ok || v <= 0 { return 0, fmt.Errorf("source %q: params.%s must be a positive duration", cfg.Name, key) } return v, nil } func parseNonNegativeOrDefaultDuration(cfg config.SourceConfig, key string, def time.Duration) (time.Duration, error) { if _, exists := cfg.Params[key]; !exists { return def, nil } v, ok := cfg.ParamDuration(key) if !ok || v < 0 { return 0, fmt.Errorf("source %q: params.%s must be a non-negative duration", cfg.Name, key) } return v, nil }