From d9474b5a5b922668350832146ded92c1eec8f5d6 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Thu, 15 Jan 2026 19:11:58 -0600 Subject: [PATCH] v0.x: add reusable HTTP source spine; fix routing; upstream HTTP transport helper - fix dispatch route compilation so empty Kinds matches all (nil), not none - introduce internal/sources/common/HTTPSource to centralize HTTP polling boilerplate: - standard cfg parsing (url + user_agent) - default HTTP client + Accept/User-Agent headers - consistent error wrapping - refactor observation sources (nws/openmeteo/openweather) to use HTTPSource - upstream generic HTTP fetch/limits/timeout helper from weatherfeeder to feedkit: - move internal/sources/common/http.go -> feedkit/transport/http.go - keep behavior: status checks, max-body limit, default timeout --- internal/sources/common/http.go | 106 +++++++++++--------- internal/sources/nws/observation.go | 31 ++---- internal/sources/openmeteo/observation.go | 31 ++---- internal/sources/openweather/observation.go | 39 +++---- 4 files changed, 91 insertions(+), 116 deletions(-) diff --git a/internal/sources/common/http.go b/internal/sources/common/http.go index b582c2f..7e24b34 100644 --- a/internal/sources/common/http.go +++ b/internal/sources/common/http.go @@ -1,70 +1,76 @@ -// FILE: ./internal/sources/common/http.go +// FILE: ./internal/sources/common/http_source.go package common import ( "context" + "encoding/json" "fmt" - "io" "net/http" - "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/transport" ) -// maxResponseBodyBytes is a hard safety limit on HTTP response bodies. -// API responses should be small, so this protects us from accidental -// or malicious large responses. -const maxResponseBodyBytes = 2 << 21 // 4 MiB - -// DefaultHTTPTimeout is the standard timeout used by weatherfeeder HTTP sources. -// Individual drivers may override this if they have a specific need. -const DefaultHTTPTimeout = 10 * time.Second - -// NewHTTPClient returns a simple http.Client configured with a timeout. -// If timeout <= 0, DefaultHTTPTimeout is used. -func NewHTTPClient(timeout time.Duration) *http.Client { - if timeout <= 0 { - timeout = DefaultHTTPTimeout - } - return &http.Client{Timeout: timeout} +// HTTPSource is a tiny, reusable "HTTP polling spine" for weatherfeeder sources. +// +// It centralizes the boring parts: +// - standard config shape (url + user_agent) via RequireHTTPSourceConfig +// - a default http.Client with timeout +// - FetchBody / headers / max-body safety limit +// - consistent error wrapping (driver + source name) +// +// Individual drivers remain responsible for: +// - decoding minimal metadata (for Event.ID / EffectiveAt) +// - constructing the event envelope (kind/schema/payload) +type HTTPSource struct { + Driver string + Name string + URL string + UserAgent string + Accept string + Client *http.Client } -func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept string) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) +// NewHTTPSource builds an HTTPSource using weatherfeeder's standard HTTP source +// config (params.url + params.user_agent) and a default HTTP client. +func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) { + c, err := RequireHTTPSourceConfig(driver, cfg) if err != nil { return nil, err } - if userAgent != "" { - req.Header.Set("User-Agent", userAgent) - } - if accept != "" { - req.Header.Set("Accept", accept) + return &HTTPSource{ + Driver: driver, + Name: c.Name, + URL: c.URL, + UserAgent: c.UserAgent, + Accept: accept, + Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout), + }, nil +} + +// FetchBytes fetches the URL and returns the raw response body bytes. +func (s *HTTPSource) FetchBytes(ctx context.Context) ([]byte, error) { + client := s.Client + if client == nil { + // Defensive: allow tests or callers to nil out Client; keep behavior sane. + client = transport.NewHTTPClient(transport.DefaultHTTPTimeout) } - res, err := client.Do(req) + b, err := transport.FetchBody(ctx, client, s.URL, s.UserAgent, s.Accept) if err != nil { - return nil, err + return nil, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err) } - defer res.Body.Close() - - if res.StatusCode < 200 || res.StatusCode >= 300 { - return nil, fmt.Errorf("HTTP %s", res.Status) - } - - // Read at most maxResponseBodyBytes + 1 so we can detect overflow. - limited := io.LimitReader(res.Body, maxResponseBodyBytes+1) - - b, err := io.ReadAll(limited) - if err != nil { - return nil, err - } - - if len(b) == 0 { - return nil, fmt.Errorf("empty response body") - } - - if len(b) > maxResponseBodyBytes { - return nil, fmt.Errorf("response body too large (>%d bytes)", maxResponseBodyBytes) - } - return b, nil } + +// FetchJSON fetches the URL and returns the raw body as json.RawMessage. +// json.Unmarshal accepts json.RawMessage directly, so callers can decode minimal +// metadata without keeping both []byte and RawMessage in their own structs. +func (s *HTTPSource) FetchJSON(ctx context.Context) (json.RawMessage, error) { + b, err := s.FetchBytes(ctx) + if err != nil { + return nil, err + } + return json.RawMessage(b), nil +} diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index 5d6b01e..ce20c9c 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "strings" "time" @@ -21,29 +20,21 @@ import ( // // https://api.weather.gov/stations/KSTL/observations/latest type ObservationSource struct { - name string - url string - userAgent string - client *http.Client + http *common.HTTPSource } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "nws_observation" - c, err := common.RequireHTTPSourceConfig(driver, cfg) + hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } - return &ObservationSource{ - name: c.Name, - url: c.URL, - userAgent: c.UserAgent, - client: common.NewHTTPClient(common.DefaultHTTPTimeout), - }, nil + return &ObservationSource{http: hs}, nil } -func (s *ObservationSource) Name() string { return s.name } +func (s *ObservationSource) Name() string { return s.http.Name } // Kind is used for routing/policy. // We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. @@ -65,11 +56,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { if ts.IsZero() { ts = time.Now().UTC() } + station := strings.TrimSpace(meta.StationID) if station == "" { station = "UNKNOWN" } - eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano)) + + eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.http.Name, station, ts.UTC().Format(time.RFC3339Nano)) } // EffectiveAt is optional; for observations it’s naturally the observation timestamp. @@ -81,7 +74,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return common.SingleRawEvent( s.Kind(), - s.name, + s.http.Name, standards.SchemaRawNWSObservationV1, eventID, effectiveAt, @@ -106,15 +99,13 @@ type observationMeta struct { } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { - b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/geo+json, application/json") + raw, err := s.http.FetchJSON(ctx) if err != nil { - return nil, observationMeta{}, fmt.Errorf("nws_observation %q: %w", s.name, err) + return nil, observationMeta{}, err } - raw := json.RawMessage(b) - var meta observationMeta - if err := json.Unmarshal(b, &meta); err != nil { + if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; envelope will fall back to computed ID. return raw, observationMeta{}, nil } diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index f819beb..2318d1b 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "strings" "time" @@ -18,10 +17,7 @@ import ( // ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event. type ObservationSource struct { - name string - url string - userAgent string - client *http.Client + http *common.HTTPSource } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { @@ -29,20 +25,15 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { // We require params.user_agent for uniformity across sources (even though Open-Meteo // itself does not strictly require a special User-Agent). - c, err := common.RequireHTTPSourceConfig(driver, cfg) + hs, err := common.NewHTTPSource(driver, cfg, "application/json") if err != nil { return nil, err } - return &ObservationSource{ - name: c.Name, - url: c.URL, - userAgent: c.UserAgent, - client: common.NewHTTPClient(common.DefaultHTTPTimeout), - }, nil + return &ObservationSource{http: hs}, nil } -func (s *ObservationSource) Name() string { return s.name } +func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } @@ -53,10 +44,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return nil, err } - eventID := buildEventID(s.name, meta) + eventID := buildEventID(s.http.Name, meta) if strings.TrimSpace(eventID) == "" { // Extremely defensive fallback: keep the envelope valid no matter what. - eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) + eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano)) } var effectiveAt *time.Time @@ -67,7 +58,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return common.SingleRawEvent( s.Kind(), - s.name, + s.http.Name, standards.SchemaRawOpenMeteoCurrentV1, eventID, effectiveAt, @@ -91,15 +82,13 @@ type openMeteoMeta struct { } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) { - b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json") + raw, err := s.http.FetchJSON(ctx) if err != nil { - return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: %w", s.name, err) + return nil, openMeteoMeta{}, err } - raw := json.RawMessage(b) - var meta openMeteoMeta - if err := json.Unmarshal(b, &meta); err != nil { + if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; envelope will fall back to computed ID without EffectiveAt. return raw, openMeteoMeta{}, nil } diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 185d752..998eca4 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "net/url" "strings" "time" @@ -23,40 +22,32 @@ import ( // system in the response body. To keep normalization deterministic, this driver *requires* // `units=metric`. If absent (or non-metric), the driver returns an error. type ObservationSource struct { - name string - url string - userAgent string - client *http.Client + http *common.HTTPSource } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "openweather_observation" - c, err := common.RequireHTTPSourceConfig(driver, cfg) + hs, err := common.NewHTTPSource(driver, cfg, "application/json") if err != nil { return nil, err } - if err := requireMetricUnits(c.URL); err != nil { - return nil, fmt.Errorf("openweather_observation %q: %w", c.Name, err) + if err := requireMetricUnits(hs.URL); err != nil { + return nil, fmt.Errorf("%s %q: %w", hs.Driver, hs.Name, err) } - return &ObservationSource{ - name: c.Name, - url: c.URL, - userAgent: c.UserAgent, - client: common.NewHTTPClient(common.DefaultHTTPTimeout), - }, nil + return &ObservationSource{http: hs}, nil } -func (s *ObservationSource) Name() string { return s.name } +func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { // Re-check policy defensively (in case the URL is mutated after construction). - if err := requireMetricUnits(s.url); err != nil { - return nil, fmt.Errorf("openweather_observation %q: %w", s.name, err) + if err := requireMetricUnits(s.http.URL); err != nil { + return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err) } raw, meta, err := s.fetchRaw(ctx) @@ -64,9 +55,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return nil, err } - eventID := buildEventID(s.name, meta) + eventID := buildEventID(s.http.Name, meta) if strings.TrimSpace(eventID) == "" { - eventID = fmt.Sprintf("openweather:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) + eventID = fmt.Sprintf("openweather:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano)) } var effectiveAt *time.Time @@ -77,7 +68,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return common.SingleRawEvent( s.Kind(), - s.name, + s.http.Name, standards.SchemaRawOpenWeatherCurrentV1, eventID, effectiveAt, @@ -102,15 +93,13 @@ type openWeatherMeta struct { } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) { - b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json") + raw, err := s.http.FetchJSON(ctx) if err != nil { - return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: %w", s.name, err) + return nil, openWeatherMeta{}, err } - raw := json.RawMessage(b) - var meta openWeatherMeta - if err := json.Unmarshal(b, &meta); err != nil { + if err := json.Unmarshal(raw, &meta); err != nil { return raw, openWeatherMeta{}, nil }