From e28ff49201cf52efc96c03aba0abd42a238be492 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Thu, 15 Jan 2026 08:58:56 -0600 Subject: [PATCH] Moved common HTTP body fetch code into a shared helper function. --- internal/model/event.go | 212 ++++++++++++++++++++ internal/normalizers/common/payload.go | 3 +- internal/sources/common/http.go | 55 +++++ internal/sources/nws/observation.go | 27 +-- internal/sources/openmeteo/observation.go | 27 +-- internal/sources/openweather/observation.go | 27 +-- 6 files changed, 277 insertions(+), 74 deletions(-) create mode 100644 internal/model/event.go create mode 100644 internal/sources/common/http.go diff --git a/internal/model/event.go b/internal/model/event.go new file mode 100644 index 0000000..0531348 --- /dev/null +++ b/internal/model/event.go @@ -0,0 +1,212 @@ +package model + +import ( + "errors" + "fmt" + "strings" + "time" +) + +// ErrInvalidEvent is a sentinel error used for errors.Is checks. +var ErrInvalidEvent = errors.New("invalid event") + +// EventValidationError reports one or more problems with an Event. +// +// We keep this structured because it makes debugging faster than a single +// "invalid event" string; you get all issues in one pass. +type EventValidationError struct { + Problems []string +} + +func (e *EventValidationError) Error() string { + if e == nil || len(e.Problems) == 0 { + return "invalid event" + } + var b strings.Builder + b.WriteString("invalid event:\n") + for _, p := range e.Problems { + b.WriteString(" - ") + b.WriteString(p) + b.WriteString("\n") + } + return strings.TrimRight(b.String(), "\n") +} + +// Is lets errors.Is(err, ErrInvalidEvent) work. +func (e *EventValidationError) Is(target error) bool { + return target == ErrInvalidEvent +} + +// Event is the normalized unit your pipeline moves around. +// It wraps exactly one of Observation/Forecast/Alert plus metadata. +type Event struct { + ID string // stable dedupe/storage key (source-defined or computed) + Kind Kind + Source string // configured source name (e.g. "NWSObservationKSTL") + EmittedAt time.Time // when *your* system emitted this event + EffectiveAt *time.Time // optional: “time the event applies” + + // Union payload: EXACTLY ONE must be non-nil. + Observation *WeatherObservation + Forecast *WeatherForecast + Alert *WeatherAlert +} + +// Validate enforces Event invariants. +// +// This is intentionally strict. If an event is invalid, we want to find out +// immediately rather than letting it drift into sinks or storage. +// +// Invariants enforced: +// - ID is non-empty +// - Kind is known +// - Source is non-empty +// - EmittedAt is non-zero +// - Exactly one payload pointer is non-nil +// - Kind matches the non-nil payload +func (e Event) Validate() error { + var problems []string + + if strings.TrimSpace(e.ID) == "" { + problems = append(problems, "ID is required") + } + if !e.Kind.IsKnown() { + problems = append(problems, fmt.Sprintf("Kind %q is not recognized", string(e.Kind))) + } + if strings.TrimSpace(e.Source) == "" { + problems = append(problems, "Source is required") + } + if e.EmittedAt.IsZero() { + problems = append(problems, "EmittedAt must be set (non-zero)") + } + + // Count payloads and ensure Kind matches. + payloadCount := 0 + if e.Observation != nil { + payloadCount++ + if e.Kind != KindObservation { + problems = append(problems, fmt.Sprintf("Observation payload present but Kind=%q", string(e.Kind))) + } + } + if e.Forecast != nil { + payloadCount++ + if e.Kind != KindForecast { + problems = append(problems, fmt.Sprintf("Forecast payload present but Kind=%q", string(e.Kind))) + } + } + if e.Alert != nil { + payloadCount++ + if e.Kind != KindAlert { + problems = append(problems, fmt.Sprintf("Alert payload present but Kind=%q", string(e.Kind))) + } + } + + if payloadCount == 0 { + problems = append(problems, "exactly one payload must be set; all payloads are nil") + } else if payloadCount > 1 { + problems = append(problems, "exactly one payload must be set; multiple payloads are non-nil") + } + + if len(problems) > 0 { + return &EventValidationError{Problems: problems} + } + return nil +} + +// NewObservationEvent constructs a valid observation Event. +// +// If emittedAt is zero, it defaults to time.Now().UTC(). +// effectiveAt is optional (nil allowed). +// +// The returned Event is guaranteed valid (or you get an error). +func NewObservationEvent( + id string, + source string, + emittedAt time.Time, + effectiveAt *time.Time, + obs *WeatherObservation, +) (Event, error) { + if obs == nil { + return Event{}, fmt.Errorf("%w: observation payload is nil", ErrInvalidEvent) + } + + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } + + e := Event{ + ID: strings.TrimSpace(id), + Kind: KindObservation, + Source: strings.TrimSpace(source), + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Observation: obs, + } + + if err := e.Validate(); err != nil { + return Event{}, err + } + return e, nil +} + +// NewForecastEvent constructs a valid forecast Event. +func NewForecastEvent( + id string, + source string, + emittedAt time.Time, + effectiveAt *time.Time, + fc *WeatherForecast, +) (Event, error) { + if fc == nil { + return Event{}, fmt.Errorf("%w: forecast payload is nil", ErrInvalidEvent) + } + + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } + + e := Event{ + ID: strings.TrimSpace(id), + Kind: KindForecast, + Source: strings.TrimSpace(source), + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Forecast: fc, + } + + if err := e.Validate(); err != nil { + return Event{}, err + } + return e, nil +} + +// NewAlertEvent constructs a valid alert Event. +func NewAlertEvent( + id string, + source string, + emittedAt time.Time, + effectiveAt *time.Time, + a *WeatherAlert, +) (Event, error) { + if a == nil { + return Event{}, fmt.Errorf("%w: alert payload is nil", ErrInvalidEvent) + } + + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } + + e := Event{ + ID: strings.TrimSpace(id), + Kind: KindAlert, + Source: strings.TrimSpace(source), + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Alert: a, + } + + if err := e.Validate(); err != nil { + return Event{}, err + } + return e, nil +} diff --git a/internal/normalizers/common/payload.go b/internal/normalizers/common/payload.go index 00da9e3..c22ed5d 100644 --- a/internal/normalizers/common/payload.go +++ b/internal/normalizers/common/payload.go @@ -1,4 +1,3 @@ -// FILE: ./internal/normalizers/common/payload.go package common import ( @@ -8,7 +7,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/event" ) -// PayloadBytes extracts a JSON-ish payload into bytes suitable for json.Unmarshal. +// PayloadBytes extracts a JSON payload into bytes suitable for json.Unmarshal. // // Supported payload shapes (weatherfeeder convention): // - json.RawMessage (recommended for raw events) diff --git a/internal/sources/common/http.go b/internal/sources/common/http.go new file mode 100644 index 0000000..d686fad --- /dev/null +++ b/internal/sources/common/http.go @@ -0,0 +1,55 @@ +package common + +import ( + "context" + "fmt" + "io" + "net/http" +) + +// maxResponseBodyBytes is a hard safety limit on HTTP response bodies. +// API responses should be small, so this protects us from accidental +// or malicious large responses. +const maxResponseBodyBytes = 2 << 21 // 4 MiB + +func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + if userAgent != "" { + req.Header.Set("User-Agent", userAgent) + } + if accept != "" { + req.Header.Set("Accept", accept) + } + + res, err := client.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return nil, fmt.Errorf("HTTP %s", res.Status) + } + + // Read at most maxResponseBodyBytes + 1 so we can detect overflow. + limited := io.LimitReader(res.Body, maxResponseBodyBytes+1) + + b, err := io.ReadAll(limited) + if err != nil { + return nil, err + } + + if len(b) == 0 { + return nil, fmt.Errorf("empty response body") + } + + if len(b) > maxResponseBodyBytes { + return nil, fmt.Errorf("response body too large (>%d bytes)", maxResponseBodyBytes) + } + + return b, nil +} diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index d4b9140..4e7c2b7 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -5,13 +5,13 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) @@ -133,30 +133,9 @@ type observationMeta struct { } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) + b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/geo+json, application/json") if err != nil { - return nil, observationMeta{}, err - } - - req.Header.Set("User-Agent", s.userAgent) - req.Header.Set("Accept", "application/geo+json, application/json") - - res, err := s.client.Do(req) - if err != nil { - return nil, observationMeta{}, err - } - defer res.Body.Close() - - if res.StatusCode < 200 || res.StatusCode >= 300 { - return nil, observationMeta{}, fmt.Errorf("nws_observation %q: HTTP %s", s.name, res.Status) - } - - b, err := io.ReadAll(res.Body) - if err != nil { - return nil, observationMeta{}, err - } - if len(b) == 0 { - return nil, observationMeta{}, fmt.Errorf("nws_observation %q: empty response body", s.name) + return nil, observationMeta{}, fmt.Errorf("nws_observation %q: %w", s.name, err) } raw := json.RawMessage(b) diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index 08206f7..524b26d 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -5,13 +5,13 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) @@ -126,30 +126,9 @@ type openMeteoMeta struct { } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) + b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json") if err != nil { - return nil, openMeteoMeta{}, err - } - - req.Header.Set("User-Agent", s.userAgent) - req.Header.Set("Accept", "application/json") - - res, err := s.client.Do(req) - if err != nil { - return nil, openMeteoMeta{}, err - } - defer res.Body.Close() - - if res.StatusCode < 200 || res.StatusCode >= 300 { - return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: HTTP %s", s.name, res.Status) - } - - b, err := io.ReadAll(res.Body) - if err != nil { - return nil, openMeteoMeta{}, err - } - if len(b) == 0 { - return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: empty response body", s.name) + return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: %w", s.name, err) } raw := json.RawMessage(b) diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 231858a..1190ccf 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "net/url" "strings" @@ -13,6 +12,7 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) @@ -139,30 +139,9 @@ type openWeatherMeta struct { } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) + b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json") if err != nil { - return nil, openWeatherMeta{}, err - } - - req.Header.Set("User-Agent", s.userAgent) - req.Header.Set("Accept", "application/json") - - res, err := s.client.Do(req) - if err != nil { - return nil, openWeatherMeta{}, err - } - defer res.Body.Close() - - if res.StatusCode < 200 || res.StatusCode >= 300 { - return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: HTTP %s", s.name, res.Status) - } - - b, err := io.ReadAll(res.Body) - if err != nil { - return nil, openWeatherMeta{}, err - } - if len(b) == 0 { - return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: empty response body", s.name) + return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: %w", s.name, err) } raw := json.RawMessage(b)