refactor(providers): centralize provider-specific parsing and invariants
- Introduce internal/providers/nws with shared timestamp parsing used by both NWS sources and normalizers - Migrate NWS observation source + normalizer to use the shared provider helper for consistent RFC3339/RFC3339Nano handling - Introduce internal/providers/openweather with a shared URL invariant helper enforcing units=metric - Remove duplicated OpenWeather URL validation logic from the observation source - Align provider layering: move provider “contract/quirk” logic out of normalizers and into internal/providers - Update normalizer and standards documentation to clearly distinguish: provider helpers (internal/providers) vs canonical mapping logic (internal/normalizers) This refactor reduces duplication between sources and normalizers, clarifies layering boundaries, and establishes a scalable pattern for future forecast and alert implementations.
This commit is contained in:
@@ -36,6 +36,10 @@
|
|||||||
// 2. Provider-level shared helpers live under the provider directory:
|
// 2. Provider-level shared helpers live under the provider directory:
|
||||||
// internal/normalizers/<provider>/
|
// internal/normalizers/<provider>/
|
||||||
//
|
//
|
||||||
|
// Use this for provider-specific quirks that should be shared by BOTH sources
|
||||||
|
// and normalizers (time parsing, URL/unit invariants, ID normalization, etc.).
|
||||||
|
// Keep these helpers pure (no I/O) and easy to test.
|
||||||
|
//
|
||||||
// You may use multiple helper files (recommended) when it improves clarity:
|
// You may use multiple helper files (recommended) when it improves clarity:
|
||||||
// - types.go (provider JSON structs)
|
// - types.go (provider JSON structs)
|
||||||
// - common.go (provider-shared helpers)
|
// - common.go (provider-shared helpers)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
||||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||||
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -46,11 +47,11 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
|
|||||||
// Timestamp (RFC3339)
|
// Timestamp (RFC3339)
|
||||||
var ts time.Time
|
var ts time.Time
|
||||||
if s := strings.TrimSpace(parsed.Properties.Timestamp); s != "" {
|
if s := strings.TrimSpace(parsed.Properties.Timestamp); s != "" {
|
||||||
t, err := time.Parse(time.RFC3339, s)
|
t, err := nwscommon.ParseTime(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherObservation{}, time.Time{}, fmt.Errorf("invalid timestamp %q: %w", s, err)
|
return model.WeatherObservation{}, time.Time{}, fmt.Errorf("invalid timestamp %q: %w", s, err)
|
||||||
}
|
}
|
||||||
ts = t
|
ts = t.UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers))
|
cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers))
|
||||||
|
|||||||
@@ -1,19 +0,0 @@
|
|||||||
// FILE: ./internal/normalizers/openmeteo/common.go
|
|
||||||
package openmeteo
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
|
|
||||||
openmeteo "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
|
||||||
)
|
|
||||||
|
|
||||||
// parseOpenMeteoTime parses Open-Meteo timestamps.
|
|
||||||
//
|
|
||||||
// The actual parsing logic lives in internal/providers/openmeteo so both the
|
|
||||||
// source (envelope EffectiveAt / event ID) and normalizer (canonical payload)
|
|
||||||
// can share identical timestamp behavior.
|
|
||||||
//
|
|
||||||
// We keep this thin wrapper to avoid churn in the normalizer package.
|
|
||||||
func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) {
|
|
||||||
return openmeteo.ParseTime(s, tz, utcOffsetSeconds)
|
|
||||||
}
|
|
||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
||||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||||
|
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -54,7 +55,7 @@ func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, e
|
|||||||
// Parse current.time.
|
// Parse current.time.
|
||||||
var ts time.Time
|
var ts time.Time
|
||||||
if s := strings.TrimSpace(parsed.Current.Time); s != "" {
|
if s := strings.TrimSpace(parsed.Current.Time); s != "" {
|
||||||
t, err := parseOpenMeteoTime(s, parsed.Timezone, parsed.UTCOffsetSeconds)
|
t, err := omcommon.ParseTime(s, parsed.Timezone, parsed.UTCOffsetSeconds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherObservation{}, time.Time{}, fmt.Errorf("parse time %q: %w", s, err)
|
return model.WeatherObservation{}, time.Time{}, fmt.Errorf("parse time %q: %w", s, err)
|
||||||
}
|
}
|
||||||
|
|||||||
8
internal/providers/nws/doc.go
Normal file
8
internal/providers/nws/doc.go
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
// Package nws contains provider-specific helper code for the National Weather Service
|
||||||
|
// used by both sources and normalizers.
|
||||||
|
//
|
||||||
|
// Rules:
|
||||||
|
// - No network I/O here (sources fetch; normalizers transform).
|
||||||
|
// - Keep helpers deterministic and easy to unit test.
|
||||||
|
// - Prefer putting provider quirks/parsing here when sources + normalizers both need it.
|
||||||
|
package nws
|
||||||
27
internal/providers/nws/time.go
Normal file
27
internal/providers/nws/time.go
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ParseTime parses NWS timestamps.
|
||||||
|
//
|
||||||
|
// NWS observation timestamps are typically RFC3339, sometimes with fractional seconds.
|
||||||
|
// We accept RFC3339Nano first, then RFC3339.
|
||||||
|
func ParseTime(s string) (time.Time, error) {
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
if s == "" {
|
||||||
|
return time.Time{}, fmt.Errorf("empty time")
|
||||||
|
}
|
||||||
|
|
||||||
|
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
if t, err := time.Parse(time.RFC3339, s); err == nil {
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return time.Time{}, fmt.Errorf("unsupported NWS timestamp format: %q", s)
|
||||||
|
}
|
||||||
8
internal/providers/openweather/doc.go
Normal file
8
internal/providers/openweather/doc.go
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
// Package openweather contains provider-specific helper code for OpenWeather used by
|
||||||
|
// both sources and normalizers.
|
||||||
|
//
|
||||||
|
// Rules:
|
||||||
|
// - No network I/O here.
|
||||||
|
// - Keep helpers deterministic and easy to unit test.
|
||||||
|
// - Put provider invariants here (e.g., units=metric enforcement).
|
||||||
|
package openweather
|
||||||
26
internal/providers/openweather/url.go
Normal file
26
internal/providers/openweather/url.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package openweather
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RequireMetricUnits enforces weatherfeeder's OpenWeather invariant:
|
||||||
|
// the request URL must include units=metric (otherwise temperatures/winds/pressure differ).
|
||||||
|
func RequireMetricUnits(rawURL string) error {
|
||||||
|
u, err := url.Parse(strings.TrimSpace(rawURL))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid url %q: %w", rawURL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
units := strings.ToLower(strings.TrimSpace(u.Query().Get("units")))
|
||||||
|
if units != "metric" {
|
||||||
|
if units == "" {
|
||||||
|
units = "(missing; defaults to standard)"
|
||||||
|
}
|
||||||
|
return fmt.Errorf("url must include units=metric (got units=%s)", units)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||||
)
|
)
|
||||||
@@ -85,8 +86,8 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, obse
|
|||||||
|
|
||||||
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
||||||
if tsStr != "" {
|
if tsStr != "" {
|
||||||
if t, err := time.Parse(time.RFC3339, tsStr); err == nil {
|
if t, err := nwscommon.ParseTime(tsStr); err == nil {
|
||||||
meta.ParsedTimestamp = t
|
meta.ParsedTimestamp = t.UTC()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,12 +5,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||||
)
|
)
|
||||||
@@ -27,7 +26,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := requireMetricUnits(hs.URL); err != nil {
|
if err := owcommon.RequireMetricUnits(hs.URL); err != nil {
|
||||||
return nil, fmt.Errorf("%s %q: %w", hs.Driver, hs.Name, err)
|
return nil, fmt.Errorf("%s %q: %w", hs.Driver, hs.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -39,7 +38,7 @@ func (s *ObservationSource) Name() string { return s.http.Name }
|
|||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
if err := requireMetricUnits(s.http.URL); err != nil {
|
if err := owcommon.RequireMetricUnits(s.http.URL); err != nil {
|
||||||
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,20 +92,3 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open
|
|||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func requireMetricUnits(rawURL string) error {
|
|
||||||
u, err := url.Parse(strings.TrimSpace(rawURL))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("invalid url %q: %w", rawURL, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
units := strings.ToLower(strings.TrimSpace(u.Query().Get("units")))
|
|
||||||
if units != "metric" {
|
|
||||||
if units == "" {
|
|
||||||
units = "(missing; defaults to standard)"
|
|
||||||
}
|
|
||||||
return fmt.Errorf("url must include units=metric (got units=%s)", units)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -10,5 +10,6 @@
|
|||||||
// provider-specific logic and free of dependencies on internal/sources/* or
|
// provider-specific logic and free of dependencies on internal/sources/* or
|
||||||
// internal/normalizers/* to avoid import cycles.
|
// internal/normalizers/* to avoid import cycles.
|
||||||
//
|
//
|
||||||
// Provider-specific decoding and mapping lives in internal/normalizers/<provider>.
|
// Provider-specific decoding helpers and quirks live in internal/providers/<provider>.
|
||||||
|
// Normalizer implementations and canonical mapping logic live in internal/normalizers/<provider>.
|
||||||
package standards
|
package standards
|
||||||
|
|||||||
Reference in New Issue
Block a user