Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d0b58a4734 | |||
| 6cd823f528 |
@@ -15,9 +15,10 @@ import (
|
|||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
||||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
|
||||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||||
|
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
||||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
@@ -27,6 +28,8 @@ import (
|
|||||||
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const dedupeMaxEntries = 2048
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
||||||
|
|
||||||
@@ -103,9 +106,9 @@ func main() {
|
|||||||
|
|
||||||
events := make(chan fkevent.Event, 256)
|
events := make(chan fkevent.Event, 256)
|
||||||
|
|
||||||
// --- Normalization (optional) ---
|
// --- Processors ---
|
||||||
//
|
//
|
||||||
// We install feedkit's normalize.Processor even before any normalizers exist.
|
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
|
||||||
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
||||||
// It will begin transforming events as soon as:
|
// It will begin transforming events as soon as:
|
||||||
// 1) sources emit raw schemas (raw.*), and
|
// 1) sources emit raw schemas (raw.*), and
|
||||||
@@ -116,8 +119,9 @@ func main() {
|
|||||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||||
return fknormalize.NewProcessor(normalizers, false), nil
|
return fknormalize.NewProcessor(normalizers, false), nil
|
||||||
})
|
})
|
||||||
|
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||||
|
|
||||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("build processor chain failed: %v", err)
|
log.Fatalf("build processor chain failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,9 +9,10 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
|
||||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||||
|
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
|
|
||||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||||
)
|
)
|
||||||
@@ -97,39 +98,23 @@ func TestExampleConfigLoads(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
|
func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
|
||||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
chain, err := buildProcessorChainForTests()
|
||||||
if len(normalizers) == 0 {
|
|
||||||
t.Fatalf("RegisterBuiltins() returned no normalizers")
|
|
||||||
}
|
|
||||||
|
|
||||||
procReg := fkprocessors.NewRegistry()
|
|
||||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
|
||||||
return fknormalize.NewProcessor(normalizers, false), nil
|
|
||||||
})
|
|
||||||
|
|
||||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
if len(chain) != 1 {
|
if len(chain) != 2 {
|
||||||
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
|
t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain))
|
||||||
}
|
}
|
||||||
|
|
||||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||||
if len(pl.Processors) != 1 {
|
if len(pl.Processors) != 2 {
|
||||||
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
|
t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
chain, err := buildProcessorChainForTests()
|
||||||
procReg := fkprocessors.NewRegistry()
|
|
||||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
|
||||||
return fknormalize.NewProcessor(normalizers, false), nil
|
|
||||||
})
|
|
||||||
|
|
||||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -157,3 +142,50 @@ func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
|||||||
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
|
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDedupeDropsSecondEventWithSameID(t *testing.T) {
|
||||||
|
chain, err := buildProcessorChainForTests()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||||
|
in := fkevent.Event{
|
||||||
|
ID: "evt-dedupe-1",
|
||||||
|
Kind: fkevent.Kind("observation"),
|
||||||
|
Source: "test",
|
||||||
|
EmittedAt: time.Now().UTC(),
|
||||||
|
Schema: "raw.weatherfeeder.unknown.v1",
|
||||||
|
Payload: map[string]any{
|
||||||
|
"ok": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
first, err := pl.Process(context.Background(), in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("first Pipeline.Process() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if first == nil {
|
||||||
|
t.Fatalf("first Pipeline.Process() unexpectedly dropped event")
|
||||||
|
}
|
||||||
|
|
||||||
|
second, err := pl.Process(context.Background(), in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second Pipeline.Process() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if second != nil {
|
||||||
|
t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildProcessorChainForTests() ([]fkprocessors.Processor, error) {
|
||||||
|
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||||
|
|
||||||
|
procReg := fkprocessors.NewRegistry()
|
||||||
|
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||||
|
return fknormalize.NewProcessor(normalizers, false), nil
|
||||||
|
})
|
||||||
|
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||||
|
|
||||||
|
return procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||||
|
}
|
||||||
|
|||||||
3
go.mod
3
go.mod
@@ -2,10 +2,11 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.7.0
|
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
github.com/lib/pq v1.10.9 // indirect
|
||||||
github.com/nats-io/nats.go v1.34.0 // indirect
|
github.com/nats-io/nats.go v1.34.0 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -1,7 +1,9 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.7.0 h1:qXbsD30BH1HkKf579B4Qu3pDiT9mr+8DmDwzd3IXUoo=
|
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.7.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
||||||
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
package normalizers
|
package normalizers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
||||||
|
|||||||
@@ -29,7 +29,7 @@
|
|||||||
//
|
//
|
||||||
// 1. One normalizer per file.
|
// 1. One normalizer per file.
|
||||||
// Each file contains exactly one Normalizer implementation (one type that
|
// Each file contains exactly one Normalizer implementation (one type that
|
||||||
// satisfies feedkit/normalize.Normalizer).
|
// satisfies feedkit/processors/normalize.Normalizer).
|
||||||
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
|
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
|
||||||
// as they do not define additional Normalizer types.
|
// as they do not define additional Normalizer types.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
package nws
|
package nws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register appends NWS normalizers in stable order.
|
// Register appends NWS normalizers in stable order.
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
package openmeteo
|
package openmeteo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register appends Open-Meteo normalizers in stable order.
|
// Register appends Open-Meteo normalizers in stable order.
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
package openweather
|
package openweather
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register appends OpenWeather normalizers in stable order.
|
// Register appends OpenWeather normalizers in stable order.
|
||||||
|
|||||||
Reference in New Issue
Block a user