From eb274864669b0af0a20cf8360570aadcf8ae42ba Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Sat, 28 Mar 2026 10:02:50 -0500 Subject: [PATCH] Moved HTTP polling helpers upstream into feedkit, and updated to feedkit v0.8.0 --- go.mod | 2 +- go.sum | 4 +- internal/sources/common/config.go | 104 -------------------- internal/sources/common/http.go | 76 -------------- internal/sources/nws/alerts.go | 23 +++-- internal/sources/nws/forecast_hourly.go | 23 +++-- internal/sources/nws/forecast_narrative.go | 23 +++-- internal/sources/nws/observation.go | 23 +++-- internal/sources/nws/observation_test.go | 63 ++++++++++++ internal/sources/openmeteo/forecast.go | 23 +++-- internal/sources/openmeteo/observation.go | 23 +++-- internal/sources/openweather/observation.go | 23 +++-- 12 files changed, 171 insertions(+), 239 deletions(-) delete mode 100644 internal/sources/common/config.go delete mode 100644 internal/sources/common/http.go create mode 100644 internal/sources/nws/observation_test.go diff --git a/go.mod b/go.mod index ac8ffc0..70f07af 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder go 1.25 -require gitea.maximumdirect.net/ejr/feedkit v0.7.2 +require gitea.maximumdirect.net/ejr/feedkit v0.8.0 require ( github.com/klauspost/compress v1.17.2 // indirect diff --git a/go.sum b/go.sum index 80dcea6..f69edf4 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8= -gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= +gitea.maximumdirect.net/ejr/feedkit v0.8.0 h1:JdEEy6T3AQ97alLNYcQ3crN3tOEZPLMBD0Qr/MH5/dw= +gitea.maximumdirect.net/ejr/feedkit v0.8.0/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= diff --git a/internal/sources/common/config.go b/internal/sources/common/config.go deleted file mode 100644 index 7290465..0000000 --- a/internal/sources/common/config.go +++ /dev/null @@ -1,104 +0,0 @@ -// FILE: ./internal/sources/common/config.go -package common - -import ( - "fmt" - "strings" - - "gitea.maximumdirect.net/ejr/feedkit/config" -) - -// This file centralizes small, boring config-validation patterns shared across -// weatherfeeder source drivers. -// -// Goal: keep driver constructors (New*Source) easy to read and consistent, while -// keeping driver-specific options in cfg.Params (feedkit remains domain-agnostic). - -// HTTPSourceConfig is the standard "HTTP-polling source" config shape used across drivers. -type HTTPSourceConfig struct { - Name string - URL string - UserAgent string -} - -// RequireHTTPSourceConfig enforces weatherfeeder's standard HTTP source config: -// -// - cfg.Name must be present -// - cfg.Params must be present -// - params.url must be present (accepts "url" or "URL") -// - params.user_agent must be present (accepts "user_agent" or "userAgent") -// -// We intentionally require a User-Agent for *all* sources, even when upstreams -// do not strictly require one. This keeps config uniform across providers. -func RequireHTTPSourceConfig(driver string, cfg config.SourceConfig) (HTTPSourceConfig, error) { - if strings.TrimSpace(cfg.Name) == "" { - return HTTPSourceConfig{}, fmt.Errorf("%s: name is required", driver) - } - if cfg.Params == nil { - return HTTPSourceConfig{}, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name) - } - - url, ok := cfg.ParamString("url", "URL") - if !ok { - return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name) - } - - ua, ok := cfg.ParamString("user_agent", "userAgent") - if !ok { - return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name) - } - - return HTTPSourceConfig{ - Name: cfg.Name, - URL: url, - UserAgent: ua, - }, nil -} - -// --- The helpers below remain useful for future drivers; they are not required -// --- by the observation sources after adopting RequireHTTPSourceConfig. - -// RequireName ensures cfg.Name is present and non-whitespace. -func RequireName(driver string, cfg config.SourceConfig) error { - if strings.TrimSpace(cfg.Name) == "" { - return fmt.Errorf("%s: name is required", driver) - } - return nil -} - -// RequireParams ensures cfg.Params is non-nil. The "want" string should be a short -// description of required keys, e.g. "need params.url and params.user_agent". -func RequireParams(driver string, cfg config.SourceConfig, want string) error { - if cfg.Params == nil { - return fmt.Errorf("%s %q: params are required (%s)", driver, cfg.Name, want) - } - return nil -} - -// RequireURL returns the configured URL for a source. -// Canonical key is "url"; we also accept "URL" as a convenience. -func RequireURL(driver string, cfg config.SourceConfig) (string, error) { - if cfg.Params == nil { - return "", fmt.Errorf("%s %q: params are required (need params.url)", driver, cfg.Name) - } - - u, ok := cfg.ParamString("url", "URL") - if !ok { - return "", fmt.Errorf("%s %q: params.url is required", driver, cfg.Name) - } - return u, nil -} - -// RequireUserAgent returns the configured User-Agent for a source. -// Canonical key is "user_agent"; we also accept "userAgent" as a convenience. -func RequireUserAgent(driver string, cfg config.SourceConfig) (string, error) { - if cfg.Params == nil { - return "", fmt.Errorf("%s %q: params are required (need params.user_agent)", driver, cfg.Name) - } - - ua, ok := cfg.ParamString("user_agent", "userAgent") - if !ok { - return "", fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name) - } - return ua, nil -} diff --git a/internal/sources/common/http.go b/internal/sources/common/http.go deleted file mode 100644 index 7e24b34..0000000 --- a/internal/sources/common/http.go +++ /dev/null @@ -1,76 +0,0 @@ -// FILE: ./internal/sources/common/http_source.go -package common - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - - "gitea.maximumdirect.net/ejr/feedkit/config" - "gitea.maximumdirect.net/ejr/feedkit/transport" -) - -// HTTPSource is a tiny, reusable "HTTP polling spine" for weatherfeeder sources. -// -// It centralizes the boring parts: -// - standard config shape (url + user_agent) via RequireHTTPSourceConfig -// - a default http.Client with timeout -// - FetchBody / headers / max-body safety limit -// - consistent error wrapping (driver + source name) -// -// Individual drivers remain responsible for: -// - decoding minimal metadata (for Event.ID / EffectiveAt) -// - constructing the event envelope (kind/schema/payload) -type HTTPSource struct { - Driver string - Name string - URL string - UserAgent string - Accept string - Client *http.Client -} - -// NewHTTPSource builds an HTTPSource using weatherfeeder's standard HTTP source -// config (params.url + params.user_agent) and a default HTTP client. -func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) { - c, err := RequireHTTPSourceConfig(driver, cfg) - if err != nil { - return nil, err - } - - return &HTTPSource{ - Driver: driver, - Name: c.Name, - URL: c.URL, - UserAgent: c.UserAgent, - Accept: accept, - Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout), - }, nil -} - -// FetchBytes fetches the URL and returns the raw response body bytes. -func (s *HTTPSource) FetchBytes(ctx context.Context) ([]byte, error) { - client := s.Client - if client == nil { - // Defensive: allow tests or callers to nil out Client; keep behavior sane. - client = transport.NewHTTPClient(transport.DefaultHTTPTimeout) - } - - b, err := transport.FetchBody(ctx, client, s.URL, s.UserAgent, s.Accept) - if err != nil { - return nil, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err) - } - return b, nil -} - -// FetchJSON fetches the URL and returns the raw body as json.RawMessage. -// json.Unmarshal accepts json.RawMessage directly, so callers can decode minimal -// metadata without keeping both []byte and RawMessage in their own structs. -func (s *HTTPSource) FetchJSON(ctx context.Context) (json.RawMessage, error) { - b, err := s.FetchBytes(ctx) - if err != nil { - return nil, err - } - return json.RawMessage(b), nil -} diff --git a/internal/sources/nws/alerts.go b/internal/sources/nws/alerts.go index 403c5d8..094238e 100644 --- a/internal/sources/nws/alerts.go +++ b/internal/sources/nws/alerts.go @@ -9,6 +9,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" @@ -22,14 +23,14 @@ import ( // Output schema: // - standards.SchemaRawNWSAlertsV1 type AlertsSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) { const driver = "nws_alerts" // NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well. - hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } @@ -43,10 +44,13 @@ func (s *AlertsSource) Name() string { return s.http.Name } func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } // EffectiveAt policy for alerts: // Prefer the collection-level "updated" timestamp (best dedupe signal). @@ -97,16 +101,19 @@ type alertsMeta struct { ParsedLatestFeatureTime time.Time `json:"-"` } -func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, alertsMeta{}, err + return nil, alertsMeta{}, false, err + } + if !changed { + return nil, alertsMeta{}, false, nil } var meta alertsMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, alertsMeta{}, nil + return raw, alertsMeta{}, true, nil } // Top-level updated (preferred). @@ -143,5 +150,5 @@ func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMet } meta.ParsedLatestFeatureTime = latest - return raw, meta, nil + return raw, meta, true, nil } diff --git a/internal/sources/nws/forecast_hourly.go b/internal/sources/nws/forecast_hourly.go index 886dab6..8e79681 100644 --- a/internal/sources/nws/forecast_hourly.go +++ b/internal/sources/nws/forecast_hourly.go @@ -9,6 +9,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" @@ -22,14 +23,14 @@ import ( // Output schema (current implementation): // - standards.SchemaRawNWSHourlyForecastV1 type HourlyForecastSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) { const driver = "nws_forecast_hourly" // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). - hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } @@ -43,10 +44,13 @@ func (s *HourlyForecastSource) Name() string { return s.http.Name } func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } // EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time. // NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated. @@ -94,16 +98,19 @@ type hourlyForecastMeta struct { ParsedUpdateTime time.Time `json:"-"` } -func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, hourlyForecastMeta{}, err + return nil, hourlyForecastMeta{}, false, err + } + if !changed { + return nil, hourlyForecastMeta{}, false, nil } var meta hourlyForecastMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, hourlyForecastMeta{}, nil + return raw, hourlyForecastMeta{}, true, nil } // generatedAt (preferred) @@ -125,5 +132,5 @@ func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, h } } - return raw, meta, nil + return raw, meta, true, nil } diff --git a/internal/sources/nws/forecast_narrative.go b/internal/sources/nws/forecast_narrative.go index d112629..69093ee 100644 --- a/internal/sources/nws/forecast_narrative.go +++ b/internal/sources/nws/forecast_narrative.go @@ -9,6 +9,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" @@ -22,14 +23,14 @@ import ( // Output schema: // - standards.SchemaRawNWSNarrativeForecastV1 type NarrativeForecastSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) { const driver = "nws_forecast_narrative" // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). - hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } @@ -43,10 +44,13 @@ func (s *NarrativeForecastSource) Name() string { return s.http.Name } func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } // EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time. // NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated. @@ -94,16 +98,19 @@ type narrativeForecastMeta struct { ParsedUpdateTime time.Time `json:"-"` } -func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, narrativeForecastMeta{}, err + return nil, narrativeForecastMeta{}, false, err + } + if !changed { + return nil, narrativeForecastMeta{}, false, nil } var meta narrativeForecastMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, narrativeForecastMeta{}, nil + return raw, narrativeForecastMeta{}, true, nil } // generatedAt (preferred) @@ -125,5 +132,5 @@ func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage } } - return raw, meta, nil + return raw, meta, true, nil } diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index 25b0b68..caeebd9 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -9,6 +9,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" @@ -16,13 +17,13 @@ import ( // ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event. type ObservationSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "nws_observation" - hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } @@ -35,10 +36,13 @@ func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } // EffectiveAt is optional; for observations it’s naturally the observation timestamp. var effectiveAt *time.Time @@ -72,16 +76,19 @@ type observationMeta struct { ParsedTimestamp time.Time `json:"-"` } -func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, observationMeta{}, err + return nil, observationMeta{}, false, err + } + if !changed { + return nil, observationMeta{}, false, nil } var meta observationMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt. - return raw, observationMeta{}, nil + return raw, observationMeta{}, true, nil } tsStr := strings.TrimSpace(meta.Properties.Timestamp) @@ -91,5 +98,5 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, obse } } - return raw, meta, nil + return raw, meta, true, nil } diff --git a/internal/sources/nws/observation_test.go b/internal/sources/nws/observation_test.go new file mode 100644 index 0000000..8104fa1 --- /dev/null +++ b/internal/sources/nws/observation_test.go @@ -0,0 +1,63 @@ +package nws + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) { + var call int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + call++ + switch call { + case 1: + w.Header().Set("ETag", `"obs-v1"`) + _, _ = w.Write([]byte(`{"id":"obs-1","properties":{"timestamp":"2026-03-28T12:00:00Z"}}`)) + case 2: + if got := r.Header.Get("If-None-Match"); got != `"obs-v1"` { + t.Fatalf("second request If-None-Match = %q", got) + } + w.WriteHeader(http.StatusNotModified) + default: + t.Fatalf("unexpected call count %d", call) + } + })) + defer srv.Close() + + src, err := NewObservationSource(config.SourceConfig{ + Name: "NWSObservationTest", + Driver: "nws_observation", + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": srv.URL, + "user_agent": "test-agent", + }, + }) + if err != nil { + t.Fatalf("NewObservationSource() error = %v", err) + } + + first, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("first Poll() error = %v", err) + } + if len(first) != 1 { + t.Fatalf("first Poll() len = %d, want 1", len(first)) + } + if first[0].Kind != event.Kind("observation") { + t.Fatalf("first Poll() kind = %q", first[0].Kind) + } + + second, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("second Poll() error = %v", err) + } + if len(second) != 0 { + t.Fatalf("second Poll() len = %d, want 0", len(second)) + } +} diff --git a/internal/sources/openmeteo/forecast.go b/internal/sources/openmeteo/forecast.go index 39d240f..4f82463 100644 --- a/internal/sources/openmeteo/forecast.go +++ b/internal/sources/openmeteo/forecast.go @@ -8,6 +8,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" @@ -15,13 +16,13 @@ import ( // ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event. type ForecastSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { const driver = "openmeteo_forecast" - hs, err := common.NewHTTPSource(driver, cfg, "application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/json") if err != nil { return nil, err } @@ -34,10 +35,13 @@ func (s *ForecastSource) Name() string { return s.http.Name } func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } // Open-Meteo does not expose a true "issued at" timestamp for forecast runs. // We use current.time when present; otherwise we fall back to the first hourly time @@ -79,16 +83,19 @@ type forecastMeta struct { ParsedTimestamp time.Time `json:"-"` } -func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, forecastMeta{}, err + return nil, forecastMeta{}, false, err + } + if !changed { + return nil, forecastMeta{}, false, nil } var meta forecastMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, forecastMeta{}, nil + return raw, forecastMeta{}, true, nil } ts := strings.TrimSpace(meta.Current.Time) @@ -106,5 +113,5 @@ func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecas } } - return raw, meta, nil + return raw, meta, true, nil } diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index ceea4ff..c07e483 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -8,6 +8,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" @@ -15,13 +16,13 @@ import ( // ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event. type ObservationSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "openmeteo_observation" - hs, err := common.NewHTTPSource(driver, cfg, "application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/json") if err != nil { return nil, err } @@ -34,10 +35,13 @@ func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { @@ -72,21 +76,24 @@ type openMeteoMeta struct { ParsedTimestamp time.Time `json:"-"` } -func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, openMeteoMeta{}, err + return nil, openMeteoMeta{}, false, err + } + if !changed { + return nil, openMeteoMeta{}, false, nil } var meta openMeteoMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; envelope will omit EffectiveAt. - return raw, openMeteoMeta{}, nil + return raw, openMeteoMeta{}, true, nil } if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil { meta.ParsedTimestamp = t.UTC() } - return raw, meta, nil + return raw, meta, true, nil } diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 9fecbd1..0127cdb 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -9,19 +9,20 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) type ObservationSource struct { - http *common.HTTPSource + http *fksources.HTTPSource } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "openweather_observation" - hs, err := common.NewHTTPSource(driver, cfg, "application/json") + hs, err := fksources.NewHTTPSource(driver, cfg, "application/json") if err != nil { return nil, err } @@ -42,10 +43,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err) } - raw, meta, err := s.fetchRaw(ctx) + raw, meta, changed, err := s.fetchRaw(ctx) if err != nil { return nil, err } + if !changed { + return nil, nil + } var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { @@ -75,20 +79,23 @@ type openWeatherMeta struct { ParsedTimestamp time.Time `json:"-"` } -func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) { - raw, err := s.http.FetchJSON(ctx) +func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) if err != nil { - return nil, openWeatherMeta{}, err + return nil, openWeatherMeta{}, false, err + } + if !changed { + return nil, openWeatherMeta{}, false, nil } var meta openWeatherMeta if err := json.Unmarshal(raw, &meta); err != nil { - return raw, openWeatherMeta{}, nil + return raw, openWeatherMeta{}, true, nil } if meta.Dt > 0 { meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC() } - return raw, meta, nil + return raw, meta, true, nil }