5 Commits

Author SHA1 Message Date
129cebd94d Updated the normalized observation schema to remove duplicate and/or unnecessary fields
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-17 11:04:51 -05:00
e42f2bc9de Remove incorrect 'internal/' prefix from model package file header comments 2026-03-17 09:46:39 -05:00
9ddcf5e0df Document the PostgreSQL schema contract in doc.go
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-17 09:33:07 -05:00
d0b58a4734 Updates to track feedkit v0.7.2 and to add a dedupe processor
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-16 18:35:44 -05:00
6cd823f528 Update go.mod
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-16 15:37:31 -05:00
30 changed files with 498 additions and 230 deletions

22
API.md
View File

@@ -80,34 +80,18 @@ A `WeatherObservation` represents a point-in-time observation for a station/loca
| `stationName` | string | no | Human station name | | `stationName` | string | no | Human station name |
| `timestamp` | timestamp string | yes | Observation timestamp | | `timestamp` | timestamp string | yes | Observation timestamp |
| `conditionCode` | int | yes | WMO code (`-1` unknown) | | `conditionCode` | int | yes | WMO code (`-1` unknown) |
| `conditionText` | string | no | Canonical short condition text |
| `isDay` | bool | no | Day/night hint | | `isDay` | bool | no | Day/night hint |
| `providerRawDescription` | string | no | Provider-specific evidence text | | `textDescription` | string | no | Human-facing short description |
| `textDescription` | string | no | Legacy/transitional text description |
| `iconUrl` | string | no | Legacy/transitional icon URL |
| `temperatureC` | number | no | Celsius | | `temperatureC` | number | no | Celsius |
| `dewpointC` | number | no | Celsius | | `dewpointC` | number | no | Celsius |
| `windDirectionDegrees` | number | no | Degrees | | `windDirectionDegrees` | number | no | Degrees |
| `windSpeedKmh` | number | no | km/h | | `windSpeedKmh` | number | no | km/h |
| `windGustKmh` | number | no | km/h | | `windGustKmh` | number | no | km/h |
| `barometricPressurePa` | number | no | Pascals | | `barometricPressurePa` | number | no | Pascals |
| `seaLevelPressurePa` | number | no | Pascals |
| `visibilityMeters` | number | no | Meters | | `visibilityMeters` | number | no | Meters |
| `relativeHumidityPercent` | number | no | Percent | | `relativeHumidityPercent` | number | no | Percent |
| `apparentTemperatureC` | number | no | Celsius | | `apparentTemperatureC` | number | no | Celsius |
| `elevationMeters` | number | no | Meters |
| `rawMessage` | string | no | Provider raw message (for example METAR) |
| `presentWeather` | array | no | Provider-specific structured weather fragments | | `presentWeather` | array | no | Provider-specific structured weather fragments |
| `cloudLayers` | array | no | Cloud layer details |
### Nested: `cloudLayers[]`
Each `cloudLayers[]` element:
| Field | Type | Required | Notes |
|---|---:|:---:|---|
| `baseMeters` | number | no | Cloud base altitude in meters |
| `amount` | string | no | Provider string (e.g. FEW/SCT/BKN/OVC) |
### Nested: `presentWeather[]` ### Nested: `presentWeather[]`
@@ -241,7 +225,7 @@ A run may contain zero, one, or many alerts.
- Consumers **must** ignore unknown fields. - Consumers **must** ignore unknown fields.
- Producers (weatherfeeder) prefer **additive changes** within a schema version. - Producers (weatherfeeder) prefer **additive changes** within a schema version.
- Renames/removals/semantic breaks require a **schema version bump** (`weather.*.v2`). - Renames/removals/semantic breaks normally require a **schema version bump** (`weather.*.v2`); pre-1.0 projects may choose in-place changes.
--- ---
@@ -259,7 +243,7 @@ A run may contain zero, one, or many alerts.
"stationId": "KSTL", "stationId": "KSTL",
"timestamp": "2026-01-17T14:00:00Z", "timestamp": "2026-01-17T14:00:00Z",
"conditionCode": 1, "conditionCode": 1,
"conditionText": "Mainly Sunny", "textDescription": "Mainly Sunny",
"temperatureC": 3.25, "temperatureC": 3.25,
"windSpeedKmh": 18.5 "windSpeedKmh": 18.5
} }

View File

@@ -15,9 +15,10 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
@@ -27,6 +28,8 @@ import (
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources" wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
) )
const dedupeMaxEntries = 2048
func main() { func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds) log.SetFlags(log.LstdFlags | log.Lmicroseconds)
@@ -103,9 +106,9 @@ func main() {
events := make(chan fkevent.Event, 256) events := make(chan fkevent.Event, 256)
// --- Normalization (optional) --- // --- Processors ---
// //
// We install feedkit's normalize.Processor even before any normalizers exist. // We install feedkit's processors/normalize.Processor even before any normalizers exist.
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough. // With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as: // It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and // 1) sources emit raw schemas (raw.*), and
@@ -116,8 +119,9 @@ func main() {
procReg.Register("normalize", func() (fkprocessors.Processor, error) { procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil return fknormalize.NewProcessor(normalizers, false), nil
}) })
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
chain, err := procReg.BuildChain([]string{"normalize"}) chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
if err != nil { if err != nil {
log.Fatalf("build processor chain failed: %v", err) log.Fatalf("build processor chain failed: %v", err)
} }

View File

