normalizers: implemented openmeteo forecast normalizer.

This commit is contained in:
2026-01-17 10:16:50 -06:00
parent b47f1b2051
commit c12cf91115
6 changed files with 328 additions and 47 deletions

View File

@@ -11,7 +11,7 @@ import (
//
// Note: encoding/json will not necessarily print trailing zeros (e.g. 1.50 -> 1.5),
// but values will be *rounded* to this number of digits after the decimal point.
const DefaultFloatPrecision = 2
const DefaultFloatPrecision = 4
// RoundFloats returns a copy of v with all float32/float64 values (including pointers,
// slices, arrays, maps, and nested exported-struct fields) rounded to `decimals` digits

View File

@@ -0,0 +1,243 @@
package openmeteo
import (
"context"
"fmt"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
)
// ForecastNormalizer converts:
//
// standards.SchemaRawOpenMeteoHourlyForecastV1 -> standards.SchemaWeatherForecastV1
//
// It interprets Open-Meteo hourly forecast JSON and maps it into the canonical
// model.WeatherForecastRun representation.
//
// Caveats / assumptions:
// - Open-Meteo does not provide a true "issued at" timestamp; IssuedAt uses
// Event.EmittedAt when present, otherwise the first hourly time.
// - Hourly payloads are array-oriented; missing fields are treated as nil per-period.
// - Snowfall is provided in centimeters and is converted to millimeters.
// - apparent_temperature is ignored (no canonical "feels like" field).
type ForecastNormalizer struct{}
func (ForecastNormalizer) Match(e event.Event) bool {
return strings.TrimSpace(e.Schema) == standards.SchemaRawOpenMeteoHourlyForecastV1
}
func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
// If present, prefer the existing event EmittedAt as IssuedAt.
var fallbackIssued time.Time
if !in.EmittedAt.IsZero() {
fallbackIssued = in.EmittedAt.UTC()
}
return normcommon.NormalizeJSON(
in,
"openmeteo hourly forecast",
standards.SchemaWeatherForecastV1,
func(parsed omForecastResponse) (model.WeatherForecastRun, time.Time, error) {
return buildForecast(parsed, fallbackIssued)
},
)
}
// buildForecast contains the domain mapping logic (provider -> canonical model).
func buildForecast(parsed omForecastResponse, fallbackIssued time.Time) (model.WeatherForecastRun, time.Time, error) {
times := parsed.Hourly.Time
if len(times) == 0 {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing hourly.time")
}
issuedAt := fallbackIssued.UTC()
run := model.WeatherForecastRun{
LocationID: normcommon.SynthStationIDPtr("OPENMETEO", parsed.Latitude, parsed.Longitude),
LocationName: "Open-Meteo",
IssuedAt: time.Time{},
UpdatedAt: nil,
Product: model.ForecastProductHourly,
Latitude: floatCopy(parsed.Latitude),
Longitude: floatCopy(parsed.Longitude),
ElevationMeters: floatCopy(parsed.Elevation),
Periods: nil,
}
periods := make([]model.WeatherForecastPeriod, 0, len(times))
var prevStart time.Time
for i := range times {
start, err := parseHourlyTime(parsed, i)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
if issuedAt.IsZero() && i == 0 {
issuedAt = start
}
end, err := parsePeriodEnd(parsed, i, start, prevStart)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
prevStart = start
var isDay *bool
if v := intAt(parsed.Hourly.IsDay, i); v != nil {
b := *v == 1
isDay = &b
}
wmo := wmoAt(parsed.Hourly.WeatherCode, i)
canonicalText := standards.WMOText(wmo, isDay)
period := model.WeatherForecastPeriod{
StartTime: start,
EndTime: end,
Name: "",
IsDay: isDay,
ConditionCode: wmo,
ConditionText: canonicalText,
ProviderRawDescription: "",
TextDescription: canonicalText,
DetailedText: "",
IconURL: "",
}
if v := floatAt(parsed.Hourly.Temperature2m, i); v != nil {
period.TemperatureC = v
}
if v := floatAt(parsed.Hourly.DewPoint2m, i); v != nil {
period.DewpointC = v
}
if v := floatAt(parsed.Hourly.RelativeHumidity2m, i); v != nil {
period.RelativeHumidityPercent = v
}
if v := floatAt(parsed.Hourly.WindDirection10m, i); v != nil {
period.WindDirectionDegrees = v
}
if v := floatAt(parsed.Hourly.WindSpeed10m, i); v != nil {
period.WindSpeedKmh = v
}
if v := floatAt(parsed.Hourly.WindGusts10m, i); v != nil {
period.WindGustKmh = v
}
if v := floatAt(parsed.Hourly.Visibility, i); v != nil {
period.VisibilityMeters = v
}
if v := floatAt(parsed.Hourly.CloudCover, i); v != nil {
period.CloudCoverPercent = v
}
if v := floatAt(parsed.Hourly.UVIndex, i); v != nil {
period.UVIndex = v
}
if v := floatAt(parsed.Hourly.PrecipProbability, i); v != nil {
period.ProbabilityOfPrecipitationPercent = v
}
if v := floatAt(parsed.Hourly.Precipitation, i); v != nil {
period.PrecipitationAmountMm = v
}
if v := floatAt(parsed.Hourly.Snowfall, i); v != nil {
mm := *v * 10.0
period.SnowfallDepthMM = &mm
}
if v := floatAt(parsed.Hourly.SurfacePressure, i); v != nil {
pa := normcommon.PressurePaFromHPa(*v)
period.BarometricPressurePa = &pa
} else if v := floatAt(parsed.Hourly.PressureMSL, i); v != nil {
pa := normcommon.PressurePaFromHPa(*v)
period.BarometricPressurePa = &pa
}
periods = append(periods, period)
}
run.Periods = periods
if issuedAt.IsZero() {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing issuedAt")
}
run.IssuedAt = issuedAt
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
}
func parseHourlyTime(parsed omForecastResponse, idx int) (time.Time, error) {
if idx < 0 || idx >= len(parsed.Hourly.Time) {
return time.Time{}, fmt.Errorf("hourly.time[%d] missing", idx)
}
raw := strings.TrimSpace(parsed.Hourly.Time[idx])
if raw == "" {
return time.Time{}, fmt.Errorf("hourly.time[%d] empty", idx)
}
t, err := omcommon.ParseTime(raw, parsed.Timezone, parsed.UTCOffsetSeconds)
if err != nil {
return time.Time{}, fmt.Errorf("hourly.time[%d] invalid %q: %w", idx, raw, err)
}
return t.UTC(), nil
}
func parsePeriodEnd(parsed omForecastResponse, idx int, start, prevStart time.Time) (time.Time, error) {
if idx+1 < len(parsed.Hourly.Time) {
return parseHourlyTime(parsed, idx+1)
}
step := time.Hour
if !prevStart.IsZero() {
if d := start.Sub(prevStart); d > 0 {
step = d
}
}
return start.Add(step), nil
}
func floatCopy(v *float64) *float64 {
if v == nil {
return nil
}
out := *v
return &out
}
func floatAt(vals []*float64, idx int) *float64 {
if idx < 0 || idx >= len(vals) {
return nil
}
return floatCopy(vals[idx])
}
func intAt(vals []*int, idx int) *int {
if idx < 0 || idx >= len(vals) {
return nil
}
if vals[idx] == nil {
return nil
}
out := *vals[idx]
return &out
}
func wmoAt(vals []*int, idx int) model.WMOCode {
if v := intAt(vals, idx); v != nil {
return model.WMOCode(*v)
}
return model.WMOUnknown
}

