sources: standardize Event.ID on Source:EffectiveAt; simplify raw event helper

- Adopt an opinionated Event.ID policy across sources:
  - use upstream-provided ID when available
  - otherwise derive a stable ID from Source:EffectiveAt (RFC3339Nano, UTC)
  - fall back to Source:EmittedAt when EffectiveAt is unavailable
- Add common/id helper to centralize ID selection logic and keep sources consistent
- Simplify common event construction by collapsing SingleRawEventAt/SingleRawEvent
  into a single explicit SingleRawEvent helper (emittedAt passed in)
- Update NWS/Open-Meteo/OpenWeather observation sources to:
  - compute EffectiveAt first
  - generate IDs via the shared helper
  - build envelopes via the unified SingleRawEvent helper
- Improve determinism and dedupe-friendliness without changing schemas or payloads
This commit is contained in:
2026-01-15 19:38:15 -06:00
parent d9474b5a5b
commit d8db58c004
5 changed files with 76 additions and 110 deletions

View File

@@ -15,12 +15,6 @@ import (
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
)
// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits a RAW observation Event.
//
// IMPORTANT UNIT POLICY (weatherfeeder convention):
// OpenWeather changes units based on the `units` query parameter but does NOT include the unit
// system in the response body. To keep normalization deterministic, this driver *requires*
// `units=metric`. If absent (or non-metric), the driver returns an error.
type ObservationSource struct {
http *common.HTTPSource
}
@@ -45,7 +39,6 @@ func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
// Re-check policy defensively (in case the URL is mutated after construction).
if err := requireMetricUnits(s.http.URL); err != nil {
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
}
@@ -55,22 +48,21 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
return nil, err
}
eventID := buildEventID(s.http.Name, meta)
if strings.TrimSpace(eventID) == "" {
eventID = fmt.Sprintf("openweather:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano))
}
var effectiveAt *time.Time
if !meta.ParsedTimestamp.IsZero() {
t := meta.ParsedTimestamp.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawOpenWeatherCurrentV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
@@ -81,14 +73,6 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
type openWeatherMeta struct {
Dt int64 `json:"dt"` // unix seconds, UTC
ID int64 `json:"id"`
Name string `json:"name"`
Coord struct {
Lon float64 `json:"lon"`
Lat float64 `json:"lat"`
} `json:"coord"`
ParsedTimestamp time.Time `json:"-"`
}
@@ -110,24 +94,6 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open
return raw, meta, nil
}
func buildEventID(sourceName string, meta openWeatherMeta) string {
locKey := ""
if meta.ID != 0 {
locKey = fmt.Sprintf("city:%d", meta.ID)
} else if meta.Coord.Lat != 0 || meta.Coord.Lon != 0 {
locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Coord.Lat, meta.Coord.Lon)
} else {
locKey = "loc:unknown"
}
ts := meta.ParsedTimestamp
if ts.IsZero() {
ts = time.Now().UTC()
}
return fmt.Sprintf("openweather:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano))
}
func requireMetricUnits(rawURL string) error {
u, err := url.Parse(strings.TrimSpace(rawURL))
if err != nil {