Code cleanup and deduplication pass through weatherfeeder
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
ci/woodpecker/manual/build-image Pipeline was successful

This commit is contained in:
2026-03-28 12:01:07 -05:00
parent 2c1278a70a
commit c76088c38c
12 changed files with 533 additions and 311 deletions

View File

@@ -9,33 +9,30 @@ import (
fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
)
type pollDriverRegistration struct {
driver string
factory func(config.SourceConfig) (fksource.PollSource, error)
}
var pollDriverRegistrations = []pollDriverRegistration{
{driver: "nws_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewObservationSource(cfg) }},
{driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }},
{driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }},
{driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }},
{driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }},
{driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }},
{driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
}},
}
// RegisterBuiltins registers the source drivers that ship with this binary.
// Keeping this in one place makes main.go very readable.
func RegisterBuiltins(r *fksource.Registry) {
// NWS drivers
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewObservationSource(cfg)
})
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewAlertsSource(cfg)
})
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewHourlyForecastSource(cfg)
})
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewNarrativeForecastSource(cfg)
})
// Open-Meteo drivers
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewObservationSource(cfg)
})
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewForecastSource(cfg)
})
// OpenWeatherMap drivers
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
})
for _, reg := range pollDriverRegistrations {
reg := reg
r.RegisterPoll(reg.driver, func(cfg config.SourceConfig) (fksource.PollSource, error) {
return reg.factory(cfg)
})
}
}

View File

@@ -47,13 +47,42 @@ func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
}
}
func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
drivers := []string{
"nws_observation",
"nws_alerts",
"nws_forecast_hourly",
"nws_forecast_narrative",
"openmeteo_observation",
"openmeteo_forecast",
"openweather_observation",
}
for _, driver := range drivers {
in, err := reg.BuildInput(sourceConfigForDriver(driver))
if err != nil {
t.Fatalf("BuildInput(%s) error = %v", driver, err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(%s) type = %T, want PollSource", driver, in)
}
}
}
func sourceConfigForDriver(driver string) config.SourceConfig {
url := "https://example.invalid"
if driver == "openweather_observation" {
url = "https://example.invalid?units=metric"
}
return config.SourceConfig{
Name: "test-source",
Driver: driver,
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"url": url,
"user_agent": "test-agent",
},
}

View File

@@ -0,0 +1,114 @@
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
)
const nwsForecastAccept = "application/geo+json, application/json"
type forecastSource struct {
http *fksources.HTTPSource
rawSchema string
}
type forecastMeta struct {
Properties struct {
GeneratedAt string `json:"generatedAt"`
UpdateTime string `json:"updateTime"`
Updated string `json:"updated"`
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func newForecastSource(cfg config.SourceConfig, driver, rawSchema string) (*forecastSource, error) {
hs, err := fksources.NewHTTPSource(driver, cfg, nwsForecastAccept)
if err != nil {
return nil, err
}
return &forecastSource{
http: hs,
rawSchema: rawSchema,
}, nil
}
func (s *forecastSource) Name() string { return s.http.Name }
func (s *forecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
s.rawSchema,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
func (s *forecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, forecastMeta{}, false, err
}
if !changed {
return nil, forecastMeta{}, false, nil
}
var meta forecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
return raw, forecastMeta{}, true, nil
}
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
}

View File

@@ -2,15 +2,7 @@
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
@@ -22,114 +14,15 @@ import (
// Output schema (current implementation):
// - standards.SchemaRawNWSHourlyForecastV1
type HourlyForecastSource struct {
http *fksources.HTTPSource
*forecastSource
}
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
const driver = "nws_forecast_hourly"
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSHourlyForecastV1)
if err != nil {
return nil, err
}
return &HourlyForecastSource{http: hs}, nil
}
func (s *HourlyForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSHourlyForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type hourlyForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, hourlyForecastMeta{}, false, err
}
if !changed {
return nil, hourlyForecastMeta{}, false, nil
}
var meta hourlyForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, hourlyForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
return &HourlyForecastSource{forecastSource: src}, nil
}

View File