View File

@@ -13,4 +13,6 @@ func Register(reg *fknormalize.Registry) {
// Observations
reg.Register(ObservationNormalizer{})
// Forecasts
reg.Register(ForecastNormalizer{})
}

View File

@@ -31,3 +31,38 @@ type omCurrent struct {
IsDay *int `json:"is_day"` // 0/1
}
// omForecastResponse is a minimal-but-sufficient representation of Open-Meteo
// hourly forecast payloads for mapping into model.WeatherForecastRun.
type omForecastResponse struct {
Latitude *float64 `json:"latitude"`
Longitude *float64 `json:"longitude"`
Timezone string `json:"timezone"`
UTCOffsetSeconds int `json:"utc_offset_seconds"`
Elevation *float64 `json:"elevation"`
Hourly omForecastHourly `json:"hourly"`
}
type omForecastHourly struct {
Time []string `json:"time"`
Temperature2m []*float64 `json:"temperature_2m"`
RelativeHumidity2m []*float64 `json:"relative_humidity_2m"`
DewPoint2m []*float64 `json:"dew_point_2m"`
ApparentTemp []*float64 `json:"apparent_temperature"`
PrecipProbability []*float64 `json:"precipitation_probability"`
Precipitation []*float64 `json:"precipitation"`
Snowfall []*float64 `json:"snowfall"`
WeatherCode []*int `json:"weather_code"`
SurfacePressure []*float64 `json:"surface_pressure"`
PressureMSL []*float64 `json:"pressure_msl"`
WindSpeed10m []*float64 `json:"wind_speed_10m"`
WindDirection10m []*float64 `json:"wind_direction_10m"`
WindGusts10m []*float64 `json:"wind_gusts_10m"`
IsDay []*int `json:"is_day"`
CloudCover []*float64 `json:"cloud_cover"`
Visibility []*float64 `json:"visibility"`
UVIndex []*float64 `json:"uv_index"`
}