Remove global Postgres schema registration in favor of explicit schema-aware sink factory wiring, and update weatherfeeder to register the Postgres sink explicitly. Add optional per-source HTTP timeout and response body limit overrides while keeping feedkit defaults. Remove remaining legacy source/config compatibility surfaces, including singular kind support and old source registry/type aliases, and migrate weatherfeeder sources to plural `Kinds()` metadata. Clean up related docs, tests, and sample config to match the new Postgres, HTTP, and NATS configuration model.
113 lines
3.1 KiB
Go
113 lines
3.1 KiB
Go
package sources
|
|
|
|
import (
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
|
)
|
|
|
|
type testInput struct {
|
|
name string
|
|
}
|
|
|
|
func (s testInput) Name() string { return s.name }
|
|
|
|
type testKindsSource struct {
|
|
testInput
|
|
kinds []event.Kind
|
|
}
|
|
|
|
func (s testKindsSource) Kinds() []event.Kind { return s.kinds }
|
|
|
|
func TestValidateExpectedKindsSubsetAllowed(t *testing.T) {
|
|
cfg := config.SourceConfig{Kinds: []string{"observation"}}
|
|
in := testKindsSource{
|
|
testInput: testInput{name: "test"},
|
|
kinds: []event.Kind{"observation", "forecast"},
|
|
}
|
|
|
|
if err := ValidateExpectedKinds(cfg, in); err != nil {
|
|
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestValidateExpectedKindsMismatchFails(t *testing.T) {
|
|
cfg := config.SourceConfig{Kinds: []string{"alert"}}
|
|
in := testKindsSource{
|
|
testInput: testInput{name: "test"},
|
|
kinds: []event.Kind{"observation", "forecast"},
|
|
}
|
|
|
|
err := ValidateExpectedKinds(cfg, in)
|
|
if err == nil {
|
|
t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil")
|
|
}
|
|
if !strings.Contains(err.Error(), "configured expected kind") {
|
|
t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err)
|
|
}
|
|
}
|
|
|
|
func TestValidateExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
|
|
cfg := config.SourceConfig{Kinds: []string{"alert"}}
|
|
in := testInput{name: "test"}
|
|
|
|
if err := ValidateExpectedKinds(cfg, in); err != nil {
|
|
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestDefaultEventIDUsesUpstreamID(t *testing.T) {
|
|
emittedAt := time.Date(2026, 3, 28, 15, 4, 5, 123, time.UTC)
|
|
got := DefaultEventID(" upstream-id ", "source", nil, emittedAt)
|
|
if got != "upstream-id" {
|
|
t.Fatalf("DefaultEventID() = %q, want upstream-id", got)
|
|
}
|
|
}
|
|
|
|
func TestDefaultEventIDPrefersEffectiveAt(t *testing.T) {
|
|
effectiveAt := time.Date(2026, 3, 28, 16, 4, 5, 987654321, time.FixedZone("x", -6*3600))
|
|
emittedAt := time.Date(2026, 3, 28, 15, 4, 5, 123, time.UTC)
|
|
|
|
got := DefaultEventID("", "source", &effectiveAt, emittedAt)
|
|
want := "source:" + effectiveAt.UTC().Format(time.RFC3339Nano)
|
|
if got != want {
|
|
t.Fatalf("DefaultEventID() = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestDefaultEventIDFallsBackToEmittedAt(t *testing.T) {
|
|
emittedAt := time.Date(2026, 3, 28, 15, 4, 5, 123456789, time.FixedZone("y", 3*3600))
|
|
got := DefaultEventID("", "source", nil, emittedAt)
|
|
want := "source:" + emittedAt.UTC().Format(time.RFC3339Nano)
|
|
if got != want {
|
|
t.Fatalf("DefaultEventID() = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestSingleEventBuildsValidatedSlice(t *testing.T) {
|
|
effectiveAt := time.Date(2026, 3, 28, 16, 0, 0, 0, time.UTC)
|
|
emittedAt := time.Date(2026, 3, 28, 15, 0, 0, 0, time.FixedZone("z", -5*3600))
|
|
|
|
got, err := SingleEvent(
|
|
event.Kind("observation"),
|
|
"source-a",
|
|
"raw.example.v1",
|
|
"evt-1",
|
|
emittedAt,
|
|
&effectiveAt,
|
|
map[string]any{"ok": true},
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("SingleEvent() unexpected error: %v", err)
|
|
}
|
|
if len(got) != 1 {
|
|
t.Fatalf("SingleEvent() len = %d, want 1", len(got))
|
|
}
|
|
if got[0].EmittedAt != emittedAt.UTC() {
|
|
t.Fatalf("SingleEvent() emittedAt = %s, want %s", got[0].EmittedAt, emittedAt.UTC())
|
|
}
|
|
}
|