Files
weatherfeeder/internal/sources/nws/observation.go
Eric Rakestraw f13f43cf56 refactor(providers): centralize provider-specific parsing and invariants
- Introduce internal/providers/nws with shared timestamp parsing used by both
  NWS sources and normalizers
- Migrate NWS observation source + normalizer to use the shared provider helper
  for consistent RFC3339/RFC3339Nano handling
- Introduce internal/providers/openweather with a shared URL invariant helper
  enforcing units=metric
- Remove duplicated OpenWeather URL validation logic from the observation source
- Align provider layering: move provider “contract/quirk” logic out of
  normalizers and into internal/providers
- Update normalizer and standards documentation to clearly distinguish:
  provider helpers (internal/providers) vs canonical mapping logic
  (internal/normalizers)

This refactor reduces duplication between sources and normalizers, clarifies
layering boundaries, and establishes a scalable pattern for future forecast
and alert implementations.
2026-01-15 20:40:53 -06:00

96 lines
2.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// FILE: ./internal/sources/nws/observation.go
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
)
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
type ObservationSource struct {
http *common.HTTPSource
}
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
const driver = "nws_observation"
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil {
return nil, err
}
return &ObservationSource{http: hs}, nil
}
func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
// EffectiveAt is optional; for observations its naturally the observation timestamp.
var effectiveAt *time.Time
if !meta.ParsedTimestamp.IsZero() {
t := meta.ParsedTimestamp.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSObservationV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type observationMeta struct {
ID string `json:"id"`
Properties struct {
Timestamp string `json:"timestamp"`
} `json:"properties"`
ParsedTimestamp time.Time `json:"-"`
}
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
raw, err := s.http.FetchJSON(ctx)
if err != nil {
return nil, observationMeta{}, err
}
var meta observationMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
return raw, observationMeta{}, nil
}
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
if tsStr != "" {
if t, err := nwscommon.ParseTime(tsStr); err == nil {
meta.ParsedTimestamp = t.UTC()
}
}
return raw, meta, nil
}