refactor(normalizers): deduplicate synthetic station ID generation
- Add common SynthStationID helpers for coordinate-based providers - Use shared helper for Open-Meteo and OpenWeather station ID synthesis - Require both lat/lon when generating synthetic IDs to avoid misleading defaults - Remove unused Open-Meteo normalizer wrapper code This reduces cross-provider duplication while keeping provider-specific mapping logic explicit and readable.
This commit is contained in:
23
internal/normalizers/common/id.go
Normal file
23
internal/normalizers/common/id.go
Normal file
@@ -0,0 +1,23 @@
|
||||
// FILE: internal/normalizers/common/id.go
|
||||
package common
|
||||
|
||||
import "fmt"
|
||||
|
||||
// SynthStationID formats a stable synthetic station identifier for providers that are
|
||||
// coordinate-based rather than station-based.
|
||||
//
|
||||
// Example output:
|
||||
//
|
||||
// OPENMETEO(38.62700,-90.19940)
|
||||
func SynthStationID(prefix string, lat, lon float64) string {
|
||||
return fmt.Sprintf("%s(%.5f,%.5f)", prefix, lat, lon)
|
||||
}
|
||||
|
||||
// SynthStationIDPtr is the pointer-friendly variant.
|
||||
// If either coordinate is missing, it returns "" (unknown).
|
||||
func SynthStationIDPtr(prefix string, lat, lon *float64) string {
|
||||
if lat == nil || lon == nil {
|
||||
return ""
|
||||
}
|
||||
return SynthStationID(prefix, *lat, *lon)
|
||||
}
|
||||
@@ -56,14 +56,11 @@ func roundValue(v reflect.Value, decimals int) reflect.Value {
|
||||
out.Set(elem)
|
||||
return out
|
||||
}
|
||||
if elem.IsValid() && elem.Type().AssignableTo(v.Type()) {
|
||||
out.Set(elem)
|
||||
return out
|
||||
}
|
||||
if elem.IsValid() && elem.Type().ConvertibleTo(v.Type()) {
|
||||
out.Set(elem.Convert(v.Type()))
|
||||
return out
|
||||
}
|
||||
|
||||
// If we can't sensibly re-wrap, just keep the original.
|
||||
return v
|
||||
}
|
||||
|
||||
@@ -39,34 +39,26 @@ func (AlertsNormalizer) Match(e event.Event) bool {
|
||||
func (AlertsNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||||
|
||||
parsed, err := normcommon.DecodeJSONPayload[nwsAlertsResponse](in)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nws alerts normalize: %w", err)
|
||||
}
|
||||
|
||||
// If we can't derive AsOf from the payload, fall back to the existing event envelope.
|
||||
fallbackAsOf := in.EmittedAt.UTC()
|
||||
if in.EffectiveAt != nil && !in.EffectiveAt.IsZero() {
|
||||
fallbackAsOf = in.EffectiveAt.UTC()
|
||||
}
|
||||
|
||||
payload, effectiveAt, err := buildAlerts(parsed, fallbackAsOf)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nws alerts normalize: build: %w", err)
|
||||
}
|
||||
|
||||
out, err := normcommon.Finalize(in, standards.SchemaWeatherAlertV1, payload, effectiveAt)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("nws alerts normalize: %w", err)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
return normcommon.NormalizeJSON(
|
||||
in,
|
||||
"nws alerts",
|
||||
standards.SchemaWeatherAlertV1,
|
||||
func(parsed nwsAlertsResponse) (model.WeatherAlertRun, time.Time, error) {
|
||||
return buildAlerts(parsed, fallbackAsOf)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// buildAlerts contains the domain mapping logic (provider -> canonical model).
|
||||
func buildAlerts(parsed nwsAlertsResponse, fallbackAsOf time.Time) (model.WeatherAlertRun, time.Time, error) {
|
||||
// 1) Determine AsOf (required by canonical model; also used as EffectiveAt).
|
||||
asOf := parseNWSTimeUTC(parsed.Updated)
|
||||
asOf := nwscommon.ParseTimeBestEffort(parsed.Updated)
|
||||
if asOf.IsZero() {
|
||||
asOf = latestAlertTimestamp(parsed.Features)
|
||||
}
|
||||
@@ -98,14 +90,14 @@ func buildAlerts(parsed nwsAlertsResponse, fallbackAsOf time.Time) (model.Weathe
|
||||
id = fmt.Sprintf("nws:alert:%s:%d", asOf.UTC().Format(time.RFC3339Nano), i)
|
||||
}
|
||||
|
||||
sent := parseNWSTimePtr(p.Sent)
|
||||
effective := parseNWSTimePtr(p.Effective)
|
||||
onset := parseNWSTimePtr(p.Onset)
|
||||
sent := nwscommon.ParseTimePtr(p.Sent)
|
||||
effective := nwscommon.ParseTimePtr(p.Effective)
|
||||
onset := nwscommon.ParseTimePtr(p.Onset)
|
||||
|
||||
// Expires: prefer "expires"; fall back to "ends" if present.
|
||||
expires := parseNWSTimePtr(p.Expires)
|
||||
expires := nwscommon.ParseTimePtr(p.Expires)
|
||||
if expires == nil {
|
||||
expires = parseNWSTimePtr(p.Ends)
|
||||
expires = nwscommon.ParseTimePtr(p.Ends)
|
||||
}
|
||||
|
||||
refs := parseNWSAlertReferences(p.References)
|
||||
@@ -168,7 +160,7 @@ func latestAlertTimestamp(features []nwsAlertFeature) time.Time {
|
||||
p.Onset,
|
||||
}
|
||||
for _, s := range candidates {
|
||||
t := parseNWSTimeUTC(s)
|
||||
t := nwscommon.ParseTimeBestEffort(s)
|
||||
if t.IsZero() {
|
||||
continue
|
||||
}
|
||||
@@ -180,28 +172,6 @@ func latestAlertTimestamp(features []nwsAlertFeature) time.Time {
|
||||
return latest
|
||||
}
|
||||
|
||||
// parseNWSTimeUTC parses an NWS timestamp string into UTC time.Time.
|
||||
// Returns zero time on empty/unparseable input (best-effort; alerts should be resilient).
|
||||
func parseNWSTimeUTC(s string) time.Time {
|
||||
s = strings.TrimSpace(s)
|
||||
if s == "" {
|
||||
return time.Time{}
|
||||
}
|
||||
t, err := nwscommon.ParseTime(s)
|
||||
if err != nil {
|
||||
return time.Time{}
|
||||
}
|
||||
return t.UTC()
|
||||
}
|
||||
|
||||
func parseNWSTimePtr(s string) *time.Time {
|
||||
t := parseNWSTimeUTC(s)
|
||||
if t.IsZero() {
|
||||
return nil
|
||||
}
|
||||
return &t
|
||||
}
|
||||
|
||||
func firstNonEmpty(vals ...string) string {
|
||||
for _, v := range vals {
|
||||
if strings.TrimSpace(v) != "" {
|
||||
@@ -228,7 +198,7 @@ func parseNWSAlertReferences(raw json.RawMessage) []model.AlertReference {
|
||||
ID: strings.TrimSpace(r.ID),
|
||||
Identifier: strings.TrimSpace(r.Identifier),
|
||||
Sender: strings.TrimSpace(r.Sender),
|
||||
Sent: parseNWSTimePtr(r.Sent),
|
||||
Sent: nwscommon.ParseTimePtr(r.Sent),
|
||||
}
|
||||
// If only Identifier is present, preserve it as ID too (useful downstream).
|
||||
if ref.ID == "" && ref.Identifier != "" {
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
// FILE: ./internal/normalizers/openmeteo/common.go
|
||||
package openmeteo
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
openmeteo "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||
)
|
||||
|
||||
// parseOpenMeteoTime parses Open-Meteo timestamps.
|
||||
//
|
||||
// The actual parsing logic lives in internal/providers/openmeteo so both the
|
||||
// source (envelope EffectiveAt / event ID) and normalizer (canonical payload)
|
||||
// can share identical timestamp behavior.
|
||||
//
|
||||
// We keep this thin wrapper to avoid churn in the normalizer package.
|
||||
func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) {
|
||||
return openmeteo.ParseTime(s, tz, utcOffsetSeconds)
|
||||
}
|
||||
@@ -78,18 +78,8 @@ func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, e
|
||||
canonicalText := standards.WMOText(wmo, isDay)
|
||||
|
||||
// Station identity: Open-Meteo is not a station feed; synthesize from coordinates.
|
||||
stationID := ""
|
||||
if parsed.Latitude != nil || parsed.Longitude != nil {
|
||||
lat := 0.0
|
||||
lon := 0.0
|
||||
if parsed.Latitude != nil {
|
||||
lat = *parsed.Latitude
|
||||
}
|
||||
if parsed.Longitude != nil {
|
||||
lon = *parsed.Longitude
|
||||
}
|
||||
stationID = fmt.Sprintf("OPENMETEO(%.5f,%.5f)", lat, lon)
|
||||
}
|
||||
// Require BOTH lat/lon to avoid misleading OPENMETEO(0.00000,...) IDs.
|
||||
stationID := normcommon.SynthStationIDPtr("OPENMETEO", parsed.Latitude, parsed.Longitude)
|
||||
|
||||
obs := model.WeatherObservation{
|
||||
StationID: stationID,
|
||||
|
||||
@@ -4,6 +4,8 @@ package openweather
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
)
|
||||
|
||||
// This file holds provider-specific helpers that are shared across multiple
|
||||
@@ -67,5 +69,5 @@ func openWeatherStationID(parsed owmResponse) string {
|
||||
return fmt.Sprintf("OPENWEATHER(%d)", parsed.ID)
|
||||
}
|
||||
// Fallback: synthesize from coordinates.
|
||||
return fmt.Sprintf("OPENWEATHER(%.5f,%.5f)", parsed.Coord.Lat, parsed.Coord.Lon)
|
||||
return normcommon.SynthStationID("OPENWEATHER", parsed.Coord.Lat, parsed.Coord.Lon)
|
||||
}
|
||||
|
||||
@@ -25,3 +25,26 @@ func ParseTime(s string) (time.Time, error) {
|
||||
|
||||
return time.Time{}, fmt.Errorf("unsupported NWS timestamp format: %q", s)
|
||||
}
|
||||
|
||||
// ParseTimeBestEffort parses an NWS timestamp and returns it in UTC.
|
||||
//
|
||||
// This is a convenience for normalizers that want "best effort" parsing:
|
||||
// invalid/empty strings do not fail the entire normalization; they return zero time.
|
||||
func ParseTimeBestEffort(s string) time.Time {
|
||||
t, err := ParseTime(s)
|
||||
if err != nil {
|
||||
return time.Time{}
|
||||
}
|
||||
return t.UTC()
|
||||
}
|
||||
|
||||
// ParseTimePtr parses an NWS timestamp and returns a UTC *time.Time.
|
||||
//
|
||||
// Empty/unparseable input returns nil. This is useful for optional CAP fields.
|
||||
func ParseTimePtr(s string) *time.Time {
|
||||
t := ParseTimeBestEffort(s)
|
||||
if t.IsZero() {
|
||||
return nil
|
||||
}
|
||||
return &t
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user