2 Commits

Author SHA1 Message Date
ea113e2dcc Updated processor/normalizer wiring to track Feedkit v0.7.0
Some checks failed
ci/woodpecker/push/build-image Pipeline failed
2026-03-16 13:35:51 -05:00
38bc162918 Updated go.sum
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-15 19:26:51 -05:00
9 changed files with 164 additions and 46 deletions

View File

@@ -17,6 +17,7 @@ import (
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
@@ -101,17 +102,24 @@ func main() {
// --- Normalization (optional) ---
//
// We install feedkit's normalize.Processor even before any normalizers exist.
// With an empty registry and RequireMatch=false, this is a no-op passthrough.
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and
// 2) matching normalizers are registered.
normReg := &fknormalize.Registry{}
wfnormalizers.RegisterBuiltins(normReg)
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
log.Fatalf("build processor chain failed: %v", err)
}
pl := &fkpipeline.Pipeline{
Processors: []fkpipeline.Processor{
fknormalize.Processor{Registry: normReg},
},
Processors: chain,
}
s := &fkscheduler.Scheduler{

View File

@@ -1,11 +1,19 @@
package main
import (
"context"
"reflect"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
)
type testInput struct {
@@ -88,3 +96,64 @@ func TestExampleConfigLoads(t *testing.T) {
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
}
}
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
if len(normalizers) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
if len(chain) != 1 {
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
}
pl := &fkpipeline.Pipeline{Processors: chain}
if len(pl.Processors) != 1 {
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
}
}
func TestNormalizeNoMatchPassThrough(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
pl := &fkpipeline.Pipeline{Processors: chain}
in := fkevent.Event{
ID: "evt-no-match",
Kind: fkevent.Kind("observation"),
Source: "test",
EmittedAt: time.Now().UTC(),
Schema: "raw.weatherfeeder.unknown.v1",
Payload: map[string]any{
"ok": true,
},
}
out, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("Pipeline.Process() unexpected error: %v", err)
}
if out == nil {
t.Fatalf("Pipeline.Process() returned nil output")
}
if !reflect.DeepEqual(*out, in) {
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
}
}

4
go.sum
View File

@@ -1,5 +1,5 @@
gitea.maximumdirect.net/ejr/feedkit v0.5.0 h1:T4pRTo9Tj/o7TbZYUbp8UE7cQVLmIucUrYmD6G8E8ZQ=
gitea.maximumdirect.net/ejr/feedkit v0.5.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
gitea.maximumdirect.net/ejr/feedkit v0.6.0 h1:GXwyNKvPp1sWN8TS5E5NDGFgimpyHlzerO5E+/qoTXg=
gitea.maximumdirect.net/ejr/feedkit v0.6.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=

View File

@@ -16,22 +16,20 @@ import (
// - sources are built by name (cfg.Driver -> factory)
// - normalizers are selected by Match() (event.Schema -> first match wins)
//
// Registration order matters because feedkit normalize.Registry is first match wins.
// Registration order matters because feedkit normalize.Processor is "first match wins".
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
// we still keep ordering stable as a best practice.
//
// If reg is nil, this function is a no-op.
func RegisterBuiltins(reg *fknormalize.Registry) {
if reg == nil {
return
}
func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Keep this intentionally boring: delegate registration to provider subpackages
// so main.go stays clean and each provider owns its own mapping logic.
//
// Order here should be stable across releases to reduce surprises when adding
// new normalizers.
nws.Register(reg)
openmeteo.Register(reg)
openweather.Register(reg)
out = nws.Register(out)
out = openmeteo.Register(out)
out = openweather.Register(out)
return out
}

View File

@@ -0,0 +1,42 @@
package normalizers
import (
"reflect"
"testing"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
)
func TestRegisterBuiltinsOrder(t *testing.T) {
got := RegisterBuiltins(nil)
if len(got) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
want := []fknormalize.Normalizer{
nws.ObservationNormalizer{},
nws.ForecastNormalizer{},
nws.AlertsNormalizer{},
openmeteo.ObservationNormalizer{},
openmeteo.ForecastNormalizer{},
openweather.ObservationNormalizer{},
}
if len(got) != len(want) {
t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got))
}
for i := range want {
if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) {
t.Fatalf(
"RegisterBuiltins() order mismatch at index %d: got %T, want %T",
i,
got[i],
want[i],
)
}
}
}

View File

@@ -8,7 +8,7 @@
// transforming provider-specific raw payloads into canonical internal models.
//
// This package is domain code (weatherfeeder). feedkits normalize package is
// infrastructure (registry + processor).
// infrastructure (normalizer contracts + processor).
//
// Directory layout (required)
// ---------------------------
@@ -136,21 +136,22 @@
//
// Registration pattern
// --------------------
// feedkit normalization uses a match-driven registry (first match wins).
// feedkit normalization uses an ordered normalizer list ("first match wins").
//
// Provider subpackages should expose:
//
// func Register(reg *normalize.Registry)
// func Register(in []normalize.Normalizer) []normalize.Normalizer
//
// And internal/normalizers/builtins.go should provide one entrypoint:
//
// func RegisterBuiltins(reg *normalize.Registry)
// func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer
//
// which calls each providers Register() in a stable order.
// which appends each provider's normalizers in a stable order and is then passed
// to normalize.NewProcessor(...).
//
// Registry ordering
// Normalizer ordering
// -----------------------------
// feedkit normalization uses a match-driven registry (“first match wins”).
// feedkit normalization is "first match wins" by list order.
// Therefore order matters:
//
// - Register more specific normalizers before more general ones.

View File

@@ -5,18 +5,18 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
)
// Register registers NWS normalizers into the provided registry.
func Register(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Register appends NWS normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Observations
reg.Register(ObservationNormalizer{})
out = append(out, ObservationNormalizer{})
// Forecasts
reg.Register(ForecastNormalizer{})
out = append(out, ForecastNormalizer{})
// Alerts
reg.Register(AlertsNormalizer{})
out = append(out, AlertsNormalizer{})
return out
}

View File

@@ -5,14 +5,14 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
)
// Register registers Open-Meteo normalizers into the provided registry.
func Register(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Register appends Open-Meteo normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Observations
reg.Register(ObservationNormalizer{})
out = append(out, ObservationNormalizer{})
// Forecasts
reg.Register(ForecastNormalizer{})
out = append(out, ForecastNormalizer{})
return out
}

View File

@@ -5,12 +5,12 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
)
// Register registers OpenWeather normalizers into the provided registry.
func Register(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Register appends OpenWeather normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Observations
reg.Register(ObservationNormalizer{})
out = append(out, ObservationNormalizer{})
return out
}