181 lines
5.0 KiB
Go
181 lines
5.0 KiB
Go
// FILE: ./internal/sources/nws/observation.go
|
||
package nws
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
"time"
|
||
|
||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||
)
|
||
|
||
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
||
//
|
||
// Key refactor:
|
||
// - Source responsibility: fetch bytes + emit a valid event envelope.
|
||
// - Normalizer responsibility: interpret raw JSON + map to canonical domain model.
|
||
//
|
||
// This corresponds to URLs like:
|
||
//
|
||
// https://api.weather.gov/stations/KSTL/observations/latest
|
||
type ObservationSource struct {
|
||
name string
|
||
url string
|
||
userAgent string
|
||
client *http.Client
|
||
}
|
||
|
||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||
if strings.TrimSpace(cfg.Name) == "" {
|
||
return nil, fmt.Errorf("nws_observation: name is required")
|
||
}
|
||
if cfg.Params == nil {
|
||
return nil, fmt.Errorf("nws_observation %q: params are required (need params.url and params.user_agent)", cfg.Name)
|
||
}
|
||
|
||
url, ok := cfg.ParamString("url", "URL")
|
||
if !ok {
|
||
return nil, fmt.Errorf("nws_observation %q: params.url is required", cfg.Name)
|
||
}
|
||
|
||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
||
if !ok {
|
||
return nil, fmt.Errorf("nws_observation %q: params.user_agent is required", cfg.Name)
|
||
}
|
||
|
||
return &ObservationSource{
|
||
name: cfg.Name,
|
||
url: url,
|
||
userAgent: ua,
|
||
client: &http.Client{
|
||
Timeout: 10 * time.Second,
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
func (s *ObservationSource) Name() string { return s.name }
|
||
|
||
// Kind is used for routing/policy.
|
||
// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical.
|
||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||
|
||
// Poll fetches NWS "latest observation" and emits exactly one RAW Event.
|
||
// The RAW payload is json.RawMessage and Schema is standards.SchemaRawNWSObservationV1.
|
||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||
raw, meta, err := s.fetchRaw(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// Event.ID must be set BEFORE normalization (feedkit requires it).
|
||
// Prefer NWS-provided "id" (stable URL). Fallback to a stable-ish computed key.
|
||
eventID := strings.TrimSpace(meta.ID)
|
||
if eventID == "" {
|
||
ts := meta.ParsedTimestamp
|
||
if ts.IsZero() {
|
||
ts = time.Now().UTC()
|
||
}
|
||
station := strings.TrimSpace(meta.StationID)
|
||
if station == "" {
|
||
station = "UNKNOWN"
|
||
}
|
||
eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano))
|
||
}
|
||
|
||
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
||
var effectiveAt *time.Time
|
||
if !meta.ParsedTimestamp.IsZero() {
|
||
t := meta.ParsedTimestamp.UTC()
|
||
effectiveAt = &t
|
||
}
|
||
|
||
e := event.Event{
|
||
ID: eventID,
|
||
Kind: s.Kind(),
|
||
Source: s.name,
|
||
EmittedAt: time.Now().UTC(),
|
||
EffectiveAt: effectiveAt,
|
||
|
||
// RAW schema (normalizer matches on this).
|
||
Schema: standards.SchemaRawNWSObservationV1,
|
||
|
||
// Raw JSON; normalizer will decode and map to canonical model.WeatherObservation.
|
||
Payload: raw,
|
||
}
|
||
|
||
if err := e.Validate(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return []event.Event{e}, nil
|
||
}
|
||
|
||
// ---- RAW fetch + minimal metadata decode ----
|
||
|
||
// observationMeta is a *minimal* decode of the NWS payload used only to build
|
||
// a stable Event.ID and a useful EffectiveAt for the envelope.
|
||
type observationMeta struct {
|
||
ID string `json:"id"`
|
||
Properties struct {
|
||
StationID string `json:"stationId"`
|
||
Timestamp string `json:"timestamp"`
|
||
} `json:"properties"`
|
||
|
||
// Convenience fields populated after decode.
|
||
ParsedTimestamp time.Time `json:"-"`
|
||
StationID string `json:"-"`
|
||
}
|
||
|
||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
|
||
if err != nil {
|
||
return nil, observationMeta{}, err
|
||
}
|
||
|
||
req.Header.Set("User-Agent", s.userAgent)
|
||
req.Header.Set("Accept", "application/geo+json, application/json")
|
||
|
||
res, err := s.client.Do(req)
|
||
if err != nil {
|
||
return nil, observationMeta{}, err
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||
return nil, observationMeta{}, fmt.Errorf("nws_observation %q: HTTP %s", s.name, res.Status)
|
||
}
|
||
|
||
b, err := io.ReadAll(res.Body)
|
||
if err != nil {
|
||
return nil, observationMeta{}, err
|
||
}
|
||
if len(b) == 0 {
|
||
return nil, observationMeta{}, fmt.Errorf("nws_observation %q: empty response body", s.name)
|
||
}
|
||
|
||
raw := json.RawMessage(b)
|
||
|
||
var meta observationMeta
|
||
if err := json.Unmarshal(b, &meta); err != nil {
|
||
// If metadata decode fails, still return raw; envelope will fall back to computed ID.
|
||
return raw, observationMeta{}, nil
|
||
}
|
||
|
||
meta.StationID = strings.TrimSpace(meta.Properties.StationID)
|
||
|
||
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
||
if tsStr != "" {
|
||
if t, err := time.Parse(time.RFC3339, tsStr); err == nil {
|
||
meta.ParsedTimestamp = t
|
||
}
|
||
}
|
||
|
||
return raw, meta, nil
|
||
}
|