@@ -9,9 +9,10 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
) )
@@ -97,39 +98,23 @@ func TestExampleConfigLoads(t *testing.T) {
} }
} }
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) { func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil) chain, err := buildProcessorChainForTests()
if len(normalizers) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil { if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err) t.Fatalf("BuildChain() unexpected error: %v", err)
} }
if len(chain) != 1 { if len(chain) != 2 {
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain)) t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain))
} }
pl := &fkpipeline.Pipeline{Processors: chain} pl := &fkpipeline.Pipeline{Processors: chain}
if len(pl.Processors) != 1 { if len(pl.Processors) != 2 {
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors)) t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors))
} }
} }
func TestNormalizeNoMatchPassThrough(t *testing.T) { func TestNormalizeNoMatchPassThrough(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil) chain, err := buildProcessorChainForTests()
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil { if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err) t.Fatalf("BuildChain() unexpected error: %v", err)
} }
@@ -157,3 +142,50 @@ func TestNormalizeNoMatchPassThrough(t *testing.T) {
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out) t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
} }
} }
func TestDedupeDropsSecondEventWithSameID(t *testing.T) {
chain, err := buildProcessorChainForTests()
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
pl := &fkpipeline.Pipeline{Processors: chain}
in := fkevent.Event{
ID: "evt-dedupe-1",
Kind: fkevent.Kind("observation"),
Source: "test",
EmittedAt: time.Now().UTC(),
Schema: "raw.weatherfeeder.unknown.v1",
Payload: map[string]any{
"ok": true,
},
}
first, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("first Pipeline.Process() unexpected error: %v", err)
}
if first == nil {
t.Fatalf("first Pipeline.Process() unexpectedly dropped event")
}
second, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("second Pipeline.Process() unexpected error: %v", err)
}
if second != nil {
t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second)
}
}
func buildProcessorChainForTests() ([]fkprocessors.Processor, error) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
return procReg.BuildChain([]string{"normalize", "dedupe"})
}

3
go.mod
View File

@@ -2,10 +2,11 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25 go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.7.0 require gitea.maximumdirect.net/ejr/feedkit v0.7.2
require ( require (
github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/compress v1.17.2 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/nats-io/nats.go v1.34.0 // indirect github.com/nats-io/nats.go v1.34.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect

6
go.sum
View File

@@ -1,7 +1,9 @@
gitea.maximumdirect.net/ejr/feedkit v0.7.0 h1:qXbsD30BH1HkKf579B4Qu3pDiT9mr+8DmDwzd3IXUoo= gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
gitea.maximumdirect.net/ejr/feedkit v0.7.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls= gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk= github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=

View File

@@ -2,7 +2,7 @@
package normalizers package normalizers
import ( import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"

View File

@@ -4,7 +4,7 @@ import (
"reflect" "reflect"
"testing" "testing"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"

View File

@@ -29,7 +29,7 @@
// //
// 1. One normalizer per file. // 1. One normalizer per file.
// Each file contains exactly one Normalizer implementation (one type that // Each file contains exactly one Normalizer implementation (one type that
// satisfies feedkit/normalize.Normalizer). // satisfies feedkit/processors/normalize.Normalizer).
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long // Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
// as they do not define additional Normalizer types. // as they do not define additional Normalizer types.
// //

View File

@@ -54,14 +54,6 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
ts = t.UTC() ts = t.UTC()
} }
cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers))
for _, cl := range parsed.Properties.CloudLayers {
cloudLayers = append(cloudLayers, model.CloudLayer{
BaseMeters: cl.Base.Value,
Amount: cl.Amount,
})
}
// Preserve raw presentWeather objects (for troubleshooting / drift analysis). // Preserve raw presentWeather objects (for troubleshooting / drift analysis).
present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather)) present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather))
for _, pw := range parsed.Properties.PresentWeather { for _, pw := range parsed.Properties.PresentWeather {
@@ -70,6 +62,7 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
// Decode presentWeather into typed METAR phenomena for mapping. // Decode presentWeather into typed METAR phenomena for mapping.
phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather) phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather)
cloudLayers := parsed.Properties.CloudLayers
providerDesc := strings.TrimSpace(parsed.Properties.TextDescription) providerDesc := strings.TrimSpace(parsed.Properties.TextDescription)
@@ -81,9 +74,6 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
isDay = isDayFromLatLonTime(*lat, *lon, ts) isDay = isDayFromLatLonTime(*lat, *lon, ts)
} }
// Canonical condition text comes from our WMO table.
canonicalText := standards.WMOText(wmo, isDay)
// Apparent temperature: prefer wind chill when both are supplied. // Apparent temperature: prefer wind chill when both are supplied.
var apparentC *float64 var apparentC *float64
if parsed.Properties.WindChill.Value != nil { if parsed.Properties.WindChill.Value != nil {
@@ -98,15 +88,9 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
Timestamp: ts, Timestamp: ts,
ConditionCode: wmo, ConditionCode: wmo,
ConditionText: canonicalText,
IsDay: isDay, IsDay: isDay,
ProviderRawDescription: providerDesc, TextDescription: providerDesc,
// Transitional / human-facing:
// keep output consistent by populating TextDescription from canonical text.
TextDescription: canonicalText,
IconURL: parsed.Properties.Icon,
TemperatureC: parsed.Properties.Temperature.Value, TemperatureC: parsed.Properties.Temperature.Value,
DewpointC: parsed.Properties.Dewpoint.Value, DewpointC: parsed.Properties.Dewpoint.Value,
@@ -115,19 +99,21 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
WindSpeedKmh: parsed.Properties.WindSpeed.Value, WindSpeedKmh: parsed.Properties.WindSpeed.Value,
WindGustKmh: parsed.Properties.WindGust.Value, WindGustKmh: parsed.Properties.WindGust.Value,
BarometricPressurePa: parsed.Properties.BarometricPressure.Value, BarometricPressurePa: pressurePrecedenceNWS(parsed.Properties.SeaLevelPressure.Value, parsed.Properties.BarometricPressure.Value),
SeaLevelPressurePa: parsed.Properties.SeaLevelPressure.Value,
VisibilityMeters: parsed.Properties.Visibility.Value, VisibilityMeters: parsed.Properties.Visibility.Value,
RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value, RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value,
ApparentTemperatureC: apparentC, ApparentTemperatureC: apparentC,
ElevationMeters: parsed.Properties.Elevation.Value,
RawMessage: parsed.Properties.RawMessage,
PresentWeather: present, PresentWeather: present,
CloudLayers: cloudLayers,
} }
return obs, ts, nil return obs, ts, nil
} }
func pressurePrecedenceNWS(seaLevelPa, barometricPa *float64) *float64 {
if seaLevelPa != nil {
return seaLevelPa
}
return barometricPa
}

