diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go index a239a70..7dde7f4 100644 --- a/cmd/weatherfeeder/main.go +++ b/cmd/weatherfeeder/main.go @@ -17,6 +17,7 @@ import ( fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" + fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" @@ -101,17 +102,24 @@ func main() { // --- Normalization (optional) --- // // We install feedkit's normalize.Processor even before any normalizers exist. - // With an empty registry and RequireMatch=false, this is a no-op passthrough. + // With an empty normalizer list and RequireMatch=false, this is a no-op passthrough. // It will begin transforming events as soon as: // 1) sources emit raw schemas (raw.*), and // 2) matching normalizers are registered. - normReg := &fknormalize.Registry{} - wfnormalizers.RegisterBuiltins(normReg) + normalizers := wfnormalizers.RegisterBuiltins(nil) + + procReg := fkprocessors.NewRegistry() + procReg.Register("normalize", func() (fkprocessors.Processor, error) { + return fknormalize.NewProcessor(normalizers, false), nil + }) + + chain, err := procReg.BuildChain([]string{"normalize"}) + if err != nil { + log.Fatalf("build processor chain failed: %v", err) + } pl := &fkpipeline.Pipeline{ - Processors: []fkpipeline.Processor{ - fknormalize.Processor{Registry: normReg}, - }, + Processors: chain, } s := &fkscheduler.Scheduler{ diff --git a/cmd/weatherfeeder/main_test.go b/cmd/weatherfeeder/main_test.go index 69bd020..5776965 100644 --- a/cmd/weatherfeeder/main_test.go +++ b/cmd/weatherfeeder/main_test.go @@ -1,11 +1,19 @@ package main import ( + "context" + "reflect" "strings" "testing" + "time" "gitea.maximumdirect.net/ejr/feedkit/config" fkevent "gitea.maximumdirect.net/ejr/feedkit/event" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" + fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" + + wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" ) type testInput struct { @@ -88,3 +96,64 @@ func TestExampleConfigLoads(t *testing.T) { t.Fatalf("config.Load(config.yml) unexpected error: %v", err) } } + +func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) { + normalizers := wfnormalizers.RegisterBuiltins(nil) + if len(normalizers) == 0 { + t.Fatalf("RegisterBuiltins() returned no normalizers") + } + + procReg := fkprocessors.NewRegistry() + procReg.Register("normalize", func() (fkprocessors.Processor, error) { + return fknormalize.NewProcessor(normalizers, false), nil + }) + + chain, err := procReg.BuildChain([]string{"normalize"}) + if err != nil { + t.Fatalf("BuildChain() unexpected error: %v", err) + } + if len(chain) != 1 { + t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain)) + } + + pl := &fkpipeline.Pipeline{Processors: chain} + if len(pl.Processors) != 1 { + t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors)) + } +} + +func TestNormalizeNoMatchPassThrough(t *testing.T) { + normalizers := wfnormalizers.RegisterBuiltins(nil) + procReg := fkprocessors.NewRegistry() + procReg.Register("normalize", func() (fkprocessors.Processor, error) { + return fknormalize.NewProcessor(normalizers, false), nil + }) + + chain, err := procReg.BuildChain([]string{"normalize"}) + if err != nil { + t.Fatalf("BuildChain() unexpected error: %v", err) + } + + pl := &fkpipeline.Pipeline{Processors: chain} + in := fkevent.Event{ + ID: "evt-no-match", + Kind: fkevent.Kind("observation"), + Source: "test", + EmittedAt: time.Now().UTC(), + Schema: "raw.weatherfeeder.unknown.v1", + Payload: map[string]any{ + "ok": true, + }, + } + + out, err := pl.Process(context.Background(), in) + if err != nil { + t.Fatalf("Pipeline.Process() unexpected error: %v", err) + } + if out == nil { + t.Fatalf("Pipeline.Process() returned nil output") + } + if !reflect.DeepEqual(*out, in) { + t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out) + } +} diff --git a/internal/normalizers/builtins.go b/internal/normalizers/builtins.go index 05079c9..ba5dc74 100644 --- a/internal/normalizers/builtins.go +++ b/internal/normalizers/builtins.go @@ -16,22 +16,20 @@ import ( // - sources are built by name (cfg.Driver -> factory) // - normalizers are selected by Match() (event.Schema -> first match wins) // -// Registration order matters because feedkit normalize.Registry is “first match wins”. +// Registration order matters because feedkit normalize.Processor is "first match wins". // In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but // we still keep ordering stable as a best practice. -// -// If reg is nil, this function is a no-op. -func RegisterBuiltins(reg *fknormalize.Registry) { - if reg == nil { - return - } +func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer { + out := in // Keep this intentionally boring: delegate registration to provider subpackages // so main.go stays clean and each provider owns its own mapping logic. // // Order here should be stable across releases to reduce surprises when adding // new normalizers. - nws.Register(reg) - openmeteo.Register(reg) - openweather.Register(reg) + out = nws.Register(out) + out = openmeteo.Register(out) + out = openweather.Register(out) + + return out } diff --git a/internal/normalizers/builtins_test.go b/internal/normalizers/builtins_test.go new file mode 100644 index 0000000..7037fff --- /dev/null +++ b/internal/normalizers/builtins_test.go @@ -0,0 +1,42 @@ +package normalizers + +import ( + "reflect" + "testing" + + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" +) + +func TestRegisterBuiltinsOrder(t *testing.T) { + got := RegisterBuiltins(nil) + if len(got) == 0 { + t.Fatalf("RegisterBuiltins() returned no normalizers") + } + + want := []fknormalize.Normalizer{ + nws.ObservationNormalizer{}, + nws.ForecastNormalizer{}, + nws.AlertsNormalizer{}, + openmeteo.ObservationNormalizer{}, + openmeteo.ForecastNormalizer{}, + openweather.ObservationNormalizer{}, + } + + if len(got) != len(want) { + t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got)) + } + + for i := range want { + if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) { + t.Fatalf( + "RegisterBuiltins() order mismatch at index %d: got %T, want %T", + i, + got[i], + want[i], + ) + } + } +} diff --git a/internal/normalizers/doc.go b/internal/normalizers/doc.go index 663b8d1..c520a5a 100644 --- a/internal/normalizers/doc.go +++ b/internal/normalizers/doc.go @@ -8,7 +8,7 @@ // transforming provider-specific raw payloads into canonical internal models. // // This package is domain code (weatherfeeder). feedkit’s normalize package is -// infrastructure (registry + processor). +// infrastructure (normalizer contracts + processor). // // Directory layout (required) // --------------------------- @@ -136,21 +136,22 @@ // // Registration pattern // -------------------- -// feedkit normalization uses a match-driven registry (“first match wins”). +// feedkit normalization uses an ordered normalizer list ("first match wins"). // // Provider subpackages should expose: // -// func Register(reg *normalize.Registry) +// func Register(in []normalize.Normalizer) []normalize.Normalizer // // And internal/normalizers/builtins.go should provide one entrypoint: // -// func RegisterBuiltins(reg *normalize.Registry) +// func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer // -// which calls each provider’s Register() in a stable order. +// which appends each provider's normalizers in a stable order and is then passed +// to normalize.NewProcessor(...). // -// Registry ordering +// Normalizer ordering // ----------------------------- -// feedkit normalization uses a match-driven registry (“first match wins”). +// feedkit normalization is "first match wins" by list order. // Therefore order matters: // // - Register more specific normalizers before more general ones. diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index fb5900f..1980b2b 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -5,18 +5,18 @@ import ( fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" ) -// Register registers NWS normalizers into the provided registry. -func Register(reg *fknormalize.Registry) { - if reg == nil { - return - } +// Register appends NWS normalizers in stable order. +func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { + out := in // Observations - reg.Register(ObservationNormalizer{}) + out = append(out, ObservationNormalizer{}) // Forecasts - reg.Register(ForecastNormalizer{}) + out = append(out, ForecastNormalizer{}) // Alerts - reg.Register(AlertsNormalizer{}) + out = append(out, AlertsNormalizer{}) + + return out } diff --git a/internal/normalizers/openmeteo/register.go b/internal/normalizers/openmeteo/register.go index e7484fa..4dabdbe 100644 --- a/internal/normalizers/openmeteo/register.go +++ b/internal/normalizers/openmeteo/register.go @@ -5,14 +5,14 @@ import ( fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" ) -// Register registers Open-Meteo normalizers into the provided registry. -func Register(reg *fknormalize.Registry) { - if reg == nil { - return - } +// Register appends Open-Meteo normalizers in stable order. +func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { + out := in // Observations - reg.Register(ObservationNormalizer{}) + out = append(out, ObservationNormalizer{}) // Forecasts - reg.Register(ForecastNormalizer{}) + out = append(out, ForecastNormalizer{}) + + return out } diff --git a/internal/normalizers/openweather/register.go b/internal/normalizers/openweather/register.go index 0e2f16c..877dacf 100644 --- a/internal/normalizers/openweather/register.go +++ b/internal/normalizers/openweather/register.go @@ -5,12 +5,12 @@ import ( fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" ) -// Register registers OpenWeather normalizers into the provided registry. -func Register(reg *fknormalize.Registry) { - if reg == nil { - return - } +// Register appends OpenWeather normalizers in stable order. +func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { + out := in // Observations - reg.Register(ObservationNormalizer{}) + out = append(out, ObservationNormalizer{}) + + return out }