// FILE: ./internal/sources/nws/observation.go package nws import ( "context" "encoding/json" "fmt" "net/http" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) // ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event. // // This corresponds to URLs like: // // https://api.weather.gov/stations/KSTL/observations/latest type ObservationSource struct { name string url string userAgent string client *http.Client } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "nws_observation" c, err := common.RequireHTTPSourceConfig(driver, cfg) if err != nil { return nil, err } return &ObservationSource{ name: c.Name, url: c.URL, userAgent: c.UserAgent, client: common.NewHTTPClient(common.DefaultHTTPTimeout), }, nil } func (s *ObservationSource) Name() string { return s.name } // Kind is used for routing/policy. // We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } // Poll fetches NWS "latest observation" and emits exactly one RAW Event. // The RAW payload is json.RawMessage and Schema is standards.SchemaRawNWSObservationV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } // Event.ID must be set BEFORE normalization (feedkit requires it). // Prefer NWS-provided "id" (stable URL). Fallback to a stable computed key. eventID := strings.TrimSpace(meta.ID) if eventID == "" { ts := meta.ParsedTimestamp if ts.IsZero() { ts = time.Now().UTC() } station := strings.TrimSpace(meta.StationID) if station == "" { station = "UNKNOWN" } eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano)) } // EffectiveAt is optional; for observations it’s naturally the observation timestamp. var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { t := meta.ParsedTimestamp.UTC() effectiveAt = &t } return common.SingleRawEvent( s.Kind(), s.name, standards.SchemaRawNWSObservationV1, eventID, effectiveAt, raw, ) } // ---- RAW fetch + minimal metadata decode ---- // observationMeta is a *minimal* decode of the NWS payload used only to build // a stable Event.ID and a useful EffectiveAt for the envelope. type observationMeta struct { ID string `json:"id"` Properties struct { StationID string `json:"stationId"` Timestamp string `json:"timestamp"` } `json:"properties"` // Convenience fields populated after decode. ParsedTimestamp time.Time `json:"-"` StationID string `json:"-"` } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/geo+json, application/json") if err != nil { return nil, observationMeta{}, fmt.Errorf("nws_observation %q: %w", s.name, err) } raw := json.RawMessage(b) var meta observationMeta if err := json.Unmarshal(b, &meta); err != nil { // If metadata decode fails, still return raw; envelope will fall back to computed ID. return raw, observationMeta{}, nil } meta.StationID = strings.TrimSpace(meta.Properties.StationID) tsStr := strings.TrimSpace(meta.Properties.Timestamp) if tsStr != "" { if t, err := time.Parse(time.RFC3339, tsStr); err == nil { meta.ParsedTimestamp = t } } return raw, meta, nil }