View File

@@ -0,0 +1,51 @@
package nws
import "testing"
func TestBuildObservationTextDescriptionAndPressurePrecedence(t *testing.T) {
barometric := 100200.0
seaLevel := 101400.0
parsed := nwsObservationResponse{}
parsed.Properties.Timestamp = "2026-03-16T19:00:00Z"
parsed.Properties.TextDescription = " Overcast "
parsed.Properties.BarometricPressure.Value = &barometric
parsed.Properties.SeaLevelPressure.Value = &seaLevel
obs, _, err := buildObservation(parsed)
if err != nil {
t.Fatalf("buildObservation() error = %v", err)
}
if got, want := obs.TextDescription, "Overcast"; got != want {
t.Fatalf("TextDescription = %q, want %q", got, want)
}
if obs.BarometricPressurePa == nil {
t.Fatalf("BarometricPressurePa = nil, want non-nil")
}
if got, want := *obs.BarometricPressurePa, seaLevel; got != want {
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
}
}
func TestBuildObservationPressureFallbackToBarometric(t *testing.T) {
barometric := 99900.0
parsed := nwsObservationResponse{}
parsed.Properties.Timestamp = "2026-03-16T19:00:00Z"
parsed.Properties.TextDescription = "Cloudy"
parsed.Properties.BarometricPressure.Value = &barometric
obs, _, err := buildObservation(parsed)
if err != nil {
t.Fatalf("buildObservation() error = %v", err)
}
if obs.BarometricPressurePa == nil {
t.Fatalf("BarometricPressurePa = nil, want non-nil")
}
if got, want := *obs.BarometricPressurePa, barometric; got != want {
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
}
}

View File

@@ -2,7 +2,7 @@
package nws package nws
import ( import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// Register appends NWS normalizers in stable order. // Register appends NWS normalizers in stable order.

View File

@@ -86,14 +86,16 @@ type nwsObservationResponse struct {
// We decode these as generic maps, then optionally interpret them in metar.go. // We decode these as generic maps, then optionally interpret them in metar.go.
PresentWeather []map[string]any `json:"presentWeather"` PresentWeather []map[string]any `json:"presentWeather"`
CloudLayers []struct { CloudLayers []nwsCloudLayer `json:"cloudLayers"`
} `json:"properties"`
}
type nwsCloudLayer struct {
Base struct { Base struct {
UnitCode string `json:"unitCode"` UnitCode string `json:"unitCode"`
Value *float64 `json:"value"` Value *float64 `json:"value"`
} `json:"base"` } `json:"base"`
Amount string `json:"amount"` Amount string `json:"amount"`
} `json:"cloudLayers"`
} `json:"properties"`
} }
// nwsForecastResponse is a minimal-but-sufficient representation of the NWS // nwsForecastResponse is a minimal-but-sufficient representation of the NWS

View File

@@ -14,7 +14,7 @@ import (
// 1. METAR phenomena (presentWeather) — most reliable for precip/hazards // 1. METAR phenomena (presentWeather) — most reliable for precip/hazards
// 2. textDescription keywords — weaker, but reusable across providers // 2. textDescription keywords — weaker, but reusable across providers
// 3. cloud layers fallback — only for sky-only conditions // 3. cloud layers fallback — only for sky-only conditions
func mapNWSToWMO(providerDesc string, cloudLayers []model.CloudLayer, phenomena []metarPhenomenon) model.WMOCode { func mapNWSToWMO(providerDesc string, cloudLayers []nwsCloudLayer, phenomena []metarPhenomenon) model.WMOCode {
// 1) Prefer METAR phenomena if present. // 1) Prefer METAR phenomena if present.
if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown { if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown {
return code return code
@@ -167,7 +167,7 @@ func wmoFromPhenomena(phenomena []metarPhenomenon) model.WMOCode {
return model.WMOUnknown return model.WMOUnknown
} }
func wmoFromCloudLayers(cloudLayers []model.CloudLayer) model.WMOCode { func wmoFromCloudLayers(cloudLayers []nwsCloudLayer) model.WMOCode {
// NWS cloud layer amount values commonly include: // NWS cloud layer amount values commonly include:
// OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky) // OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky)
// //

View File

@@ -87,19 +87,8 @@ func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, e
Timestamp: ts, Timestamp: ts,
ConditionCode: wmo, ConditionCode: wmo,
ConditionText: canonicalText,
IsDay: isDay, IsDay: isDay,
// Open-Meteo does not provide a separate human text description for "current"
// when using weather_code; we leave provider evidence empty.
ProviderRawDescription: "",
// Transitional / human-facing:
// keep output consistent by populating TextDescription from canonical text.
TextDescription: canonicalText, TextDescription: canonicalText,
// IconURL: Open-Meteo does not provide an icon URL in this endpoint.
IconURL: "",
} }
// Measurements (all optional; only set when present). // Measurements (all optional; only set when present).
@@ -132,20 +121,13 @@ func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, e
obs.WindGustKmh = &v obs.WindGustKmh = &v
} }
if parsed.Current.SurfacePressure != nil { if parsed.Current.PressureMSL != nil {
v := normcommon.PressurePaFromHPa(*parsed.Current.PressureMSL)
obs.BarometricPressurePa = &v
} else if parsed.Current.SurfacePressure != nil {
v := normcommon.PressurePaFromHPa(*parsed.Current.SurfacePressure) v := normcommon.PressurePaFromHPa(*parsed.Current.SurfacePressure)
obs.BarometricPressurePa = &v obs.BarometricPressurePa = &v
} }
if parsed.Current.PressureMSL != nil {
v := normcommon.PressurePaFromHPa(*parsed.Current.PressureMSL)
obs.SeaLevelPressurePa = &v
}
if parsed.Elevation != nil {
v := *parsed.Elevation
obs.ElevationMeters = &v
}
return obs, ts, nil return obs, ts, nil
} }

