From 2c1278a70a615dd62d6489f5d2b10d3b1ac2d364 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Sat, 28 Mar 2026 11:30:20 -0500 Subject: [PATCH] Moved generic and broadly useful helper functions upstream into feedkit --- cmd/weatherfeeder/main.go | 138 +------------------ cmd/weatherfeeder/main_test.go | 25 ++-- go.mod | 2 +- go.sum | 4 +- internal/normalizers/common/finalize.go | 20 +-- internal/normalizers/common/finalize_test.go | 36 +++++ internal/normalizers/common/json.go | 16 +-- internal/normalizers/common/payload.go | 53 ------- internal/sinks/postgres/map_test.go | 2 +- internal/sinks/postgres/schema.go | 31 +---- internal/sinks/postgres/schema_test.go | 15 +- internal/sources/common/event.go | 54 -------- internal/sources/common/id.go | 39 ------ internal/sources/nws/alerts.go | 5 +- internal/sources/nws/forecast_hourly.go | 5 +- internal/sources/nws/forecast_narrative.go | 5 +- internal/sources/nws/observation.go | 5 +- internal/sources/openmeteo/forecast.go | 5 +- internal/sources/openmeteo/observation.go | 5 +- internal/sources/openweather/observation.go | 5 +- 20 files changed, 84 insertions(+), 386 deletions(-) create mode 100644 internal/normalizers/common/finalize_test.go delete mode 100644 internal/normalizers/common/payload.go delete mode 100644 internal/sources/common/event.go delete mode 100644 internal/sources/common/id.go diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go index 9a41887..f8f77c5 100644 --- a/cmd/weatherfeeder/main.go +++ b/cmd/weatherfeeder/main.go @@ -3,14 +3,10 @@ package main import ( "context" "errors" - "fmt" "log" "os" "os/signal" - "sort" - "strings" "syscall" - "time" "gitea.maximumdirect.net/ejr/feedkit/config" fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" @@ -41,7 +37,7 @@ func main() { if err != nil { log.Fatalf("config load failed: %v", err) } - if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil { + if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, wfpgsink.PostgresSchema()); err != nil { log.Fatalf("postgres schema registration failed: %v", err) } @@ -51,15 +47,7 @@ func main() { // Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs. sinkReg := fksinks.NewRegistry() - sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) { - return fksinks.NewStdoutSink(cfg.Name), nil - }) - sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) { - return fksinks.NewPostgresSinkFromConfig(cfg) - }) - sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) { - return fksinks.NewNATSSinkFromConfig(cfg) - }) + fksinks.RegisterBuiltins(sinkReg) // --- Build sources into scheduler jobs --- var jobs []fkscheduler.Job @@ -69,7 +57,7 @@ func main() { log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) } - if err := validateSourceExpectedKinds(sc, in); err != nil { + if err := fksources.ValidateExpectedKinds(sc, in); err != nil { log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) } @@ -99,7 +87,7 @@ func main() { } // --- Compile routes --- - routes, err := compileRoutes(cfg, builtSinks) + routes, err := fkdispatch.CompileRoutes(cfg) if err != nil { log.Fatalf("compile routes failed: %v", err) } @@ -160,124 +148,6 @@ func main() { log.Printf("shutdown complete") } -func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) { - if len(cfg.Routes) == 0 { - return defaultRoutes(builtSinks), nil - } - - var routes []fkdispatch.Route - for i, r := range cfg.Routes { - if strings.TrimSpace(r.Sink) == "" { - return nil, fmt.Errorf("routes[%d].sink is empty", i) - } - if _, ok := builtSinks[r.Sink]; !ok { - return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink) - } - - kinds := map[fkevent.Kind]bool{} - for j, k := range r.Kinds { - kind, err := fkevent.ParseKind(k) - if err != nil { - return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err) - } - kinds[kind] = true - } - - routes = append(routes, fkdispatch.Route{ - SinkName: r.Sink, - Kinds: kinds, - }) - } - - return routes, nil -} - -func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route { - // nil Kinds means "match all kinds" by convention - var allKinds map[fkevent.Kind]bool = nil - - routes := make([]fkdispatch.Route, 0, len(builtSinks)) - for name := range builtSinks { - routes = append(routes, fkdispatch.Route{ - SinkName: name, - Kinds: allKinds, - }) - } - return routes -} - func isContextShutdown(err error) bool { return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } - -func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error { - expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds()) - if err != nil { - return err - } - if len(expectedKinds) == 0 { - return nil - } - - advertisedKinds := advertisedSourceKinds(in) - if len(advertisedKinds) == 0 { - return nil - } - - for kind := range expectedKinds { - if !advertisedKinds[kind] { - return fmt.Errorf( - "configured expected kind %q not advertised by source (configured=%v advertised=%v)", - kind, - sortedKinds(expectedKinds), - sortedKinds(advertisedKinds), - ) - } - } - return nil -} - -func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) { - kinds := map[fkevent.Kind]bool{} - for i, k := range raw { - kind, err := fkevent.ParseKind(k) - if err != nil { - return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err) - } - kinds[kind] = true - } - return kinds, nil -} - -func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool { - if in == nil { - return nil - } - - kinds := map[fkevent.Kind]bool{} - if ks, ok := in.(fksources.KindsSource); ok { - for _, kind := range ks.Kinds() { - kinds[kind] = true - } - return kinds - } - - if ks, ok := in.(fksources.KindSource); ok { - kinds[ks.Kind()] = true - return kinds - } - - return nil -} - -func sortedKinds(kindSet map[fkevent.Kind]bool) []string { - out := make([]string, 0, len(kindSet)) - for kind := range kindSet { - out = append(out, string(kind)) - } - sort.Strings(out) - return out -} - -// keep time imported (mirrors your previous main.go defensive trick) -var _ = time.Second diff --git a/cmd/weatherfeeder/main_test.go b/cmd/weatherfeeder/main_test.go index eb0a6dd..2cfbef9 100644 --- a/cmd/weatherfeeder/main_test.go +++ b/cmd/weatherfeeder/main_test.go @@ -13,6 +13,7 @@ import ( fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" ) @@ -44,8 +45,8 @@ func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) { kind: fkevent.Kind("observation"), } - if err := validateSourceExpectedKinds(sc, in); err != nil { - t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) + if err := fksources.ValidateExpectedKinds(sc, in); err != nil { + t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) } } @@ -56,8 +57,8 @@ func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) { kinds: []fkevent.Kind{"observation", "forecast"}, } - if err := validateSourceExpectedKinds(sc, in); err != nil { - t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) + if err := fksources.ValidateExpectedKinds(sc, in); err != nil { + t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) } } @@ -68,12 +69,12 @@ func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) { kinds: []fkevent.Kind{"observation", "forecast"}, } - err := validateSourceExpectedKinds(sc, in) + err := fksources.ValidateExpectedKinds(sc, in) if err == nil { - t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil") + t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil") } if !strings.Contains(err.Error(), "configured expected kind") { - t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err) + t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err) } } @@ -81,14 +82,8 @@ func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) { sc := config.SourceConfig{Kinds: []string{"alert"}} in := testInput{name: "test"} - if err := validateSourceExpectedKinds(sc, in); err != nil { - t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) - } -} - -func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) { - if _, err := parseExpectedKinds([]string{""}); err == nil { - t.Fatalf("parseExpectedKinds() expected error for empty kind") + if err := fksources.ValidateExpectedKinds(sc, in); err != nil { + t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) } } diff --git a/go.mod b/go.mod index 70f07af..f5bb809 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder go 1.25 -require gitea.maximumdirect.net/ejr/feedkit v0.8.0 +require gitea.maximumdirect.net/ejr/feedkit v0.8.1 require ( github.com/klauspost/compress v1.17.2 // indirect diff --git a/go.sum b/go.sum index f69edf4..6a836dc 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -gitea.maximumdirect.net/ejr/feedkit v0.8.0 h1:JdEEy6T3AQ97alLNYcQ3crN3tOEZPLMBD0Qr/MH5/dw= -gitea.maximumdirect.net/ejr/feedkit v0.8.0/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= +gitea.maximumdirect.net/ejr/feedkit v0.8.1 h1:gSZ5J/mYHfNZ6dp27TS0V9RQ0JuP5ZAHXhSeCJaBu60= +gitea.maximumdirect.net/ejr/feedkit v0.8.1/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= diff --git a/internal/normalizers/common/finalize.go b/internal/normalizers/common/finalize.go index 368460e..5e3ffc9 100644 --- a/internal/normalizers/common/finalize.go +++ b/internal/normalizers/common/finalize.go @@ -5,6 +5,7 @@ import ( "time" "gitea.maximumdirect.net/ejr/feedkit/event" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) // Finalize builds the output event envelope by copying the input and applying the @@ -16,20 +17,7 @@ import ( // If effectiveAt is zero, any existing in.EffectiveAt is preserved. // - Payload floats are rounded to a stable wire-friendly precision (see round.go). func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) { - out := in - out.Schema = outSchema - - // Enforce stable numeric presentation for sinks: round floats in the canonical payload. - out.Payload = RoundFloats(outPayload, DefaultFloatPrecision) - - if !effectiveAt.IsZero() { - t := effectiveAt.UTC() - out.EffectiveAt = &t - } - - if err := out.Validate(); err != nil { - return nil, err - } - - return &out, nil + // Enforce stable numeric presentation for weather payloads before delegating to feedkit's + // generic envelope finalizer. + return fknormalize.FinalizeEvent(in, outSchema, RoundFloats(outPayload, DefaultFloatPrecision), effectiveAt) } diff --git a/internal/normalizers/common/finalize_test.go b/internal/normalizers/common/finalize_test.go new file mode 100644 index 0000000..d32e898 --- /dev/null +++ b/internal/normalizers/common/finalize_test.go @@ -0,0 +1,36 @@ +package common + +import ( + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestFinalizeRoundsWeatherPayloadFloats(t *testing.T) { + type payload struct { + Value float64 + } + + in := event.Event{ + ID: "evt-1", + Kind: event.Kind("observation"), + Source: "source-a", + EmittedAt: time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC), + Schema: "raw.example.v1", + Payload: map[string]any{"old": true}, + } + + out, err := Finalize(in, "weather.example.v1", payload{Value: 1.234567}, time.Time{}) + if err != nil { + t.Fatalf("Finalize() unexpected error: %v", err) + } + + got, ok := out.Payload.(payload) + if !ok { + t.Fatalf("Finalize() payload type = %T, want payload", out.Payload) + } + if got.Value != 1.2346 { + t.Fatalf("Finalize() rounded value = %v, want 1.2346", got.Value) + } +} diff --git a/internal/normalizers/common/json.go b/internal/normalizers/common/json.go index 1f7f483..590283f 100644 --- a/internal/normalizers/common/json.go +++ b/internal/normalizers/common/json.go @@ -2,11 +2,11 @@ package common import ( - "encoding/json" "fmt" "time" "gitea.maximumdirect.net/ejr/feedkit/event" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) // DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T. @@ -18,19 +18,7 @@ import ( // Errors include a small amount of stage context ("extract payload", "decode raw payload"). // Callers typically wrap these with a provider/kind label. func DecodeJSONPayload[T any](in event.Event) (T, error) { - var zero T - - b, err := PayloadBytes(in) - if err != nil { - return zero, fmt.Errorf("extract payload: %w", err) - } - - var parsed T - if err := json.Unmarshal(b, &parsed); err != nil { - return zero, fmt.Errorf("decode raw payload: %w", err) - } - - return parsed, nil + return fknormalize.DecodeJSONPayload[T](in) } // NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern: diff --git a/internal/normalizers/common/payload.go b/internal/normalizers/common/payload.go deleted file mode 100644 index c22ed5d..0000000 --- a/internal/normalizers/common/payload.go +++ /dev/null @@ -1,53 +0,0 @@ -package common - -import ( - "encoding/json" - "fmt" - - "gitea.maximumdirect.net/ejr/feedkit/event" -) - -// PayloadBytes extracts a JSON payload into bytes suitable for json.Unmarshal. -// -// Supported payload shapes (weatherfeeder convention): -// - json.RawMessage (recommended for raw events) -// - []byte -// - string (assumed to contain JSON) -// - map[string]any (re-marshaled to JSON) -// -// If you add other raw representations later, extend this function. -func PayloadBytes(e event.Event) ([]byte, error) { - if e.Payload == nil { - return nil, fmt.Errorf("payload is nil") - } - - switch v := e.Payload.(type) { - case json.RawMessage: - if len(v) == 0 { - return nil, fmt.Errorf("payload is empty json.RawMessage") - } - return []byte(v), nil - - case []byte: - if len(v) == 0 { - return nil, fmt.Errorf("payload is empty []byte") - } - return v, nil - - case string: - if v == "" { - return nil, fmt.Errorf("payload is empty string") - } - return []byte(v), nil - - case map[string]any: - b, err := json.Marshal(v) - if err != nil { - return nil, fmt.Errorf("marshal map payload: %w", err) - } - return b, nil - - default: - return nil, fmt.Errorf("unsupported payload type %T", e.Payload) - } -} diff --git a/internal/sinks/postgres/map_test.go b/internal/sinks/postgres/map_test.go index 843a5f8..1e91fcf 100644 --- a/internal/sinks/postgres/map_test.go +++ b/internal/sinks/postgres/map_test.go @@ -238,7 +238,7 @@ func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWri } func tableColumnCounts() map[string]int { - s := weatherPostgresSchema() + s := PostgresSchema() m := make(map[string]int, len(s.Tables)) for _, tbl := range s.Tables { m[tbl.Name] = len(tbl.Columns) diff --git a/internal/sinks/postgres/schema.go b/internal/sinks/postgres/schema.go index c54c134..38e51ba 100644 --- a/internal/sinks/postgres/schema.go +++ b/internal/sinks/postgres/schema.go @@ -1,10 +1,6 @@ package postgres import ( - "fmt" - "strings" - - "gitea.maximumdirect.net/ejr/feedkit/config" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" ) @@ -18,31 +14,8 @@ const ( tableAlertReferences = "alert_references" ) -// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each -// configured sink using driver=postgres. -func RegisterPostgresSchemas(cfg *config.Config) error { - if cfg == nil { - return fmt.Errorf("register postgres schemas: config is nil") - } - - schema := weatherPostgresSchema() - for i, sk := range cfg.Sinks { - if !isPostgresDriver(sk.Driver) { - continue - } - if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil { - return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err) - } - } - - return nil -} - -func isPostgresDriver(driver string) bool { - return strings.EqualFold(strings.TrimSpace(driver), "postgres") -} - -func weatherPostgresSchema() fksinks.PostgresSchema { +// PostgresSchema returns weatherfeeder's Postgres schema definition. +func PostgresSchema() fksinks.PostgresSchema { return fksinks.PostgresSchema{ Tables: []fksinks.PostgresTable{ { diff --git a/internal/sinks/postgres/schema_test.go b/internal/sinks/postgres/schema_test.go index cfad385..70763b0 100644 --- a/internal/sinks/postgres/schema_test.go +++ b/internal/sinks/postgres/schema_test.go @@ -7,10 +7,11 @@ import ( "time" "gitea.maximumdirect.net/ejr/feedkit/config" + fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" ) func TestRegisterPostgresSchemasNilConfig(t *testing.T) { - err := RegisterPostgresSchemas(nil) + err := fksinks.RegisterPostgresSchemaForConfiguredSinks(nil, PostgresSchema()) if err == nil { t.Fatalf("RegisterPostgresSchemas(nil) expected error") } @@ -27,7 +28,7 @@ func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) { }, } - if err := RegisterPostgresSchemas(cfg); err != nil { + if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil { t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err) } } @@ -40,11 +41,11 @@ func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) { }, } - if err := RegisterPostgresSchemas(cfg); err != nil { + if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil { t.Fatalf("first RegisterPostgresSchemas() error = %v", err) } - err := RegisterPostgresSchemas(cfg) + err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()) if err == nil { t.Fatalf("second RegisterPostgresSchemas() expected duplicate error") } @@ -54,9 +55,9 @@ func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) { } func TestWeatherPostgresSchemaShape(t *testing.T) { - s := weatherPostgresSchema() + s := PostgresSchema() if s.MapEvent == nil { - t.Fatalf("weatherPostgresSchema().MapEvent is nil") + t.Fatalf("PostgresSchema().MapEvent is nil") } wantTables := map[string]bool{ @@ -70,7 +71,7 @@ func TestWeatherPostgresSchemaShape(t *testing.T) { } if len(s.Tables) != len(wantTables) { - t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables)) + t.Fatalf("PostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables)) } seenIndexes := map[string]bool{} diff --git a/internal/sources/common/event.go b/internal/sources/common/event.go deleted file mode 100644 index 16ae1bd..0000000 --- a/internal/sources/common/event.go +++ /dev/null @@ -1,54 +0,0 @@ -// FILE: ./internal/sources/common/event.go -package common - -import ( - "time" - - "gitea.maximumdirect.net/ejr/feedkit/event" -) - -// SingleRawEvent constructs, validates, and returns a slice containing exactly one event. -// -// This removes repetitive "event envelope ceremony" from individual sources. -// Sources remain responsible for: -// - fetching bytes (raw payload) -// - choosing Schema (raw schema identifier) -// - computing Event.ID and (optional) EffectiveAt -// -// emittedAt is explicit so callers can compute IDs using the same timestamp (or -// so tests can provide a stable value). -func SingleRawEvent( - kind event.Kind, - sourceName string, - schema string, - id string, - emittedAt time.Time, - effectiveAt *time.Time, - payload any, -) ([]event.Event, error) { - if emittedAt.IsZero() { - emittedAt = time.Now().UTC() - } else { - emittedAt = emittedAt.UTC() - } - - e := event.Event{ - ID: id, - Kind: kind, - Source: sourceName, - EmittedAt: emittedAt, - EffectiveAt: effectiveAt, - - // RAW schema (normalizer matches on this). - Schema: schema, - - // Raw payload (usually json.RawMessage). Normalizer will decode and map to canonical model. - Payload: payload, - } - - if err := e.Validate(); err != nil { - return nil, err - } - - return []event.Event{e}, nil -} diff --git a/internal/sources/common/id.go b/internal/sources/common/id.go deleted file mode 100644 index 2bfd5ac..0000000 --- a/internal/sources/common/id.go +++ /dev/null @@ -1,39 +0,0 @@ -// FILE: ./internal/sources/common/id.go -package common - -import ( - "fmt" - "strings" - "time" -) - -// ChooseEventID applies weatherfeeder's opinionated Event.ID policy: -// -// - If upstream provides an ID, use it (trimmed). -// - Otherwise, ID is ":" when available. -// - If EffectiveAt is unavailable, fall back to ":". -// -// Timestamps are encoded as RFC3339Nano in UTC. -func ChooseEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string { - if id := strings.TrimSpace(upstreamID); id != "" { - return id - } - - src := strings.TrimSpace(sourceName) - if src == "" { - src = "UNKNOWN_SOURCE" - } - - // Prefer EffectiveAt for dedupe friendliness. - if effectiveAt != nil && !effectiveAt.IsZero() { - return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano)) - } - - // Fall back to EmittedAt (still stable within a poll invocation). - t := emittedAt.UTC() - if t.IsZero() { - t = time.Now().UTC() - } - - return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano)) -} diff --git a/internal/sources/nws/alerts.go b/internal/sources/nws/alerts.go index 094238e..10804cf 100644 --- a/internal/sources/nws/alerts.go +++ b/internal/sources/nws/alerts.go @@ -11,7 +11,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -69,9 +68,9 @@ func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { // NWS alerts collections do not provide a stable per-snapshot ID. // Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness. - eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSAlertsV1, diff --git a/internal/sources/nws/forecast_hourly.go b/internal/sources/nws/forecast_hourly.go index 8e79681..ccf4feb 100644 --- a/internal/sources/nws/forecast_hourly.go +++ b/internal/sources/nws/forecast_hourly.go @@ -11,7 +11,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -69,9 +68,9 @@ func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) // NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL. // That is *not* unique per issued run, so we intentionally do not use it for Event.ID. // Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback). - eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSHourlyForecastV1, diff --git a/internal/sources/nws/forecast_narrative.go b/internal/sources/nws/forecast_narrative.go index 69093ee..16b70ec 100644 --- a/internal/sources/nws/forecast_narrative.go +++ b/internal/sources/nws/forecast_narrative.go @@ -11,7 +11,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -69,9 +68,9 @@ func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, erro // NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL. // That is *not* unique per issued run, so we intentionally do not use it for Event.ID. // Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback). - eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSNarrativeForecastV1, diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index caeebd9..4cd0ce8 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -11,7 +11,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -52,9 +51,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { } emittedAt := time.Now().UTC() - eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSObservationV1, diff --git a/internal/sources/openmeteo/forecast.go b/internal/sources/openmeteo/forecast.go index 4f82463..d01980a 100644 --- a/internal/sources/openmeteo/forecast.go +++ b/internal/sources/openmeteo/forecast.go @@ -10,7 +10,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -53,9 +52,9 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { } emittedAt := time.Now().UTC() - eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawOpenMeteoHourlyForecastV1, diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index c07e483..15639b8 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -10,7 +10,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -50,9 +49,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { } emittedAt := time.Now().UTC() - eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawOpenMeteoCurrentV1, diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 0127cdb..25b6264 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -11,7 +11,6 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -58,9 +57,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { } emittedAt := time.Now().UTC() - eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - return common.SingleRawEvent( + return fksources.SingleEvent( s.Kind(), s.http.Name, standards.SchemaRawOpenWeatherCurrentV1,