Updates to track feedkit v0.7.2 and to add a dedupe processor
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
This commit is contained in:
@@ -15,9 +15,10 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||
@@ -27,6 +28,8 @@ import (
|
||||
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
||||
)
|
||||
|
||||
const dedupeMaxEntries = 2048
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
||||
|
||||
@@ -103,9 +106,9 @@ func main() {
|
||||
|
||||
events := make(chan fkevent.Event, 256)
|
||||
|
||||
// --- Normalization (optional) ---
|
||||
// --- Processors ---
|
||||
//
|
||||
// We install feedkit's normalize.Processor even before any normalizers exist.
|
||||
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
|
||||
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
||||
// It will begin transforming events as soon as:
|
||||
// 1) sources emit raw schemas (raw.*), and
|
||||
@@ -116,8 +119,9 @@ func main() {
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||
if err != nil {
|
||||
log.Fatalf("build processor chain failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -9,9 +9,10 @@ import (
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
|
||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||
)
|
||||
@@ -97,39 +98,23 @@ func TestExampleConfigLoads(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
if len(normalizers) == 0 {
|
||||
t.Fatalf("RegisterBuiltins() returned no normalizers")
|
||||
}
|
||||
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
|
||||
chain, err := buildProcessorChainForTests()
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
if len(chain) != 1 {
|
||||
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
|
||||
if len(chain) != 2 {
|
||||
t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain))
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||
if len(pl.Processors) != 1 {
|
||||
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
|
||||
if len(pl.Processors) != 2 {
|
||||
t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
chain, err := buildProcessorChainForTests()
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
@@ -157,3 +142,50 @@ func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupeDropsSecondEventWithSameID(t *testing.T) {
|
||||
chain, err := buildProcessorChainForTests()
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||
in := fkevent.Event{
|
||||
ID: "evt-dedupe-1",
|
||||
Kind: fkevent.Kind("observation"),
|
||||
Source: "test",
|
||||
EmittedAt: time.Now().UTC(),
|
||||
Schema: "raw.weatherfeeder.unknown.v1",
|
||||
Payload: map[string]any{
|
||||
"ok": true,
|
||||
},
|
||||
}
|
||||
|
||||
first, err := pl.Process(context.Background(), in)
|
||||
if err != nil {
|
||||
t.Fatalf("first Pipeline.Process() unexpected error: %v", err)
|
||||
}
|
||||
if first == nil {
|
||||
t.Fatalf("first Pipeline.Process() unexpectedly dropped event")
|
||||
}
|
||||
|
||||
second, err := pl.Process(context.Background(), in)
|
||||
if err != nil {
|
||||
t.Fatalf("second Pipeline.Process() unexpected error: %v", err)
|
||||
}
|
||||
if second != nil {
|
||||
t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second)
|
||||
}
|
||||
}
|
||||
|
||||
func buildProcessorChainForTests() ([]fkprocessors.Processor, error) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||
|
||||
return procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user