v0.x: add reusable HTTP source spine; fix routing; upstream HTTP transport helper
- fix dispatch route compilation so empty Kinds matches all (nil), not none - introduce internal/sources/common/HTTPSource to centralize HTTP polling boilerplate: - standard cfg parsing (url + user_agent) - default HTTP client + Accept/User-Agent headers - consistent error wrapping - refactor observation sources (nws/openmeteo/openweather) to use HTTPSource - upstream generic HTTP fetch/limits/timeout helper from weatherfeeder to feedkit: - move internal/sources/common/http.go -> feedkit/transport/http.go - keep behavior: status checks, max-body limit, default timeout
This commit is contained in:
@@ -1,70 +1,76 @@
|
|||||||
// FILE: ./internal/sources/common/http.go
|
// FILE: ./internal/sources/common/http_source.go
|
||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
// maxResponseBodyBytes is a hard safety limit on HTTP response bodies.
|
// HTTPSource is a tiny, reusable "HTTP polling spine" for weatherfeeder sources.
|
||||||
// API responses should be small, so this protects us from accidental
|
//
|
||||||
// or malicious large responses.
|
// It centralizes the boring parts:
|
||||||
const maxResponseBodyBytes = 2 << 21 // 4 MiB
|
// - standard config shape (url + user_agent) via RequireHTTPSourceConfig
|
||||||
|
// - a default http.Client with timeout
|
||||||
// DefaultHTTPTimeout is the standard timeout used by weatherfeeder HTTP sources.
|
// - FetchBody / headers / max-body safety limit
|
||||||
// Individual drivers may override this if they have a specific need.
|
// - consistent error wrapping (driver + source name)
|
||||||
const DefaultHTTPTimeout = 10 * time.Second
|
//
|
||||||
|
// Individual drivers remain responsible for:
|
||||||
// NewHTTPClient returns a simple http.Client configured with a timeout.
|
// - decoding minimal metadata (for Event.ID / EffectiveAt)
|
||||||
// If timeout <= 0, DefaultHTTPTimeout is used.
|
// - constructing the event envelope (kind/schema/payload)
|
||||||
func NewHTTPClient(timeout time.Duration) *http.Client {
|
type HTTPSource struct {
|
||||||
if timeout <= 0 {
|
Driver string
|
||||||
timeout = DefaultHTTPTimeout
|
Name string
|
||||||
}
|
URL string
|
||||||
return &http.Client{Timeout: timeout}
|
UserAgent string
|
||||||
|
Accept string
|
||||||
|
Client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept string) ([]byte, error) {
|
// NewHTTPSource builds an HTTPSource using weatherfeeder's standard HTTP source
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
// config (params.url + params.user_agent) and a default HTTP client.
|
||||||
|
func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) {
|
||||||
|
c, err := RequireHTTPSourceConfig(driver, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if userAgent != "" {
|
return &HTTPSource{
|
||||||
req.Header.Set("User-Agent", userAgent)
|
Driver: driver,
|
||||||
}
|
Name: c.Name,
|
||||||
if accept != "" {
|
URL: c.URL,
|
||||||
req.Header.Set("Accept", accept)
|
UserAgent: c.UserAgent,
|
||||||
|
Accept: accept,
|
||||||
|
Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := client.Do(req)
|
// FetchBytes fetches the URL and returns the raw response body bytes.
|
||||||
|
func (s *HTTPSource) FetchBytes(ctx context.Context) ([]byte, error) {
|
||||||
|
client := s.Client
|
||||||
|
if client == nil {
|
||||||
|
// Defensive: allow tests or callers to nil out Client; keep behavior sane.
|
||||||
|
client = transport.NewHTTPClient(transport.DefaultHTTPTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := transport.FetchBody(ctx, client, s.URL, s.UserAgent, s.Accept)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err)
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
|
||||||
|
|
||||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
|
||||||
return nil, fmt.Errorf("HTTP %s", res.Status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read at most maxResponseBodyBytes + 1 so we can detect overflow.
|
|
||||||
limited := io.LimitReader(res.Body, maxResponseBodyBytes+1)
|
|
||||||
|
|
||||||
b, err := io.ReadAll(limited)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(b) == 0 {
|
|
||||||
return nil, fmt.Errorf("empty response body")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(b) > maxResponseBodyBytes {
|
|
||||||
return nil, fmt.Errorf("response body too large (>%d bytes)", maxResponseBodyBytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FetchJSON fetches the URL and returns the raw body as json.RawMessage.
|
||||||
|
// json.Unmarshal accepts json.RawMessage directly, so callers can decode minimal
|
||||||
|
// metadata without keeping both []byte and RawMessage in their own structs.
|
||||||
|
func (s *HTTPSource) FetchJSON(ctx context.Context) (json.RawMessage, error) {
|
||||||
|
b, err := s.FetchBytes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return json.RawMessage(b), nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -21,29 +20,21 @@ import (
|
|||||||
//
|
//
|
||||||
// https://api.weather.gov/stations/KSTL/observations/latest
|
// https://api.weather.gov/stations/KSTL/observations/latest
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
name string
|
http *common.HTTPSource
|
||||||
url string
|
|
||||||
userAgent string
|
|
||||||
client *http.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "nws_observation"
|
const driver = "nws_observation"
|
||||||
|
|
||||||
c, err := common.RequireHTTPSourceConfig(driver, cfg)
|
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ObservationSource{
|
return &ObservationSource{http: hs}, nil
|
||||||
name: c.Name,
|
|
||||||
url: c.URL,
|
|
||||||
userAgent: c.UserAgent,
|
|
||||||
client: common.NewHTTPClient(common.DefaultHTTPTimeout),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) Name() string { return s.name }
|
func (s *ObservationSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
// Kind is used for routing/policy.
|
// Kind is used for routing/policy.
|
||||||
// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical.
|
// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical.
|
||||||
@@ -65,11 +56,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
if ts.IsZero() {
|
if ts.IsZero() {
|
||||||
ts = time.Now().UTC()
|
ts = time.Now().UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
station := strings.TrimSpace(meta.StationID)
|
station := strings.TrimSpace(meta.StationID)
|
||||||
if station == "" {
|
if station == "" {
|
||||||
station = "UNKNOWN"
|
station = "UNKNOWN"
|
||||||
}
|
}
|
||||||
eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano))
|
|
||||||
|
eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.http.Name, station, ts.UTC().Format(time.RFC3339Nano))
|
||||||
}
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
||||||
@@ -81,7 +74,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
|
|
||||||
return common.SingleRawEvent(
|
return common.SingleRawEvent(
|
||||||
s.Kind(),
|
s.Kind(),
|
||||||
s.name,
|
s.http.Name,
|
||||||
standards.SchemaRawNWSObservationV1,
|
standards.SchemaRawNWSObservationV1,
|
||||||
eventID,
|
eventID,
|
||||||
effectiveAt,
|
effectiveAt,
|
||||||
@@ -106,15 +99,13 @@ type observationMeta struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
|
||||||
b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/geo+json, application/json")
|
raw, err := s.http.FetchJSON(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, observationMeta{}, fmt.Errorf("nws_observation %q: %w", s.name, err)
|
return nil, observationMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
raw := json.RawMessage(b)
|
|
||||||
|
|
||||||
var meta observationMeta
|
var meta observationMeta
|
||||||
if err := json.Unmarshal(b, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; envelope will fall back to computed ID.
|
// If metadata decode fails, still return raw; envelope will fall back to computed ID.
|
||||||
return raw, observationMeta{}, nil
|
return raw, observationMeta{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -18,10 +17,7 @@ import (
|
|||||||
|
|
||||||
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
name string
|
http *common.HTTPSource
|
||||||
url string
|
|
||||||
userAgent string
|
|
||||||
client *http.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
@@ -29,20 +25,15 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
|
|
||||||
// We require params.user_agent for uniformity across sources (even though Open-Meteo
|
// We require params.user_agent for uniformity across sources (even though Open-Meteo
|
||||||
// itself does not strictly require a special User-Agent).
|
// itself does not strictly require a special User-Agent).
|
||||||
c, err := common.RequireHTTPSourceConfig(driver, cfg)
|
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ObservationSource{
|
return &ObservationSource{http: hs}, nil
|
||||||
name: c.Name,
|
|
||||||
url: c.URL,
|
|
||||||
userAgent: c.UserAgent,
|
|
||||||
client: common.NewHTTPClient(common.DefaultHTTPTimeout),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) Name() string { return s.name }
|
func (s *ObservationSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
@@ -53,10 +44,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
eventID := buildEventID(s.name, meta)
|
eventID := buildEventID(s.http.Name, meta)
|
||||||
if strings.TrimSpace(eventID) == "" {
|
if strings.TrimSpace(eventID) == "" {
|
||||||
// Extremely defensive fallback: keep the envelope valid no matter what.
|
// Extremely defensive fallback: keep the envelope valid no matter what.
|
||||||
eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano))
|
eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano))
|
||||||
}
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
@@ -67,7 +58,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
|
|
||||||
return common.SingleRawEvent(
|
return common.SingleRawEvent(
|
||||||
s.Kind(),
|
s.Kind(),
|
||||||
s.name,
|
s.http.Name,
|
||||||
standards.SchemaRawOpenMeteoCurrentV1,
|
standards.SchemaRawOpenMeteoCurrentV1,
|
||||||
eventID,
|
eventID,
|
||||||
effectiveAt,
|
effectiveAt,
|
||||||
@@ -91,15 +82,13 @@ type openMeteoMeta struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) {
|
||||||
b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json")
|
raw, err := s.http.FetchJSON(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: %w", s.name, err)
|
return nil, openMeteoMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
raw := json.RawMessage(b)
|
|
||||||
|
|
||||||
var meta openMeteoMeta
|
var meta openMeteoMeta
|
||||||
if err := json.Unmarshal(b, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; envelope will fall back to computed ID without EffectiveAt.
|
// If metadata decode fails, still return raw; envelope will fall back to computed ID without EffectiveAt.
|
||||||
return raw, openMeteoMeta{}, nil
|
return raw, openMeteoMeta{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -23,40 +22,32 @@ import (
|
|||||||
// system in the response body. To keep normalization deterministic, this driver *requires*
|
// system in the response body. To keep normalization deterministic, this driver *requires*
|
||||||
// `units=metric`. If absent (or non-metric), the driver returns an error.
|
// `units=metric`. If absent (or non-metric), the driver returns an error.
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
name string
|
http *common.HTTPSource
|
||||||
url string
|
|
||||||
userAgent string
|
|
||||||
client *http.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "openweather_observation"
|
const driver = "openweather_observation"
|
||||||
|
|
||||||
c, err := common.RequireHTTPSourceConfig(driver, cfg)
|
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := requireMetricUnits(c.URL); err != nil {
|
if err := requireMetricUnits(hs.URL); err != nil {
|
||||||
return nil, fmt.Errorf("openweather_observation %q: %w", c.Name, err)
|
return nil, fmt.Errorf("%s %q: %w", hs.Driver, hs.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ObservationSource{
|
return &ObservationSource{http: hs}, nil
|
||||||
name: c.Name,
|
|
||||||
url: c.URL,
|
|
||||||
userAgent: c.UserAgent,
|
|
||||||
client: common.NewHTTPClient(common.DefaultHTTPTimeout),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) Name() string { return s.name }
|
func (s *ObservationSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
// Re-check policy defensively (in case the URL is mutated after construction).
|
// Re-check policy defensively (in case the URL is mutated after construction).
|
||||||
if err := requireMetricUnits(s.url); err != nil {
|
if err := requireMetricUnits(s.http.URL); err != nil {
|
||||||
return nil, fmt.Errorf("openweather_observation %q: %w", s.name, err)
|
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, err := s.fetchRaw(ctx)
|
||||||
@@ -64,9 +55,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
eventID := buildEventID(s.name, meta)
|
eventID := buildEventID(s.http.Name, meta)
|
||||||
if strings.TrimSpace(eventID) == "" {
|
if strings.TrimSpace(eventID) == "" {
|
||||||
eventID = fmt.Sprintf("openweather:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano))
|
eventID = fmt.Sprintf("openweather:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano))
|
||||||
}
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
@@ -77,7 +68,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
|
|
||||||
return common.SingleRawEvent(
|
return common.SingleRawEvent(
|
||||||
s.Kind(),
|
s.Kind(),
|
||||||
s.name,
|
s.http.Name,
|
||||||
standards.SchemaRawOpenWeatherCurrentV1,
|
standards.SchemaRawOpenWeatherCurrentV1,
|
||||||
eventID,
|
eventID,
|
||||||
effectiveAt,
|
effectiveAt,
|
||||||
@@ -102,15 +93,13 @@ type openWeatherMeta struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) {
|
||||||
b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json")
|
raw, err := s.http.FetchJSON(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: %w", s.name, err)
|
return nil, openWeatherMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
raw := json.RawMessage(b)
|
|
||||||
|
|
||||||
var meta openWeatherMeta
|
var meta openWeatherMeta
|
||||||
if err := json.Unmarshal(b, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
return raw, openWeatherMeta{}, nil
|
return raw, openWeatherMeta{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user