Refactor normalizers: dedupe JSON decode + event finalize
Add shared normalizer helpers to centralize payload extraction, JSON decoding, and event finalization/validation. Refactor NWS, Open-Meteo, and OpenWeather observation normalizers to use the shared spine, removing repeated boilerplate while preserving provider-specific mapping logic.
This commit is contained in:
32
internal/normalizers/common/finalize.go
Normal file
32
internal/normalizers/common/finalize.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
// FILE: ./internal/normalizers/common/finalize.go
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Finalize builds the output event envelope by copying the input and applying the
|
||||||
|
// canonical schema/payload, plus (optionally) EffectiveAt.
|
||||||
|
//
|
||||||
|
// Important behavior:
|
||||||
|
// - ID/Kind/Source/EmittedAt are preserved by copying the input event.
|
||||||
|
// - EffectiveAt is only overwritten when effectiveAt is non-zero.
|
||||||
|
// If effectiveAt is zero, any existing in.EffectiveAt is preserved.
|
||||||
|
func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) {
|
||||||
|
out := in
|
||||||
|
out.Schema = outSchema
|
||||||
|
out.Payload = outPayload
|
||||||
|
|
||||||
|
if !effectiveAt.IsZero() {
|
||||||
|
t := effectiveAt.UTC()
|
||||||
|
out.EffectiveAt = &t
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := out.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
68
internal/normalizers/common/json.go
Normal file
68
internal/normalizers/common/json.go
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
// FILE: ./internal/normalizers/common/json.go
|
||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T.
|
||||||
|
//
|
||||||
|
// This is the shared "spine" used by many normalizers:
|
||||||
|
// - sources emit raw JSON payloads (typically json.RawMessage)
|
||||||
|
// - normalizers decode into provider structs
|
||||||
|
//
|
||||||
|
// Errors include a small amount of stage context ("extract payload", "decode raw payload").
|
||||||
|
// Callers typically wrap these with a provider/kind label.
|
||||||
|
func DecodeJSONPayload[T any](in event.Event) (T, error) {
|
||||||
|
var zero T
|
||||||
|
|
||||||
|
b, err := PayloadBytes(in)
|
||||||
|
if err != nil {
|
||||||
|
return zero, fmt.Errorf("extract payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parsed T
|
||||||
|
if err := json.Unmarshal(b, &parsed); err != nil {
|
||||||
|
return zero, fmt.Errorf("decode raw payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return parsed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern:
|
||||||
|
//
|
||||||
|
// 1. Decode raw JSON payload into provider struct T
|
||||||
|
// 2. Map T into canonical payload P (plus an EffectiveAt timestamp)
|
||||||
|
// 3. Finalize the event envelope (schema/payload/effectiveAt) + Validate
|
||||||
|
//
|
||||||
|
// label should be short and specific, e.g. "openweather observation".
|
||||||
|
// outSchema should be the canonical schema constant.
|
||||||
|
// build should contain ONLY provider/domain mapping logic.
|
||||||
|
func NormalizeJSON[T any, P any](
|
||||||
|
in event.Event,
|
||||||
|
label string,
|
||||||
|
outSchema string,
|
||||||
|
build func(parsed T) (P, time.Time, error),
|
||||||
|
) (*event.Event, error) {
|
||||||
|
parsed, err := DecodeJSONPayload[T](in)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("%s normalize: %w", label, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, effectiveAt, err := build(parsed)
|
||||||
|
if err != nil {
|
||||||
|
// build() should already include provider-specific context where appropriate.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := Finalize(in, outSchema, payload, effectiveAt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("%s normalize: %w", label, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
@@ -3,7 +3,6 @@ package nws
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -34,35 +33,12 @@ func (ObservationNormalizer) Match(e event.Event) bool {
|
|||||||
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||||
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||||||
|
|
||||||
rawBytes, err := normcommon.PayloadBytes(in)
|
return normcommon.NormalizeJSON(
|
||||||
if err != nil {
|
in,
|
||||||
return nil, fmt.Errorf("nws observation normalize: %w", err)
|
"nws observation",
|
||||||
}
|
standards.SchemaWeatherObservationV1,
|
||||||
|
buildObservation,
|
||||||
var parsed nwsObservationResponse
|
)
|
||||||
if err := json.Unmarshal(rawBytes, &parsed); err != nil {
|
|
||||||
return nil, fmt.Errorf("nws observation normalize: decode raw payload: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
obs, effectiveAt, err := buildObservation(parsed)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
out := in
|
|
||||||
out.Schema = standards.SchemaWeatherObservationV1
|
|
||||||
out.Payload = obs
|
|
||||||
|
|
||||||
// EffectiveAt is optional; for observations it is naturally the observation timestamp.
|
|
||||||
if !effectiveAt.IsZero() {
|
|
||||||
t := effectiveAt.UTC()
|
|
||||||
out.EffectiveAt = &t
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := out.Validate(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package openmeteo
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -42,35 +41,12 @@ func (ObservationNormalizer) Match(e event.Event) bool {
|
|||||||
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||||
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||||||
|
|
||||||
rawBytes, err := normcommon.PayloadBytes(in)
|
return normcommon.NormalizeJSON(
|
||||||
if err != nil {
|
in,
|
||||||
return nil, fmt.Errorf("openmeteo observation normalize: %w", err)
|
"openmeteo observation",
|
||||||
}
|
standards.SchemaWeatherObservationV1,
|
||||||
|
buildObservation,
|
||||||
var parsed omResponse
|
)
|
||||||
if err := json.Unmarshal(rawBytes, &parsed); err != nil {
|
|
||||||
return nil, fmt.Errorf("openmeteo observation normalize: decode raw payload: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
obs, effectiveAt, err := buildObservation(parsed)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
out := in
|
|
||||||
out.Schema = standards.SchemaWeatherObservationV1
|
|
||||||
out.Payload = obs
|
|
||||||
|
|
||||||
// EffectiveAt is optional; for observations it is naturally the observation timestamp.
|
|
||||||
if !effectiveAt.IsZero() {
|
|
||||||
t := effectiveAt.UTC()
|
|
||||||
out.EffectiveAt = &t
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := out.Validate(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package openweather
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -41,35 +40,12 @@ func (ObservationNormalizer) Match(e event.Event) bool {
|
|||||||
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||||
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||||||
|
|
||||||
rawBytes, err := normcommon.PayloadBytes(in)
|
return normcommon.NormalizeJSON(
|
||||||
if err != nil {
|
in,
|
||||||
return nil, fmt.Errorf("openweather observation normalize: %w", err)
|
"openweather observation",
|
||||||
}
|
standards.SchemaWeatherObservationV1,
|
||||||
|
buildObservation,
|
||||||
var parsed owmResponse
|
)
|
||||||
if err := json.Unmarshal(rawBytes, &parsed); err != nil {
|
|
||||||
return nil, fmt.Errorf("openweather observation normalize: decode raw payload: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
obs, effectiveAt, err := buildObservation(parsed)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
out := in
|
|
||||||
out.Schema = standards.SchemaWeatherObservationV1
|
|
||||||
out.Payload = obs
|
|
||||||
|
|
||||||
// EffectiveAt is optional; for observations it is naturally the observation timestamp.
|
|
||||||
if !effectiveAt.IsZero() {
|
|
||||||
t := effectiveAt.UTC()
|
|
||||||
out.EffectiveAt = &t
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := out.Validate(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &out, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
||||||
|
|||||||
Reference in New Issue
Block a user