Files
weatherfeeder/internal/sources/nws/observation.go

133 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// FILE: ./internal/sources/nws/observation.go
package nws
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
)
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
//
// This corresponds to URLs like:
//
// https://api.weather.gov/stations/KSTL/observations/latest
type ObservationSource struct {
name string
url string
userAgent string
client *http.Client
}
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
const driver = "nws_observation"
c, err := common.RequireHTTPSourceConfig(driver, cfg)
if err != nil {
return nil, err
}
return &ObservationSource{
name: c.Name,
url: c.URL,
userAgent: c.UserAgent,
client: common.NewHTTPClient(common.DefaultHTTPTimeout),
}, nil
}
func (s *ObservationSource) Name() string { return s.name }
// Kind is used for routing/policy.
// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical.
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
// Poll fetches NWS "latest observation" and emits exactly one RAW Event.
// The RAW payload is json.RawMessage and Schema is standards.SchemaRawNWSObservationV1.
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
// Event.ID must be set BEFORE normalization (feedkit requires it).
// Prefer NWS-provided "id" (stable URL). Fallback to a stable computed key.
eventID := strings.TrimSpace(meta.ID)
if eventID == "" {
ts := meta.ParsedTimestamp
if ts.IsZero() {
ts = time.Now().UTC()
}
station := strings.TrimSpace(meta.StationID)
if station == "" {
station = "UNKNOWN"
}
eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano))
}
// EffectiveAt is optional; for observations its naturally the observation timestamp.
var effectiveAt *time.Time
if !meta.ParsedTimestamp.IsZero() {
t := meta.ParsedTimestamp.UTC()
effectiveAt = &t
}
return common.SingleRawEvent(
s.Kind(),
s.name,
standards.SchemaRawNWSObservationV1,
eventID,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
// observationMeta is a *minimal* decode of the NWS payload used only to build
// a stable Event.ID and a useful EffectiveAt for the envelope.
type observationMeta struct {
ID string `json:"id"`
Properties struct {
StationID string `json:"stationId"`
Timestamp string `json:"timestamp"`
} `json:"properties"`
// Convenience fields populated after decode.
ParsedTimestamp time.Time `json:"-"`
StationID string `json:"-"`
}
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/geo+json, application/json")
if err != nil {
return nil, observationMeta{}, fmt.Errorf("nws_observation %q: %w", s.name, err)
}
raw := json.RawMessage(b)
var meta observationMeta
if err := json.Unmarshal(b, &meta); err != nil {
// If metadata decode fails, still return raw; envelope will fall back to computed ID.
return raw, observationMeta{}, nil
}
meta.StationID = strings.TrimSpace(meta.Properties.StationID)
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
if tsStr != "" {
if t, err := time.Parse(time.RFC3339, tsStr); err == nil {
meta.ParsedTimestamp = t
}
}
return raw, meta, nil
}