@@ -2,15 +2,7 @@
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
@@ -22,114 +14,15 @@ import (
// Output schema:
// - standards.SchemaRawNWSNarrativeForecastV1
type NarrativeForecastSource struct {
http *fksources.HTTPSource
*forecastSource
}
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
const driver = "nws_forecast_narrative"
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSNarrativeForecastV1)
if err != nil {
return nil, err
}
return &NarrativeForecastSource{http: hs}, nil
}
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSNarrativeForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type narrativeForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, narrativeForecastMeta{}, false, err
}
if !changed {
return nil, narrativeForecastMeta{}, false, nil
}
var meta narrativeForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, narrativeForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
return &NarrativeForecastSource{forecastSource: src}, nil
}

View File

@@ -0,0 +1,184 @@
package nws
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type forecastPoller interface {
Poll(ctx context.Context) ([]event.Event, error)
}
func TestForecastSourcesEmitExpectedSchemaAndPreferGeneratedAt(t *testing.T) {
tests := []struct {
name string
driver string
wantSchema string
newSource func(config.SourceConfig) (forecastPoller, error)
}{
{
name: "hourly",
driver: "nws_forecast_hourly",
wantSchema: standards.SchemaRawNWSHourlyForecastV1,
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
return NewHourlyForecastSource(cfg)
},
},
{
name: "narrative",
driver: "nws_forecast_narrative",
wantSchema: standards.SchemaRawNWSNarrativeForecastV1,
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
return NewNarrativeForecastSource(cfg)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"properties":{"generatedAt":"2026-03-28T12:00:00Z","updateTime":"2026-03-28T11:00:00Z"}}`))
}))
defer srv.Close()
src, err := tt.newSource(forecastSourceConfig(tt.driver, srv.URL))
if err != nil {
t.Fatalf("newSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if got[0].Schema != tt.wantSchema {
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, tt.wantSchema)
}
if got[0].Kind != event.Kind("forecast") {
t.Fatalf("Poll() kind = %q, want forecast", got[0].Kind)
}
wantEffectiveAt := time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC)
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, wantEffectiveAt)
}
})
}
}
func TestForecastSourcePollEffectiveAtFallbackOrder(t *testing.T) {
tests := []struct {
name string
body string
wantEffectiveAt *time.Time
}{
{
name: "updateTime fallback",
body: `{"properties":{"updateTime":"2026-03-28T11:00:00Z"}}`,
wantEffectiveAt: func() *time.Time {
t := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC)
return &t
}(),
},
{
name: "updated fallback",
body: `{"properties":{"updated":"2026-03-28T10:00:00Z"}}`,
wantEffectiveAt: func() *time.Time {
t := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC)
return &t
}(),
},
{
name: "omitted when metadata lacks timestamps",
body: `{"properties":{}}`,
wantEffectiveAt: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(tt.body))
}))
defer srv.Close()
src, err := NewHourlyForecastSource(forecastSourceConfig("nws_forecast_hourly", srv.URL))
if err != nil {
t.Fatalf("NewHourlyForecastSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if tt.wantEffectiveAt == nil {
if got[0].EffectiveAt != nil {
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
}
return
}
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(*tt.wantEffectiveAt) {
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, *tt.wantEffectiveAt)
}
})
}
}
func TestForecastSourcePollMetadataDecodeFailureStillEmitsRawEvent(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`not-json`))
}))
defer srv.Close()
src, err := NewNarrativeForecastSource(forecastSourceConfig("nws_forecast_narrative", srv.URL))
if err != nil {
t.Fatalf("NewNarrativeForecastSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if got[0].EffectiveAt != nil {
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
}
if got[0].Schema != standards.SchemaRawNWSNarrativeForecastV1 {
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, standards.SchemaRawNWSNarrativeForecastV1)
}
raw, ok := got[0].Payload.(json.RawMessage)
if !ok {
t.Fatalf("Poll() payload type = %T, want json.RawMessage", got[0].Payload)
}
if string(raw) != "not-json" {
t.Fatalf("Poll() payload = %q, want %q", string(raw), "not-json")
}
}
func forecastSourceConfig(driver, url string) config.SourceConfig {
return config.SourceConfig{
Name: "test-forecast-source",
Driver: driver,
Mode: config.SourceModePoll,
Params: map[string]any{
"url": url,
"user_agent": "test-agent",
},
}
}