feat(nws, normalizers): add NWS hourly forecast normalization and enforce canonical float rounding
- Implement full NWS hourly forecast normalizer (raw.nws.hourly.forecast.v1 → weather.forecast.v1) - Add GeoJSON forecast types and helpers for NWS gridpoint hourly payloads - Normalize temperatures, winds, humidity, PoP, and infer WMO condition codes from forecast text/icons - Treat forecast IssuedAt as EffectiveAt for stable, dedupe-friendly event IDs - Introduce project-wide float rounding at normalization finalization - Round all float values in canonical payloads to 2 decimal places - Apply consistently across pointers, slices, maps, and nested structs - Preserve opaque structs (e.g., time.Time) unchanged - Add SchemaRawNWSHourlyForecastV1 and align schema matching/comments - Clean up NWS helper organization and comments - Update documentation to reflect numeric wire-format and normalization policies This establishes a complete, deterministic hourly forecast pipeline for NWS and improves JSON output stability across all canonical weather schemas.
This commit is contained in:
@@ -11,39 +11,9 @@
|
||||
// - minimal abstractions (prefer straightforward functions)
|
||||
// - easy to unit test
|
||||
//
|
||||
// What belongs here
|
||||
// -----------------
|
||||
// Put code in internal/normalizers/common when it is:
|
||||
//
|
||||
// - potentially reusable by more than one provider
|
||||
// - provider-agnostic (no NWS/OpenWeather/Open-Meteo specific assumptions)
|
||||
// - stable, small, and readable
|
||||
//
|
||||
// Typical examples:
|
||||
// - unit conversion helpers (°F <-> °C, m/s <-> km/h, hPa <-> Pa, etc.)
|
||||
// - json.RawMessage payload extraction helpers (with good error messages)
|
||||
// - shared parsing helpers (timestamps, simple numeric coercions)
|
||||
// - generic fallbacks (e.g., mapping a human text description into a coarse canonical code),
|
||||
// so long as the logic truly applies across providers
|
||||
//
|
||||
// What does NOT belong here
|
||||
// -------------------------
|
||||
// Do NOT put the following in this package:
|
||||
//
|
||||
// - Normalizer implementations (types that satisfy feedkit/normalize.Normalizer)
|
||||
// - provider-specific JSON structs or mapping logic (put those under
|
||||
// internal/normalizers/<provider>/)
|
||||
// - network or filesystem I/O (sources fetch; normalizers transform)
|
||||
// - code that depends on event.Source naming, config fields, or driver-specific params
|
||||
//
|
||||
// Style and API guidelines
|
||||
// ------------------------
|
||||
// - Prefer small, single-purpose functions.
|
||||
// - Keep function names explicit (avoid clever generic “DoThing” helpers).
|
||||
// - Return typed errors with context (include schema/field names where helpful).
|
||||
// - Keep dependencies minimal: standard library + weatherfeeder packages only.
|
||||
// - Add unit tests for any non-trivial logic (especially parsing and fallbacks).
|
||||
//
|
||||
// Keeping this clean matters: common is shared by all providers, so complexity here
|
||||
// multiplies across the project.
|
||||
// Numeric wire policy
|
||||
// -------------------
|
||||
// Canonical payloads are intended for sinks/serialization. To keep output stable and readable,
|
||||
// weatherfeeder rounds floating-point values in canonical payloads to a small, fixed precision
|
||||
// at finalization time (see round.go).
|
||||
package common
|
||||
|
||||
@@ -14,10 +14,13 @@ import (
|
||||
// - ID/Kind/Source/EmittedAt are preserved by copying the input event.
|
||||
// - EffectiveAt is only overwritten when effectiveAt is non-zero.
|
||||
// If effectiveAt is zero, any existing in.EffectiveAt is preserved.
|
||||
// - Payload floats are rounded to a stable wire-friendly precision (see round.go).
|
||||
func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) {
|
||||
out := in
|
||||
out.Schema = outSchema
|
||||
out.Payload = outPayload
|
||||
|
||||
// Enforce stable numeric presentation for sinks: round floats in the canonical payload.
|
||||
out.Payload = RoundFloats(outPayload, DefaultFloatPrecision)
|
||||
|
||||
if !effectiveAt.IsZero() {
|
||||
t := effectiveAt.UTC()
|
||||
|
||||
215
internal/normalizers/common/round.go
Normal file
215
internal/normalizers/common/round.go
Normal file
@@ -0,0 +1,215 @@
|
||||
// FILE: ./internal/normalizers/common/round.go
|
||||
package common
|
||||
|
||||
import (
|
||||
"math"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
// DefaultFloatPrecision is the project-wide wire-format policy for floating-point
|
||||
// values in canonical payloads (weather.* schemas).
|
||||
//
|
||||
// Note: encoding/json will not necessarily print trailing zeros (e.g. 1.50 -> 1.5),
|
||||
// but values will be *rounded* to this number of digits after the decimal point.
|
||||
const DefaultFloatPrecision = 2
|
||||
|
||||
// RoundFloats returns a copy of v with all float32/float64 values (including pointers,
|
||||
// slices, arrays, maps, and nested exported-struct fields) rounded to `decimals` digits
|
||||
// after the decimal point.
|
||||
//
|
||||
// This is a best-effort helper meant for presentation stability. If reflection hits an
|
||||
// unsupported/opaque type (e.g. structs with unexported fields like time.Time), that
|
||||
// subtree is left unchanged.
|
||||
func RoundFloats(v any, decimals int) any {
|
||||
if v == nil || decimals < 0 {
|
||||
return v
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// Never let presentation formatting crash the pipeline.
|
||||
_ = recover()
|
||||
}()
|
||||
|
||||
rv := reflect.ValueOf(v)
|
||||
out := roundValue(rv, decimals)
|
||||
if !out.IsValid() {
|
||||
return v
|
||||
}
|
||||
return out.Interface()
|
||||
}
|
||||
|
||||
func roundValue(v reflect.Value, decimals int) reflect.Value {
|
||||
if !v.IsValid() {
|
||||
return v
|
||||
}
|
||||
|
||||
// Unwrap interfaces.
|
||||
if v.Kind() == reflect.Interface {
|
||||
if v.IsNil() {
|
||||
return v
|
||||
}
|
||||
elem := roundValue(v.Elem(), decimals)
|
||||
|
||||
// Re-wrap in the same interface type.
|
||||
out := reflect.New(v.Type()).Elem()
|
||||
if elem.IsValid() && elem.Type().AssignableTo(v.Type()) {
|
||||
out.Set(elem)
|
||||
return out
|
||||
}
|
||||
if elem.IsValid() && elem.Type().AssignableTo(v.Type()) {
|
||||
out.Set(elem)
|
||||
return out
|
||||
}
|
||||
if elem.IsValid() && elem.Type().ConvertibleTo(v.Type()) {
|
||||
out.Set(elem.Convert(v.Type()))
|
||||
return out
|
||||
}
|
||||
// If we can't sensibly re-wrap, just keep the original.
|
||||
return v
|
||||
}
|
||||
|
||||
// Copy pointers (and round their targets).
|
||||
if v.Kind() == reflect.Pointer {
|
||||
if v.IsNil() {
|
||||
return v
|
||||
}
|
||||
|
||||
// If the pointed-to type is an opaque struct (e.g. time.Time), keep as-is.
|
||||
if v.Elem().Kind() == reflect.Struct && isOpaqueStruct(v.Elem().Type()) {
|
||||
return v
|
||||
}
|
||||
|
||||
elem := roundValue(v.Elem(), decimals)
|
||||
p := reflect.New(v.Type().Elem())
|
||||
if elem.IsValid() && elem.Type().AssignableTo(v.Type().Elem()) {
|
||||
p.Elem().Set(elem)
|
||||
} else if elem.IsValid() && elem.Type().ConvertibleTo(v.Type().Elem()) {
|
||||
p.Elem().Set(elem.Convert(v.Type().Elem()))
|
||||
} else {
|
||||
p.Elem().Set(v.Elem())
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
switch v.Kind() {
|
||||
case reflect.Float32, reflect.Float64:
|
||||
f := v.Convert(reflect.TypeOf(float64(0))).Float()
|
||||
r := roundFloat64(f, decimals)
|
||||
return reflect.ValueOf(r).Convert(v.Type())
|
||||
|
||||
case reflect.Struct:
|
||||
// Avoid reconstructing opaque structs (time.Time has unexported fields).
|
||||
if isOpaqueStruct(v.Type()) {
|
||||
return v
|
||||
}
|
||||
|
||||
out := reflect.New(v.Type()).Elem()
|
||||
out.Set(v) // start from a copy, then replace rounded fields
|
||||
|
||||
t := v.Type()
|
||||
for i := 0; i < v.NumField(); i++ {
|
||||
sf := t.Field(i)
|
||||
|
||||
// Only exported fields are safely settable across packages.
|
||||
if sf.PkgPath != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
fv := v.Field(i)
|
||||
rf := roundValue(fv, decimals)
|
||||
|
||||
of := out.Field(i)
|
||||
if !of.CanSet() {
|
||||
continue
|
||||
}
|
||||
|
||||
if rf.IsValid() && rf.Type().AssignableTo(of.Type()) {
|
||||
of.Set(rf)
|
||||
} else if rf.IsValid() && rf.Type().ConvertibleTo(of.Type()) {
|
||||
of.Set(rf.Convert(of.Type()))
|
||||
}
|
||||
}
|
||||
return out
|
||||
|
||||
case reflect.Slice:
|
||||
if v.IsNil() {
|
||||
return v
|
||||
}
|
||||
out := reflect.MakeSlice(v.Type(), v.Len(), v.Len())
|
||||
for i := 0; i < v.Len(); i++ {
|
||||
ev := v.Index(i)
|
||||
re := roundValue(ev, decimals)
|
||||
if re.IsValid() && re.Type().AssignableTo(out.Index(i).Type()) {
|
||||
out.Index(i).Set(re)
|
||||
} else if re.IsValid() && re.Type().ConvertibleTo(out.Index(i).Type()) {
|
||||
out.Index(i).Set(re.Convert(out.Index(i).Type()))
|
||||
} else {
|
||||
out.Index(i).Set(ev)
|
||||
}
|
||||
}
|
||||
return out
|
||||
|
||||
case reflect.Array:
|
||||
out := reflect.New(v.Type()).Elem()
|
||||
out.Set(v)
|
||||
for i := 0; i < v.Len(); i++ {
|
||||
ev := v.Index(i)
|
||||
re := roundValue(ev, decimals)
|
||||
if re.IsValid() && re.Type().AssignableTo(out.Index(i).Type()) {
|
||||
out.Index(i).Set(re)
|
||||
} else if re.IsValid() && re.Type().ConvertibleTo(out.Index(i).Type()) {
|
||||
out.Index(i).Set(re.Convert(out.Index(i).Type()))
|
||||
} else {
|
||||
out.Index(i).Set(ev)
|
||||
}
|
||||
}
|
||||
return out
|
||||
|
||||
case reflect.Map:
|
||||
if v.IsNil() {
|
||||
return v
|
||||
}
|
||||
out := reflect.MakeMapWithSize(v.Type(), v.Len())
|
||||
iter := v.MapRange()
|
||||
for iter.Next() {
|
||||
k := iter.Key()
|
||||
mv := iter.Value()
|
||||
rv := roundValue(mv, decimals)
|
||||
|
||||
if rv.IsValid() && rv.Type().AssignableTo(v.Type().Elem()) {
|
||||
out.SetMapIndex(k, rv)
|
||||
} else if rv.IsValid() && rv.Type().ConvertibleTo(v.Type().Elem()) {
|
||||
out.SetMapIndex(k, rv.Convert(v.Type().Elem()))
|
||||
} else {
|
||||
out.SetMapIndex(k, mv)
|
||||
}
|
||||
}
|
||||
return out
|
||||
|
||||
default:
|
||||
// ints, strings, bools, time.Time (handled as opaque), etc.
|
||||
return v
|
||||
}
|
||||
}
|
||||
|
||||
func roundFloat64(f float64, decimals int) float64 {
|
||||
if decimals <= 0 {
|
||||
return math.Round(f)
|
||||
}
|
||||
pow := math.Pow10(decimals)
|
||||
return math.Round(f*pow) / pow
|
||||
}
|
||||
|
||||
// isOpaqueStruct returns true for structs that are unsafe/unhelpful to reconstruct via reflection.
|
||||
// Any struct containing unexported fields (e.g. time.Time) is treated as opaque.
|
||||
func isOpaqueStruct(t reflect.Type) bool {
|
||||
if t.Kind() != reflect.Struct {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < t.NumField(); i++ {
|
||||
if t.Field(i).PkgPath != "" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
Reference in New Issue
Block a user