// FILE: internal/sources/nws/forecast.go package nws import ( "context" "encoding/json" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) // ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event. // // It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes // minimal metadata for Event.EffectiveAt and Event.ID. // // Output schema (current implementation): // - standards.SchemaRawNWSHourlyForecastV1 type ForecastSource struct { http *common.HTTPSource } func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { const driver = "nws_forecast" // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } return &ForecastSource{http: hs}, nil } func (s *ForecastSource) Name() string { return s.http.Name } // Kind is used for routing/policy. func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } // EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time. // NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated. var effectiveAt *time.Time switch { case !meta.ParsedGeneratedAt.IsZero(): t := meta.ParsedGeneratedAt.UTC() effectiveAt = &t case !meta.ParsedUpdateTime.IsZero(): t := meta.ParsedUpdateTime.UTC() effectiveAt = &t } emittedAt := time.Now().UTC() // NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL. // That is *not* unique per issued run, so we intentionally do not use it for Event.ID. // Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback). eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) return common.SingleRawEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSHourlyForecastV1, eventID, emittedAt, effectiveAt, raw, ) } // ---- RAW fetch + minimal metadata decode ---- type forecastMeta struct { // Present for GeoJSON Feature responses, but often stable (endpoint URL). ID string `json:"id"` Properties struct { GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time UpdateTime string `json:"updateTime"` // last update time of underlying data Updated string `json:"updated"` // deprecated alias for updateTime } `json:"properties"` ParsedGeneratedAt time.Time `json:"-"` ParsedUpdateTime time.Time `json:"-"` } func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) { raw, err := s.http.FetchJSON(ctx) if err != nil { return nil, forecastMeta{}, err } var meta forecastMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. return raw, forecastMeta{}, nil } // generatedAt (preferred) genStr := strings.TrimSpace(meta.Properties.GeneratedAt) if genStr != "" { if t, err := nwscommon.ParseTime(genStr); err == nil { meta.ParsedGeneratedAt = t.UTC() } } // updateTime, with fallback to deprecated "updated" updStr := strings.TrimSpace(meta.Properties.UpdateTime) if updStr == "" { updStr = strings.TrimSpace(meta.Properties.Updated) } if updStr != "" { if t, err := nwscommon.ParseTime(updStr); err == nil { meta.ParsedUpdateTime = t.UTC() } } return raw, meta, nil }