Feature addition to support narrative forecast updates from the NWS
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
This commit is contained in:
@@ -22,6 +22,9 @@ func RegisterBuiltins(r *fksource.Registry) {
|
||||
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return nws.NewHourlyForecastSource(cfg)
|
||||
})
|
||||
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return nws.NewNarrativeForecastSource(cfg)
|
||||
})
|
||||
|
||||
// Open-Meteo drivers
|
||||
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
|
||||
@@ -21,6 +21,19 @@ func TestRegisterBuiltinsRegistersNWSHourlyForecastDriver(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterBuiltinsRegistersNWSNarrativeForecastDriver(t *testing.T) {
|
||||
reg := fksource.NewRegistry()
|
||||
RegisterBuiltins(reg)
|
||||
|
||||
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_narrative"))
|
||||
if err != nil {
|
||||
t.Fatalf("BuildInput(nws_forecast_narrative) error = %v", err)
|
||||
}
|
||||
if _, ok := in.(fksource.PollSource); !ok {
|
||||
t.Fatalf("BuildInput(nws_forecast_narrative) type = %T, want PollSource", in)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
|
||||
reg := fksource.NewRegistry()
|
||||
RegisterBuiltins(reg)
|
||||
|
||||
129
internal/sources/nws/forecast_narrative.go
Normal file
129
internal/sources/nws/forecast_narrative.go
Normal file
@@ -0,0 +1,129 @@
|
||||
// FILE: internal/sources/nws/forecast_narrative.go
|
||||
package nws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// NarrativeForecastSource polls an NWS narrative forecast endpoint and emits a RAW forecast Event.
|
||||
//
|
||||
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
|
||||
// minimal metadata for Event.EffectiveAt and Event.ID.
|
||||
//
|
||||
// Output schema:
|
||||
// - standards.SchemaRawNWSNarrativeForecastV1
|
||||
type NarrativeForecastSource struct {
|
||||
http *common.HTTPSource
|
||||
}
|
||||
|
||||
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
|
||||
const driver = "nws_forecast_narrative"
|
||||
|
||||
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &NarrativeForecastSource{http: hs}, nil
|
||||
}
|
||||
|
||||
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
|
||||
|
||||
// Kind is used for routing/policy.
|
||||
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||
|
||||
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||
raw, meta, err := s.fetchRaw(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
||||
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
||||
var effectiveAt *time.Time
|
||||
switch {
|
||||
case !meta.ParsedGeneratedAt.IsZero():
|
||||
t := meta.ParsedGeneratedAt.UTC()
|
||||
effectiveAt = &t
|
||||
case !meta.ParsedUpdateTime.IsZero():
|
||||
t := meta.ParsedUpdateTime.UTC()
|
||||
effectiveAt = &t
|
||||
}
|
||||
|
||||
emittedAt := time.Now().UTC()
|
||||
|
||||
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
|
||||
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
|
||||
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
|
||||
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||
|
||||
return common.SingleRawEvent(
|
||||
s.Kind(),
|
||||
s.http.Name,
|
||||
standards.SchemaRawNWSNarrativeForecastV1,
|
||||
eventID,
|
||||
emittedAt,
|
||||
effectiveAt,
|
||||
raw,
|
||||
)
|
||||
}
|
||||
|
||||
// ---- RAW fetch + minimal metadata decode ----
|
||||
|
||||
type narrativeForecastMeta struct {
|
||||
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
|
||||
ID string `json:"id"`
|
||||
|
||||
Properties struct {
|
||||
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
|
||||
UpdateTime string `json:"updateTime"` // last update time of underlying data
|
||||
Updated string `json:"updated"` // deprecated alias for updateTime
|
||||
} `json:"properties"`
|
||||
|
||||
ParsedGeneratedAt time.Time `json:"-"`
|
||||
ParsedUpdateTime time.Time `json:"-"`
|
||||
}
|
||||
|
||||
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, error) {
|
||||
raw, err := s.http.FetchJSON(ctx)
|
||||
if err != nil {
|
||||
return nil, narrativeForecastMeta{}, err
|
||||
}
|
||||
|
||||
var meta narrativeForecastMeta
|
||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||
return raw, narrativeForecastMeta{}, nil
|
||||
}
|
||||
|
||||
// generatedAt (preferred)
|
||||
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
|
||||
if genStr != "" {
|
||||
if t, err := nwscommon.ParseTime(genStr); err == nil {
|
||||
meta.ParsedGeneratedAt = t.UTC()
|
||||
}
|
||||
}
|
||||
|
||||
// updateTime, with fallback to deprecated "updated"
|
||||
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
|
||||
if updStr == "" {
|
||||
updStr = strings.TrimSpace(meta.Properties.Updated)
|
||||
}
|
||||
if updStr != "" {
|
||||
if t, err := nwscommon.ParseTime(updStr); err == nil {
|
||||
meta.ParsedUpdateTime = t.UTC()
|
||||
}
|
||||
}
|
||||
|
||||
return raw, meta, nil
|
||||
}
|
||||
Reference in New Issue
Block a user