// FILE: ./internal/sources/nws/observation.go package nws import ( "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) // ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event. // // Key refactor: // - Source responsibility: fetch bytes + emit a valid event envelope. // - Normalizer responsibility: interpret raw JSON + map to canonical domain model. // // This corresponds to URLs like: // // https://api.weather.gov/stations/KSTL/observations/latest type ObservationSource struct { name string url string userAgent string client *http.Client } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { if strings.TrimSpace(cfg.Name) == "" { return nil, fmt.Errorf("nws_observation: name is required") } if cfg.Params == nil { return nil, fmt.Errorf("nws_observation %q: params are required (need params.url and params.user_agent)", cfg.Name) } url, ok := cfg.ParamString("url", "URL") if !ok { return nil, fmt.Errorf("nws_observation %q: params.url is required", cfg.Name) } ua, ok := cfg.ParamString("user_agent", "userAgent") if !ok { return nil, fmt.Errorf("nws_observation %q: params.user_agent is required", cfg.Name) } return &ObservationSource{ name: cfg.Name, url: url, userAgent: ua, client: &http.Client{ Timeout: 10 * time.Second, }, }, nil } func (s *ObservationSource) Name() string { return s.name } // Kind is used for routing/policy. // We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } // Poll fetches NWS "latest observation" and emits exactly one RAW Event. // The RAW payload is json.RawMessage and Schema is standards.SchemaRawNWSObservationV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } // Event.ID must be set BEFORE normalization (feedkit requires it). // Prefer NWS-provided "id" (stable URL). Fallback to a stable-ish computed key. eventID := strings.TrimSpace(meta.ID) if eventID == "" { ts := meta.ParsedTimestamp if ts.IsZero() { ts = time.Now().UTC() } station := strings.TrimSpace(meta.StationID) if station == "" { station = "UNKNOWN" } eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano)) } // EffectiveAt is optional; for observations it’s naturally the observation timestamp. var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { t := meta.ParsedTimestamp.UTC() effectiveAt = &t } e := event.Event{ ID: eventID, Kind: s.Kind(), Source: s.name, EmittedAt: time.Now().UTC(), EffectiveAt: effectiveAt, // RAW schema (normalizer matches on this). Schema: standards.SchemaRawNWSObservationV1, // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. Payload: raw, } if err := e.Validate(); err != nil { return nil, err } return []event.Event{e}, nil } // ---- RAW fetch + minimal metadata decode ---- // observationMeta is a *minimal* decode of the NWS payload used only to build // a stable Event.ID and a useful EffectiveAt for the envelope. type observationMeta struct { ID string `json:"id"` Properties struct { StationID string `json:"stationId"` Timestamp string `json:"timestamp"` } `json:"properties"` // Convenience fields populated after decode. ParsedTimestamp time.Time `json:"-"` StationID string `json:"-"` } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) if err != nil { return nil, observationMeta{}, err } req.Header.Set("User-Agent", s.userAgent) req.Header.Set("Accept", "application/geo+json, application/json") res, err := s.client.Do(req) if err != nil { return nil, observationMeta{}, err } defer res.Body.Close() if res.StatusCode < 200 || res.StatusCode >= 300 { return nil, observationMeta{}, fmt.Errorf("nws_observation %q: HTTP %s", s.name, res.Status) } b, err := io.ReadAll(res.Body) if err != nil { return nil, observationMeta{}, err } if len(b) == 0 { return nil, observationMeta{}, fmt.Errorf("nws_observation %q: empty response body", s.name) } raw := json.RawMessage(b) var meta observationMeta if err := json.Unmarshal(b, &meta); err != nil { // If metadata decode fails, still return raw; envelope will fall back to computed ID. return raw, observationMeta{}, nil } meta.StationID = strings.TrimSpace(meta.Properties.StationID) tsStr := strings.TrimSpace(meta.Properties.Timestamp) if tsStr != "" { if t, err := time.Parse(time.RFC3339, tsStr); err == nil { meta.ParsedTimestamp = t } } return raw, meta, nil }