View File

@@ -0,0 +1,61 @@
package openmeteo
import "testing"
func TestBuildObservationTextDescriptionAndPressurePrecedence(t *testing.T) {
weatherCode := 2
pressureMSL := 1016.0
surfacePressure := 1009.0
parsed := omResponse{
Timezone: "UTC",
UTCOffsetSeconds: 0,
Current: omCurrent{
Time: "2026-03-16T19:00",
WeatherCode: &weatherCode,
PressureMSL: &pressureMSL,
SurfacePressure: &surfacePressure,
},
}
obs, _, err := buildObservation(parsed)
if err != nil {
t.Fatalf("buildObservation() error = %v", err)
}
if got, want := obs.TextDescription, "Partly Cloudy"; got != want {
t.Fatalf("TextDescription = %q, want %q", got, want)
}
if obs.BarometricPressurePa == nil {
t.Fatalf("BarometricPressurePa = nil, want non-nil")
}
if got, want := *obs.BarometricPressurePa, 101600.0; got != want {
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
}
}
func TestBuildObservationPressureFallbackToSurface(t *testing.T) {
surfacePressure := 1008.0
parsed := omResponse{
Timezone: "UTC",
UTCOffsetSeconds: 0,
Current: omCurrent{
Time: "2026-03-16T19:00",
SurfacePressure: &surfacePressure,
},
}
obs, _, err := buildObservation(parsed)
if err != nil {
t.Fatalf("buildObservation() error = %v", err)
}
if obs.BarometricPressurePa == nil {
t.Fatalf("BarometricPressurePa = nil, want non-nil")
}
if got, want := *obs.BarometricPressurePa, 100800.0; got != want {
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
}
}

View File

@@ -2,7 +2,7 @@
package openmeteo package openmeteo
import ( import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// Register appends Open-Meteo normalizers in stable order. // Register appends Open-Meteo normalizers in stable order.

View File

@@ -53,15 +53,6 @@ func inferIsDay(icon string, dt, sunrise, sunset int64) *bool {
return nil return nil
} }
// openWeatherIconURL builds the standard OpenWeather icon URL for the given icon code.
func openWeatherIconURL(icon string) string {
icon = strings.TrimSpace(icon)
if icon == "" {
return ""
}
return fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon)
}
// openWeatherStationID returns a stable station identifier for the given response. // openWeatherStationID returns a stable station identifier for the given response.
// Prefer the OpenWeather city ID when present; otherwise, fall back to coordinates. // Prefer the OpenWeather city ID when present; otherwise, fall back to coordinates.
func openWeatherStationID(parsed owmResponse) string { func openWeatherStationID(parsed owmResponse) string {

View File

@@ -75,10 +75,10 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
} }
surfacePa := normcommon.PressurePaFromHPa(parsed.Main.Pressure) surfacePa := normcommon.PressurePaFromHPa(parsed.Main.Pressure)
var seaLevelPa *float64 barometricPa := &surfacePa
if parsed.Main.SeaLevel != nil { if parsed.Main.SeaLevel != nil {
v := normcommon.PressurePaFromHPa(*parsed.Main.SeaLevel) v := normcommon.PressurePaFromHPa(*parsed.Main.SeaLevel)
seaLevelPa = &v barometricPa = &v
} }
wsKmh := normcommon.SpeedKmhFromMps(parsed.Wind.Speed) wsKmh := normcommon.SpeedKmhFromMps(parsed.Wind.Speed)
@@ -96,9 +96,6 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
// Condition mapping: OpenWeather condition IDs -> canonical WMO code vocabulary. // Condition mapping: OpenWeather condition IDs -> canonical WMO code vocabulary.
wmo := mapOpenWeatherToWMO(owmID) wmo := mapOpenWeatherToWMO(owmID)
canonicalText := standards.WMOText(wmo, isDay)
iconURL := openWeatherIconURL(icon)
stationID := openWeatherStationID(parsed) stationID := openWeatherStationID(parsed)
stationName := strings.TrimSpace(parsed.Name) stationName := strings.TrimSpace(parsed.Name)
@@ -112,14 +109,8 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
Timestamp: ts, Timestamp: ts,
ConditionCode: wmo, ConditionCode: wmo,
ConditionText: canonicalText,
IsDay: isDay, IsDay: isDay,
TextDescription: rawDesc,
ProviderRawDescription: rawDesc,
// Human-facing legacy fields: populate with canonical text for consistency.
TextDescription: canonicalText,
IconURL: iconURL,
TemperatureC: &tempC, TemperatureC: &tempC,
ApparentTemperatureC: apparentC, ApparentTemperatureC: apparentC,
@@ -128,8 +119,7 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
WindSpeedKmh: &wsKmh, WindSpeedKmh: &wsKmh,
WindGustKmh: wgKmh, WindGustKmh: wgKmh,
BarometricPressurePa: &surfacePa, BarometricPressurePa: barometricPa,
SeaLevelPressurePa: seaLevelPa,
VisibilityMeters: visM, VisibilityMeters: visM,
RelativeHumidityPercent: &rh, RelativeHumidityPercent: &rh,

View File

