From 59111a1c82befbebf8217e6088517a606f4ee358 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Thu, 15 Jan 2026 09:43:22 -0600 Subject: [PATCH] sources: standardize HTTP source config + factor raw-event boilerplate - Require params.user_agent for all HTTP sources (uniform config across providers) - Add common.RequireHTTPSourceConfig() to validate name/url/user_agent in one call - Add common.NewHTTPClient() with DefaultHTTPTimeout for consistent client setup - Add common.SingleRawEvent() to centralize event envelope construction + validation - Refactor NWS/Open-Meteo/OpenWeather observation sources to use new helpers --- internal/sources/common/config.go | 104 ++++++++++++++++++++ internal/sources/common/event.go | 37 +++++++ internal/sources/common/http.go | 15 +++ internal/sources/nws/observation.go | 55 +++-------- internal/sources/openmeteo/observation.go | 89 ++++------------- internal/sources/openweather/observation.go | 89 ++++------------- 6 files changed, 212 insertions(+), 177 deletions(-) create mode 100644 internal/sources/common/config.go create mode 100644 internal/sources/common/event.go diff --git a/internal/sources/common/config.go b/internal/sources/common/config.go new file mode 100644 index 0000000..7290465 --- /dev/null +++ b/internal/sources/common/config.go @@ -0,0 +1,104 @@ +// FILE: ./internal/sources/common/config.go +package common + +import ( + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +// This file centralizes small, boring config-validation patterns shared across +// weatherfeeder source drivers. +// +// Goal: keep driver constructors (New*Source) easy to read and consistent, while +// keeping driver-specific options in cfg.Params (feedkit remains domain-agnostic). + +// HTTPSourceConfig is the standard "HTTP-polling source" config shape used across drivers. +type HTTPSourceConfig struct { + Name string + URL string + UserAgent string +} + +// RequireHTTPSourceConfig enforces weatherfeeder's standard HTTP source config: +// +// - cfg.Name must be present +// - cfg.Params must be present +// - params.url must be present (accepts "url" or "URL") +// - params.user_agent must be present (accepts "user_agent" or "userAgent") +// +// We intentionally require a User-Agent for *all* sources, even when upstreams +// do not strictly require one. This keeps config uniform across providers. +func RequireHTTPSourceConfig(driver string, cfg config.SourceConfig) (HTTPSourceConfig, error) { + if strings.TrimSpace(cfg.Name) == "" { + return HTTPSourceConfig{}, fmt.Errorf("%s: name is required", driver) + } + if cfg.Params == nil { + return HTTPSourceConfig{}, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name) + } + + url, ok := cfg.ParamString("url", "URL") + if !ok { + return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name) + } + + ua, ok := cfg.ParamString("user_agent", "userAgent") + if !ok { + return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name) + } + + return HTTPSourceConfig{ + Name: cfg.Name, + URL: url, + UserAgent: ua, + }, nil +} + +// --- The helpers below remain useful for future drivers; they are not required +// --- by the observation sources after adopting RequireHTTPSourceConfig. + +// RequireName ensures cfg.Name is present and non-whitespace. +func RequireName(driver string, cfg config.SourceConfig) error { + if strings.TrimSpace(cfg.Name) == "" { + return fmt.Errorf("%s: name is required", driver) + } + return nil +} + +// RequireParams ensures cfg.Params is non-nil. The "want" string should be a short +// description of required keys, e.g. "need params.url and params.user_agent". +func RequireParams(driver string, cfg config.SourceConfig, want string) error { + if cfg.Params == nil { + return fmt.Errorf("%s %q: params are required (%s)", driver, cfg.Name, want) + } + return nil +} + +// RequireURL returns the configured URL for a source. +// Canonical key is "url"; we also accept "URL" as a convenience. +func RequireURL(driver string, cfg config.SourceConfig) (string, error) { + if cfg.Params == nil { + return "", fmt.Errorf("%s %q: params are required (need params.url)", driver, cfg.Name) + } + + u, ok := cfg.ParamString("url", "URL") + if !ok { + return "", fmt.Errorf("%s %q: params.url is required", driver, cfg.Name) + } + return u, nil +} + +// RequireUserAgent returns the configured User-Agent for a source. +// Canonical key is "user_agent"; we also accept "userAgent" as a convenience. +func RequireUserAgent(driver string, cfg config.SourceConfig) (string, error) { + if cfg.Params == nil { + return "", fmt.Errorf("%s %q: params are required (need params.user_agent)", driver, cfg.Name) + } + + ua, ok := cfg.ParamString("user_agent", "userAgent") + if !ok { + return "", fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name) + } + return ua, nil +} diff --git a/internal/sources/common/event.go b/internal/sources/common/event.go new file mode 100644 index 0000000..584487d --- /dev/null +++ b/internal/sources/common/event.go @@ -0,0 +1,37 @@ +// FILE: ./internal/sources/common/event.go +package common + +import ( + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// SingleRawEvent constructs, validates, and returns a slice containing exactly one event. +// +// This removes the repetitive "event envelope ceremony" from individual sources. +// Sources remain responsible for: +// - fetching bytes (raw payload) +// - choosing Schema (raw schema identifier) +// - computing a stable Event.ID and (optional) EffectiveAt +func SingleRawEvent(kind event.Kind, sourceName string, schema string, id string, effectiveAt *time.Time, payload any) ([]event.Event, error) { + e := event.Event{ + ID: id, + Kind: kind, + Source: sourceName, + EmittedAt: time.Now().UTC(), + EffectiveAt: effectiveAt, + + // RAW schema (normalizer matches on this). + Schema: schema, + + // Raw payload (usually json.RawMessage). Normalizer will decode and map to canonical model. + Payload: payload, + } + + if err := e.Validate(); err != nil { + return nil, err + } + + return []event.Event{e}, nil +} diff --git a/internal/sources/common/http.go b/internal/sources/common/http.go index d686fad..b582c2f 100644 --- a/internal/sources/common/http.go +++ b/internal/sources/common/http.go @@ -1,3 +1,4 @@ +// FILE: ./internal/sources/common/http.go package common import ( @@ -5,6 +6,7 @@ import ( "fmt" "io" "net/http" + "time" ) // maxResponseBodyBytes is a hard safety limit on HTTP response bodies. @@ -12,6 +14,19 @@ import ( // or malicious large responses. const maxResponseBodyBytes = 2 << 21 // 4 MiB +// DefaultHTTPTimeout is the standard timeout used by weatherfeeder HTTP sources. +// Individual drivers may override this if they have a specific need. +const DefaultHTTPTimeout = 10 * time.Second + +// NewHTTPClient returns a simple http.Client configured with a timeout. +// If timeout <= 0, DefaultHTTPTimeout is used. +func NewHTTPClient(timeout time.Duration) *http.Client { + if timeout <= 0 { + timeout = DefaultHTTPTimeout + } + return &http.Client{Timeout: timeout} +} + func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept string) ([]byte, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index 4e7c2b7..7984444 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -32,30 +32,18 @@ type ObservationSource struct { } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { - if strings.TrimSpace(cfg.Name) == "" { - return nil, fmt.Errorf("nws_observation: name is required") - } - if cfg.Params == nil { - return nil, fmt.Errorf("nws_observation %q: params are required (need params.url and params.user_agent)", cfg.Name) - } + const driver = "nws_observation" - url, ok := cfg.ParamString("url", "URL") - if !ok { - return nil, fmt.Errorf("nws_observation %q: params.url is required", cfg.Name) - } - - ua, ok := cfg.ParamString("user_agent", "userAgent") - if !ok { - return nil, fmt.Errorf("nws_observation %q: params.user_agent is required", cfg.Name) + c, err := common.RequireHTTPSourceConfig(driver, cfg) + if err != nil { + return nil, err } return &ObservationSource{ - name: cfg.Name, - url: url, - userAgent: ua, - client: &http.Client{ - Timeout: 10 * time.Second, - }, + name: c.Name, + url: c.URL, + userAgent: c.UserAgent, + client: common.NewHTTPClient(common.DefaultHTTPTimeout), }, nil } @@ -95,25 +83,14 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { effectiveAt = &t } - e := event.Event{ - ID: eventID, - Kind: s.Kind(), - Source: s.name, - EmittedAt: time.Now().UTC(), - EffectiveAt: effectiveAt, - - // RAW schema (normalizer matches on this). - Schema: standards.SchemaRawNWSObservationV1, - - // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. - Payload: raw, - } - - if err := e.Validate(); err != nil { - return nil, err - } - - return []event.Event{e}, nil + return common.SingleRawEvent( + s.Kind(), + s.name, + standards.SchemaRawNWSObservationV1, + eventID, + effectiveAt, + raw, + ) } // ---- RAW fetch + minimal metadata decode ---- diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index 524b26d..d10454f 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -16,14 +16,6 @@ import ( ) // ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event. -// -// Refactor (mirrors NWS/OpenWeather): -// - Source responsibility: fetch bytes + emit a valid event envelope -// - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation -// -// Typical URL shape (you provide this via config): -// -// https://api.open-meteo.com/v1/forecast?latitude=...&longitude=...¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,wind_gusts_10m,surface_pressure,pressure_msl&timezone=GMT type ObservationSource struct { name string url string @@ -32,39 +24,28 @@ type ObservationSource struct { } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { - if strings.TrimSpace(cfg.Name) == "" { - return nil, fmt.Errorf("openmeteo_observation: name is required") - } - if cfg.Params == nil { - return nil, fmt.Errorf("openmeteo_observation %q: params are required (need params.url)", cfg.Name) - } + const driver = "openmeteo_observation" - url, ok := cfg.ParamString("url", "URL") - if !ok { - return nil, fmt.Errorf("openmeteo_observation %q: params.url is required", cfg.Name) + // We require params.user_agent for uniformity across sources (even though Open-Meteo + // itself does not strictly require a special User-Agent). + c, err := common.RequireHTTPSourceConfig(driver, cfg) + if err != nil { + return nil, err } - // Open-Meteo doesn't require a special User-Agent, but including one is polite. - ua := cfg.ParamStringDefault("weatherfeeder (open-meteo client)", "user_agent", "userAgent") - return &ObservationSource{ - name: cfg.Name, - url: url, - userAgent: ua, - client: &http.Client{ - Timeout: 10 * time.Second, - }, + name: c.Name, + url: c.URL, + userAgent: c.UserAgent, + client: common.NewHTTPClient(common.DefaultHTTPTimeout), }, nil } func (s *ObservationSource) Name() string { return s.name } -// Kind is used for routing/policy. -// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } // Poll fetches Open-Meteo "current" and emits exactly one RAW Event. -// The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenMeteoCurrentV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { @@ -77,40 +58,24 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) } - // EffectiveAt is optional; for observations it’s naturally the observation timestamp. var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { t := meta.ParsedTimestamp.UTC() effectiveAt = &t } - e := event.Event{ - ID: eventID, - Kind: s.Kind(), - Source: s.name, - EmittedAt: time.Now().UTC(), - EffectiveAt: effectiveAt, - - // RAW schema (normalizer matches on this). - Schema: standards.SchemaRawOpenMeteoCurrentV1, - - // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. - Payload: raw, - } - - if err := e.Validate(); err != nil { - return nil, err - } - - return []event.Event{e}, nil + return common.SingleRawEvent( + s.Kind(), + s.name, + standards.SchemaRawOpenMeteoCurrentV1, + eventID, + effectiveAt, + raw, + ) } // ---- RAW fetch + minimal metadata decode ---- -// openMeteoMeta is a *minimal* decode of the Open-Meteo payload used only to build -// a stable Event.ID and a useful EffectiveAt for the envelope. -// -// Important: this is NOT where we map into internal/model. That belongs in the normalizer. type openMeteoMeta struct { Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` @@ -118,10 +83,9 @@ type openMeteoMeta struct { UTCOffsetSeconds int `json:"utc_offset_seconds"` Current struct { - Time string `json:"time"` // e.g. "2026-01-10T12:30" (often no timezone suffix) + Time string `json:"time"` } `json:"current"` - // Convenience fields populated after decode. ParsedTimestamp time.Time `json:"-"` } @@ -139,8 +103,6 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open return raw, openMeteoMeta{}, nil } - // Best-effort parse of current.time so the envelope carries a meaningful EffectiveAt. - // This duplicates the parsing logic in the normalizer, but ONLY for envelope metadata. if t, err := parseOpenMeteoTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil { meta.ParsedTimestamp = t.UTC() } @@ -149,7 +111,6 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open } func buildEventID(sourceName string, meta openMeteoMeta) string { - // Prefer stable location key from lat/lon if present. locKey := "" if meta.Latitude != 0 || meta.Longitude != 0 { locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Latitude, meta.Longitude) @@ -159,42 +120,30 @@ func buildEventID(sourceName string, meta openMeteoMeta) string { ts := meta.ParsedTimestamp if ts.IsZero() { - // If we couldn't parse current.time, use "now" so we still emit. ts = time.Now().UTC() } - // Example: - // openmeteo:current::coord:38.62390,-90.35710:2026-01-14T18:00:00.000Z return fmt.Sprintf("openmeteo:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) } -// parseOpenMeteoTime parses Open-Meteo "current.time" using timezone/offset hints. -// -// Open-Meteo commonly returns "YYYY-MM-DDTHH:MM" (no timezone suffix) when timezone -// is provided separately. When a timezone suffix is present (RFC3339), we accept it too. func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) { s = strings.TrimSpace(s) if s == "" { return time.Time{}, fmt.Errorf("empty time") } - // If the server returned an RFC3339 timestamp with timezone, take it as authoritative. if t, err := time.Parse(time.RFC3339, s); err == nil { return t, nil } - // Typical Open-Meteo format: "2006-01-02T15:04" const layout = "2006-01-02T15:04" - // Best effort: try to load the timezone as an IANA name. - // Examples Open-Meteo might return: "GMT", "America/Chicago". if tz != "" { if loc, err := time.LoadLocation(tz); err == nil { return time.ParseInLocation(layout, s, loc) } } - // Fallback: use the offset seconds to create a fixed zone. (If offset is 0, this is UTC.) loc := time.FixedZone("open-meteo", utcOffsetSeconds) return time.ParseInLocation(layout, s, loc) } diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 1190ccf..185d752 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -18,14 +18,6 @@ import ( // ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits a RAW observation Event. // -// Refactor (mirrors NWS): -// - Source responsibility: fetch bytes + emit a valid event envelope. -// - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation. -// -// Typical URL shape (provided via config): -// -// https://api.openweathermap.org/data/2.5/weather?lat=...&lon=...&appid=...&units=metric -// // IMPORTANT UNIT POLICY (weatherfeeder convention): // OpenWeather changes units based on the `units` query parameter but does NOT include the unit // system in the response body. To keep normalization deterministic, this driver *requires* @@ -38,43 +30,29 @@ type ObservationSource struct { } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { - if strings.TrimSpace(cfg.Name) == "" { - return nil, fmt.Errorf("openweather_observation: name is required") - } - if cfg.Params == nil { - return nil, fmt.Errorf("openweather_observation %q: params are required (need params.url)", cfg.Name) + const driver = "openweather_observation" + + c, err := common.RequireHTTPSourceConfig(driver, cfg) + if err != nil { + return nil, err } - rawURL, ok := cfg.ParamString("url", "URL") - if !ok { - return nil, fmt.Errorf("openweather_observation %q: params.url is required", cfg.Name) + if err := requireMetricUnits(c.URL); err != nil { + return nil, fmt.Errorf("openweather_observation %q: %w", c.Name, err) } - // Fail fast: enforce deterministic unit system. - if err := requireMetricUnits(rawURL); err != nil { - return nil, fmt.Errorf("openweather_observation %q: %w", cfg.Name, err) - } - - ua := cfg.ParamStringDefault("weatherfeeder (openweather client)", "user_agent", "userAgent") - return &ObservationSource{ - name: cfg.Name, - url: rawURL, - userAgent: ua, - client: &http.Client{ - Timeout: 10 * time.Second, - }, + name: c.Name, + url: c.URL, + userAgent: c.UserAgent, + client: common.NewHTTPClient(common.DefaultHTTPTimeout), }, nil } func (s *ObservationSource) Name() string { return s.name } -// Kind is used for routing/policy. -// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } -// Poll fetches OpenWeather "current weather" and emits exactly one RAW Event. -// The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenWeatherCurrentV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { // Re-check policy defensively (in case the URL is mutated after construction). if err := requireMetricUnits(s.url); err != nil { @@ -88,7 +66,6 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := buildEventID(s.name, meta) if strings.TrimSpace(eventID) == "" { - // Extremely defensive fallback: should never happen, but keep the envelope valid. eventID = fmt.Sprintf("openweather:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) } @@ -98,43 +75,29 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { effectiveAt = &t } - e := event.Event{ - ID: eventID, - Kind: s.Kind(), - Source: s.name, - EmittedAt: time.Now().UTC(), - EffectiveAt: effectiveAt, - - // RAW schema (normalizer matches on this). - Schema: standards.SchemaRawOpenWeatherCurrentV1, - - // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. - Payload: raw, - } - - if err := e.Validate(); err != nil { - return nil, err - } - - return []event.Event{e}, nil + return common.SingleRawEvent( + s.Kind(), + s.name, + standards.SchemaRawOpenWeatherCurrentV1, + eventID, + effectiveAt, + raw, + ) } // ---- RAW fetch + minimal metadata decode ---- -// openWeatherMeta is a *minimal* decode of the OpenWeather payload used only to build -// a stable Event.ID and a useful EffectiveAt for the envelope. type openWeatherMeta struct { Dt int64 `json:"dt"` // unix seconds, UTC - ID int64 `json:"id"` // city id (if present) - Name string `json:"name"` // city name (optional) + ID int64 `json:"id"` + Name string `json:"name"` Coord struct { Lon float64 `json:"lon"` Lat float64 `json:"lat"` } `json:"coord"` - // Convenience fields populated after decode. ParsedTimestamp time.Time `json:"-"` } @@ -148,7 +111,6 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open var meta openWeatherMeta if err := json.Unmarshal(b, &meta); err != nil { - // If metadata decode fails, still return raw; envelope will fall back to computed ID. return raw, openWeatherMeta{}, nil } @@ -160,7 +122,6 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open } func buildEventID(sourceName string, meta openWeatherMeta) string { - // Prefer provider city ID if present; otherwise fall back to lat/lon. locKey := "" if meta.ID != 0 { locKey = fmt.Sprintf("city:%d", meta.ID) @@ -172,19 +133,12 @@ func buildEventID(sourceName string, meta openWeatherMeta) string { ts := meta.ParsedTimestamp if ts.IsZero() { - // We prefer stable IDs, but if the payload didn't decode, use "now" so we still emit. ts = time.Now().UTC() } - // Example: - // openweather:current::city:12345:2026-01-14T17:00:00.123Z return fmt.Sprintf("openweather:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) } -// requireMetricUnits enforces weatherfeeder's OpenWeather unit policy. -// -// OpenWeather does not tell us the unit system in the response body. We therefore enforce that -// the request URL explicitly contains units=metric; otherwise normalization would be ambiguous. func requireMetricUnits(rawURL string) error { u, err := url.Parse(strings.TrimSpace(rawURL)) if err != nil { @@ -193,7 +147,6 @@ func requireMetricUnits(rawURL string) error { units := strings.ToLower(strings.TrimSpace(u.Query().Get("units"))) if units != "metric" { - // Treat missing units ("" -> standard) as non-compliant too. if units == "" { units = "(missing; defaults to standard)" }