From d0b58a4734821addb78ded4ede2fdb05998da626 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Mon, 16 Mar 2026 18:35:44 -0500 Subject: [PATCH] Updates to track feedkit v0.7.2 and to add a dedupe processor --- cmd/weatherfeeder/main.go | 12 ++- cmd/weatherfeeder/main_test.go | 80 ++++++++++++++------ go.mod | 2 +- go.sum | 4 +- internal/normalizers/builtins.go | 2 +- internal/normalizers/builtins_test.go | 2 +- internal/normalizers/doc.go | 2 +- internal/normalizers/nws/register.go | 2 +- internal/normalizers/openmeteo/register.go | 2 +- internal/normalizers/openweather/register.go | 2 +- 10 files changed, 73 insertions(+), 37 deletions(-) diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go index 4d58c35..9a41887 100644 --- a/cmd/weatherfeeder/main.go +++ b/cmd/weatherfeeder/main.go @@ -15,9 +15,10 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" fkevent "gitea.maximumdirect.net/ejr/feedkit/event" - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" + fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" @@ -27,6 +28,8 @@ import ( wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources" ) +const dedupeMaxEntries = 2048 + func main() { log.SetFlags(log.LstdFlags | log.Lmicroseconds) @@ -103,9 +106,9 @@ func main() { events := make(chan fkevent.Event, 256) - // --- Normalization (optional) --- + // --- Processors --- // - // We install feedkit's normalize.Processor even before any normalizers exist. + // We install feedkit's processors/normalize.Processor even before any normalizers exist. // With an empty normalizer list and RequireMatch=false, this is a no-op passthrough. // It will begin transforming events as soon as: // 1) sources emit raw schemas (raw.*), and @@ -116,8 +119,9 @@ func main() { procReg.Register("normalize", func() (fkprocessors.Processor, error) { return fknormalize.NewProcessor(normalizers, false), nil }) + procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries)) - chain, err := procReg.BuildChain([]string{"normalize"}) + chain, err := procReg.BuildChain([]string{"normalize", "dedupe"}) if err != nil { log.Fatalf("build processor chain failed: %v", err) } diff --git a/cmd/weatherfeeder/main_test.go b/cmd/weatherfeeder/main_test.go index 5776965..eb0a6dd 100644 --- a/cmd/weatherfeeder/main_test.go +++ b/cmd/weatherfeeder/main_test.go @@ -9,9 +9,10 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" fkevent "gitea.maximumdirect.net/ejr/feedkit/event" - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" + fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" ) @@ -97,39 +98,23 @@ func TestExampleConfigLoads(t *testing.T) { } } -func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) { - normalizers := wfnormalizers.RegisterBuiltins(nil) - if len(normalizers) == 0 { - t.Fatalf("RegisterBuiltins() returned no normalizers") - } - - procReg := fkprocessors.NewRegistry() - procReg.Register("normalize", func() (fkprocessors.Processor, error) { - return fknormalize.NewProcessor(normalizers, false), nil - }) - - chain, err := procReg.BuildChain([]string{"normalize"}) +func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) { + chain, err := buildProcessorChainForTests() if err != nil { t.Fatalf("BuildChain() unexpected error: %v", err) } - if len(chain) != 1 { - t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain)) + if len(chain) != 2 { + t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain)) } pl := &fkpipeline.Pipeline{Processors: chain} - if len(pl.Processors) != 1 { - t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors)) + if len(pl.Processors) != 2 { + t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors)) } } func TestNormalizeNoMatchPassThrough(t *testing.T) { - normalizers := wfnormalizers.RegisterBuiltins(nil) - procReg := fkprocessors.NewRegistry() - procReg.Register("normalize", func() (fkprocessors.Processor, error) { - return fknormalize.NewProcessor(normalizers, false), nil - }) - - chain, err := procReg.BuildChain([]string{"normalize"}) + chain, err := buildProcessorChainForTests() if err != nil { t.Fatalf("BuildChain() unexpected error: %v", err) } @@ -157,3 +142,50 @@ func TestNormalizeNoMatchPassThrough(t *testing.T) { t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out) } } + +func TestDedupeDropsSecondEventWithSameID(t *testing.T) { + chain, err := buildProcessorChainForTests() + if err != nil { + t.Fatalf("BuildChain() unexpected error: %v", err) + } + + pl := &fkpipeline.Pipeline{Processors: chain} + in := fkevent.Event{ + ID: "evt-dedupe-1", + Kind: fkevent.Kind("observation"), + Source: "test", + EmittedAt: time.Now().UTC(), + Schema: "raw.weatherfeeder.unknown.v1", + Payload: map[string]any{ + "ok": true, + }, + } + + first, err := pl.Process(context.Background(), in) + if err != nil { + t.Fatalf("first Pipeline.Process() unexpected error: %v", err) + } + if first == nil { + t.Fatalf("first Pipeline.Process() unexpectedly dropped event") + } + + second, err := pl.Process(context.Background(), in) + if err != nil { + t.Fatalf("second Pipeline.Process() unexpected error: %v", err) + } + if second != nil { + t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second) + } +} + +func buildProcessorChainForTests() ([]fkprocessors.Processor, error) { + normalizers := wfnormalizers.RegisterBuiltins(nil) + + procReg := fkprocessors.NewRegistry() + procReg.Register("normalize", func() (fkprocessors.Processor, error) { + return fknormalize.NewProcessor(normalizers, false), nil + }) + procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries)) + + return procReg.BuildChain([]string{"normalize", "dedupe"}) +} diff --git a/go.mod b/go.mod index aaf9349..ac8ffc0 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder go 1.25 -require gitea.maximumdirect.net/ejr/feedkit v0.7.1 +require gitea.maximumdirect.net/ejr/feedkit v0.7.2 require ( github.com/klauspost/compress v1.17.2 // indirect diff --git a/go.sum b/go.sum index f774b7d..80dcea6 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -gitea.maximumdirect.net/ejr/feedkit v0.7.1 h1:DNjGKN9e6n1+bEJw9r4O/9Nao8OFp9AZQyzPfH6eNb8= -gitea.maximumdirect.net/ejr/feedkit v0.7.1/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= +gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8= +gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= diff --git a/internal/normalizers/builtins.go b/internal/normalizers/builtins.go index ba5dc74..ab77673 100644 --- a/internal/normalizers/builtins.go +++ b/internal/normalizers/builtins.go @@ -2,7 +2,7 @@ package normalizers import ( - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo" diff --git a/internal/normalizers/builtins_test.go b/internal/normalizers/builtins_test.go index 7037fff..e33507c 100644 --- a/internal/normalizers/builtins_test.go +++ b/internal/normalizers/builtins_test.go @@ -4,7 +4,7 @@ import ( "reflect" "testing" - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" diff --git a/internal/normalizers/doc.go b/internal/normalizers/doc.go index c520a5a..0839e96 100644 --- a/internal/normalizers/doc.go +++ b/internal/normalizers/doc.go @@ -29,7 +29,7 @@ // // 1. One normalizer per file. // Each file contains exactly one Normalizer implementation (one type that -// satisfies feedkit/normalize.Normalizer). +// satisfies feedkit/processors/normalize.Normalizer). // Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long // as they do not define additional Normalizer types. // diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index 1980b2b..1b4475b 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -2,7 +2,7 @@ package nws import ( - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) // Register appends NWS normalizers in stable order. diff --git a/internal/normalizers/openmeteo/register.go b/internal/normalizers/openmeteo/register.go index 4dabdbe..915cab2 100644 --- a/internal/normalizers/openmeteo/register.go +++ b/internal/normalizers/openmeteo/register.go @@ -2,7 +2,7 @@ package openmeteo import ( - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) // Register appends Open-Meteo normalizers in stable order. diff --git a/internal/normalizers/openweather/register.go b/internal/normalizers/openweather/register.go index 877dacf..fe6f87a 100644 --- a/internal/normalizers/openweather/register.go +++ b/internal/normalizers/openweather/register.go @@ -2,7 +2,7 @@ package openweather import ( - fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) // Register appends OpenWeather normalizers in stable order.