@@ -0,0 +1,58 @@
package openweather
import "testing"
func TestBuildObservationTextDescriptionAndPressurePrecedence(t *testing.T) {
seaLevel := 1018.0
parsed := owmResponse{}
parsed.Dt = 1710000000
parsed.Main.Temp = 20.0
parsed.Main.Humidity = 45.0
parsed.Main.Pressure = 1000.0
parsed.Main.SeaLevel = &seaLevel
parsed.Wind.Speed = 3.0
parsed.Weather = []owmWeather{
{ID: 801, Main: "Clouds", Description: "few clouds", Icon: "02d"},
}
obs, _, err := buildObservation(parsed)
if err != nil {
t.Fatalf("buildObservation() error = %v", err)
}
if obs.TextDescription != "few clouds" {
t.Fatalf("TextDescription = %q, want %q", obs.TextDescription, "few clouds")
}
if obs.BarometricPressurePa == nil {
t.Fatalf("BarometricPressurePa = nil, want non-nil")
}
if got, want := *obs.BarometricPressurePa, 101800.0; got != want {
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
}
}
func TestBuildObservationPressureFallbackToSurface(t *testing.T) {
parsed := owmResponse{}
parsed.Dt = 1710000000
parsed.Main.Temp = 20.0
parsed.Main.Humidity = 45.0
parsed.Main.Pressure = 1001.0
parsed.Wind.Speed = 3.0
parsed.Weather = []owmWeather{
{ID: 800, Description: "clear sky", Icon: "01d"},
}
obs, _, err := buildObservation(parsed)
if err != nil {
t.Fatalf("buildObservation() error = %v", err)
}
if obs.BarometricPressurePa == nil {
t.Fatalf("BarometricPressurePa = nil, want non-nil")
}
if got, want := *obs.BarometricPressurePa, 100100.0; got != want {
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
}
}

View File

