Compare commits
9 Commits
v0.6.0
...
129cebd94d
| Author | SHA1 | Date | |
|---|---|---|---|
| 129cebd94d | |||
| e42f2bc9de | |||
| 9ddcf5e0df | |||
| d0b58a4734 | |||
| 6cd823f528 | |||
| 84da2bb689 | |||
| 859ee9dd5c | |||
| ea113e2dcc | |||
| 38bc162918 |
22
API.md
22
API.md
@@ -80,34 +80,18 @@ A `WeatherObservation` represents a point-in-time observation for a station/loca
|
|||||||
| `stationName` | string | no | Human station name |
|
| `stationName` | string | no | Human station name |
|
||||||
| `timestamp` | timestamp string | yes | Observation timestamp |
|
| `timestamp` | timestamp string | yes | Observation timestamp |
|
||||||
| `conditionCode` | int | yes | WMO code (`-1` unknown) |
|
| `conditionCode` | int | yes | WMO code (`-1` unknown) |
|
||||||
| `conditionText` | string | no | Canonical short condition text |
|
|
||||||
| `isDay` | bool | no | Day/night hint |
|
| `isDay` | bool | no | Day/night hint |
|
||||||
| `providerRawDescription` | string | no | Provider-specific evidence text |
|
| `textDescription` | string | no | Human-facing short description |
|
||||||
| `textDescription` | string | no | Legacy/transitional text description |
|
|
||||||
| `iconUrl` | string | no | Legacy/transitional icon URL |
|
|
||||||
| `temperatureC` | number | no | Celsius |
|
| `temperatureC` | number | no | Celsius |
|
||||||
| `dewpointC` | number | no | Celsius |
|
| `dewpointC` | number | no | Celsius |
|
||||||
| `windDirectionDegrees` | number | no | Degrees |
|
| `windDirectionDegrees` | number | no | Degrees |
|
||||||
| `windSpeedKmh` | number | no | km/h |
|
| `windSpeedKmh` | number | no | km/h |
|
||||||
| `windGustKmh` | number | no | km/h |
|
| `windGustKmh` | number | no | km/h |
|
||||||
| `barometricPressurePa` | number | no | Pascals |
|
| `barometricPressurePa` | number | no | Pascals |
|
||||||
| `seaLevelPressurePa` | number | no | Pascals |
|
|
||||||
| `visibilityMeters` | number | no | Meters |
|
| `visibilityMeters` | number | no | Meters |
|
||||||
| `relativeHumidityPercent` | number | no | Percent |
|
| `relativeHumidityPercent` | number | no | Percent |
|
||||||
| `apparentTemperatureC` | number | no | Celsius |
|
| `apparentTemperatureC` | number | no | Celsius |
|
||||||
| `elevationMeters` | number | no | Meters |
|
|
||||||
| `rawMessage` | string | no | Provider raw message (for example METAR) |
|
|
||||||
| `presentWeather` | array | no | Provider-specific structured weather fragments |
|
| `presentWeather` | array | no | Provider-specific structured weather fragments |
|
||||||
| `cloudLayers` | array | no | Cloud layer details |
|
|
||||||
|
|
||||||
### Nested: `cloudLayers[]`
|
|
||||||
|
|
||||||
Each `cloudLayers[]` element:
|
|
||||||
|
|
||||||
| Field | Type | Required | Notes |
|
|
||||||
|---|---:|:---:|---|
|
|
||||||
| `baseMeters` | number | no | Cloud base altitude in meters |
|
|
||||||
| `amount` | string | no | Provider string (e.g. FEW/SCT/BKN/OVC) |
|
|
||||||
|
|
||||||
### Nested: `presentWeather[]`
|
### Nested: `presentWeather[]`
|
||||||
|
|
||||||
@@ -241,7 +225,7 @@ A run may contain zero, one, or many alerts.
|
|||||||
|
|
||||||
- Consumers **must** ignore unknown fields.
|
- Consumers **must** ignore unknown fields.
|
||||||
- Producers (weatherfeeder) prefer **additive changes** within a schema version.
|
- Producers (weatherfeeder) prefer **additive changes** within a schema version.
|
||||||
- Renames/removals/semantic breaks require a **schema version bump** (`weather.*.v2`).
|
- Renames/removals/semantic breaks normally require a **schema version bump** (`weather.*.v2`); pre-1.0 projects may choose in-place changes.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -259,7 +243,7 @@ A run may contain zero, one, or many alerts.
|
|||||||
"stationId": "KSTL",
|
"stationId": "KSTL",
|
||||||
"timestamp": "2026-01-17T14:00:00Z",
|
"timestamp": "2026-01-17T14:00:00Z",
|
||||||
"conditionCode": 1,
|
"conditionCode": 1,
|
||||||
"conditionText": "Mainly Sunny",
|
"textDescription": "Mainly Sunny",
|
||||||
"temperatureC": 3.25,
|
"temperatureC": 3.25,
|
||||||
"windSpeedKmh": 18.5
|
"windSpeedKmh": 18.5
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,16 +15,21 @@ import (
|
|||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
||||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
|
||||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||||
|
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||||
|
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
||||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
|
||||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||||
|
wfpgsink "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sinks/postgres"
|
||||||
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const dedupeMaxEntries = 2048
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
||||||
|
|
||||||
@@ -36,6 +41,9 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("config load failed: %v", err)
|
log.Fatalf("config load failed: %v", err)
|
||||||
}
|
}
|
||||||
|
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
|
||||||
|
log.Fatalf("postgres schema registration failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// --- Registries ---
|
// --- Registries ---
|
||||||
srcReg := fksources.NewRegistry()
|
srcReg := fksources.NewRegistry()
|
||||||
@@ -98,20 +106,28 @@ func main() {
|
|||||||
|
|
||||||
events := make(chan fkevent.Event, 256)
|
events := make(chan fkevent.Event, 256)
|
||||||
|
|
||||||
// --- Normalization (optional) ---
|
// --- Processors ---
|
||||||
//
|
//
|
||||||
// We install feedkit's normalize.Processor even before any normalizers exist.
|
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
|
||||||
// With an empty registry and RequireMatch=false, this is a no-op passthrough.
|
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
||||||
// It will begin transforming events as soon as:
|
// It will begin transforming events as soon as:
|
||||||
// 1) sources emit raw schemas (raw.*), and
|
// 1) sources emit raw schemas (raw.*), and
|
||||||
// 2) matching normalizers are registered.
|
// 2) matching normalizers are registered.
|
||||||
normReg := &fknormalize.Registry{}
|
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||||
wfnormalizers.RegisterBuiltins(normReg)
|
|
||||||
|
procReg := fkprocessors.NewRegistry()
|
||||||
|
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||||
|
return fknormalize.NewProcessor(normalizers, false), nil
|
||||||
|
})
|
||||||
|
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||||
|
|
||||||
|
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("build processor chain failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
pl := &fkpipeline.Pipeline{
|
pl := &fkpipeline.Pipeline{
|
||||||
Processors: []fkpipeline.Processor{
|
Processors: chain,
|
||||||
fknormalize.Processor{Registry: normReg},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &fkscheduler.Scheduler{
|
s := &fkscheduler.Scheduler{
|
||||||
|
|||||||
@@ -1,11 +1,20 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||||
|
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||||
|
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
|
|
||||||
|
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testInput struct {
|
type testInput struct {
|
||||||
@@ -88,3 +97,95 @@ func TestExampleConfigLoads(t *testing.T) {
|
|||||||
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
|
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
|
||||||
|
chain, err := buildProcessorChainForTests()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(chain) != 2 {
|
||||||
|
t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain))
|
||||||
|
}
|
||||||
|
|
||||||
|
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||||
|
if len(pl.Processors) != 2 {
|
||||||
|
t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||||
|
chain, err := buildProcessorChainForTests()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||||
|
in := fkevent.Event{
|
||||||
|
ID: "evt-no-match",
|
||||||
|
Kind: fkevent.Kind("observation"),
|
||||||
|
Source: "test",
|
||||||
|
EmittedAt: time.Now().UTC(),
|
||||||
|
Schema: "raw.weatherfeeder.unknown.v1",
|
||||||
|
Payload: map[string]any{
|
||||||
|
"ok": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := pl.Process(context.Background(), in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Pipeline.Process() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
t.Fatalf("Pipeline.Process() returned nil output")
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(*out, in) {
|
||||||
|
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDedupeDropsSecondEventWithSameID(t *testing.T) {
|
||||||
|
chain, err := buildProcessorChainForTests()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||||
|
in := fkevent.Event{
|
||||||
|
ID: "evt-dedupe-1",
|
||||||
|
Kind: fkevent.Kind("observation"),
|
||||||
|
Source: "test",
|
||||||
|
EmittedAt: time.Now().UTC(),
|
||||||
|
Schema: "raw.weatherfeeder.unknown.v1",
|
||||||
|
Payload: map[string]any{
|
||||||
|
"ok": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
first, err := pl.Process(context.Background(), in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("first Pipeline.Process() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if first == nil {
|
||||||
|
t.Fatalf("first Pipeline.Process() unexpectedly dropped event")
|
||||||
|
}
|
||||||
|
|
||||||
|
second, err := pl.Process(context.Background(), in)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second Pipeline.Process() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if second != nil {
|
||||||
|
t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildProcessorChainForTests() ([]fkprocessors.Processor, error) {
|
||||||
|
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||||
|
|
||||||
|
procReg := fkprocessors.NewRegistry()
|
||||||
|
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||||
|
return fknormalize.NewProcessor(normalizers, false), nil
|
||||||
|
})
|
||||||
|
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||||
|
|
||||||
|
return procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||||
|
}
|
||||||
|
|||||||
3
go.mod
3
go.mod
@@ -2,10 +2,11 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.6.0
|
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
github.com/lib/pq v1.10.9 // indirect
|
||||||
github.com/nats-io/nats.go v1.34.0 // indirect
|
github.com/nats-io/nats.go v1.34.0 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -1,7 +1,9 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.5.0 h1:T4pRTo9Tj/o7TbZYUbp8UE7cQVLmIucUrYmD6G8E8ZQ=
|
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.5.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
||||||
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
package normalizers
|
package normalizers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||||
@@ -16,22 +16,20 @@ import (
|
|||||||
// - sources are built by name (cfg.Driver -> factory)
|
// - sources are built by name (cfg.Driver -> factory)
|
||||||
// - normalizers are selected by Match() (event.Schema -> first match wins)
|
// - normalizers are selected by Match() (event.Schema -> first match wins)
|
||||||
//
|
//
|
||||||
// Registration order matters because feedkit normalize.Registry is “first match wins”.
|
// Registration order matters because feedkit normalize.Processor is "first match wins".
|
||||||
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
|
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
|
||||||
// we still keep ordering stable as a best practice.
|
// we still keep ordering stable as a best practice.
|
||||||
//
|
func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
// If reg is nil, this function is a no-op.
|
out := in
|
||||||
func RegisterBuiltins(reg *fknormalize.Registry) {
|
|
||||||
if reg == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep this intentionally boring: delegate registration to provider subpackages
|
// Keep this intentionally boring: delegate registration to provider subpackages
|
||||||
// so main.go stays clean and each provider owns its own mapping logic.
|
// so main.go stays clean and each provider owns its own mapping logic.
|
||||||
//
|
//
|
||||||
// Order here should be stable across releases to reduce surprises when adding
|
// Order here should be stable across releases to reduce surprises when adding
|
||||||
// new normalizers.
|
// new normalizers.
|
||||||
nws.Register(reg)
|
out = nws.Register(out)
|
||||||
openmeteo.Register(reg)
|
out = openmeteo.Register(out)
|
||||||
openweather.Register(reg)
|
out = openweather.Register(out)
|
||||||
|
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|||||||
42
internal/normalizers/builtins_test.go
Normal file
42
internal/normalizers/builtins_test.go
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
package normalizers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegisterBuiltinsOrder(t *testing.T) {
|
||||||
|
got := RegisterBuiltins(nil)
|
||||||
|
if len(got) == 0 {
|
||||||
|
t.Fatalf("RegisterBuiltins() returned no normalizers")
|
||||||
|
}
|
||||||
|
|
||||||
|
want := []fknormalize.Normalizer{
|
||||||
|
nws.ObservationNormalizer{},
|
||||||
|
nws.ForecastNormalizer{},
|
||||||
|
nws.AlertsNormalizer{},
|
||||||
|
openmeteo.ObservationNormalizer{},
|
||||||
|
openmeteo.ForecastNormalizer{},
|
||||||
|
openweather.ObservationNormalizer{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(got) != len(want) {
|
||||||
|
t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range want {
|
||||||
|
if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) {
|
||||||
|
t.Fatalf(
|
||||||
|
"RegisterBuiltins() order mismatch at index %d: got %T, want %T",
|
||||||
|
i,
|
||||||
|
got[i],
|
||||||
|
want[i],
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,7 +8,7 @@
|
|||||||
// transforming provider-specific raw payloads into canonical internal models.
|
// transforming provider-specific raw payloads into canonical internal models.
|
||||||
//
|
//
|
||||||
// This package is domain code (weatherfeeder). feedkit’s normalize package is
|
// This package is domain code (weatherfeeder). feedkit’s normalize package is
|
||||||
// infrastructure (registry + processor).
|
// infrastructure (normalizer contracts + processor).
|
||||||
//
|
//
|
||||||
// Directory layout (required)
|
// Directory layout (required)
|
||||||
// ---------------------------
|
// ---------------------------
|
||||||
@@ -29,7 +29,7 @@
|
|||||||
//
|
//
|
||||||
// 1. One normalizer per file.
|
// 1. One normalizer per file.
|
||||||
// Each file contains exactly one Normalizer implementation (one type that
|
// Each file contains exactly one Normalizer implementation (one type that
|
||||||
// satisfies feedkit/normalize.Normalizer).
|
// satisfies feedkit/processors/normalize.Normalizer).
|
||||||
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
|
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
|
||||||
// as they do not define additional Normalizer types.
|
// as they do not define additional Normalizer types.
|
||||||
//
|
//
|
||||||
@@ -136,21 +136,22 @@
|
|||||||
//
|
//
|
||||||
// Registration pattern
|
// Registration pattern
|
||||||
// --------------------
|
// --------------------
|
||||||
// feedkit normalization uses a match-driven registry (“first match wins”).
|
// feedkit normalization uses an ordered normalizer list ("first match wins").
|
||||||
//
|
//
|
||||||
// Provider subpackages should expose:
|
// Provider subpackages should expose:
|
||||||
//
|
//
|
||||||
// func Register(reg *normalize.Registry)
|
// func Register(in []normalize.Normalizer) []normalize.Normalizer
|
||||||
//
|
//
|
||||||
// And internal/normalizers/builtins.go should provide one entrypoint:
|
// And internal/normalizers/builtins.go should provide one entrypoint:
|
||||||
//
|
//
|
||||||
// func RegisterBuiltins(reg *normalize.Registry)
|
// func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer
|
||||||
//
|
//
|
||||||
// which calls each provider’s Register() in a stable order.
|
// which appends each provider's normalizers in a stable order and is then passed
|
||||||
|
// to normalize.NewProcessor(...).
|
||||||
//
|
//
|
||||||
// Registry ordering
|
// Normalizer ordering
|
||||||
// -----------------------------
|
// -----------------------------
|
||||||
// feedkit normalization uses a match-driven registry (“first match wins”).
|
// feedkit normalization is "first match wins" by list order.
|
||||||
// Therefore order matters:
|
// Therefore order matters:
|
||||||
//
|
//
|
||||||
// - Register more specific normalizers before more general ones.
|
// - Register more specific normalizers before more general ones.
|
||||||
|
|||||||
@@ -54,14 +54,6 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
|
|||||||
ts = t.UTC()
|
ts = t.UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers))
|
|
||||||
for _, cl := range parsed.Properties.CloudLayers {
|
|
||||||
cloudLayers = append(cloudLayers, model.CloudLayer{
|
|
||||||
BaseMeters: cl.Base.Value,
|
|
||||||
Amount: cl.Amount,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Preserve raw presentWeather objects (for troubleshooting / drift analysis).
|
// Preserve raw presentWeather objects (for troubleshooting / drift analysis).
|
||||||
present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather))
|
present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather))
|
||||||
for _, pw := range parsed.Properties.PresentWeather {
|
for _, pw := range parsed.Properties.PresentWeather {
|
||||||
@@ -70,6 +62,7 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
|
|||||||
|
|
||||||
// Decode presentWeather into typed METAR phenomena for mapping.
|
// Decode presentWeather into typed METAR phenomena for mapping.
|
||||||
phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather)
|
phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather)
|
||||||
|
cloudLayers := parsed.Properties.CloudLayers
|
||||||
|
|
||||||
providerDesc := strings.TrimSpace(parsed.Properties.TextDescription)
|
providerDesc := strings.TrimSpace(parsed.Properties.TextDescription)
|
||||||
|
|
||||||
@@ -81,9 +74,6 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
|
|||||||
isDay = isDayFromLatLonTime(*lat, *lon, ts)
|
isDay = isDayFromLatLonTime(*lat, *lon, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Canonical condition text comes from our WMO table.
|
|
||||||
canonicalText := standards.WMOText(wmo, isDay)
|
|
||||||
|
|
||||||
// Apparent temperature: prefer wind chill when both are supplied.
|
// Apparent temperature: prefer wind chill when both are supplied.
|
||||||
var apparentC *float64
|
var apparentC *float64
|
||||||
if parsed.Properties.WindChill.Value != nil {
|
if parsed.Properties.WindChill.Value != nil {
|
||||||
@@ -98,15 +88,9 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
|
|||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
|
|
||||||
ConditionCode: wmo,
|
ConditionCode: wmo,
|
||||||
ConditionText: canonicalText,
|
|
||||||
IsDay: isDay,
|
IsDay: isDay,
|
||||||
|
|
||||||
ProviderRawDescription: providerDesc,
|
TextDescription: providerDesc,
|
||||||
|
|
||||||
// Transitional / human-facing:
|
|
||||||
// keep output consistent by populating TextDescription from canonical text.
|
|
||||||
TextDescription: canonicalText,
|
|
||||||
IconURL: parsed.Properties.Icon,
|
|
||||||
|
|
||||||
TemperatureC: parsed.Properties.Temperature.Value,
|
TemperatureC: parsed.Properties.Temperature.Value,
|
||||||
DewpointC: parsed.Properties.Dewpoint.Value,
|
DewpointC: parsed.Properties.Dewpoint.Value,
|
||||||
@@ -115,19 +99,21 @@ func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation,
|
|||||||
WindSpeedKmh: parsed.Properties.WindSpeed.Value,
|
WindSpeedKmh: parsed.Properties.WindSpeed.Value,
|
||||||
WindGustKmh: parsed.Properties.WindGust.Value,
|
WindGustKmh: parsed.Properties.WindGust.Value,
|
||||||
|
|
||||||
BarometricPressurePa: parsed.Properties.BarometricPressure.Value,
|
BarometricPressurePa: pressurePrecedenceNWS(parsed.Properties.SeaLevelPressure.Value, parsed.Properties.BarometricPressure.Value),
|
||||||
SeaLevelPressurePa: parsed.Properties.SeaLevelPressure.Value,
|
|
||||||
VisibilityMeters: parsed.Properties.Visibility.Value,
|
VisibilityMeters: parsed.Properties.Visibility.Value,
|
||||||
|
|
||||||
RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value,
|
RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value,
|
||||||
ApparentTemperatureC: apparentC,
|
ApparentTemperatureC: apparentC,
|
||||||
|
|
||||||
ElevationMeters: parsed.Properties.Elevation.Value,
|
|
||||||
RawMessage: parsed.Properties.RawMessage,
|
|
||||||
|
|
||||||
PresentWeather: present,
|
PresentWeather: present,
|
||||||
CloudLayers: cloudLayers,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return obs, ts, nil
|
return obs, ts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func pressurePrecedenceNWS(seaLevelPa, barometricPa *float64) *float64 {
|
||||||
|
if seaLevelPa != nil {
|
||||||
|
return seaLevelPa
|
||||||
|
}
|
||||||
|
return barometricPa
|
||||||
|
}
|
||||||
|
|||||||
51
internal/normalizers/nws/observation_test.go
Normal file
51
internal/normalizers/nws/observation_test.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestBuildObservationTextDescriptionAndPressurePrecedence(t *testing.T) {
|
||||||
|
barometric := 100200.0
|
||||||
|
seaLevel := 101400.0
|
||||||
|
|
||||||
|
parsed := nwsObservationResponse{}
|
||||||
|
parsed.Properties.Timestamp = "2026-03-16T19:00:00Z"
|
||||||
|
parsed.Properties.TextDescription = " Overcast "
|
||||||
|
parsed.Properties.BarometricPressure.Value = &barometric
|
||||||
|
parsed.Properties.SeaLevelPressure.Value = &seaLevel
|
||||||
|
|
||||||
|
obs, _, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildObservation() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := obs.TextDescription, "Overcast"; got != want {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.BarometricPressurePa == nil {
|
||||||
|
t.Fatalf("BarometricPressurePa = nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got, want := *obs.BarometricPressurePa, seaLevel; got != want {
|
||||||
|
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildObservationPressureFallbackToBarometric(t *testing.T) {
|
||||||
|
barometric := 99900.0
|
||||||
|
|
||||||
|
parsed := nwsObservationResponse{}
|
||||||
|
parsed.Properties.Timestamp = "2026-03-16T19:00:00Z"
|
||||||
|
parsed.Properties.TextDescription = "Cloudy"
|
||||||
|
parsed.Properties.BarometricPressure.Value = &barometric
|
||||||
|
|
||||||
|
obs, _, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildObservation() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.BarometricPressurePa == nil {
|
||||||
|
t.Fatalf("BarometricPressurePa = nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got, want := *obs.BarometricPressurePa, barometric; got != want {
|
||||||
|
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,21 +2,21 @@
|
|||||||
package nws
|
package nws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register registers NWS normalizers into the provided registry.
|
// Register appends NWS normalizers in stable order.
|
||||||
func Register(reg *fknormalize.Registry) {
|
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
if reg == nil {
|
out := in
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Observations
|
// Observations
|
||||||
reg.Register(ObservationNormalizer{})
|
out = append(out, ObservationNormalizer{})
|
||||||
|
|
||||||
// Forecasts
|
// Forecasts
|
||||||
reg.Register(ForecastNormalizer{})
|
out = append(out, ForecastNormalizer{})
|
||||||
|
|
||||||
// Alerts
|
// Alerts
|
||||||
reg.Register(AlertsNormalizer{})
|
out = append(out, AlertsNormalizer{})
|
||||||
|
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,16 +86,18 @@ type nwsObservationResponse struct {
|
|||||||
// We decode these as generic maps, then optionally interpret them in metar.go.
|
// We decode these as generic maps, then optionally interpret them in metar.go.
|
||||||
PresentWeather []map[string]any `json:"presentWeather"`
|
PresentWeather []map[string]any `json:"presentWeather"`
|
||||||
|
|
||||||
CloudLayers []struct {
|
CloudLayers []nwsCloudLayer `json:"cloudLayers"`
|
||||||
Base struct {
|
|
||||||
UnitCode string `json:"unitCode"`
|
|
||||||
Value *float64 `json:"value"`
|
|
||||||
} `json:"base"`
|
|
||||||
Amount string `json:"amount"`
|
|
||||||
} `json:"cloudLayers"`
|
|
||||||
} `json:"properties"`
|
} `json:"properties"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nwsCloudLayer struct {
|
||||||
|
Base struct {
|
||||||
|
UnitCode string `json:"unitCode"`
|
||||||
|
Value *float64 `json:"value"`
|
||||||
|
} `json:"base"`
|
||||||
|
Amount string `json:"amount"`
|
||||||
|
}
|
||||||
|
|
||||||
// nwsForecastResponse is a minimal-but-sufficient representation of the NWS
|
// nwsForecastResponse is a minimal-but-sufficient representation of the NWS
|
||||||
// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
|
// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
// 1. METAR phenomena (presentWeather) — most reliable for precip/hazards
|
// 1. METAR phenomena (presentWeather) — most reliable for precip/hazards
|
||||||
// 2. textDescription keywords — weaker, but reusable across providers
|
// 2. textDescription keywords — weaker, but reusable across providers
|
||||||
// 3. cloud layers fallback — only for sky-only conditions
|
// 3. cloud layers fallback — only for sky-only conditions
|
||||||
func mapNWSToWMO(providerDesc string, cloudLayers []model.CloudLayer, phenomena []metarPhenomenon) model.WMOCode {
|
func mapNWSToWMO(providerDesc string, cloudLayers []nwsCloudLayer, phenomena []metarPhenomenon) model.WMOCode {
|
||||||
// 1) Prefer METAR phenomena if present.
|
// 1) Prefer METAR phenomena if present.
|
||||||
if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown {
|
if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown {
|
||||||
return code
|
return code
|
||||||
@@ -167,7 +167,7 @@ func wmoFromPhenomena(phenomena []metarPhenomenon) model.WMOCode {
|
|||||||
return model.WMOUnknown
|
return model.WMOUnknown
|
||||||
}
|
}
|
||||||
|
|
||||||
func wmoFromCloudLayers(cloudLayers []model.CloudLayer) model.WMOCode {
|
func wmoFromCloudLayers(cloudLayers []nwsCloudLayer) model.WMOCode {
|
||||||
// NWS cloud layer amount values commonly include:
|
// NWS cloud layer amount values commonly include:
|
||||||
// OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky)
|
// OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky)
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -86,20 +86,9 @@ func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, e
|
|||||||
StationName: "Open-Meteo",
|
StationName: "Open-Meteo",
|
||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
|
|
||||||
ConditionCode: wmo,
|
ConditionCode: wmo,
|
||||||
ConditionText: canonicalText,
|
IsDay: isDay,
|
||||||
IsDay: isDay,
|
|
||||||
|
|
||||||
// Open-Meteo does not provide a separate human text description for "current"
|
|
||||||
// when using weather_code; we leave provider evidence empty.
|
|
||||||
ProviderRawDescription: "",
|
|
||||||
|
|
||||||
// Transitional / human-facing:
|
|
||||||
// keep output consistent by populating TextDescription from canonical text.
|
|
||||||
TextDescription: canonicalText,
|
TextDescription: canonicalText,
|
||||||
|
|
||||||
// IconURL: Open-Meteo does not provide an icon URL in this endpoint.
|
|
||||||
IconURL: "",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Measurements (all optional; only set when present).
|
// Measurements (all optional; only set when present).
|
||||||
@@ -132,20 +121,13 @@ func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, e
|
|||||||
obs.WindGustKmh = &v
|
obs.WindGustKmh = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
if parsed.Current.SurfacePressure != nil {
|
if parsed.Current.PressureMSL != nil {
|
||||||
|
v := normcommon.PressurePaFromHPa(*parsed.Current.PressureMSL)
|
||||||
|
obs.BarometricPressurePa = &v
|
||||||
|
} else if parsed.Current.SurfacePressure != nil {
|
||||||
v := normcommon.PressurePaFromHPa(*parsed.Current.SurfacePressure)
|
v := normcommon.PressurePaFromHPa(*parsed.Current.SurfacePressure)
|
||||||
obs.BarometricPressurePa = &v
|
obs.BarometricPressurePa = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
if parsed.Current.PressureMSL != nil {
|
|
||||||
v := normcommon.PressurePaFromHPa(*parsed.Current.PressureMSL)
|
|
||||||
obs.SeaLevelPressurePa = &v
|
|
||||||
}
|
|
||||||
|
|
||||||
if parsed.Elevation != nil {
|
|
||||||
v := *parsed.Elevation
|
|
||||||
obs.ElevationMeters = &v
|
|
||||||
}
|
|
||||||
|
|
||||||
return obs, ts, nil
|
return obs, ts, nil
|
||||||
}
|
}
|
||||||
|
|||||||
61
internal/normalizers/openmeteo/observation_test.go
Normal file
61
internal/normalizers/openmeteo/observation_test.go
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
package openmeteo
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestBuildObservationTextDescriptionAndPressurePrecedence(t *testing.T) {
|
||||||
|
weatherCode := 2
|
||||||
|
pressureMSL := 1016.0
|
||||||
|
surfacePressure := 1009.0
|
||||||
|
|
||||||
|
parsed := omResponse{
|
||||||
|
Timezone: "UTC",
|
||||||
|
UTCOffsetSeconds: 0,
|
||||||
|
Current: omCurrent{
|
||||||
|
Time: "2026-03-16T19:00",
|
||||||
|
WeatherCode: &weatherCode,
|
||||||
|
PressureMSL: &pressureMSL,
|
||||||
|
SurfacePressure: &surfacePressure,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
obs, _, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildObservation() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := obs.TextDescription, "Partly Cloudy"; got != want {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.BarometricPressurePa == nil {
|
||||||
|
t.Fatalf("BarometricPressurePa = nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got, want := *obs.BarometricPressurePa, 101600.0; got != want {
|
||||||
|
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildObservationPressureFallbackToSurface(t *testing.T) {
|
||||||
|
surfacePressure := 1008.0
|
||||||
|
|
||||||
|
parsed := omResponse{
|
||||||
|
Timezone: "UTC",
|
||||||
|
UTCOffsetSeconds: 0,
|
||||||
|
Current: omCurrent{
|
||||||
|
Time: "2026-03-16T19:00",
|
||||||
|
SurfacePressure: &surfacePressure,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
obs, _, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildObservation() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.BarometricPressurePa == nil {
|
||||||
|
t.Fatalf("BarometricPressurePa = nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got, want := *obs.BarometricPressurePa, 100800.0; got != want {
|
||||||
|
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,17 +2,17 @@
|
|||||||
package openmeteo
|
package openmeteo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register registers Open-Meteo normalizers into the provided registry.
|
// Register appends Open-Meteo normalizers in stable order.
|
||||||
func Register(reg *fknormalize.Registry) {
|
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
if reg == nil {
|
out := in
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Observations
|
// Observations
|
||||||
reg.Register(ObservationNormalizer{})
|
out = append(out, ObservationNormalizer{})
|
||||||
// Forecasts
|
// Forecasts
|
||||||
reg.Register(ForecastNormalizer{})
|
out = append(out, ForecastNormalizer{})
|
||||||
|
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,15 +53,6 @@ func inferIsDay(icon string, dt, sunrise, sunset int64) *bool {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// openWeatherIconURL builds the standard OpenWeather icon URL for the given icon code.
|
|
||||||
func openWeatherIconURL(icon string) string {
|
|
||||||
icon = strings.TrimSpace(icon)
|
|
||||||
if icon == "" {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon)
|
|
||||||
}
|
|
||||||
|
|
||||||
// openWeatherStationID returns a stable station identifier for the given response.
|
// openWeatherStationID returns a stable station identifier for the given response.
|
||||||
// Prefer the OpenWeather city ID when present; otherwise, fall back to coordinates.
|
// Prefer the OpenWeather city ID when present; otherwise, fall back to coordinates.
|
||||||
func openWeatherStationID(parsed owmResponse) string {
|
func openWeatherStationID(parsed owmResponse) string {
|
||||||
|
|||||||
@@ -75,10 +75,10 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
|
|||||||
}
|
}
|
||||||
|
|
||||||
surfacePa := normcommon.PressurePaFromHPa(parsed.Main.Pressure)
|
surfacePa := normcommon.PressurePaFromHPa(parsed.Main.Pressure)
|
||||||
var seaLevelPa *float64
|
barometricPa := &surfacePa
|
||||||
if parsed.Main.SeaLevel != nil {
|
if parsed.Main.SeaLevel != nil {
|
||||||
v := normcommon.PressurePaFromHPa(*parsed.Main.SeaLevel)
|
v := normcommon.PressurePaFromHPa(*parsed.Main.SeaLevel)
|
||||||
seaLevelPa = &v
|
barometricPa = &v
|
||||||
}
|
}
|
||||||
|
|
||||||
wsKmh := normcommon.SpeedKmhFromMps(parsed.Wind.Speed)
|
wsKmh := normcommon.SpeedKmhFromMps(parsed.Wind.Speed)
|
||||||
@@ -96,9 +96,6 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
|
|||||||
|
|
||||||
// Condition mapping: OpenWeather condition IDs -> canonical WMO code vocabulary.
|
// Condition mapping: OpenWeather condition IDs -> canonical WMO code vocabulary.
|
||||||
wmo := mapOpenWeatherToWMO(owmID)
|
wmo := mapOpenWeatherToWMO(owmID)
|
||||||
canonicalText := standards.WMOText(wmo, isDay)
|
|
||||||
|
|
||||||
iconURL := openWeatherIconURL(icon)
|
|
||||||
|
|
||||||
stationID := openWeatherStationID(parsed)
|
stationID := openWeatherStationID(parsed)
|
||||||
stationName := strings.TrimSpace(parsed.Name)
|
stationName := strings.TrimSpace(parsed.Name)
|
||||||
@@ -111,15 +108,9 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
|
|||||||
StationName: stationName,
|
StationName: stationName,
|
||||||
Timestamp: ts,
|
Timestamp: ts,
|
||||||
|
|
||||||
ConditionCode: wmo,
|
ConditionCode: wmo,
|
||||||
ConditionText: canonicalText,
|
IsDay: isDay,
|
||||||
IsDay: isDay,
|
TextDescription: rawDesc,
|
||||||
|
|
||||||
ProviderRawDescription: rawDesc,
|
|
||||||
|
|
||||||
// Human-facing legacy fields: populate with canonical text for consistency.
|
|
||||||
TextDescription: canonicalText,
|
|
||||||
IconURL: iconURL,
|
|
||||||
|
|
||||||
TemperatureC: &tempC,
|
TemperatureC: &tempC,
|
||||||
ApparentTemperatureC: apparentC,
|
ApparentTemperatureC: apparentC,
|
||||||
@@ -128,8 +119,7 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
|
|||||||
WindSpeedKmh: &wsKmh,
|
WindSpeedKmh: &wsKmh,
|
||||||
WindGustKmh: wgKmh,
|
WindGustKmh: wgKmh,
|
||||||
|
|
||||||
BarometricPressurePa: &surfacePa,
|
BarometricPressurePa: barometricPa,
|
||||||
SeaLevelPressurePa: seaLevelPa,
|
|
||||||
VisibilityMeters: visM,
|
VisibilityMeters: visM,
|
||||||
|
|
||||||
RelativeHumidityPercent: &rh,
|
RelativeHumidityPercent: &rh,
|
||||||
|
|||||||
58
internal/normalizers/openweather/observation_test.go
Normal file
58
internal/normalizers/openweather/observation_test.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package openweather
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestBuildObservationTextDescriptionAndPressurePrecedence(t *testing.T) {
|
||||||
|
seaLevel := 1018.0
|
||||||
|
|
||||||
|
parsed := owmResponse{}
|
||||||
|
parsed.Dt = 1710000000
|
||||||
|
parsed.Main.Temp = 20.0
|
||||||
|
parsed.Main.Humidity = 45.0
|
||||||
|
parsed.Main.Pressure = 1000.0
|
||||||
|
parsed.Main.SeaLevel = &seaLevel
|
||||||
|
parsed.Wind.Speed = 3.0
|
||||||
|
parsed.Weather = []owmWeather{
|
||||||
|
{ID: 801, Main: "Clouds", Description: "few clouds", Icon: "02d"},
|
||||||
|
}
|
||||||
|
|
||||||
|
obs, _, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildObservation() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.TextDescription != "few clouds" {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", obs.TextDescription, "few clouds")
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.BarometricPressurePa == nil {
|
||||||
|
t.Fatalf("BarometricPressurePa = nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got, want := *obs.BarometricPressurePa, 101800.0; got != want {
|
||||||
|
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildObservationPressureFallbackToSurface(t *testing.T) {
|
||||||
|
parsed := owmResponse{}
|
||||||
|
parsed.Dt = 1710000000
|
||||||
|
parsed.Main.Temp = 20.0
|
||||||
|
parsed.Main.Humidity = 45.0
|
||||||
|
parsed.Main.Pressure = 1001.0
|
||||||
|
parsed.Wind.Speed = 3.0
|
||||||
|
parsed.Weather = []owmWeather{
|
||||||
|
{ID: 800, Description: "clear sky", Icon: "01d"},
|
||||||
|
}
|
||||||
|
|
||||||
|
obs, _, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildObservation() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if obs.BarometricPressurePa == nil {
|
||||||
|
t.Fatalf("BarometricPressurePa = nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got, want := *obs.BarometricPressurePa, 100100.0; got != want {
|
||||||
|
t.Fatalf("BarometricPressurePa = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,15 +2,15 @@
|
|||||||
package openweather
|
package openweather
|
||||||
|
|
||||||
import (
|
import (
|
||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register registers OpenWeather normalizers into the provided registry.
|
// Register appends OpenWeather normalizers in stable order.
|
||||||
func Register(reg *fknormalize.Registry) {
|
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
if reg == nil {
|
out := in
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Observations
|
// Observations
|
||||||
reg.Register(ObservationNormalizer{})
|
out = append(out, ObservationNormalizer{})
|
||||||
|
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|||||||
192
internal/sinks/postgres/doc.go
Normal file
192
internal/sinks/postgres/doc.go
Normal file
@@ -0,0 +1,192 @@
|
|||||||
|
// Package postgres documents weatherfeeder's PostgreSQL sink contract for
|
||||||
|
// downstream SQL consumers.
|
||||||
|
//
|
||||||
|
// This package wires weatherfeeder canonical events into normalized relational
|
||||||
|
// tables. Downstream consumers can reconstruct the same canonical JSON objects
|
||||||
|
// that were written by joining parent/child tables as described below.
|
||||||
|
//
|
||||||
|
// Canonical input schemas:
|
||||||
|
// - weather.observation.v1 -> model.WeatherObservation
|
||||||
|
// - weather.forecast.v1 -> model.WeatherForecastRun
|
||||||
|
// - weather.alert.v1 -> model.WeatherAlertRun
|
||||||
|
//
|
||||||
|
// Parent/child relationships:
|
||||||
|
// - observations.event_id -> observation_present_weather.event_id
|
||||||
|
// - forecasts.event_id -> forecast_periods.run_event_id
|
||||||
|
// - alert_runs.event_id -> alerts.run_event_id
|
||||||
|
// - alerts.(run_event_id, alert_index) -> alert_references.(run_event_id, alert_index)
|
||||||
|
//
|
||||||
|
// Dedupe and retention behavior:
|
||||||
|
// - Parent primary keys (event_id): observations, forecasts, alert_runs.
|
||||||
|
// - Child primary keys use positional indexes to preserve payload order.
|
||||||
|
// - Prune columns:
|
||||||
|
// - observations.observed_at
|
||||||
|
// - observation_present_weather.observed_at
|
||||||
|
// - forecasts.issued_at
|
||||||
|
// - forecast_periods.issued_at
|
||||||
|
// - alert_runs.as_of
|
||||||
|
// - alerts.as_of
|
||||||
|
// - alert_references.as_of
|
||||||
|
//
|
||||||
|
// Envelope field mapping (shared parent columns)
|
||||||
|
//
|
||||||
|
// These columns exist on observations, forecasts, and alert_runs:
|
||||||
|
// - event_id TEXT -> event.id
|
||||||
|
// - event_kind TEXT -> event.kind
|
||||||
|
// - event_source TEXT -> event.source
|
||||||
|
// - event_schema TEXT -> event.schema
|
||||||
|
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
|
||||||
|
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
|
||||||
|
//
|
||||||
|
// Table contract
|
||||||
|
//
|
||||||
|
// 1. observations (PK: event_id)
|
||||||
|
//
|
||||||
|
// - event_id TEXT -> event.id
|
||||||
|
// - event_kind TEXT -> event.kind
|
||||||
|
// - event_source TEXT -> event.source
|
||||||
|
// - event_schema TEXT -> event.schema
|
||||||
|
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
|
||||||
|
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
|
||||||
|
// - station_id TEXT NULL -> payload.stationId
|
||||||
|
// - station_name TEXT NULL -> payload.stationName
|
||||||
|
// - observed_at TIMESTAMPTZ -> payload.timestamp
|
||||||
|
// - condition_code INTEGER -> payload.conditionCode
|
||||||
|
// - is_day BOOLEAN NULL -> payload.isDay
|
||||||
|
// - text_description TEXT NULL -> payload.textDescription
|
||||||
|
// - temperature_c DOUBLE PRECISION NULL -> payload.temperatureC
|
||||||
|
// - dewpoint_c DOUBLE PRECISION NULL -> payload.dewpointC
|
||||||
|
// - wind_direction_degrees DOUBLE PRECISION NULL -> payload.windDirectionDegrees
|
||||||
|
// - wind_speed_kmh DOUBLE PRECISION NULL -> payload.windSpeedKmh
|
||||||
|
// - wind_gust_kmh DOUBLE PRECISION NULL -> payload.windGustKmh
|
||||||
|
// - barometric_pressure_pa DOUBLE PRECISION NULL -> payload.barometricPressurePa
|
||||||
|
// - visibility_meters DOUBLE PRECISION NULL -> payload.visibilityMeters
|
||||||
|
// - relative_humidity_percent DOUBLE PRECISION NULL -> payload.relativeHumidityPercent
|
||||||
|
// - apparent_temperature_c DOUBLE PRECISION NULL -> payload.apparentTemperatureC
|
||||||
|
//
|
||||||
|
// 2. observation_present_weather (PK: event_id, weather_index)
|
||||||
|
//
|
||||||
|
// - event_id TEXT -> observations.event_id / payload.presentWeather[i]
|
||||||
|
// - weather_index INTEGER -> i (array position in payload.presentWeather)
|
||||||
|
// - observed_at TIMESTAMPTZ -> payload.timestamp
|
||||||
|
// - raw_text TEXT NULL -> JSON-encoded payload.presentWeather[i].raw
|
||||||
|
//
|
||||||
|
// Note: raw_text stores compact JSON text. Consumers that need the original
|
||||||
|
// object should parse raw_text as JSON.
|
||||||
|
//
|
||||||
|
// 3. forecasts (PK: event_id)
|
||||||
|
//
|
||||||
|
// - event_id TEXT -> event.id
|
||||||
|
// - event_kind TEXT -> event.kind
|
||||||
|
// - event_source TEXT -> event.source
|
||||||
|
// - event_schema TEXT -> event.schema
|
||||||
|
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
|
||||||
|
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
|
||||||
|
// - location_id TEXT NULL -> payload.locationId
|
||||||
|
// - location_name TEXT NULL -> payload.locationName
|
||||||
|
// - issued_at TIMESTAMPTZ -> payload.issuedAt
|
||||||
|
// - updated_at TIMESTAMPTZ NULL -> payload.updatedAt
|
||||||
|
// - product TEXT -> payload.product
|
||||||
|
// - latitude DOUBLE PRECISION NULL -> payload.latitude
|
||||||
|
// - longitude DOUBLE PRECISION NULL -> payload.longitude
|
||||||
|
// - elevation_meters DOUBLE PRECISION NULL -> payload.elevationMeters
|
||||||
|
// - period_count INTEGER -> len(payload.periods)
|
||||||
|
//
|
||||||
|
// 4. forecast_periods (PK: run_event_id, period_index)
|
||||||
|
//
|
||||||
|
// - run_event_id TEXT -> forecasts.event_id / payload.periods[i]
|
||||||
|
// - period_index INTEGER -> i (array position in payload.periods)
|
||||||
|
// - issued_at TIMESTAMPTZ -> payload.issuedAt (copied from parent)
|
||||||
|
// - start_time TIMESTAMPTZ -> payload.periods[i].startTime
|
||||||
|
// - end_time TIMESTAMPTZ -> payload.periods[i].endTime
|
||||||
|
// - name TEXT NULL -> payload.periods[i].name
|
||||||
|
// - is_day BOOLEAN NULL -> payload.periods[i].isDay
|
||||||
|
// - condition_code INTEGER -> payload.periods[i].conditionCode
|
||||||
|
// - condition_text TEXT NULL -> payload.periods[i].conditionText
|
||||||
|
// - provider_raw_description TEXT NULL -> payload.periods[i].providerRawDescription
|
||||||
|
// - text_description TEXT NULL -> payload.periods[i].textDescription
|
||||||
|
// - detailed_text TEXT NULL -> payload.periods[i].detailedText
|
||||||
|
// - icon_url TEXT NULL -> payload.periods[i].iconUrl
|
||||||
|
// - temperature_c DOUBLE PRECISION NULL -> payload.periods[i].temperatureC
|
||||||
|
// - temperature_c_min DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMin
|
||||||
|
// - temperature_c_max DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMax
|
||||||
|
// - dewpoint_c DOUBLE PRECISION NULL -> payload.periods[i].dewpointC
|
||||||
|
// - relative_humidity_percent DOUBLE PRECISION NULL -> payload.periods[i].relativeHumidityPercent
|
||||||
|
// - wind_direction_degrees DOUBLE PRECISION NULL -> payload.periods[i].windDirectionDegrees
|
||||||
|
// - wind_speed_kmh DOUBLE PRECISION NULL -> payload.periods[i].windSpeedKmh
|
||||||
|
// - wind_gust_kmh DOUBLE PRECISION NULL -> payload.periods[i].windGustKmh
|
||||||
|
// - barometric_pressure_pa DOUBLE PRECISION NULL -> payload.periods[i].barometricPressurePa
|
||||||
|
// - visibility_meters DOUBLE PRECISION NULL -> payload.periods[i].visibilityMeters
|
||||||
|
// - apparent_temperature_c DOUBLE PRECISION NULL -> payload.periods[i].apparentTemperatureC
|
||||||
|
// - cloud_cover_percent DOUBLE PRECISION NULL -> payload.periods[i].cloudCoverPercent
|
||||||
|
// - probability_of_precipitation_percent DOUBLE PRECISION NULL -> payload.periods[i].probabilityOfPrecipitationPercent
|
||||||
|
// - precipitation_amount_mm DOUBLE PRECISION NULL -> payload.periods[i].precipitationAmountMm
|
||||||
|
// - snowfall_depth_mm DOUBLE PRECISION NULL -> payload.periods[i].snowfallDepthMm
|
||||||
|
// - uv_index DOUBLE PRECISION NULL -> payload.periods[i].uvIndex
|
||||||
|
//
|
||||||
|
// 5. alert_runs (PK: event_id)
|
||||||
|
//
|
||||||
|
// - event_id TEXT -> event.id
|
||||||
|
// - event_kind TEXT -> event.kind
|
||||||
|
// - event_source TEXT -> event.source
|
||||||
|
// - event_schema TEXT -> event.schema
|
||||||
|
// - event_emitted_at TIMESTAMPTZ -> event.emitted_at
|
||||||
|
// - event_effective_at TIMESTAMPTZ NULL -> event.effective_at
|
||||||
|
// - location_id TEXT NULL -> payload.locationId
|
||||||
|
// - location_name TEXT NULL -> payload.locationName
|
||||||
|
// - as_of TIMESTAMPTZ -> payload.asOf
|
||||||
|
// - latitude DOUBLE PRECISION NULL -> payload.latitude
|
||||||
|
// - longitude DOUBLE PRECISION NULL -> payload.longitude
|
||||||
|
// - alert_count INTEGER -> len(payload.alerts)
|
||||||
|
//
|
||||||
|
// 6. alerts (PK: run_event_id, alert_index)
|
||||||
|
//
|
||||||
|
// - run_event_id TEXT -> alert_runs.event_id / payload.alerts[i]
|
||||||
|
// - alert_index INTEGER -> i (array position in payload.alerts)
|
||||||
|
// - as_of TIMESTAMPTZ -> payload.asOf (copied from parent)
|
||||||
|
// - alert_id TEXT -> payload.alerts[i].id
|
||||||
|
// - event TEXT NULL -> payload.alerts[i].event
|
||||||
|
// - headline TEXT NULL -> payload.alerts[i].headline
|
||||||
|
// - severity TEXT NULL -> payload.alerts[i].severity
|
||||||
|
// - urgency TEXT NULL -> payload.alerts[i].urgency
|
||||||
|
// - certainty TEXT NULL -> payload.alerts[i].certainty
|
||||||
|
// - status TEXT NULL -> payload.alerts[i].status
|
||||||
|
// - message_type TEXT NULL -> payload.alerts[i].messageType
|
||||||
|
// - category TEXT NULL -> payload.alerts[i].category
|
||||||
|
// - response TEXT NULL -> payload.alerts[i].response
|
||||||
|
// - description TEXT NULL -> payload.alerts[i].description
|
||||||
|
// - instruction TEXT NULL -> payload.alerts[i].instruction
|
||||||
|
// - sent TIMESTAMPTZ NULL -> payload.alerts[i].sent
|
||||||
|
// - effective TIMESTAMPTZ NULL -> payload.alerts[i].effective
|
||||||
|
// - onset TIMESTAMPTZ NULL -> payload.alerts[i].onset
|
||||||
|
// - expires TIMESTAMPTZ NULL -> payload.alerts[i].expires
|
||||||
|
// - area_description TEXT NULL -> payload.alerts[i].areaDescription
|
||||||
|
// - sender_name TEXT NULL -> payload.alerts[i].senderName
|
||||||
|
// - reference_count INTEGER -> len(payload.alerts[i].references)
|
||||||
|
//
|
||||||
|
// 7. alert_references (PK: run_event_id, alert_index, reference_index)
|
||||||
|
//
|
||||||
|
// - run_event_id TEXT -> alert_runs.event_id / payload.alerts[i].references[j]
|
||||||
|
// - alert_index INTEGER -> i (array position in payload.alerts)
|
||||||
|
// - reference_index INTEGER -> j (array position in payload.alerts[i].references)
|
||||||
|
// - as_of TIMESTAMPTZ -> payload.asOf (copied from parent)
|
||||||
|
// - id TEXT NULL -> payload.alerts[i].references[j].id
|
||||||
|
// - identifier TEXT NULL -> payload.alerts[i].references[j].identifier
|
||||||
|
// - sender TEXT NULL -> payload.alerts[i].references[j].sender
|
||||||
|
// - sent TIMESTAMPTZ NULL -> payload.alerts[i].references[j].sent
|
||||||
|
//
|
||||||
|
// Reconstructing canonical JSON payloads
|
||||||
|
//
|
||||||
|
// - WeatherObservation:
|
||||||
|
// read one row from observations, then join child rows by event_id ordered by
|
||||||
|
// weather_index to rebuild presentWeather arrays.
|
||||||
|
//
|
||||||
|
// - WeatherForecastRun:
|
||||||
|
// read one row from forecasts, then join forecast_periods by run_event_id
|
||||||
|
// ordered by period_index to rebuild periods.
|
||||||
|
//
|
||||||
|
// - WeatherAlertRun:
|
||||||
|
// read one row from alert_runs, join alerts by run_event_id ordered by
|
||||||
|
// alert_index, then join alert_references by (run_event_id, alert_index)
|
||||||
|
// ordered by reference_index to rebuild references per alert.
|
||||||
|
package postgres
|
||||||
324
internal/sinks/postgres/map.go
Normal file
324
internal/sinks/postgres/map.go
Normal file
@@ -0,0 +1,324 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||||
|
schema := strings.TrimSpace(e.Schema)
|
||||||
|
switch schema {
|
||||||
|
case standards.SchemaWeatherObservationV1:
|
||||||
|
return mapObservationEvent(e)
|
||||||
|
case standards.SchemaWeatherForecastV1:
|
||||||
|
return mapForecastEvent(e)
|
||||||
|
case standards.SchemaWeatherAlertV1:
|
||||||
|
return mapAlertEvent(e)
|
||||||
|
default:
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||||
|
obs, err := decodePayload[model.WeatherObservation](e.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode observation payload: %w", err)
|
||||||
|
}
|
||||||
|
if obs.Timestamp.IsZero() {
|
||||||
|
return nil, fmt.Errorf("decode observation payload: timestamp is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
observedAt := obs.Timestamp.UTC()
|
||||||
|
writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.PresentWeather))
|
||||||
|
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableObservations,
|
||||||
|
Values: map[string]any{
|
||||||
|
"event_id": e.ID,
|
||||||
|
"event_kind": string(e.Kind),
|
||||||
|
"event_source": e.Source,
|
||||||
|
"event_schema": e.Schema,
|
||||||
|
"event_emitted_at": e.EmittedAt.UTC(),
|
||||||
|
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||||
|
"station_id": nullableString(obs.StationID),
|
||||||
|
"station_name": nullableString(obs.StationName),
|
||||||
|
"observed_at": observedAt,
|
||||||
|
"condition_code": int(obs.ConditionCode),
|
||||||
|
"is_day": nullableBool(obs.IsDay),
|
||||||
|
"text_description": nullableString(obs.TextDescription),
|
||||||
|
"temperature_c": nullableFloat64(obs.TemperatureC),
|
||||||
|
"dewpoint_c": nullableFloat64(obs.DewpointC),
|
||||||
|
"wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees),
|
||||||
|
"wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh),
|
||||||
|
"wind_gust_kmh": nullableFloat64(obs.WindGustKmh),
|
||||||
|
"barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa),
|
||||||
|
"visibility_meters": nullableFloat64(obs.VisibilityMeters),
|
||||||
|
"relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent),
|
||||||
|
"apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for i, pw := range obs.PresentWeather {
|
||||||
|
rawText, err := compactJSONText(pw.Raw)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("observation presentWeather[%d].raw: %w", i, err)
|
||||||
|
}
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableObservationPresentWeather,
|
||||||
|
Values: map[string]any{
|
||||||
|
"event_id": e.ID,
|
||||||
|
"weather_index": i,
|
||||||
|
"observed_at": observedAt,
|
||||||
|
"raw_text": rawText,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return writes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||||
|
run, err := decodePayload[model.WeatherForecastRun](e.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode forecast payload: %w", err)
|
||||||
|
}
|
||||||
|
if run.IssuedAt.IsZero() {
|
||||||
|
return nil, fmt.Errorf("decode forecast payload: issuedAt is required")
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(string(run.Product)) == "" {
|
||||||
|
return nil, fmt.Errorf("decode forecast payload: product is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
issuedAt := run.IssuedAt.UTC()
|
||||||
|
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Periods))
|
||||||
|
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableForecasts,
|
||||||
|
Values: map[string]any{
|
||||||
|
"event_id": e.ID,
|
||||||
|
"event_kind": string(e.Kind),
|
||||||
|
"event_source": e.Source,
|
||||||
|
"event_schema": e.Schema,
|
||||||
|
"event_emitted_at": e.EmittedAt.UTC(),
|
||||||
|
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||||
|
"location_id": nullableString(run.LocationID),
|
||||||
|
"location_name": nullableString(run.LocationName),
|
||||||
|
"issued_at": issuedAt,
|
||||||
|
"updated_at": nullableTime(run.UpdatedAt),
|
||||||
|
"product": string(run.Product),
|
||||||
|
"latitude": nullableFloat64(run.Latitude),
|
||||||
|
"longitude": nullableFloat64(run.Longitude),
|
||||||
|
"elevation_meters": nullableFloat64(run.ElevationMeters),
|
||||||
|
"period_count": len(run.Periods),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for i, p := range run.Periods {
|
||||||
|
if p.StartTime.IsZero() || p.EndTime.IsZero() {
|
||||||
|
return nil, fmt.Errorf("decode forecast payload: periods[%d] startTime/endTime are required", i)
|
||||||
|
}
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableForecastPeriods,
|
||||||
|
Values: map[string]any{
|
||||||
|
"run_event_id": e.ID,
|
||||||
|
"period_index": i,
|
||||||
|
"issued_at": issuedAt,
|
||||||
|
"start_time": p.StartTime.UTC(),
|
||||||
|
"end_time": p.EndTime.UTC(),
|
||||||
|
"name": nullableString(p.Name),
|
||||||
|
"is_day": nullableBool(p.IsDay),
|
||||||
|
"condition_code": int(p.ConditionCode),
|
||||||
|
"condition_text": nullableString(p.ConditionText),
|
||||||
|
"provider_raw_description": nullableString(p.ProviderRawDescription),
|
||||||
|
"text_description": nullableString(p.TextDescription),
|
||||||
|
"detailed_text": nullableString(p.DetailedText),
|
||||||
|
"icon_url": nullableString(p.IconURL),
|
||||||
|
"temperature_c": nullableFloat64(p.TemperatureC),
|
||||||
|
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
|
||||||
|
"temperature_c_max": nullableFloat64(p.TemperatureCMax),
|
||||||
|
"dewpoint_c": nullableFloat64(p.DewpointC),
|
||||||
|
"relative_humidity_percent": nullableFloat64(p.RelativeHumidityPercent),
|
||||||
|
"wind_direction_degrees": nullableFloat64(p.WindDirectionDegrees),
|
||||||
|
"wind_speed_kmh": nullableFloat64(p.WindSpeedKmh),
|
||||||
|
"wind_gust_kmh": nullableFloat64(p.WindGustKmh),
|
||||||
|
"barometric_pressure_pa": nullableFloat64(p.BarometricPressurePa),
|
||||||
|
"visibility_meters": nullableFloat64(p.VisibilityMeters),
|
||||||
|
"apparent_temperature_c": nullableFloat64(p.ApparentTemperatureC),
|
||||||
|
"cloud_cover_percent": nullableFloat64(p.CloudCoverPercent),
|
||||||
|
"probability_of_precipitation_percent": nullableFloat64(p.ProbabilityOfPrecipitationPercent),
|
||||||
|
"precipitation_amount_mm": nullableFloat64(p.PrecipitationAmountMm),
|
||||||
|
"snowfall_depth_mm": nullableFloat64(p.SnowfallDepthMM),
|
||||||
|
"uv_index": nullableFloat64(p.UVIndex),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return writes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||||
|
run, err := decodePayload[model.WeatherAlertRun](e.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode alert payload: %w", err)
|
||||||
|
}
|
||||||
|
if run.AsOf.IsZero() {
|
||||||
|
return nil, fmt.Errorf("decode alert payload: asOf is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
asOf := run.AsOf.UTC()
|
||||||
|
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Alerts)+countAlertReferences(run.Alerts))
|
||||||
|
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableAlertRuns,
|
||||||
|
Values: map[string]any{
|
||||||
|
"event_id": e.ID,
|
||||||
|
"event_kind": string(e.Kind),
|
||||||
|
"event_source": e.Source,
|
||||||
|
"event_schema": e.Schema,
|
||||||
|
"event_emitted_at": e.EmittedAt.UTC(),
|
||||||
|
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||||
|
"location_id": nullableString(run.LocationID),
|
||||||
|
"location_name": nullableString(run.LocationName),
|
||||||
|
"as_of": asOf,
|
||||||
|
"latitude": nullableFloat64(run.Latitude),
|
||||||
|
"longitude": nullableFloat64(run.Longitude),
|
||||||
|
"alert_count": len(run.Alerts),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for i, a := range run.Alerts {
|
||||||
|
if strings.TrimSpace(a.ID) == "" {
|
||||||
|
return nil, fmt.Errorf("decode alert payload: alerts[%d].id is required", i)
|
||||||
|
}
|
||||||
|
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableAlerts,
|
||||||
|
Values: map[string]any{
|
||||||
|
"run_event_id": e.ID,
|
||||||
|
"alert_index": i,
|
||||||
|
"as_of": asOf,
|
||||||
|
"alert_id": a.ID,
|
||||||
|
"event": nullableString(a.Event),
|
||||||
|
"headline": nullableString(a.Headline),
|
||||||
|
"severity": nullableString(a.Severity),
|
||||||
|
"urgency": nullableString(a.Urgency),
|
||||||
|
"certainty": nullableString(a.Certainty),
|
||||||
|
"status": nullableString(a.Status),
|
||||||
|
"message_type": nullableString(a.MessageType),
|
||||||
|
"category": nullableString(a.Category),
|
||||||
|
"response": nullableString(a.Response),
|
||||||
|
"description": nullableString(a.Description),
|
||||||
|
"instruction": nullableString(a.Instruction),
|
||||||
|
"sent": nullableTime(a.Sent),
|
||||||
|
"effective": nullableTime(a.Effective),
|
||||||
|
"onset": nullableTime(a.Onset),
|
||||||
|
"expires": nullableTime(a.Expires),
|
||||||
|
"area_description": nullableString(a.AreaDescription),
|
||||||
|
"sender_name": nullableString(a.SenderName),
|
||||||
|
"reference_count": len(a.References),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for j, ref := range a.References {
|
||||||
|
writes = append(writes, fksinks.PostgresWrite{
|
||||||
|
Table: tableAlertReferences,
|
||||||
|
Values: map[string]any{
|
||||||
|
"run_event_id": e.ID,
|
||||||
|
"alert_index": i,
|
||||||
|
"reference_index": j,
|
||||||
|
"as_of": asOf,
|
||||||
|
"id": nullableString(ref.ID),
|
||||||
|
"identifier": nullableString(ref.Identifier),
|
||||||
|
"sender": nullableString(ref.Sender),
|
||||||
|
"sent": nullableTime(ref.Sent),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return writes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodePayload[T any](payload any) (T, error) {
|
||||||
|
var out T
|
||||||
|
if payload == nil {
|
||||||
|
return out, fmt.Errorf("payload is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if typed, ok := payload.(T); ok {
|
||||||
|
return typed, nil
|
||||||
|
}
|
||||||
|
if ptr, ok := payload.(*T); ok {
|
||||||
|
if ptr == nil {
|
||||||
|
return out, fmt.Errorf("payload pointer is nil")
|
||||||
|
}
|
||||||
|
return *ptr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return out, fmt.Errorf("marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(b, &out); err != nil {
|
||||||
|
return out, fmt.Errorf("unmarshal payload: %w", err)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func countAlertReferences(alerts []model.WeatherAlert) int {
|
||||||
|
total := 0
|
||||||
|
for _, a := range alerts {
|
||||||
|
total += len(a.References)
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
func nullableString(s string) any {
|
||||||
|
if strings.TrimSpace(s) == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func nullableFloat64(v *float64) any {
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return *v
|
||||||
|
}
|
||||||
|
|
||||||
|
func nullableBool(v *bool) any {
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return *v
|
||||||
|
}
|
||||||
|
|
||||||
|
func nullableTime(v *time.Time) any {
|
||||||
|
if v == nil || v.IsZero() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return v.UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
func compactJSONText(v any) (any, error) {
|
||||||
|
if v == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if string(b) == "null" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return string(b), nil
|
||||||
|
}
|
||||||
248
internal/sinks/postgres/map_test.go
Normal file
248
internal/sinks/postgres/map_test.go
Normal file
@@ -0,0 +1,248 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMapPostgresEventObservationStructPayload(t *testing.T) {
|
||||||
|
isDay := true
|
||||||
|
temp := 21.5
|
||||||
|
obs := model.WeatherObservation{
|
||||||
|
StationID: "KSTL",
|
||||||
|
StationName: "St. Louis",
|
||||||
|
Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
||||||
|
ConditionCode: model.WMOCode(1),
|
||||||
|
IsDay: &isDay,
|
||||||
|
TextDescription: "few clouds",
|
||||||
|
TemperatureC: &temp,
|
||||||
|
PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherObservationV1, "observation", obs))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(writes) != 2 {
|
||||||
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
|
||||||
|
}
|
||||||
|
if writes[0].Table != tableObservations {
|
||||||
|
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations)
|
||||||
|
}
|
||||||
|
if got := writes[0].Values["station_id"]; got != "KSTL" {
|
||||||
|
t.Fatalf("observations station_id = %#v, want KSTL", got)
|
||||||
|
}
|
||||||
|
if writes[1].Table != tableObservationPresentWeather {
|
||||||
|
t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationPresentWeather)
|
||||||
|
}
|
||||||
|
if got := writes[1].Values["raw_text"]; got != `{"a":1,"b":"x"}` {
|
||||||
|
t.Fatalf("present_weather raw_text = %#v, want compact JSON", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
assertAllWritesIncludeAllColumns(t, writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapPostgresEventForecastStructPayload(t *testing.T) {
|
||||||
|
isDay := true
|
||||||
|
temp := 10.5
|
||||||
|
run := model.WeatherForecastRun{
|
||||||
|
LocationID: "LOC-1",
|
||||||
|
LocationName: "St. Louis",
|
||||||
|
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||||
|
Product: model.ForecastProductHourly,
|
||||||
|
Periods: []model.WeatherForecastPeriod{
|
||||||
|
{
|
||||||
|
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
||||||
|
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||||
|
IsDay: &isDay,
|
||||||
|
ConditionCode: model.WMOCode(2),
|
||||||
|
ConditionText: "Partly Cloudy",
|
||||||
|
TemperatureC: &temp,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StartTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||||
|
EndTime: time.Date(2026, 3, 16, 21, 0, 0, 0, time.UTC),
|
||||||
|
ConditionCode: model.WMOCode(3),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", run))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(writes) != 3 {
|
||||||
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
|
||||||
|
}
|
||||||
|
|
||||||
|
if writes[0].Table != tableForecasts {
|
||||||
|
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
|
||||||
|
}
|
||||||
|
if got := writes[0].Values["period_count"]; got != 2 {
|
||||||
|
t.Fatalf("forecasts period_count = %#v, want 2", got)
|
||||||
|
}
|
||||||
|
if writes[1].Table != tableForecastPeriods || writes[2].Table != tableForecastPeriods {
|
||||||
|
t.Fatalf("forecast period writes not in expected order")
|
||||||
|
}
|
||||||
|
if got := writes[1].Values["period_index"]; got != 0 {
|
||||||
|
t.Fatalf("first period index = %#v, want 0", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
assertAllWritesIncludeAllColumns(t, writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapPostgresEventAlertStructPayload(t *testing.T) {
|
||||||
|
sent := time.Date(2026, 3, 16, 17, 0, 0, 0, time.UTC)
|
||||||
|
run := model.WeatherAlertRun{
|
||||||
|
AsOf: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||||
|
Alerts: []model.WeatherAlert{
|
||||||
|
{
|
||||||
|
ID: "urn:alert:1",
|
||||||
|
Headline: "Winter Weather Advisory",
|
||||||
|
Severity: "Moderate",
|
||||||
|
References: []model.AlertReference{
|
||||||
|
{ID: "urn:ref:1", Sent: &sent},
|
||||||
|
{Identifier: "ref-two"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "urn:alert:2",
|
||||||
|
Headline: "Second alert",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherAlertV1, "alert", run))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(writes) != 5 {
|
||||||
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 5", len(writes))
|
||||||
|
}
|
||||||
|
|
||||||
|
counts := map[string]int{}
|
||||||
|
for _, w := range writes {
|
||||||
|
counts[w.Table]++
|
||||||
|
}
|
||||||
|
if counts[tableAlertRuns] != 1 || counts[tableAlerts] != 2 || counts[tableAlertReferences] != 2 {
|
||||||
|
t.Fatalf("unexpected table write counts: %#v", counts)
|
||||||
|
}
|
||||||
|
|
||||||
|
firstAlert, ok := firstWriteForTable(writes, tableAlerts)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("missing alerts write")
|
||||||
|
}
|
||||||
|
if got := firstAlert.Values["reference_count"]; got != 2 {
|
||||||
|
t.Fatalf("alerts reference_count = %#v, want 2", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
assertAllWritesIncludeAllColumns(t, writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapPostgresEventMapPayload(t *testing.T) {
|
||||||
|
run := model.WeatherForecastRun{
|
||||||
|
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||||
|
Product: model.ForecastProductHourly,
|
||||||
|
Periods: []model.WeatherForecastPeriod{
|
||||||
|
{
|
||||||
|
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
||||||
|
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||||
|
ConditionCode: model.WMOCode(2),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(run)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("json.Marshal() error = %v", err)
|
||||||
|
}
|
||||||
|
var payload map[string]any
|
||||||
|
if err := json.Unmarshal(b, &payload); err != nil {
|
||||||
|
t.Fatalf("json.Unmarshal() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", payload))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(writes) != 2 {
|
||||||
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
|
||||||
|
}
|
||||||
|
if writes[0].Table != tableForecasts {
|
||||||
|
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
|
||||||
|
}
|
||||||
|
|
||||||
|
assertAllWritesIncludeAllColumns(t, writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapPostgresEventUnknownSchemaNoOp(t *testing.T) {
|
||||||
|
writes, err := mapPostgresEvent(context.Background(), testEvent("weather.unknown.v1", "observation", map[string]any{"x": 1}))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(writes) != 0 {
|
||||||
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 0", len(writes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMapPostgresEventMalformedPayload(t *testing.T) {
|
||||||
|
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", "bad"))
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "decode forecast payload") {
|
||||||
|
t.Fatalf("error = %q, want decode forecast payload context", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
|
||||||
|
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
|
||||||
|
return fkevent.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
Kind: kind,
|
||||||
|
Source: "test-source",
|
||||||
|
Schema: schema,
|
||||||
|
EmittedAt: time.Date(2026, 3, 16, 18, 31, 0, 0, time.UTC),
|
||||||
|
EffectiveAt: &effectiveAt,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func firstWriteForTable(writes []fksinks.PostgresWrite, table string) (fksinks.PostgresWrite, bool) {
|
||||||
|
for _, w := range writes {
|
||||||
|
if w.Table == table {
|
||||||
|
return w, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fksinks.PostgresWrite{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWrite) {
|
||||||
|
t.Helper()
|
||||||
|
colCounts := tableColumnCounts()
|
||||||
|
for i, w := range writes {
|
||||||
|
expectedCount, ok := colCounts[w.Table]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("writes[%d] references unknown table %q", i, w.Table)
|
||||||
|
}
|
||||||
|
if len(w.Values) != expectedCount {
|
||||||
|
t.Fatalf("writes[%d] table=%q has %d values, want %d", i, w.Table, len(w.Values), expectedCount)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func tableColumnCounts() map[string]int {
|
||||||
|
s := weatherPostgresSchema()
|
||||||
|
m := make(map[string]int, len(s.Tables))
|
||||||
|
for _, tbl := range s.Tables {
|
||||||
|
m[tbl.Name] = len(tbl.Columns)
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
242
internal/sinks/postgres/schema.go
Normal file
242
internal/sinks/postgres/schema.go
Normal file
@@ -0,0 +1,242 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
tableObservations = "observations"
|
||||||
|
tableObservationPresentWeather = "observation_present_weather"
|
||||||
|
tableForecasts = "forecasts"
|
||||||
|
tableForecastPeriods = "forecast_periods"
|
||||||
|
tableAlertRuns = "alert_runs"
|
||||||
|
tableAlerts = "alerts"
|
||||||
|
tableAlertReferences = "alert_references"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each
|
||||||
|
// configured sink using driver=postgres.
|
||||||
|
func RegisterPostgresSchemas(cfg *config.Config) error {
|
||||||
|
if cfg == nil {
|
||||||
|
return fmt.Errorf("register postgres schemas: config is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
schema := weatherPostgresSchema()
|
||||||
|
for i, sk := range cfg.Sinks {
|
||||||
|
if !isPostgresDriver(sk.Driver) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil {
|
||||||
|
return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isPostgresDriver(driver string) bool {
|
||||||
|
return strings.EqualFold(strings.TrimSpace(driver), "postgres")
|
||||||
|
}
|
||||||
|
|
||||||
|
func weatherPostgresSchema() fksinks.PostgresSchema {
|
||||||
|
return fksinks.PostgresSchema{
|
||||||
|
Tables: []fksinks.PostgresTable{
|
||||||
|
{
|
||||||
|
Name: tableObservations,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "station_id", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "station_name", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "condition_code", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
|
||||||
|
{Name: "text_description", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"event_id"},
|
||||||
|
PruneColumn: "observed_at",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_obs_station_observed_at", Columns: []string{"station_id", "observed_at"}},
|
||||||
|
{Name: "idx_wf_obs_observed_at", Columns: []string{"observed_at"}},
|
||||||
|
{Name: "idx_wf_obs_condition_code", Columns: []string{"condition_code"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: tableObservationPresentWeather,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false},
|
||||||
|
{Name: "weather_index", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "raw_text", Type: "TEXT", Nullable: true},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"event_id", "weather_index"},
|
||||||
|
PruneColumn: "observed_at",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_obs_present_observed_at", Columns: []string{"observed_at"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: tableForecasts,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "location_id", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "location_name", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "product", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "period_count", Type: "INTEGER", Nullable: false},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"event_id"},
|
||||||
|
PruneColumn: "issued_at",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_fc_location_product_issued_at", Columns: []string{"location_id", "product", "issued_at"}},
|
||||||
|
{Name: "idx_wf_fc_issued_at", Columns: []string{"issued_at"}},
|
||||||
|
{Name: "idx_wf_fc_product_issued_at", Columns: []string{"product", "issued_at"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: tableForecastPeriods,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "run_event_id", Type: "TEXT REFERENCES forecasts(event_id) ON DELETE CASCADE", Nullable: false},
|
||||||
|
{Name: "period_index", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "start_time", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "end_time", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "name", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
|
||||||
|
{Name: "condition_code", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "condition_text", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "text_description", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "detailed_text", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "icon_url", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "cloud_cover_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "probability_of_precipitation_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "precipitation_amount_mm", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "snowfall_depth_mm", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "uv_index", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"run_event_id", "period_index"},
|
||||||
|
PruneColumn: "issued_at",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_fc_period_start_time", Columns: []string{"start_time"}},
|
||||||
|
{Name: "idx_wf_fc_period_end_time", Columns: []string{"end_time"}},
|
||||||
|
{Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: tableAlertRuns,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "location_id", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "location_name", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
{Name: "alert_count", Type: "INTEGER", Nullable: false},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"event_id"},
|
||||||
|
PruneColumn: "as_of",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_alert_run_location_as_of", Columns: []string{"location_id", "as_of"}},
|
||||||
|
{Name: "idx_wf_alert_run_as_of", Columns: []string{"as_of"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: tableAlerts,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false},
|
||||||
|
{Name: "alert_index", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "alert_id", Type: "TEXT", Nullable: false},
|
||||||
|
{Name: "event", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "headline", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "severity", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "urgency", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "certainty", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "status", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "message_type", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "category", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "response", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "description", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "instruction", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "sent", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "effective", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "onset", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "expires", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
{Name: "area_description", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "sender_name", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "reference_count", Type: "INTEGER", Nullable: false},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"run_event_id", "alert_index"},
|
||||||
|
PruneColumn: "as_of",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_alerts_alert_id", Columns: []string{"alert_id"}},
|
||||||
|
{Name: "idx_wf_alerts_severity_expires", Columns: []string{"severity", "expires"}},
|
||||||
|
{Name: "idx_wf_alerts_as_of", Columns: []string{"as_of"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: tableAlertReferences,
|
||||||
|
Columns: []fksinks.PostgresColumn{
|
||||||
|
{Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false},
|
||||||
|
{Name: "alert_index", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "reference_index", Type: "INTEGER", Nullable: false},
|
||||||
|
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
|
||||||
|
{Name: "id", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "identifier", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "sender", Type: "TEXT", Nullable: true},
|
||||||
|
{Name: "sent", Type: "TIMESTAMPTZ", Nullable: true},
|
||||||
|
},
|
||||||
|
PrimaryKey: []string{"run_event_id", "alert_index", "reference_index"},
|
||||||
|
PruneColumn: "as_of",
|
||||||
|
Indexes: []fksinks.PostgresIndex{
|
||||||
|
{Name: "idx_wf_alert_refs_as_of", Columns: []string{"as_of"}},
|
||||||
|
{Name: "idx_wf_alert_refs_sent", Columns: []string{"sent"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
MapEvent: mapPostgresEvent,
|
||||||
|
}
|
||||||
|
}
|
||||||
95
internal/sinks/postgres/schema_test.go
Normal file
95
internal/sinks/postgres/schema_test.go
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
|
||||||
|
err := RegisterPostgresSchemas(nil)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "config is nil") {
|
||||||
|
t.Fatalf("error = %q, want config is nil", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
|
||||||
|
cfg := &config.Config{
|
||||||
|
Sinks: []config.SinkConfig{
|
||||||
|
{Name: "stdout_only", Driver: "stdout"},
|
||||||
|
{Name: "nats_only", Driver: "nats"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := RegisterPostgresSchemas(cfg); err != nil {
|
||||||
|
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
|
||||||
|
sinkName := uniqueSinkName("pg_test")
|
||||||
|
cfg := &config.Config{
|
||||||
|
Sinks: []config.SinkConfig{
|
||||||
|
{Name: sinkName, Driver: "postgres"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := RegisterPostgresSchemas(cfg); err != nil {
|
||||||
|
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := RegisterPostgresSchemas(cfg)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "already registered") {
|
||||||
|
t.Fatalf("error = %q, want already registered", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWeatherPostgresSchemaShape(t *testing.T) {
|
||||||
|
s := weatherPostgresSchema()
|
||||||
|
if s.MapEvent == nil {
|
||||||
|
t.Fatalf("weatherPostgresSchema().MapEvent is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
wantTables := map[string]bool{
|
||||||
|
tableObservations: true,
|
||||||
|
tableObservationPresentWeather: true,
|
||||||
|
tableForecasts: true,
|
||||||
|
tableForecastPeriods: true,
|
||||||
|
tableAlertRuns: true,
|
||||||
|
tableAlerts: true,
|
||||||
|
tableAlertReferences: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.Tables) != len(wantTables) {
|
||||||
|
t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
|
||||||
|
}
|
||||||
|
|
||||||
|
seenIndexes := map[string]bool{}
|
||||||
|
for _, tbl := range s.Tables {
|
||||||
|
if !wantTables[tbl.Name] {
|
||||||
|
t.Fatalf("unexpected table %q in schema", tbl.Name)
|
||||||
|
}
|
||||||
|
if tbl.PruneColumn == "" {
|
||||||
|
t.Fatalf("table %q missing prune column", tbl.Name)
|
||||||
|
}
|
||||||
|
for _, idx := range tbl.Indexes {
|
||||||
|
if seenIndexes[idx.Name] {
|
||||||
|
t.Fatalf("duplicate index name %q", idx.Name)
|
||||||
|
}
|
||||||
|
seenIndexes[idx.Name] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func uniqueSinkName(prefix string) string {
|
||||||
|
return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano())
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
// FILE: internal/model/alert.go
|
// FILE: model/alert.go
|
||||||
package model
|
package model
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// FILE: internal/model/doc.go
|
// FILE: model/doc.go
|
||||||
// Package model defines weatherfeeder's canonical domain payload types.
|
// Package model defines weatherfeeder's canonical domain payload types.
|
||||||
//
|
//
|
||||||
// These structs are emitted as the Payload of canonical events (schemas "weather.*.vN").
|
// These structs are emitted as the Payload of canonical events (schemas "weather.*.vN").
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// FILE: internal/model/forecast.go
|
// FILE: model/forecast.go
|
||||||
package model
|
package model
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// FILE: internal/model/observation.go
|
// FILE: model/observation.go
|
||||||
package model
|
package model
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
@@ -10,18 +10,9 @@ type WeatherObservation struct {
|
|||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
|
||||||
// Canonical internal representation (provider-independent).
|
// Canonical internal representation (provider-independent).
|
||||||
ConditionCode WMOCode `json:"conditionCode"`
|
ConditionCode WMOCode `json:"conditionCode"`
|
||||||
ConditionText string `json:"conditionText,omitempty"`
|
IsDay *bool `json:"isDay,omitempty"`
|
||||||
IsDay *bool `json:"isDay,omitempty"`
|
TextDescription string `json:"textDescription,omitempty"`
|
||||||
|
|
||||||
// Provider-specific “evidence” for troubleshooting mapping and drift.
|
|
||||||
ProviderRawDescription string `json:"providerRawDescription,omitempty"`
|
|
||||||
|
|
||||||
// Human-facing (legacy / transitional)
|
|
||||||
TextDescription string `json:"textDescription,omitempty"`
|
|
||||||
|
|
||||||
// Provider-specific (legacy / transitional)
|
|
||||||
IconURL string `json:"iconUrl,omitempty"`
|
|
||||||
|
|
||||||
// Core measurements (nullable)
|
// Core measurements (nullable)
|
||||||
TemperatureC *float64 `json:"temperatureC,omitempty"`
|
TemperatureC *float64 `json:"temperatureC,omitempty"`
|
||||||
@@ -32,22 +23,12 @@ type WeatherObservation struct {
|
|||||||
WindGustKmh *float64 `json:"windGustKmh,omitempty"`
|
WindGustKmh *float64 `json:"windGustKmh,omitempty"`
|
||||||
|
|
||||||
BarometricPressurePa *float64 `json:"barometricPressurePa,omitempty"`
|
BarometricPressurePa *float64 `json:"barometricPressurePa,omitempty"`
|
||||||
SeaLevelPressurePa *float64 `json:"seaLevelPressurePa,omitempty"`
|
|
||||||
VisibilityMeters *float64 `json:"visibilityMeters,omitempty"`
|
VisibilityMeters *float64 `json:"visibilityMeters,omitempty"`
|
||||||
|
|
||||||
RelativeHumidityPercent *float64 `json:"relativeHumidityPercent,omitempty"`
|
RelativeHumidityPercent *float64 `json:"relativeHumidityPercent,omitempty"`
|
||||||
ApparentTemperatureC *float64 `json:"apparentTemperatureC,omitempty"`
|
ApparentTemperatureC *float64 `json:"apparentTemperatureC,omitempty"`
|
||||||
|
|
||||||
ElevationMeters *float64 `json:"elevationMeters,omitempty"`
|
|
||||||
RawMessage string `json:"rawMessage,omitempty"`
|
|
||||||
|
|
||||||
PresentWeather []PresentWeather `json:"presentWeather,omitempty"`
|
PresentWeather []PresentWeather `json:"presentWeather,omitempty"`
|
||||||
CloudLayers []CloudLayer `json:"cloudLayers,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type CloudLayer struct {
|
|
||||||
BaseMeters *float64 `json:"baseMeters,omitempty"`
|
|
||||||
Amount string `json:"amount,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type PresentWeather struct {
|
type PresentWeather struct {
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ type WMODescription struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WMODescriptions is the canonical internal mapping of WMO code -> day/night text.
|
// WMODescriptions is the canonical internal mapping of WMO code -> day/night text.
|
||||||
// These are used to populate model.WeatherObservation.ConditionText.
|
// These are used to populate canonical text fields derived from WMO codes.
|
||||||
var WMODescriptions = map[model.WMOCode]WMODescription{
|
var WMODescriptions = map[model.WMOCode]WMODescription{
|
||||||
0: {Day: "Sunny", Night: "Clear"},
|
0: {Day: "Sunny", Night: "Clear"},
|
||||||
1: {Day: "Mainly Sunny", Night: "Mainly Clear"},
|
1: {Day: "Mainly Sunny", Night: "Mainly Clear"},
|
||||||
@@ -56,7 +56,8 @@ var WMODescriptions = map[model.WMOCode]WMODescription{
|
|||||||
// WMOText returns the canonical text description for a WMO code.
|
// WMOText returns the canonical text description for a WMO code.
|
||||||
// If isDay is nil, it prefers the Day description (if present).
|
// If isDay is nil, it prefers the Day description (if present).
|
||||||
//
|
//
|
||||||
// This is intended to be used by drivers after they set ConditionCode.
|
// This is intended to be used by drivers after they set ConditionCode when they
|
||||||
|
// need a human-readable description.
|
||||||
func WMOText(code model.WMOCode, isDay *bool) string {
|
func WMOText(code model.WMOCode, isDay *bool) string {
|
||||||
if code == model.WMOUnknown {
|
if code == model.WMOUnknown {
|
||||||
return "Unknown"
|
return "Unknown"
|
||||||
|
|||||||
Reference in New Issue
Block a user