From 8968b6bdcd637b1fd74a96cf0e9466ea23e69e22 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Thu, 15 Jan 2026 10:36:18 -0600 Subject: [PATCH] Refactor normalizers: dedupe JSON decode + event finalize Add shared normalizer helpers to centralize payload extraction, JSON decoding, and event finalization/validation. Refactor NWS, Open-Meteo, and OpenWeather observation normalizers to use the shared spine, removing repeated boilerplate while preserving provider-specific mapping logic. --- internal/normalizers/common/finalize.go | 32 +++++++++ internal/normalizers/common/json.go | 68 +++++++++++++++++++ internal/normalizers/nws/observation.go | 36 ++-------- internal/normalizers/openmeteo/observation.go | 36 ++-------- .../normalizers/openweather/observation.go | 36 ++-------- 5 files changed, 118 insertions(+), 90 deletions(-) create mode 100644 internal/normalizers/common/finalize.go create mode 100644 internal/normalizers/common/json.go diff --git a/internal/normalizers/common/finalize.go b/internal/normalizers/common/finalize.go new file mode 100644 index 0000000..0ada934 --- /dev/null +++ b/internal/normalizers/common/finalize.go @@ -0,0 +1,32 @@ +// FILE: ./internal/normalizers/common/finalize.go +package common + +import ( + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Finalize builds the output event envelope by copying the input and applying the +// canonical schema/payload, plus (optionally) EffectiveAt. +// +// Important behavior: +// - ID/Kind/Source/EmittedAt are preserved by copying the input event. +// - EffectiveAt is only overwritten when effectiveAt is non-zero. +// If effectiveAt is zero, any existing in.EffectiveAt is preserved. +func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) { + out := in + out.Schema = outSchema + out.Payload = outPayload + + if !effectiveAt.IsZero() { + t := effectiveAt.UTC() + out.EffectiveAt = &t + } + + if err := out.Validate(); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/internal/normalizers/common/json.go b/internal/normalizers/common/json.go new file mode 100644 index 0000000..29a74ff --- /dev/null +++ b/internal/normalizers/common/json.go @@ -0,0 +1,68 @@ +// FILE: ./internal/normalizers/common/json.go +package common + +import ( + "encoding/json" + "fmt" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T. +// +// This is the shared "spine" used by many normalizers: +// - sources emit raw JSON payloads (typically json.RawMessage) +// - normalizers decode into provider structs +// +// Errors include a small amount of stage context ("extract payload", "decode raw payload"). +// Callers typically wrap these with a provider/kind label. +func DecodeJSONPayload[T any](in event.Event) (T, error) { + var zero T + + b, err := PayloadBytes(in) + if err != nil { + return zero, fmt.Errorf("extract payload: %w", err) + } + + var parsed T + if err := json.Unmarshal(b, &parsed); err != nil { + return zero, fmt.Errorf("decode raw payload: %w", err) + } + + return parsed, nil +} + +// NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern: +// +// 1. Decode raw JSON payload into provider struct T +// 2. Map T into canonical payload P (plus an EffectiveAt timestamp) +// 3. Finalize the event envelope (schema/payload/effectiveAt) + Validate +// +// label should be short and specific, e.g. "openweather observation". +// outSchema should be the canonical schema constant. +// build should contain ONLY provider/domain mapping logic. +func NormalizeJSON[T any, P any]( + in event.Event, + label string, + outSchema string, + build func(parsed T) (P, time.Time, error), +) (*event.Event, error) { + parsed, err := DecodeJSONPayload[T](in) + if err != nil { + return nil, fmt.Errorf("%s normalize: %w", label, err) + } + + payload, effectiveAt, err := build(parsed) + if err != nil { + // build() should already include provider-specific context where appropriate. + return nil, err + } + + out, err := Finalize(in, outSchema, payload, effectiveAt) + if err != nil { + return nil, fmt.Errorf("%s normalize: %w", label, err) + } + + return out, nil +} diff --git a/internal/normalizers/nws/observation.go b/internal/normalizers/nws/observation.go index ef1693b..d0aaeca 100644 --- a/internal/normalizers/nws/observation.go +++ b/internal/normalizers/nws/observation.go @@ -3,7 +3,6 @@ package nws import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -34,35 +33,12 @@ func (ObservationNormalizer) Match(e event.Event) bool { func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps - rawBytes, err := normcommon.PayloadBytes(in) - if err != nil { - return nil, fmt.Errorf("nws observation normalize: %w", err) - } - - var parsed nwsObservationResponse - if err := json.Unmarshal(rawBytes, &parsed); err != nil { - return nil, fmt.Errorf("nws observation normalize: decode raw payload: %w", err) - } - - obs, effectiveAt, err := buildObservation(parsed) - if err != nil { - return nil, err - } - - out := in - out.Schema = standards.SchemaWeatherObservationV1 - out.Payload = obs - - // EffectiveAt is optional; for observations it is naturally the observation timestamp. - if !effectiveAt.IsZero() { - t := effectiveAt.UTC() - out.EffectiveAt = &t - } - - if err := out.Validate(); err != nil { - return nil, err - } - return &out, nil + return normcommon.NormalizeJSON( + in, + "nws observation", + standards.SchemaWeatherObservationV1, + buildObservation, + ) } // buildObservation contains the domain mapping logic (provider -> canonical model). diff --git a/internal/normalizers/openmeteo/observation.go b/internal/normalizers/openmeteo/observation.go index 12b2c3a..0a07c27 100644 --- a/internal/normalizers/openmeteo/observation.go +++ b/internal/normalizers/openmeteo/observation.go @@ -3,7 +3,6 @@ package openmeteo import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -42,35 +41,12 @@ func (ObservationNormalizer) Match(e event.Event) bool { func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps - rawBytes, err := normcommon.PayloadBytes(in) - if err != nil { - return nil, fmt.Errorf("openmeteo observation normalize: %w", err) - } - - var parsed omResponse - if err := json.Unmarshal(rawBytes, &parsed); err != nil { - return nil, fmt.Errorf("openmeteo observation normalize: decode raw payload: %w", err) - } - - obs, effectiveAt, err := buildObservation(parsed) - if err != nil { - return nil, err - } - - out := in - out.Schema = standards.SchemaWeatherObservationV1 - out.Payload = obs - - // EffectiveAt is optional; for observations it is naturally the observation timestamp. - if !effectiveAt.IsZero() { - t := effectiveAt.UTC() - out.EffectiveAt = &t - } - - if err := out.Validate(); err != nil { - return nil, err - } - return &out, nil + return normcommon.NormalizeJSON( + in, + "openmeteo observation", + standards.SchemaWeatherObservationV1, + buildObservation, + ) } // buildObservation contains the domain mapping logic (provider -> canonical model). diff --git a/internal/normalizers/openweather/observation.go b/internal/normalizers/openweather/observation.go index b21190e..19ae86c 100644 --- a/internal/normalizers/openweather/observation.go +++ b/internal/normalizers/openweather/observation.go @@ -3,7 +3,6 @@ package openweather import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -41,35 +40,12 @@ func (ObservationNormalizer) Match(e event.Event) bool { func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps - rawBytes, err := normcommon.PayloadBytes(in) - if err != nil { - return nil, fmt.Errorf("openweather observation normalize: %w", err) - } - - var parsed owmResponse - if err := json.Unmarshal(rawBytes, &parsed); err != nil { - return nil, fmt.Errorf("openweather observation normalize: decode raw payload: %w", err) - } - - obs, effectiveAt, err := buildObservation(parsed) - if err != nil { - return nil, err - } - - out := in - out.Schema = standards.SchemaWeatherObservationV1 - out.Payload = obs - - // EffectiveAt is optional; for observations it is naturally the observation timestamp. - if !effectiveAt.IsZero() { - t := effectiveAt.UTC() - out.EffectiveAt = &t - } - - if err := out.Validate(); err != nil { - return nil, err - } - return &out, nil + return normcommon.NormalizeJSON( + in, + "openweather observation", + standards.SchemaWeatherObservationV1, + buildObservation, + ) } // buildObservation contains the domain mapping logic (provider -> canonical model).