@@ -2,7 +2,7 @@
package openweather package openweather
import ( import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// Register appends OpenWeather normalizers in stable order. // Register appends OpenWeather normalizers in stable order.

View File

@@ -0,0 +1,192 @@
// Package postgres documents weatherfeeder's PostgreSQL sink contract for
// downstream SQL consumers.
//
// This package wires weatherfeeder canonical events into normalized relational
// tables. Downstream consumers can reconstruct the same canonical JSON objects
// that were written by joining parent/child tables as described below.
//
// Canonical input schemas:
// - weather.observation.v1 -> model.WeatherObservation
// - weather.forecast.v1 -> model.WeatherForecastRun
// - weather.alert.v1 -> model.WeatherAlertRun
//
// Parent/child relationships:
// - observations.event_id -> observation_present_weather.event_id
// - forecasts.event_id -> forecast_periods.run_event_id
// - alert_runs.event_id -> alerts.run_event_id
// - alerts.(run_event_id, alert_index) -> alert_references.(run_event_id, alert_index)
//
// Dedupe and retention behavior:
// - Parent primary keys (event_id): observations, forecasts, alert_runs.
// - Child primary keys use positional indexes to preserve payload order.
// - Prune columns:
// - observations.observed_at
// - observation_present_weather.observed_at
// - forecasts.issued_at
// - forecast_periods.issued_at
// - alert_runs.as_of
// - alerts.as_of
// - alert_references.as_of
//
// Envelope field mapping (shared parent columns)
//
// These columns exist on observations, forecasts, and alert_runs:
// - event_id TEXT -> event.id
// - event_kind TEXT -> event.kind
// - event_source TEXT -> event.source
// - event_schema TEXT -> event.schema
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
//
// Table contract
//
// 1. observations (PK: event_id)
//
// - event_id TEXT -> event.id
// - event_kind TEXT -> event.kind
// - event_source TEXT -> event.source
// - event_schema TEXT -> event.schema
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
// - station_id TEXT NULL -> payload.stationId
// - station_name TEXT NULL -> payload.stationName
// - observed_at TIMESTAMPTZ -> payload.timestamp
// - condition_code INTEGER -> payload.conditionCode
// - is_day BOOLEAN NULL -> payload.isDay
// - text_description TEXT NULL -> payload.textDescription
// - temperature_c DOUBLE PRECISION NULL -> payload.temperatureC
// - dewpoint_c DOUBLE PRECISION NULL -> payload.dewpointC
// - wind_direction_degrees DOUBLE PRECISION NULL -> payload.windDirectionDegrees
// - wind_speed_kmh DOUBLE PRECISION NULL -> payload.windSpeedKmh
// - wind_gust_kmh DOUBLE PRECISION NULL -> payload.windGustKmh
// - barometric_pressure_pa DOUBLE PRECISION NULL -> payload.barometricPressurePa
// - visibility_meters DOUBLE PRECISION NULL -> payload.visibilityMeters
// - relative_humidity_percent DOUBLE PRECISION NULL -> payload.relativeHumidityPercent
// - apparent_temperature_c DOUBLE PRECISION NULL -> payload.apparentTemperatureC
//
// 2. observation_present_weather (PK: event_id, weather_index)
//
// - event_id TEXT -> observations.event_id / payload.presentWeather[i]
// - weather_index INTEGER -> i (array position in payload.presentWeather)
// - observed_at TIMESTAMPTZ -> payload.timestamp
// - raw_text TEXT NULL -> JSON-encoded payload.presentWeather[i].raw
//
// Note: raw_text stores compact JSON text. Consumers that need the original
// object should parse raw_text as JSON.
//
// 3. forecasts (PK: event_id)
//
// - event_id TEXT -> event.id
// - event_kind TEXT -> event.kind
// - event_source TEXT -> event.source
// - event_schema TEXT -> event.schema
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
// - location_id TEXT NULL -> payload.locationId
// - location_name TEXT NULL -> payload.locationName
// - issued_at TIMESTAMPTZ -> payload.issuedAt
// - updated_at TIMESTAMPTZ NULL -> payload.updatedAt
// - product TEXT -> payload.product
// - latitude DOUBLE PRECISION NULL -> payload.latitude
// - longitude DOUBLE PRECISION NULL -> payload.longitude
// - elevation_meters DOUBLE PRECISION NULL -> payload.elevationMeters
// - period_count INTEGER -> len(payload.periods)
//
// 4. forecast_periods (PK: run_event_id, period_index)
//
// - run_event_id TEXT -> forecasts.event_id / payload.periods[i]
// - period_index INTEGER -> i (array position in payload.periods)
// - issued_at TIMESTAMPTZ -> payload.issuedAt (copied from parent)
// - start_time TIMESTAMPTZ -> payload.periods[i].startTime
// - end_time TIMESTAMPTZ -> payload.periods[i].endTime
// - name TEXT NULL -> payload.periods[i].name
// - is_day BOOLEAN NULL -> payload.periods[i].isDay
// - condition_code INTEGER -> payload.periods[i].conditionCode
// - condition_text TEXT NULL -> payload.periods[i].conditionText
// - provider_raw_description TEXT NULL -> payload.periods[i].providerRawDescription
// - text_description TEXT NULL -> payload.periods[i].textDescription
// - detailed_text TEXT NULL -> payload.periods[i].detailedText
// - icon_url TEXT NULL -> payload.periods[i].iconUrl
// - temperature_c DOUBLE PRECISION NULL -> payload.periods[i].temperatureC
// - temperature_c_min DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMin
// - temperature_c_max DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMax
// - dewpoint_c DOUBLE PRECISION NULL -> payload.periods[i].dewpointC
// - relative_humidity_percent DOUBLE PRECISION NULL -> payload.periods[i].relativeHumidityPercent
// - wind_direction_degrees DOUBLE PRECISION NULL -> payload.periods[i].windDirectionDegrees
// - wind_speed_kmh DOUBLE PRECISION NULL -> payload.periods[i].windSpeedKmh
// - wind_gust_kmh DOUBLE PRECISION NULL -> payload.periods[i].windGustKmh
// - barometric_pressure_pa DOUBLE PRECISION NULL -> payload.periods[i].barometricPressurePa
// - visibility_meters DOUBLE PRECISION NULL -> payload.periods[i].visibilityMeters
// - apparent_temperature_c DOUBLE PRECISION NULL -> payload.periods[i].apparentTemperatureC
// - cloud_cover_percent DOUBLE PRECISION NULL -> payload.periods[i].cloudCoverPercent
// - probability_of_precipitation_percent DOUBLE PRECISION NULL -> payload.periods[i].probabilityOfPrecipitationPercent
// - precipitation_amount_mm DOUBLE PRECISION NULL -> payload.periods[i].precipitationAmountMm
// - snowfall_depth_mm DOUBLE PRECISION NULL -> payload.periods[i].snowfallDepthMm
// - uv_index DOUBLE PRECISION NULL -> payload.periods[i].uvIndex
//
// 5. alert_runs (PK: event_id)
//
// - event_id TEXT -> event.id
// - event_kind TEXT -> event.kind
// - event_source TEXT -> event.source
// - event_schema TEXT -> event.schema
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
// - location_id TEXT NULL -> payload.locationId
// - location_name TEXT NULL -> payload.locationName
// - as_of TIMESTAMPTZ -> payload.asOf
// - latitude DOUBLE PRECISION NULL -> payload.latitude
// - longitude DOUBLE PRECISION NULL -> payload.longitude
// - alert_count INTEGER -> len(payload.alerts)
//
// 6. alerts (PK: run_event_id, alert_index)
//
// - run_event_id TEXT -> alert_runs.event_id / payload.alerts[i]
// - alert_index INTEGER -> i (array position in payload.alerts)
// - as_of TIMESTAMPTZ -> payload.asOf (copied from parent)
// - alert_id TEXT -> payload.alerts[i].id
// - event TEXT NULL -> payload.alerts[i].event
// - headline TEXT NULL -> payload.alerts[i].headline
// - severity TEXT NULL -> payload.alerts[i].severity
// - urgency TEXT NULL -> payload.alerts[i].urgency
// - certainty TEXT NULL -> payload.alerts[i].certainty
// - status TEXT NULL -> payload.alerts[i].status
// - message_type TEXT NULL -> payload.alerts[i].messageType
// - category TEXT NULL -> payload.alerts[i].category
// - response TEXT NULL -> payload.alerts[i].response
// - description TEXT NULL -> payload.alerts[i].description
// - instruction TEXT NULL -> payload.alerts[i].instruction
// - sent TIMESTAMPTZ NULL -> payload.alerts[i].sent
// - effective TIMESTAMPTZ NULL -> payload.alerts[i].effective
// - onset TIMESTAMPTZ NULL -> payload.alerts[i].onset
// - expires TIMESTAMPTZ NULL -> payload.alerts[i].expires
// - area_description TEXT NULL -> payload.alerts[i].areaDescription
// - sender_name TEXT NULL -> payload.alerts[i].senderName
// - reference_count INTEGER -> len(payload.alerts[i].references)
//
// 7. alert_references (PK: run_event_id, alert_index, reference_index)
//
// - run_event_id TEXT -> alert_runs.event_id / payload.alerts[i].references[j]
// - alert_index INTEGER -> i (array position in payload.alerts)
// - reference_index INTEGER -> j (array position in payload.alerts[i].references)
// - as_of TIMESTAMPTZ -> payload.asOf (copied from parent)
// - id TEXT NULL -> payload.alerts[i].references[j].id
// - identifier TEXT NULL -> payload.alerts[i].references[j].identifier
// - sender TEXT NULL -> payload.alerts[i].references[j].sender
// - sent TIMESTAMPTZ NULL -> payload.alerts[i].references[j].sent
//
// Reconstructing canonical JSON payloads
//
// - WeatherObservation:
// read one row from observations, then join child rows by event_id ordered by
// weather_index to rebuild presentWeather arrays.
//
// - WeatherForecastRun:
// read one row from forecasts, then join forecast_periods by run_event_id
// ordered by period_index to rebuild periods.
//
// - WeatherAlertRun:
// read one row from alert_runs, join alerts by run_event_id ordered by
// alert_index, then join alert_references by (run_event_id, alert_index)
// ordered by reference_index to rebuild references per alert.
package postgres

View File

@@ -37,7 +37,7 @@ func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
} }
observedAt := obs.Timestamp.UTC() observedAt := obs.Timestamp.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.CloudLayers)+len(obs.PresentWeather)) writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.PresentWeather))
writes = append(writes, fksinks.PostgresWrite{ writes = append(writes, fksinks.PostgresWrite{
Table: tableObservations, Table: tableObservations,
@@ -52,39 +52,20 @@ func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
"station_name": nullableString(obs.StationName), "station_name": nullableString(obs.StationName),
"observed_at": observedAt, "observed_at": observedAt,
"condition_code": int(obs.ConditionCode), "condition_code": int(obs.ConditionCode),
"condition_text": nullableString(obs.ConditionText),
"is_day": nullableBool(obs.IsDay), "is_day": nullableBool(obs.IsDay),
"provider_raw_description": nullableString(obs.ProviderRawDescription),
"text_description": nullableString(obs.TextDescription), "text_description": nullableString(obs.TextDescription),
"icon_url": nullableString(obs.IconURL),
"temperature_c": nullableFloat64(obs.TemperatureC), "temperature_c": nullableFloat64(obs.TemperatureC),
"dewpoint_c": nullableFloat64(obs.DewpointC), "dewpoint_c": nullableFloat64(obs.DewpointC),
"wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees), "wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees),
"wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh), "wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh),
"wind_gust_kmh": nullableFloat64(obs.WindGustKmh), "wind_gust_kmh": nullableFloat64(obs.WindGustKmh),
"barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa), "barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa),
"sea_level_pressure_pa": nullableFloat64(obs.SeaLevelPressurePa),
"visibility_meters": nullableFloat64(obs.VisibilityMeters), "visibility_meters": nullableFloat64(obs.VisibilityMeters),
"relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent), "relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent),
"apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC), "apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC),
"elevation_meters": nullableFloat64(obs.ElevationMeters),
"raw_message": nullableString(obs.RawMessage),
}, },
}) })
for i, cl := range obs.CloudLayers {
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservationCloudLayers,
Values: map[string]any{
"event_id": e.ID,
"layer_index": i,
"observed_at": observedAt,
"base_meters": nullableFloat64(cl.BaseMeters),
"amount": nullableString(cl.Amount),
},
})
}
for i, pw := range obs.PresentWeather { for i, pw := range obs.PresentWeather {
rawText, err := compactJSONText(pw.Raw) rawText, err := compactJSONText(pw.Raw)
if err != nil { if err != nil {

View File

@@ -16,19 +16,14 @@ import (
func TestMapPostgresEventObservationStructPayload(t *testing.T) { func TestMapPostgresEventObservationStructPayload(t *testing.T) {
isDay := true isDay := true
temp := 21.5 temp := 21.5
base := 1200.0
obs := model.WeatherObservation{ obs := model.WeatherObservation{
StationID: "KSTL", StationID: "KSTL",
StationName: "St. Louis", StationName: "St. Louis",
Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
ConditionCode: model.WMOCode(1), ConditionCode: model.WMOCode(1),
ConditionText: "Mainly Sunny",
IsDay: &isDay, IsDay: &isDay,
ProviderRawDescription: "few clouds", TextDescription: "few clouds",
TextDescription: "Mainly Sunny",
IconURL: "https://example/icon.png",
TemperatureC: &temp, TemperatureC: &temp,
CloudLayers: []model.CloudLayer{{BaseMeters: &base, Amount: "FEW"}},
PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}}, PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}},
} }
@@ -36,8 +31,8 @@ func TestMapPostgresEventObservationStructPayload(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err) t.Fatalf("mapPostgresEvent() error = %v", err)
} }
if len(writes) != 3 { if len(writes) != 2 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes)) t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
} }
if writes[0].Table != tableObservations { if writes[0].Table != tableObservations {
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations) t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations)
@@ -45,13 +40,10 @@ func TestMapPostgresEventObservationStructPayload(t *testing.T) {
if got := writes[0].Values["station_id"]; got != "KSTL" { if got := writes[0].Values["station_id"]; got != "KSTL" {
t.Fatalf("observations station_id = %#v, want KSTL", got) t.Fatalf("observations station_id = %#v, want KSTL", got)
} }
if writes[1].Table != tableObservationCloudLayers { if writes[1].Table != tableObservationPresentWeather {
t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationCloudLayers) t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationPresentWeather)
} }
if writes[2].Table != tableObservationPresentWeather { if got := writes[1].Values["raw_text"]; got != `{"a":1,"b":"x"}` {
t.Fatalf("writes[2].Table = %q, want %q", writes[2].Table, tableObservationPresentWeather)
}
if got := writes[2].Values["raw_text"]; got != `{"a":1,"b":"x"}` {
t.Fatalf("present_weather raw_text = %#v, want compact JSON", got) t.Fatalf("present_weather raw_text = %#v, want compact JSON", got)
} }

