Remove global Postgres schema registration in favor of explicit schema-aware sink factory wiring, and update weatherfeeder to register the Postgres sink explicitly. Add optional per-source HTTP timeout and response body limit overrides while keeping feedkit defaults. Remove remaining legacy source/config compatibility surfaces, including singular kind support and old source registry/type aliases, and migrate weatherfeeder sources to plural `Kinds()` metadata. Clean up related docs, tests, and sample config to match the new Postgres, HTTP, and NATS configuration model.
122 lines
4.2 KiB
Go
122 lines
4.2 KiB
Go
package sources
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
)
|
|
|
|
// PollFactory constructs a configured PollSource instance from config.
|
|
//
|
|
// This is how concrete daemons (weatherfeeder/newsfeeder/...) register their
|
|
// domain-specific source drivers (Open-Meteo, NWS, RSS, etc.) while feedkit
|
|
// remains domain-agnostic.
|
|
type PollFactory func(cfg config.SourceConfig) (PollSource, error)
|
|
type StreamFactory func(cfg config.SourceConfig) (StreamSource, error)
|
|
|
|
type Registry struct {
|
|
byPollDriver map[string]PollFactory
|
|
byStreamDriver map[string]StreamFactory
|
|
}
|
|
|
|
func NewRegistry() *Registry {
|
|
return &Registry{
|
|
byPollDriver: map[string]PollFactory{},
|
|
byStreamDriver: map[string]StreamFactory{},
|
|
}
|
|
}
|
|
|
|
// RegisterPoll associates a driver name with a polling-source factory.
|
|
func (r *Registry) RegisterPoll(driver string, f PollFactory) {
|
|
driver = strings.TrimSpace(driver)
|
|
if driver == "" {
|
|
// Panic is appropriate here: registering an empty driver is always a programmer error,
|
|
// and it will lead to extremely confusing runtime behavior if allowed.
|
|
panic("sources.Registry.RegisterPoll: driver cannot be empty")
|
|
}
|
|
if f == nil {
|
|
panic(fmt.Sprintf("sources.Registry.RegisterPoll: factory cannot be nil (driver=%q)", driver))
|
|
}
|
|
if _, exists := r.byStreamDriver[driver]; exists {
|
|
panic(fmt.Sprintf("sources.Registry.RegisterPoll: driver %q already registered as a stream source", driver))
|
|
}
|
|
if _, exists := r.byPollDriver[driver]; exists {
|
|
panic(fmt.Sprintf("sources.Registry.RegisterPoll: driver %q already registered as a polling source", driver))
|
|
}
|
|
r.byPollDriver[driver] = f
|
|
}
|
|
|
|
// RegisterStream is the StreamSource equivalent of Register.
|
|
func (r *Registry) RegisterStream(driver string, f StreamFactory) {
|
|
driver = strings.TrimSpace(driver)
|
|
if driver == "" {
|
|
panic("sources.Registry.RegisterStream: driver cannot be empty")
|
|
}
|
|
if f == nil {
|
|
panic(fmt.Sprintf("sources.Registry.RegisterStream: factory cannot be nil (driver=%q)", driver))
|
|
}
|
|
if _, exists := r.byPollDriver[driver]; exists {
|
|
panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a polling source", driver))
|
|
}
|
|
if _, exists := r.byStreamDriver[driver]; exists {
|
|
panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a stream source", driver))
|
|
}
|
|
r.byStreamDriver[driver] = f
|
|
}
|
|
|
|
// BuildPoll constructs a polling source from a SourceConfig by looking up cfg.Driver.
|
|
func (r *Registry) BuildPoll(cfg config.SourceConfig) (PollSource, error) {
|
|
driver := strings.TrimSpace(cfg.Driver)
|
|
if cfg.Mode.Normalize() == config.SourceModeStream {
|
|
return nil, fmt.Errorf("source %q mode=stream cannot be built as polling source", cfg.Name)
|
|
}
|
|
|
|
f, ok := r.byPollDriver[driver]
|
|
if !ok {
|
|
if _, streamExists := r.byStreamDriver[driver]; streamExists {
|
|
return nil, fmt.Errorf("source driver %q is stream-only; cannot build as polling source", driver)
|
|
}
|
|
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
|
}
|
|
return f(cfg)
|
|
}
|
|
|
|
// BuildInput can return either a polling Source or a StreamSource.
|
|
func (r *Registry) BuildInput(cfg config.SourceConfig) (Input, error) {
|
|
driver := strings.TrimSpace(cfg.Driver)
|
|
mode := cfg.Mode.Normalize()
|
|
if mode != config.SourceModeAuto && mode != config.SourceModePoll && mode != config.SourceModeStream {
|
|
return nil, fmt.Errorf("source %q has invalid mode %q (expected \"poll\" or \"stream\")", cfg.Name, cfg.Mode)
|
|
}
|
|
|
|
switch mode {
|
|
case config.SourceModePoll:
|
|
f, ok := r.byPollDriver[driver]
|
|
if !ok {
|
|
if _, streamExists := r.byStreamDriver[driver]; streamExists {
|
|
return nil, fmt.Errorf("source %q mode=poll conflicts with stream-only driver %q", cfg.Name, driver)
|
|
}
|
|
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
|
}
|
|
return f(cfg)
|
|
case config.SourceModeStream:
|
|
f, ok := r.byStreamDriver[driver]
|
|
if !ok {
|
|
if _, pollExists := r.byPollDriver[driver]; pollExists {
|
|
return nil, fmt.Errorf("source %q mode=stream conflicts with polling driver %q", cfg.Name, driver)
|
|
}
|
|
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
|
}
|
|
return f(cfg)
|
|
}
|
|
|
|
if f, ok := r.byStreamDriver[driver]; ok {
|
|
return f(cfg)
|
|
}
|
|
if f, ok := r.byPollDriver[driver]; ok {
|
|
return f(cfg)
|
|
}
|
|
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
|
}
|