Updated processor/normalizer wiring to track Feedkit v0.7.0
Some checks failed
ci/woodpecker/push/build-image Pipeline failed

This commit is contained in:
2026-03-16 13:35:51 -05:00
parent 38bc162918
commit ea113e2dcc
8 changed files with 162 additions and 44 deletions

View File

@@ -17,6 +17,7 @@ import (
fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
@@ -101,17 +102,24 @@ func main() {
// --- Normalization (optional) --- // --- Normalization (optional) ---
// //
// We install feedkit's normalize.Processor even before any normalizers exist. // We install feedkit's normalize.Processor even before any normalizers exist.
// With an empty registry and RequireMatch=false, this is a no-op passthrough. // With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as: // It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and // 1) sources emit raw schemas (raw.*), and
// 2) matching normalizers are registered. // 2) matching normalizers are registered.
normReg := &fknormalize.Registry{} normalizers := wfnormalizers.RegisterBuiltins(nil)
wfnormalizers.RegisterBuiltins(normReg)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
log.Fatalf("build processor chain failed: %v", err)
}
pl := &fkpipeline.Pipeline{ pl := &fkpipeline.Pipeline{
Processors: []fkpipeline.Processor{ Processors: chain,
fknormalize.Processor{Registry: normReg},
},
} }
s := &fkscheduler.Scheduler{ s := &fkscheduler.Scheduler{

View File

@@ -1,11 +1,19 @@
package main package main
import ( import (
"context"
"reflect"
"strings" "strings"
"testing" "testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
) )
type testInput struct { type testInput struct {
@@ -88,3 +96,64 @@ func TestExampleConfigLoads(t *testing.T) {
t.Fatalf("config.Load(config.yml) unexpected error: %v", err) t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
} }
} }
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
if len(normalizers) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
if len(chain) != 1 {
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
}
pl := &fkpipeline.Pipeline{Processors: chain}
if len(pl.Processors) != 1 {
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
}
}
func TestNormalizeNoMatchPassThrough(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
pl := &fkpipeline.Pipeline{Processors: chain}
in := fkevent.Event{
ID: "evt-no-match",
Kind: fkevent.Kind("observation"),
Source: "test",
EmittedAt: time.Now().UTC(),
Schema: "raw.weatherfeeder.unknown.v1",
Payload: map[string]any{
"ok": true,
},
}
out, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("Pipeline.Process() unexpected error: %v", err)
}
if out == nil {
t.Fatalf("Pipeline.Process() returned nil output")
}
if !reflect.DeepEqual(*out, in) {
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
}
}

View File

@@ -16,22 +16,20 @@ import (
// - sources are built by name (cfg.Driver -> factory) // - sources are built by name (cfg.Driver -> factory)
// - normalizers are selected by Match() (event.Schema -> first match wins) // - normalizers are selected by Match() (event.Schema -> first match wins)
// //
// Registration order matters because feedkit normalize.Registry is first match wins. // Registration order matters because feedkit normalize.Processor is "first match wins".
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but // In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
// we still keep ordering stable as a best practice. // we still keep ordering stable as a best practice.
// func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
// If reg is nil, this function is a no-op. out := in
func RegisterBuiltins(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Keep this intentionally boring: delegate registration to provider subpackages // Keep this intentionally boring: delegate registration to provider subpackages
// so main.go stays clean and each provider owns its own mapping logic. // so main.go stays clean and each provider owns its own mapping logic.
// //
// Order here should be stable across releases to reduce surprises when adding // Order here should be stable across releases to reduce surprises when adding
// new normalizers. // new normalizers.
nws.Register(reg) out = nws.Register(out)
openmeteo.Register(reg) out = openmeteo.Register(out)
openweather.Register(reg) out = openweather.Register(out)
return out
} }

View File

@@ -0,0 +1,42 @@
package normalizers
import (
"reflect"
"testing"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
)
func TestRegisterBuiltinsOrder(t *testing.T) {
got := RegisterBuiltins(nil)
if len(got) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
want := []fknormalize.Normalizer{
nws.ObservationNormalizer{},
nws.ForecastNormalizer{},
nws.AlertsNormalizer{},
openmeteo.ObservationNormalizer{},
openmeteo.ForecastNormalizer{},
openweather.ObservationNormalizer{},
}
if len(got) != len(want) {
t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got))
}
for i := range want {
if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) {
t.Fatalf(
"RegisterBuiltins() order mismatch at index %d: got %T, want %T",
i,
got[i],
want[i],
)
}
}
}

View File

@@ -8,7 +8,7 @@
// transforming provider-specific raw payloads into canonical internal models. // transforming provider-specific raw payloads into canonical internal models.
// //
// This package is domain code (weatherfeeder). feedkits normalize package is // This package is domain code (weatherfeeder). feedkits normalize package is
// infrastructure (registry + processor). // infrastructure (normalizer contracts + processor).
// //
// Directory layout (required) // Directory layout (required)
// --------------------------- // ---------------------------
@@ -136,21 +136,22 @@
// //
// Registration pattern // Registration pattern
// -------------------- // --------------------
// feedkit normalization uses a match-driven registry (first match wins). // feedkit normalization uses an ordered normalizer list ("first match wins").
// //
// Provider subpackages should expose: // Provider subpackages should expose:
// //
// func Register(reg *normalize.Registry) // func Register(in []normalize.Normalizer) []normalize.Normalizer
// //
// And internal/normalizers/builtins.go should provide one entrypoint: // And internal/normalizers/builtins.go should provide one entrypoint:
// //
// func RegisterBuiltins(reg *normalize.Registry) // func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer
// //
// which calls each providers Register() in a stable order. // which appends each provider's normalizers in a stable order and is then passed
// to normalize.NewProcessor(...).
// //
// Registry ordering // Normalizer ordering
// ----------------------------- // -----------------------------
// feedkit normalization uses a match-driven registry (“first match wins”). // feedkit normalization is "first match wins" by list order.
// Therefore order matters: // Therefore order matters:
// //
// - Register more specific normalizers before more general ones. // - Register more specific normalizers before more general ones.

View File

@@ -5,18 +5,18 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
) )
// Register registers NWS normalizers into the provided registry. // Register appends NWS normalizers in stable order.
func Register(reg *fknormalize.Registry) { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
if reg == nil { out := in
return
}
// Observations // Observations
reg.Register(ObservationNormalizer{}) out = append(out, ObservationNormalizer{})
// Forecasts // Forecasts
reg.Register(ForecastNormalizer{}) out = append(out, ForecastNormalizer{})
// Alerts // Alerts
reg.Register(AlertsNormalizer{}) out = append(out, AlertsNormalizer{})
return out
} }

View File

@@ -5,14 +5,14 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
) )
// Register registers Open-Meteo normalizers into the provided registry. // Register appends Open-Meteo normalizers in stable order.
func Register(reg *fknormalize.Registry) { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
if reg == nil { out := in
return
}
// Observations // Observations
reg.Register(ObservationNormalizer{}) out = append(out, ObservationNormalizer{})
// Forecasts // Forecasts
reg.Register(ForecastNormalizer{}) out = append(out, ForecastNormalizer{})
return out
} }

View File

@@ -5,12 +5,12 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
) )
// Register registers OpenWeather normalizers into the provided registry. // Register appends OpenWeather normalizers in stable order.
func Register(reg *fknormalize.Registry) { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
if reg == nil { out := in
return
}
// Observations // Observations
reg.Register(ObservationNormalizer{}) out = append(out, ObservationNormalizer{})
return out
} }