- Introduce internal/providers/nws with shared timestamp parsing used by both NWS sources and normalizers - Migrate NWS observation source + normalizer to use the shared provider helper for consistent RFC3339/RFC3339Nano handling - Introduce internal/providers/openweather with a shared URL invariant helper enforcing units=metric - Remove duplicated OpenWeather URL validation logic from the observation source - Align provider layering: move provider “contract/quirk” logic out of normalizers and into internal/providers - Update normalizer and standards documentation to clearly distinguish: provider helpers (internal/providers) vs canonical mapping logic (internal/normalizers) This refactor reduces duplication between sources and normalizers, clarifies layering boundaries, and establishes a scalable pattern for future forecast and alert implementations.
96 lines
2.4 KiB
Go
96 lines
2.4 KiB
Go
// FILE: ./internal/sources/nws/observation.go
|
||
package nws
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"strings"
|
||
"time"
|
||
|
||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||
)
|
||
|
||
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
||
type ObservationSource struct {
|
||
http *common.HTTPSource
|
||
}
|
||
|
||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||
const driver = "nws_observation"
|
||
|
||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return &ObservationSource{http: hs}, nil
|
||
}
|
||
|
||
func (s *ObservationSource) Name() string { return s.http.Name }
|
||
|
||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||
|
||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||
raw, meta, err := s.fetchRaw(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
||
var effectiveAt *time.Time
|
||
if !meta.ParsedTimestamp.IsZero() {
|
||
t := meta.ParsedTimestamp.UTC()
|
||
effectiveAt = &t
|
||
}
|
||
|
||
emittedAt := time.Now().UTC()
|
||
eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
|
||
|
||
return common.SingleRawEvent(
|
||
s.Kind(),
|
||
s.http.Name,
|
||
standards.SchemaRawNWSObservationV1,
|
||
eventID,
|
||
emittedAt,
|
||
effectiveAt,
|
||
raw,
|
||
)
|
||
}
|
||
|
||
// ---- RAW fetch + minimal metadata decode ----
|
||
|
||
type observationMeta struct {
|
||
ID string `json:"id"`
|
||
Properties struct {
|
||
Timestamp string `json:"timestamp"`
|
||
} `json:"properties"`
|
||
|
||
ParsedTimestamp time.Time `json:"-"`
|
||
}
|
||
|
||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
|
||
raw, err := s.http.FetchJSON(ctx)
|
||
if err != nil {
|
||
return nil, observationMeta{}, err
|
||
}
|
||
|
||
var meta observationMeta
|
||
if err := json.Unmarshal(raw, &meta); err != nil {
|
||
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
|
||
return raw, observationMeta{}, nil
|
||
}
|
||
|
||
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
||
if tsStr != "" {
|
||
if t, err := nwscommon.ParseTime(tsStr); err == nil {
|
||
meta.ParsedTimestamp = t.UTC()
|
||
}
|
||
}
|
||
|
||
return raw, meta, nil
|
||
}
|