feat(sources)!: split source contracts into PollSource/StreamSource and add mode-aware source config
- Introduce explicit source interfaces: sources.PollSource and sources.StreamSource, with shared sources.Input (Name() only). - Remove mandatory Kind() from the base source contract to support sources that emit multiple kinds. - Add config.SourceMode (poll, stream, or omitted/auto) and SourceConfig.Kinds (plural expected kinds), while keeping legacy SourceConfig.Kind for compatibility. - Enforce mode semantics in config validation (poll requires every, stream forbids every) and detect mode/driver mismatches in sources.Registry. - Update docs and tests for the new source model and config behavior.
This commit is contained in:
14
sources/doc.go
Normal file
14
sources/doc.go
Normal file
@@ -0,0 +1,14 @@
|
||||
// Package sources defines feedkit's input-source abstraction.
|
||||
//
|
||||
// A source ingests upstream input and emits one or more event.Event values.
|
||||
//
|
||||
// feedkit supports two source modes:
|
||||
// - PollSource: scheduler invokes Poll on a cadence.
|
||||
// - StreamSource: source runs continuously and pushes events as input arrives.
|
||||
//
|
||||
// Source drivers are domain-specific and registered into Registry by driver name.
|
||||
// Registry can then build configured sources from config.SourceConfig.
|
||||
//
|
||||
// A single source may emit 0..N events per poll or stream iteration, and those
|
||||
// events may span multiple event kinds.
|
||||
package sources
|
||||
@@ -7,22 +7,25 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
)
|
||||
|
||||
// Factory constructs a configured Source instance from config.
|
||||
// PollFactory constructs a configured PollSource instance from config.
|
||||
//
|
||||
// This is how concrete daemons (weatherfeeder/newsfeeder/...) register their
|
||||
// domain-specific source drivers (Open-Meteo, NWS, RSS, etc.) while feedkit
|
||||
// remains domain-agnostic.
|
||||
type Factory func(cfg config.SourceConfig) (Source, error)
|
||||
type PollFactory func(cfg config.SourceConfig) (PollSource, error)
|
||||
type StreamFactory func(cfg config.SourceConfig) (StreamSource, error)
|
||||
|
||||
// Factory is the legacy alias for poll source factories.
|
||||
type Factory = PollFactory
|
||||
|
||||
type Registry struct {
|
||||
byDriver map[string]Factory
|
||||
byPollDriver map[string]PollFactory
|
||||
byStreamDriver map[string]StreamFactory
|
||||
}
|
||||
|
||||
func NewRegistry() *Registry {
|
||||
return &Registry{
|
||||
byDriver: map[string]Factory{},
|
||||
byPollDriver: map[string]PollFactory{},
|
||||
byStreamDriver: map[string]StreamFactory{},
|
||||
}
|
||||
}
|
||||
@@ -30,20 +33,28 @@ func NewRegistry() *Registry {
|
||||
// Register associates a driver name (e.g. "openmeteo_observation") with a factory.
|
||||
//
|
||||
// The driver string is the "lookup key" used by config.sources[].driver.
|
||||
func (r *Registry) Register(driver string, f Factory) {
|
||||
func (r *Registry) Register(driver string, f PollFactory) {
|
||||
r.RegisterPoll(driver, f)
|
||||
}
|
||||
|
||||
// RegisterPoll associates a driver name with a polling-source factory.
|
||||
func (r *Registry) RegisterPoll(driver string, f PollFactory) {
|
||||
driver = strings.TrimSpace(driver)
|
||||
if driver == "" {
|
||||
// Panic is appropriate here: registering an empty driver is always a programmer error,
|
||||
// and it will lead to extremely confusing runtime behavior if allowed.
|
||||
panic("sources.Registry.Register: driver cannot be empty")
|
||||
panic("sources.Registry.RegisterPoll: driver cannot be empty")
|
||||
}
|
||||
if f == nil {
|
||||
panic(fmt.Sprintf("sources.Registry.Register: factory cannot be nil (driver=%q)", driver))
|
||||
panic(fmt.Sprintf("sources.Registry.RegisterPoll: factory cannot be nil (driver=%q)", driver))
|
||||
}
|
||||
if _, exists := r.byStreamDriver[driver]; exists {
|
||||
panic(fmt.Sprintf("sources.Registry.Register: driver %q already registered as a stream source", driver))
|
||||
panic(fmt.Sprintf("sources.Registry.RegisterPoll: driver %q already registered as a stream source", driver))
|
||||
}
|
||||
r.byDriver[driver] = f
|
||||
if _, exists := r.byPollDriver[driver]; exists {
|
||||
panic(fmt.Sprintf("sources.Registry.RegisterPoll: driver %q already registered as a polling source", driver))
|
||||
}
|
||||
r.byPollDriver[driver] = f
|
||||
}
|
||||
|
||||
// RegisterStream is the StreamSource equivalent of Register.
|
||||
@@ -55,28 +66,71 @@ func (r *Registry) RegisterStream(driver string, f StreamFactory) {
|
||||
if f == nil {
|
||||
panic(fmt.Sprintf("sources.Registry.RegisterStream: factory cannot be nil (driver=%q)", driver))
|
||||
}
|
||||
if _, exists := r.byDriver[driver]; exists {
|
||||
if _, exists := r.byPollDriver[driver]; exists {
|
||||
panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a polling source", driver))
|
||||
}
|
||||
if _, exists := r.byStreamDriver[driver]; exists {
|
||||
panic(fmt.Sprintf("sources.Registry.RegisterStream: driver %q already registered as a stream source", driver))
|
||||
}
|
||||
r.byStreamDriver[driver] = f
|
||||
}
|
||||
|
||||
// Build constructs a Source from a SourceConfig by looking up cfg.Driver.
|
||||
func (r *Registry) Build(cfg config.SourceConfig) (Source, error) {
|
||||
f, ok := r.byDriver[cfg.Driver]
|
||||
// Build constructs a polling source from a SourceConfig by looking up cfg.Driver.
|
||||
func (r *Registry) Build(cfg config.SourceConfig) (PollSource, error) {
|
||||
return r.BuildPoll(cfg)
|
||||
}
|
||||
|
||||
// BuildPoll constructs a polling source from a SourceConfig by looking up cfg.Driver.
|
||||
func (r *Registry) BuildPoll(cfg config.SourceConfig) (PollSource, error) {
|
||||
driver := strings.TrimSpace(cfg.Driver)
|
||||
if cfg.Mode.Normalize() == config.SourceModeStream {
|
||||
return nil, fmt.Errorf("source %q mode=stream cannot be built as polling source", cfg.Name)
|
||||
}
|
||||
|
||||
f, ok := r.byPollDriver[driver]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown source driver: %q", cfg.Driver)
|
||||
if _, streamExists := r.byStreamDriver[driver]; streamExists {
|
||||
return nil, fmt.Errorf("source driver %q is stream-only; cannot build as polling source", driver)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
||||
}
|
||||
return f(cfg)
|
||||
}
|
||||
|
||||
// BuildInput can return either a polling Source or a StreamSource.
|
||||
func (r *Registry) BuildInput(cfg config.SourceConfig) (Input, error) {
|
||||
if f, ok := r.byStreamDriver[cfg.Driver]; ok {
|
||||
driver := strings.TrimSpace(cfg.Driver)
|
||||
mode := cfg.Mode.Normalize()
|
||||
if mode != config.SourceModeAuto && mode != config.SourceModePoll && mode != config.SourceModeStream {
|
||||
return nil, fmt.Errorf("source %q has invalid mode %q (expected \"poll\" or \"stream\")", cfg.Name, cfg.Mode)
|
||||
}
|
||||
|
||||
switch mode {
|
||||
case config.SourceModePoll:
|
||||
f, ok := r.byPollDriver[driver]
|
||||
if !ok {
|
||||
if _, streamExists := r.byStreamDriver[driver]; streamExists {
|
||||
return nil, fmt.Errorf("source %q mode=poll conflicts with stream-only driver %q", cfg.Name, driver)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
||||
}
|
||||
return f(cfg)
|
||||
case config.SourceModeStream:
|
||||
f, ok := r.byStreamDriver[driver]
|
||||
if !ok {
|
||||
if _, pollExists := r.byPollDriver[driver]; pollExists {
|
||||
return nil, fmt.Errorf("source %q mode=stream conflicts with polling driver %q", cfg.Name, driver)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
||||
}
|
||||
return f(cfg)
|
||||
}
|
||||
if f, ok := r.byDriver[cfg.Driver]; ok {
|
||||
|
||||
if f, ok := r.byStreamDriver[driver]; ok {
|
||||
return f(cfg)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown source driver: %q", cfg.Driver)
|
||||
if f, ok := r.byPollDriver[driver]; ok {
|
||||
return f(cfg)
|
||||
}
|
||||
return nil, fmt.Errorf("unknown source driver: %q", driver)
|
||||
}
|
||||
|
||||
84
sources/registry_test.go
Normal file
84
sources/registry_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package sources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
)
|
||||
|
||||
type testPollSource struct{ name string }
|
||||
|
||||
func (s testPollSource) Name() string { return s.name }
|
||||
func (s testPollSource) Poll(context.Context) ([]event.Event, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type testStreamSource struct{ name string }
|
||||
|
||||
func (s testStreamSource) Name() string { return s.name }
|
||||
func (s testStreamSource) Run(context.Context, chan<- event.Event) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestRegistryBuildInputModeConflicts(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
r.RegisterPoll("poll_driver", func(cfg config.SourceConfig) (PollSource, error) {
|
||||
return testPollSource{name: cfg.Name}, nil
|
||||
})
|
||||
r.RegisterStream("stream_driver", func(cfg config.SourceConfig) (StreamSource, error) {
|
||||
return testStreamSource{name: cfg.Name}, nil
|
||||
})
|
||||
|
||||
_, err := r.BuildInput(config.SourceConfig{
|
||||
Name: "s1",
|
||||
Driver: "stream_driver",
|
||||
Mode: config.SourceModePoll,
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("expected mode conflict error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "mode=poll") {
|
||||
t.Fatalf("expected poll conflict error, got: %v", err)
|
||||
}
|
||||
|
||||
_, err = r.BuildInput(config.SourceConfig{
|
||||
Name: "s2",
|
||||
Driver: "poll_driver",
|
||||
Mode: config.SourceModeStream,
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("expected mode conflict error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "mode=stream") {
|
||||
t.Fatalf("expected stream conflict error, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryBuildInputAutoByDriverType(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
r.RegisterPoll("poll_driver", func(cfg config.SourceConfig) (PollSource, error) {
|
||||
return testPollSource{name: cfg.Name}, nil
|
||||
})
|
||||
r.RegisterStream("stream_driver", func(cfg config.SourceConfig) (StreamSource, error) {
|
||||
return testStreamSource{name: cfg.Name}, nil
|
||||
})
|
||||
|
||||
src, err := r.BuildInput(config.SourceConfig{Name: "p", Driver: "poll_driver"})
|
||||
if err != nil {
|
||||
t.Fatalf("BuildInput poll auto failed: %v", err)
|
||||
}
|
||||
if _, ok := src.(PollSource); !ok {
|
||||
t.Fatalf("expected PollSource, got %T", src)
|
||||
}
|
||||
|
||||
src, err = r.BuildInput(config.SourceConfig{Name: "s", Driver: "stream_driver"})
|
||||
if err != nil {
|
||||
t.Fatalf("BuildInput stream auto failed: %v", err)
|
||||
}
|
||||
if _, ok := src.(StreamSource); !ok {
|
||||
t.Fatalf("expected StreamSource, got %T", src)
|
||||
}
|
||||
}
|
||||
@@ -7,34 +7,33 @@ import (
|
||||
)
|
||||
|
||||
// Input is the common surface shared by all source types.
|
||||
//
|
||||
// A source may be polling (PollSource) or event-driven (StreamSource).
|
||||
// Both source types emit domain-agnostic event.Event values.
|
||||
type Input interface {
|
||||
Name() string
|
||||
Kind() event.Kind
|
||||
}
|
||||
|
||||
// Source is a configured polling job that emits 0..N events per poll.
|
||||
// PollSource is a configured polling source that emits 0..N events per poll.
|
||||
//
|
||||
// Source implementations live in domain modules (weatherfeeder/newsfeeder/...)
|
||||
// PollSource implementations live in domain modules (weatherfeeder/newsfeeder/...)
|
||||
// and are registered into a feedkit sources.Registry.
|
||||
//
|
||||
// feedkit infrastructure treats Source as opaque; it just calls Poll()
|
||||
// feedkit infrastructure treats PollSource as opaque; it just calls Poll()
|
||||
// on the configured cadence and publishes the resulting events.
|
||||
type Source interface {
|
||||
type PollSource interface {
|
||||
// Name is the configured source name (used for logs and included in emitted events).
|
||||
Name() string
|
||||
|
||||
// Kind is the "primary kind" emitted by this source.
|
||||
//
|
||||
// This is mainly useful as a *safety check* (e.g. config says kind=forecast but
|
||||
// driver emits observation). Some future sources may emit multiple kinds; if/when
|
||||
// that happens, we can evolve this interface (e.g., make Kind optional, or remove it).
|
||||
Kind() event.Kind
|
||||
|
||||
// Poll fetches from upstream and returns 0..N events.
|
||||
// Poll fetches/processes one input batch and returns 0..N events.
|
||||
// A single poll can emit multiple event kinds.
|
||||
// Implementations should honor ctx.Done() for network calls and other I/O.
|
||||
Poll(ctx context.Context) ([]event.Event, error)
|
||||
}
|
||||
|
||||
// Source is a compatibility alias for the legacy polling-source name.
|
||||
type Source = PollSource
|
||||
|
||||
// StreamSource is an event-driven source (NATS/RabbitMQ/MQTT/etc).
|
||||
//
|
||||
// Run should block, producing events into `out` until ctx is cancelled or a fatal error occurs.
|
||||
@@ -43,3 +42,14 @@ type StreamSource interface {
|
||||
Input
|
||||
Run(ctx context.Context, out chan<- event.Event) error
|
||||
}
|
||||
|
||||
// KindSource is an optional interface for sources that advertise one "primary" kind.
|
||||
// This is legacy-friendly but no longer required.
|
||||
type KindSource interface {
|
||||
Kind() event.Kind
|
||||
}
|
||||
|
||||
// KindsSource is an optional interface for sources that advertise multiple kinds.
|
||||
type KindsSource interface {
|
||||
Kinds() []event.Kind
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user