sources: standardize HTTP source config + factor raw-event boilerplate
- Require params.user_agent for all HTTP sources (uniform config across providers) - Add common.RequireHTTPSourceConfig() to validate name/url/user_agent in one call - Add common.NewHTTPClient() with DefaultHTTPTimeout for consistent client setup - Add common.SingleRawEvent() to centralize event envelope construction + validation - Refactor NWS/Open-Meteo/OpenWeather observation sources to use new helpers
This commit is contained in:
104
internal/sources/common/config.go
Normal file
104
internal/sources/common/config.go
Normal file
@@ -0,0 +1,104 @@
|
||||
// FILE: ./internal/sources/common/config.go
|
||||
package common
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
)
|
||||
|
||||
// This file centralizes small, boring config-validation patterns shared across
|
||||
// weatherfeeder source drivers.
|
||||
//
|
||||
// Goal: keep driver constructors (New*Source) easy to read and consistent, while
|
||||
// keeping driver-specific options in cfg.Params (feedkit remains domain-agnostic).
|
||||
|
||||
// HTTPSourceConfig is the standard "HTTP-polling source" config shape used across drivers.
|
||||
type HTTPSourceConfig struct {
|
||||
Name string
|
||||
URL string
|
||||
UserAgent string
|
||||
}
|
||||
|
||||
// RequireHTTPSourceConfig enforces weatherfeeder's standard HTTP source config:
|
||||
//
|
||||
// - cfg.Name must be present
|
||||
// - cfg.Params must be present
|
||||
// - params.url must be present (accepts "url" or "URL")
|
||||
// - params.user_agent must be present (accepts "user_agent" or "userAgent")
|
||||
//
|
||||
// We intentionally require a User-Agent for *all* sources, even when upstreams
|
||||
// do not strictly require one. This keeps config uniform across providers.
|
||||
func RequireHTTPSourceConfig(driver string, cfg config.SourceConfig) (HTTPSourceConfig, error) {
|
||||
if strings.TrimSpace(cfg.Name) == "" {
|
||||
return HTTPSourceConfig{}, fmt.Errorf("%s: name is required", driver)
|
||||
}
|
||||
if cfg.Params == nil {
|
||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name)
|
||||
}
|
||||
|
||||
url, ok := cfg.ParamString("url", "URL")
|
||||
if !ok {
|
||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
|
||||
}
|
||||
|
||||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
||||
if !ok {
|
||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
|
||||
}
|
||||
|
||||
return HTTPSourceConfig{
|
||||
Name: cfg.Name,
|
||||
URL: url,
|
||||
UserAgent: ua,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// --- The helpers below remain useful for future drivers; they are not required
|
||||
// --- by the observation sources after adopting RequireHTTPSourceConfig.
|
||||
|
||||
// RequireName ensures cfg.Name is present and non-whitespace.
|
||||
func RequireName(driver string, cfg config.SourceConfig) error {
|
||||
if strings.TrimSpace(cfg.Name) == "" {
|
||||
return fmt.Errorf("%s: name is required", driver)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequireParams ensures cfg.Params is non-nil. The "want" string should be a short
|
||||
// description of required keys, e.g. "need params.url and params.user_agent".
|
||||
func RequireParams(driver string, cfg config.SourceConfig, want string) error {
|
||||
if cfg.Params == nil {
|
||||
return fmt.Errorf("%s %q: params are required (%s)", driver, cfg.Name, want)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequireURL returns the configured URL for a source.
|
||||
// Canonical key is "url"; we also accept "URL" as a convenience.
|
||||
func RequireURL(driver string, cfg config.SourceConfig) (string, error) {
|
||||
if cfg.Params == nil {
|
||||
return "", fmt.Errorf("%s %q: params are required (need params.url)", driver, cfg.Name)
|
||||
}
|
||||
|
||||
u, ok := cfg.ParamString("url", "URL")
|
||||
if !ok {
|
||||
return "", fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
// RequireUserAgent returns the configured User-Agent for a source.
|
||||
// Canonical key is "user_agent"; we also accept "userAgent" as a convenience.
|
||||
func RequireUserAgent(driver string, cfg config.SourceConfig) (string, error) {
|
||||
if cfg.Params == nil {
|
||||
return "", fmt.Errorf("%s %q: params are required (need params.user_agent)", driver, cfg.Name)
|
||||
}
|
||||
|
||||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
||||
if !ok {
|
||||
return "", fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
|
||||
}
|
||||
return ua, nil
|
||||
}
|
||||
37
internal/sources/common/event.go
Normal file
37
internal/sources/common/event.go
Normal file
@@ -0,0 +1,37 @@
|
||||
// FILE: ./internal/sources/common/event.go
|
||||
package common
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
)
|
||||
|
||||
// SingleRawEvent constructs, validates, and returns a slice containing exactly one event.
|
||||
//
|
||||
// This removes the repetitive "event envelope ceremony" from individual sources.
|
||||
// Sources remain responsible for:
|
||||
// - fetching bytes (raw payload)
|
||||
// - choosing Schema (raw schema identifier)
|
||||
// - computing a stable Event.ID and (optional) EffectiveAt
|
||||
func SingleRawEvent(kind event.Kind, sourceName string, schema string, id string, effectiveAt *time.Time, payload any) ([]event.Event, error) {
|
||||
e := event.Event{
|
||||
ID: id,
|
||||
Kind: kind,
|
||||
Source: sourceName,
|
||||
EmittedAt: time.Now().UTC(),
|
||||
EffectiveAt: effectiveAt,
|
||||
|
||||
// RAW schema (normalizer matches on this).
|
||||
Schema: schema,
|
||||
|
||||
// Raw payload (usually json.RawMessage). Normalizer will decode and map to canonical model.
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
if err := e.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return []event.Event{e}, nil
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
// FILE: ./internal/sources/common/http.go
|
||||
package common
|
||||
|
||||
import (
|
||||
@@ -5,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// maxResponseBodyBytes is a hard safety limit on HTTP response bodies.
|
||||
@@ -12,6 +14,19 @@ import (
|
||||
// or malicious large responses.
|
||||
const maxResponseBodyBytes = 2 << 21 // 4 MiB
|
||||
|
||||
// DefaultHTTPTimeout is the standard timeout used by weatherfeeder HTTP sources.
|
||||
// Individual drivers may override this if they have a specific need.
|
||||
const DefaultHTTPTimeout = 10 * time.Second
|
||||
|
||||
// NewHTTPClient returns a simple http.Client configured with a timeout.
|
||||
// If timeout <= 0, DefaultHTTPTimeout is used.
|
||||
func NewHTTPClient(timeout time.Duration) *http.Client {
|
||||
if timeout <= 0 {
|
||||
timeout = DefaultHTTPTimeout
|
||||
}
|
||||
return &http.Client{Timeout: timeout}
|
||||
}
|
||||
|
||||
func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept string) ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user