View File

@@ -10,7 +10,6 @@ import (
const ( const (
tableObservations = "observations" tableObservations = "observations"
tableObservationCloudLayers = "observation_cloud_layers"
tableObservationPresentWeather = "observation_present_weather" tableObservationPresentWeather = "observation_present_weather"
tableForecasts = "forecasts" tableForecasts = "forecasts"
tableForecastPeriods = "forecast_periods" tableForecastPeriods = "forecast_periods"
@@ -59,23 +58,17 @@ func weatherPostgresSchema() fksinks.PostgresSchema {
{Name: "station_name", Type: "TEXT", Nullable: true}, {Name: "station_name", Type: "TEXT", Nullable: true},
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false}, {Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "condition_code", Type: "INTEGER", Nullable: false}, {Name: "condition_code", Type: "INTEGER", Nullable: false},
{Name: "condition_text", Type: "TEXT", Nullable: true},
{Name: "is_day", Type: "BOOLEAN", Nullable: true}, {Name: "is_day", Type: "BOOLEAN", Nullable: true},
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
{Name: "text_description", Type: "TEXT", Nullable: true}, {Name: "text_description", Type: "TEXT", Nullable: true},
{Name: "icon_url", Type: "TEXT", Nullable: true},
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "sea_level_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true}, {Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "raw_message", Type: "TEXT", Nullable: true},
}, },
PrimaryKey: []string{"event_id"}, PrimaryKey: []string{"event_id"},
PruneColumn: "observed_at", PruneColumn: "observed_at",
@@ -85,21 +78,6 @@ func weatherPostgresSchema() fksinks.PostgresSchema {
{Name: "idx_wf_obs_condition_code", Columns: []string{"condition_code"}}, {Name: "idx_wf_obs_condition_code", Columns: []string{"condition_code"}},
}, },
}, },
{
Name: tableObservationCloudLayers,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "layer_index", Type: "INTEGER", Nullable: false},
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "base_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "amount", Type: "TEXT", Nullable: true},
},
PrimaryKey: []string{"event_id", "layer_index"},
PruneColumn: "observed_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_obs_cloud_observed_at", Columns: []string{"observed_at"}},
},
},
{ {
Name: tableObservationPresentWeather, Name: tableObservationPresentWeather,
Columns: []fksinks.PostgresColumn{ Columns: []fksinks.PostgresColumn{

View File

@@ -61,7 +61,6 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
wantTables := map[string]bool{ wantTables := map[string]bool{
tableObservations: true, tableObservations: true,
tableObservationCloudLayers: true,
tableObservationPresentWeather: true, tableObservationPresentWeather: true,
tableForecasts: true, tableForecasts: true,
tableForecastPeriods: true, tableForecastPeriods: true,

View File

@@ -1,4 +1,4 @@
// FILE: internal/model/alert.go // FILE: model/alert.go
package model package model
import "time" import "time"

View File

@@ -1,4 +1,4 @@
// FILE: internal/model/doc.go // FILE: model/doc.go
// Package model defines weatherfeeder's canonical domain payload types. // Package model defines weatherfeeder's canonical domain payload types.
// //
// These structs are emitted as the Payload of canonical events (schemas "weather.*.vN"). // These structs are emitted as the Payload of canonical events (schemas "weather.*.vN").

View File

@@ -1,4 +1,4 @@
// FILE: internal/model/forecast.go // FILE: model/forecast.go
package model package model
import "time" import "time"

View File

@@ -1,4 +1,4 @@
// FILE: internal/model/observation.go // FILE: model/observation.go
package model package model
import "time" import "time"
@@ -11,18 +11,9 @@ type WeatherObservation struct {
// Canonical internal representation (provider-independent). // Canonical internal representation (provider-independent).
ConditionCode WMOCode `json:"conditionCode"` ConditionCode WMOCode `json:"conditionCode"`
ConditionText string `json:"conditionText,omitempty"`
IsDay *bool `json:"isDay,omitempty"` IsDay *bool `json:"isDay,omitempty"`
// Provider-specific “evidence” for troubleshooting mapping and drift.
ProviderRawDescription string `json:"providerRawDescription,omitempty"`
// Human-facing (legacy / transitional)
TextDescription string `json:"textDescription,omitempty"` TextDescription string `json:"textDescription,omitempty"`
// Provider-specific (legacy / transitional)
IconURL string `json:"iconUrl,omitempty"`
// Core measurements (nullable) // Core measurements (nullable)
TemperatureC *float64 `json:"temperatureC,omitempty"` TemperatureC *float64 `json:"temperatureC,omitempty"`
DewpointC *float64 `json:"dewpointC,omitempty"` DewpointC *float64 `json:"dewpointC,omitempty"`
@@ -32,22 +23,12 @@ type WeatherObservation struct {
WindGustKmh *float64 `json:"windGustKmh,omitempty"` WindGustKmh *float64 `json:"windGustKmh,omitempty"`
BarometricPressurePa *float64 `json:"barometricPressurePa,omitempty"` BarometricPressurePa *float64 `json:"barometricPressurePa,omitempty"`
SeaLevelPressurePa *float64 `json:"seaLevelPressurePa,omitempty"`
VisibilityMeters *float64 `json:"visibilityMeters,omitempty"` VisibilityMeters *float64 `json:"visibilityMeters,omitempty"`
RelativeHumidityPercent *float64 `json:"relativeHumidityPercent,omitempty"` RelativeHumidityPercent *float64 `json:"relativeHumidityPercent,omitempty"`
ApparentTemperatureC *float64 `json:"apparentTemperatureC,omitempty"` ApparentTemperatureC *float64 `json:"apparentTemperatureC,omitempty"`
ElevationMeters *float64 `json:"elevationMeters,omitempty"`
RawMessage string `json:"rawMessage,omitempty"`
PresentWeather []PresentWeather `json:"presentWeather,omitempty"` PresentWeather []PresentWeather `json:"presentWeather,omitempty"`
CloudLayers []CloudLayer `json:"cloudLayers,omitempty"`
}
type CloudLayer struct {
BaseMeters *float64 `json:"baseMeters,omitempty"`
Amount string `json:"amount,omitempty"`
} }
type PresentWeather struct { type PresentWeather struct {

View File

@@ -21,7 +21,7 @@ type WMODescription struct {
} }
// WMODescriptions is the canonical internal mapping of WMO code -> day/night text. // WMODescriptions is the canonical internal mapping of WMO code -> day/night text.
// These are used to populate model.WeatherObservation.ConditionText. // These are used to populate canonical text fields derived from WMO codes.
var WMODescriptions = map[model.WMOCode]WMODescription{ var WMODescriptions = map[model.WMOCode]WMODescription{
0: {Day: "Sunny", Night: "Clear"}, 0: {Day: "Sunny", Night: "Clear"},
1: {Day: "Mainly Sunny", Night: "Mainly Clear"}, 1: {Day: "Mainly Sunny", Night: "Mainly Clear"},
@@ -56,7 +56,8 @@ var WMODescriptions = map[model.WMOCode]WMODescription{
// WMOText returns the canonical text description for a WMO code. // WMOText returns the canonical text description for a WMO code.
// If isDay is nil, it prefers the Day description (if present). // If isDay is nil, it prefers the Day description (if present).
// //
// This is intended to be used by drivers after they set ConditionCode. // This is intended to be used by drivers after they set ConditionCode when they
// need a human-readable description.
func WMOText(code model.WMOCode, isDay *bool) string { func WMOText(code model.WMOCode, isDay *bool) string {
if code == model.WMOUnknown { if code == model.WMOUnknown {
return "Unknown" return "Unknown"