Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d0b58a4734 | |||
| 6cd823f528 | |||
| 84da2bb689 | |||
| 859ee9dd5c |
@@ -15,17 +15,21 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||
|
||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||
wfpgsink "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sinks/postgres"
|
||||
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
||||
)
|
||||
|
||||
const dedupeMaxEntries = 2048
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
||||
|
||||
@@ -37,6 +41,9 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("config load failed: %v", err)
|
||||
}
|
||||
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
|
||||
log.Fatalf("postgres schema registration failed: %v", err)
|
||||
}
|
||||
|
||||
// --- Registries ---
|
||||
srcReg := fksources.NewRegistry()
|
||||
@@ -99,9 +106,9 @@ func main() {
|
||||
|
||||
events := make(chan fkevent.Event, 256)
|
||||
|
||||
// --- Normalization (optional) ---
|
||||
// --- Processors ---
|
||||
//
|
||||
// We install feedkit's normalize.Processor even before any normalizers exist.
|
||||
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
|
||||
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
||||
// It will begin transforming events as soon as:
|
||||
// 1) sources emit raw schemas (raw.*), and
|
||||
@@ -112,8 +119,9 @@ func main() {
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||
if err != nil {
|
||||
log.Fatalf("build processor chain failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -9,9 +9,10 @@ import (
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
|
||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||
)
|
||||
@@ -97,39 +98,23 @@ func TestExampleConfigLoads(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
if len(normalizers) == 0 {
|
||||
t.Fatalf("RegisterBuiltins() returned no normalizers")
|
||||
}
|
||||
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
|
||||
chain, err := buildProcessorChainForTests()
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
if len(chain) != 1 {
|
||||
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
|
||||
if len(chain) != 2 {
|
||||
t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain))
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||
if len(pl.Processors) != 1 {
|
||||
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
|
||||
if len(pl.Processors) != 2 {
|
||||
t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
chain, err := buildProcessorChainForTests()
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
@@ -157,3 +142,50 @@ func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupeDropsSecondEventWithSameID(t *testing.T) {
|
||||
chain, err := buildProcessorChainForTests()
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||
in := fkevent.Event{
|
||||
ID: "evt-dedupe-1",
|
||||
Kind: fkevent.Kind("observation"),
|
||||
Source: "test",
|
||||
EmittedAt: time.Now().UTC(),
|
||||
Schema: "raw.weatherfeeder.unknown.v1",
|
||||
Payload: map[string]any{
|
||||
"ok": true,
|
||||
},
|
||||
}
|
||||
|
||||
first, err := pl.Process(context.Background(), in)
|
||||
if err != nil {
|
||||
t.Fatalf("first Pipeline.Process() unexpected error: %v", err)
|
||||
}
|
||||
if first == nil {
|
||||
t.Fatalf("first Pipeline.Process() unexpectedly dropped event")
|
||||
}
|
||||
|
||||
second, err := pl.Process(context.Background(), in)
|
||||
if err != nil {
|
||||
t.Fatalf("second Pipeline.Process() unexpected error: %v", err)
|
||||
}
|
||||
if second != nil {
|
||||
t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second)
|
||||
}
|
||||
}
|
||||
|
||||
func buildProcessorChainForTests() ([]fkprocessors.Processor, error) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
||||
|
||||
return procReg.BuildChain([]string{"normalize", "dedupe"})
|
||||
}
|
||||
|
||||
3
go.mod
3
go.mod
@@ -2,10 +2,11 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
||||
|
||||
go 1.25
|
||||
|
||||
require gitea.maximumdirect.net/ejr/feedkit v0.6.0
|
||||
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
|
||||
|
||||
require (
|
||||
github.com/klauspost/compress v1.17.2 // indirect
|
||||
github.com/lib/pq v1.10.9 // indirect
|
||||
github.com/nats-io/nats.go v1.34.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
|
||||
6
go.sum
6
go.sum
@@ -1,7 +1,9 @@
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.6.0 h1:GXwyNKvPp1sWN8TS5E5NDGFgimpyHlzerO5E+/qoTXg=
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.6.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
||||
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
package normalizers
|
||||
|
||||
import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
//
|
||||
// 1. One normalizer per file.
|
||||
// Each file contains exactly one Normalizer implementation (one type that
|
||||
// satisfies feedkit/normalize.Normalizer).
|
||||
// satisfies feedkit/processors/normalize.Normalizer).
|
||||
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
|
||||
// as they do not define additional Normalizer types.
|
||||
//
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
package nws
|
||||
|
||||
import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
)
|
||||
|
||||
// Register appends NWS normalizers in stable order.
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
package openmeteo
|
||||
|
||||
import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
)
|
||||
|
||||
// Register appends Open-Meteo normalizers in stable order.
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
package openweather
|
||||
|
||||
import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
)
|
||||
|
||||
// Register appends OpenWeather normalizers in stable order.
|
||||
|
||||
343
internal/sinks/postgres/map.go
Normal file
343
internal/sinks/postgres/map.go
Normal file
@@ -0,0 +1,343 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
schema := strings.TrimSpace(e.Schema)
|
||||
switch schema {
|
||||
case standards.SchemaWeatherObservationV1:
|
||||
return mapObservationEvent(e)
|
||||
case standards.SchemaWeatherForecastV1:
|
||||
return mapForecastEvent(e)
|
||||
case standards.SchemaWeatherAlertV1:
|
||||
return mapAlertEvent(e)
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
obs, err := decodePayload[model.WeatherObservation](e.Payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode observation payload: %w", err)
|
||||
}
|
||||
if obs.Timestamp.IsZero() {
|
||||
return nil, fmt.Errorf("decode observation payload: timestamp is required")
|
||||
}
|
||||
|
||||
observedAt := obs.Timestamp.UTC()
|
||||
writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.CloudLayers)+len(obs.PresentWeather))
|
||||
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableObservations,
|
||||
Values: map[string]any{
|
||||
"event_id": e.ID,
|
||||
"event_kind": string(e.Kind),
|
||||
"event_source": e.Source,
|
||||
"event_schema": e.Schema,
|
||||
"event_emitted_at": e.EmittedAt.UTC(),
|
||||
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||
"station_id": nullableString(obs.StationID),
|
||||
"station_name": nullableString(obs.StationName),
|
||||
"observed_at": observedAt,
|
||||
"condition_code": int(obs.ConditionCode),
|
||||
"condition_text": nullableString(obs.ConditionText),
|
||||
"is_day": nullableBool(obs.IsDay),
|
||||
"provider_raw_description": nullableString(obs.ProviderRawDescription),
|
||||
"text_description": nullableString(obs.TextDescription),
|
||||
"icon_url": nullableString(obs.IconURL),
|
||||
"temperature_c": nullableFloat64(obs.TemperatureC),
|
||||
"dewpoint_c": nullableFloat64(obs.DewpointC),
|
||||
"wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees),
|
||||
"wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh),
|
||||
"wind_gust_kmh": nullableFloat64(obs.WindGustKmh),
|
||||
"barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa),
|
||||
"sea_level_pressure_pa": nullableFloat64(obs.SeaLevelPressurePa),
|
||||
"visibility_meters": nullableFloat64(obs.VisibilityMeters),
|
||||
"relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent),
|
||||
"apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC),
|
||||
"elevation_meters": nullableFloat64(obs.ElevationMeters),
|
||||
"raw_message": nullableString(obs.RawMessage),
|
||||
},
|
||||
})
|
||||
|
||||
for i, cl := range obs.CloudLayers {
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableObservationCloudLayers,
|
||||
Values: map[string]any{
|
||||
"event_id": e.ID,
|
||||
"layer_index": i,
|
||||
"observed_at": observedAt,
|
||||
"base_meters": nullableFloat64(cl.BaseMeters),
|
||||
"amount": nullableString(cl.Amount),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
for i, pw := range obs.PresentWeather {
|
||||
rawText, err := compactJSONText(pw.Raw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("observation presentWeather[%d].raw: %w", i, err)
|
||||
}
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableObservationPresentWeather,
|
||||
Values: map[string]any{
|
||||
"event_id": e.ID,
|
||||
"weather_index": i,
|
||||
"observed_at": observedAt,
|
||||
"raw_text": rawText,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return writes, nil
|
||||
}
|
||||
|
||||
func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
run, err := decodePayload[model.WeatherForecastRun](e.Payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode forecast payload: %w", err)
|
||||
}
|
||||
if run.IssuedAt.IsZero() {
|
||||
return nil, fmt.Errorf("decode forecast payload: issuedAt is required")
|
||||
}
|
||||
if strings.TrimSpace(string(run.Product)) == "" {
|
||||
return nil, fmt.Errorf("decode forecast payload: product is required")
|
||||
}
|
||||
|
||||
issuedAt := run.IssuedAt.UTC()
|
||||
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Periods))
|
||||
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableForecasts,
|
||||
Values: map[string]any{
|
||||
"event_id": e.ID,
|
||||
"event_kind": string(e.Kind),
|
||||
"event_source": e.Source,
|
||||
"event_schema": e.Schema,
|
||||
"event_emitted_at": e.EmittedAt.UTC(),
|
||||
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||
"location_id": nullableString(run.LocationID),
|
||||
"location_name": nullableString(run.LocationName),
|
||||
"issued_at": issuedAt,
|
||||
"updated_at": nullableTime(run.UpdatedAt),
|
||||
"product": string(run.Product),
|
||||
"latitude": nullableFloat64(run.Latitude),
|
||||
"longitude": nullableFloat64(run.Longitude),
|
||||
"elevation_meters": nullableFloat64(run.ElevationMeters),
|
||||
"period_count": len(run.Periods),
|
||||
},
|
||||
})
|
||||
|
||||
for i, p := range run.Periods {
|
||||
if p.StartTime.IsZero() || p.EndTime.IsZero() {
|
||||
return nil, fmt.Errorf("decode forecast payload: periods[%d] startTime/endTime are required", i)
|
||||
}
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableForecastPeriods,
|
||||
Values: map[string]any{
|
||||
"run_event_id": e.ID,
|
||||
"period_index": i,
|
||||
"issued_at": issuedAt,
|
||||
"start_time": p.StartTime.UTC(),
|
||||
"end_time": p.EndTime.UTC(),
|
||||
"name": nullableString(p.Name),
|
||||
"is_day": nullableBool(p.IsDay),
|
||||
"condition_code": int(p.ConditionCode),
|
||||
"condition_text": nullableString(p.ConditionText),
|
||||
"provider_raw_description": nullableString(p.ProviderRawDescription),
|
||||
"text_description": nullableString(p.TextDescription),
|
||||
"detailed_text": nullableString(p.DetailedText),
|
||||
"icon_url": nullableString(p.IconURL),
|
||||
"temperature_c": nullableFloat64(p.TemperatureC),
|
||||
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
|
||||
"temperature_c_max": nullableFloat64(p.TemperatureCMax),
|
||||
"dewpoint_c": nullableFloat64(p.DewpointC),
|
||||
"relative_humidity_percent": nullableFloat64(p.RelativeHumidityPercent),
|
||||
"wind_direction_degrees": nullableFloat64(p.WindDirectionDegrees),
|
||||
"wind_speed_kmh": nullableFloat64(p.WindSpeedKmh),
|
||||
"wind_gust_kmh": nullableFloat64(p.WindGustKmh),
|
||||
"barometric_pressure_pa": nullableFloat64(p.BarometricPressurePa),
|
||||
"visibility_meters": nullableFloat64(p.VisibilityMeters),
|
||||
"apparent_temperature_c": nullableFloat64(p.ApparentTemperatureC),
|
||||
"cloud_cover_percent": nullableFloat64(p.CloudCoverPercent),
|
||||
"probability_of_precipitation_percent": nullableFloat64(p.ProbabilityOfPrecipitationPercent),
|
||||
"precipitation_amount_mm": nullableFloat64(p.PrecipitationAmountMm),
|
||||
"snowfall_depth_mm": nullableFloat64(p.SnowfallDepthMM),
|
||||
"uv_index": nullableFloat64(p.UVIndex),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return writes, nil
|
||||
}
|
||||
|
||||
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
run, err := decodePayload[model.WeatherAlertRun](e.Payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode alert payload: %w", err)
|
||||
}
|
||||
if run.AsOf.IsZero() {
|
||||
return nil, fmt.Errorf("decode alert payload: asOf is required")
|
||||
}
|
||||
|
||||
asOf := run.AsOf.UTC()
|
||||
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Alerts)+countAlertReferences(run.Alerts))
|
||||
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableAlertRuns,
|
||||
Values: map[string]any{
|
||||
"event_id": e.ID,
|
||||
"event_kind": string(e.Kind),
|
||||
"event_source": e.Source,
|
||||
"event_schema": e.Schema,
|
||||
"event_emitted_at": e.EmittedAt.UTC(),
|
||||
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||
"location_id": nullableString(run.LocationID),
|
||||
"location_name": nullableString(run.LocationName),
|
||||
"as_of": asOf,
|
||||
"latitude": nullableFloat64(run.Latitude),
|
||||
"longitude": nullableFloat64(run.Longitude),
|
||||
"alert_count": len(run.Alerts),
|
||||
},
|
||||
})
|
||||
|
||||
for i, a := range run.Alerts {
|
||||
if strings.TrimSpace(a.ID) == "" {
|
||||
return nil, fmt.Errorf("decode alert payload: alerts[%d].id is required", i)
|
||||
}
|
||||
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableAlerts,
|
||||
Values: map[string]any{
|
||||
"run_event_id": e.ID,
|
||||
"alert_index": i,
|
||||
"as_of": asOf,
|
||||
"alert_id": a.ID,
|
||||
"event": nullableString(a.Event),
|
||||
"headline": nullableString(a.Headline),
|
||||
"severity": nullableString(a.Severity),
|
||||
"urgency": nullableString(a.Urgency),
|
||||
"certainty": nullableString(a.Certainty),
|
||||
"status": nullableString(a.Status),
|
||||
"message_type": nullableString(a.MessageType),
|
||||
"category": nullableString(a.Category),
|
||||
"response": nullableString(a.Response),
|
||||
"description": nullableString(a.Description),
|
||||
"instruction": nullableString(a.Instruction),
|
||||
"sent": nullableTime(a.Sent),
|
||||
"effective": nullableTime(a.Effective),
|
||||
"onset": nullableTime(a.Onset),
|
||||
"expires": nullableTime(a.Expires),
|
||||
"area_description": nullableString(a.AreaDescription),
|
||||
"sender_name": nullableString(a.SenderName),
|
||||
"reference_count": len(a.References),
|
||||
},
|
||||
})
|
||||
|
||||
for j, ref := range a.References {
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableAlertReferences,
|
||||
Values: map[string]any{
|
||||
"run_event_id": e.ID,
|
||||
"alert_index": i,
|
||||
"reference_index": j,
|
||||
"as_of": asOf,
|
||||
"id": nullableString(ref.ID),
|
||||
"identifier": nullableString(ref.Identifier),
|
||||
"sender": nullableString(ref.Sender),
|
||||
"sent": nullableTime(ref.Sent),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return writes, nil
|
||||
}
|
||||
|
||||
func decodePayload[T any](payload any) (T, error) {
|
||||
var out T
|
||||
if payload == nil {
|
||||
return out, fmt.Errorf("payload is nil")
|
||||
}
|
||||
|
||||
if typed, ok := payload.(T); ok {
|
||||
return typed, nil
|
||||
}
|
||||
if ptr, ok := payload.(*T); ok {
|
||||
if ptr == nil {
|
||||
return out, fmt.Errorf("payload pointer is nil")
|
||||
}
|
||||
return *ptr, nil
|
||||
}
|
||||
|
||||
b, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("marshal payload: %w", err)
|
||||
}
|
||||
if err := json.Unmarshal(b, &out); err != nil {
|
||||
return out, fmt.Errorf("unmarshal payload: %w", err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func countAlertReferences(alerts []model.WeatherAlert) int {
|
||||
total := 0
|
||||
for _, a := range alerts {
|
||||
total += len(a.References)
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
func nullableString(s string) any {
|
||||
if strings.TrimSpace(s) == "" {
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func nullableFloat64(v *float64) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return *v
|
||||
}
|
||||
|
||||
func nullableBool(v *bool) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
return *v
|
||||
}
|
||||
|
||||
func nullableTime(v *time.Time) any {
|
||||
if v == nil || v.IsZero() {
|
||||
return nil
|
||||
}
|
||||
return v.UTC()
|
||||
}
|
||||
|
||||
func compactJSONText(v any) (any, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if string(b) == "null" {
|
||||
return nil, nil
|
||||
}
|
||||
return string(b), nil
|
||||
}
|
||||
256
internal/sinks/postgres/map_test.go
Normal file
256
internal/sinks/postgres/map_test.go
Normal file
@@ -0,0 +1,256 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
func TestMapPostgresEventObservationStructPayload(t *testing.T) {
|
||||
isDay := true
|
||||
temp := 21.5
|
||||
base := 1200.0
|
||||
obs := model.WeatherObservation{
|
||||
StationID: "KSTL",
|
||||
StationName: "St. Louis",
|
||||
Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
||||
ConditionCode: model.WMOCode(1),
|
||||
ConditionText: "Mainly Sunny",
|
||||
IsDay: &isDay,
|
||||
ProviderRawDescription: "few clouds",
|
||||
TextDescription: "Mainly Sunny",
|
||||
IconURL: "https://example/icon.png",
|
||||
TemperatureC: &temp,
|
||||
CloudLayers: []model.CloudLayer{{BaseMeters: &base, Amount: "FEW"}},
|
||||
PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}},
|
||||
}
|
||||
|
||||
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherObservationV1, "observation", obs))
|
||||
if err != nil {
|
||||
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||
}
|
||||
if len(writes) != 3 {
|
||||
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
|
||||
}
|
||||
if writes[0].Table != tableObservations {
|
||||
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations)
|
||||
}
|
||||
if got := writes[0].Values["station_id"]; got != "KSTL" {
|
||||
t.Fatalf("observations station_id = %#v, want KSTL", got)
|
||||
}
|
||||
if writes[1].Table != tableObservationCloudLayers {
|
||||
t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationCloudLayers)
|
||||
}
|
||||
if writes[2].Table != tableObservationPresentWeather {
|
||||
t.Fatalf("writes[2].Table = %q, want %q", writes[2].Table, tableObservationPresentWeather)
|
||||
}
|
||||
if got := writes[2].Values["raw_text"]; got != `{"a":1,"b":"x"}` {
|
||||
t.Fatalf("present_weather raw_text = %#v, want compact JSON", got)
|
||||
}
|
||||
|
||||
assertAllWritesIncludeAllColumns(t, writes)
|
||||
}
|
||||
|
||||
func TestMapPostgresEventForecastStructPayload(t *testing.T) {
|
||||
isDay := true
|
||||
temp := 10.5
|
||||
run := model.WeatherForecastRun{
|
||||
LocationID: "LOC-1",
|
||||
LocationName: "St. Louis",
|
||||
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||
Product: model.ForecastProductHourly,
|
||||
Periods: []model.WeatherForecastPeriod{
|
||||
{
|
||||
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||
IsDay: &isDay,
|
||||
ConditionCode: model.WMOCode(2),
|
||||
ConditionText: "Partly Cloudy",
|
||||
TemperatureC: &temp,
|
||||
},
|
||||
{
|
||||
StartTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2026, 3, 16, 21, 0, 0, 0, time.UTC),
|
||||
ConditionCode: model.WMOCode(3),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", run))
|
||||
if err != nil {
|
||||
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||
}
|
||||
if len(writes) != 3 {
|
||||
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
|
||||
}
|
||||
|
||||
if writes[0].Table != tableForecasts {
|
||||
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
|
||||
}
|
||||
if got := writes[0].Values["period_count"]; got != 2 {
|
||||
t.Fatalf("forecasts period_count = %#v, want 2", got)
|
||||
}
|
||||
if writes[1].Table != tableForecastPeriods || writes[2].Table != tableForecastPeriods {
|
||||
t.Fatalf("forecast period writes not in expected order")
|
||||
}
|
||||
if got := writes[1].Values["period_index"]; got != 0 {
|
||||
t.Fatalf("first period index = %#v, want 0", got)
|
||||
}
|
||||
|
||||
assertAllWritesIncludeAllColumns(t, writes)
|
||||
}
|
||||
|
||||
func TestMapPostgresEventAlertStructPayload(t *testing.T) {
|
||||
sent := time.Date(2026, 3, 16, 17, 0, 0, 0, time.UTC)
|
||||
run := model.WeatherAlertRun{
|
||||
AsOf: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||
Alerts: []model.WeatherAlert{
|
||||
{
|
||||
ID: "urn:alert:1",
|
||||
Headline: "Winter Weather Advisory",
|
||||
Severity: "Moderate",
|
||||
References: []model.AlertReference{
|
||||
{ID: "urn:ref:1", Sent: &sent},
|
||||
{Identifier: "ref-two"},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "urn:alert:2",
|
||||
Headline: "Second alert",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherAlertV1, "alert", run))
|
||||
if err != nil {
|
||||
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||
}
|
||||
if len(writes) != 5 {
|
||||
t.Fatalf("mapPostgresEvent() writes len = %d, want 5", len(writes))
|
||||
}
|
||||
|
||||
counts := map[string]int{}
|
||||
for _, w := range writes {
|
||||
counts[w.Table]++
|
||||
}
|
||||
if counts[tableAlertRuns] != 1 || counts[tableAlerts] != 2 || counts[tableAlertReferences] != 2 {
|
||||
t.Fatalf("unexpected table write counts: %#v", counts)
|
||||
}
|
||||
|
||||
firstAlert, ok := firstWriteForTable(writes, tableAlerts)
|
||||
if !ok {
|
||||
t.Fatalf("missing alerts write")
|
||||
}
|
||||
if got := firstAlert.Values["reference_count"]; got != 2 {
|
||||
t.Fatalf("alerts reference_count = %#v, want 2", got)
|
||||
}
|
||||
|
||||
assertAllWritesIncludeAllColumns(t, writes)
|
||||
}
|
||||
|
||||
func TestMapPostgresEventMapPayload(t *testing.T) {
|
||||
run := model.WeatherForecastRun{
|
||||
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||
Product: model.ForecastProductHourly,
|
||||
Periods: []model.WeatherForecastPeriod{
|
||||
{
|
||||
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||
ConditionCode: model.WMOCode(2),
|
||||
},
|
||||
},
|
||||
}
|
||||
b, err := json.Marshal(run)
|
||||
if err != nil {
|
||||
t.Fatalf("json.Marshal() error = %v", err)
|
||||
}
|
||||
var payload map[string]any
|
||||
if err := json.Unmarshal(b, &payload); err != nil {
|
||||
t.Fatalf("json.Unmarshal() error = %v", err)
|
||||
}
|
||||
|
||||
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", payload))
|
||||
if err != nil {
|
||||
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||
}
|
||||
if len(writes) != 2 {
|
||||
t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
|
||||
}
|
||||
if writes[0].Table != tableForecasts {
|
||||
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
|
||||
}
|
||||
|
||||
assertAllWritesIncludeAllColumns(t, writes)
|
||||
}
|
||||
|
||||
func TestMapPostgresEventUnknownSchemaNoOp(t *testing.T) {
|
||||
writes, err := mapPostgresEvent(context.Background(), testEvent("weather.unknown.v1", "observation", map[string]any{"x": 1}))
|
||||
if err != nil {
|
||||
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||
}
|
||||
if len(writes) != 0 {
|
||||
t.Fatalf("mapPostgresEvent() writes len = %d, want 0", len(writes))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMapPostgresEventMalformedPayload(t *testing.T) {
|
||||
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", "bad"))
|
||||
if err == nil {
|
||||
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "decode forecast payload") {
|
||||
t.Fatalf("error = %q, want decode forecast payload context", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
|
||||
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
|
||||
return fkevent.Event{
|
||||
ID: "evt-1",
|
||||
Kind: kind,
|
||||
Source: "test-source",
|
||||
Schema: schema,
|
||||
EmittedAt: time.Date(2026, 3, 16, 18, 31, 0, 0, time.UTC),
|
||||
EffectiveAt: &effectiveAt,
|
||||
Payload: payload,
|
||||
}
|
||||
}
|
||||
|
||||
func firstWriteForTable(writes []fksinks.PostgresWrite, table string) (fksinks.PostgresWrite, bool) {
|
||||
for _, w := range writes {
|
||||
if w.Table == table {
|
||||
return w, true
|
||||
}
|
||||
}
|
||||
return fksinks.PostgresWrite{}, false
|
||||
}
|
||||
|
||||
func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWrite) {
|
||||
t.Helper()
|
||||
colCounts := tableColumnCounts()
|
||||
for i, w := range writes {
|
||||
expectedCount, ok := colCounts[w.Table]
|
||||
if !ok {
|
||||
t.Fatalf("writes[%d] references unknown table %q", i, w.Table)
|
||||
}
|
||||
if len(w.Values) != expectedCount {
|
||||
t.Fatalf("writes[%d] table=%q has %d values, want %d", i, w.Table, len(w.Values), expectedCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tableColumnCounts() map[string]int {
|
||||
s := weatherPostgresSchema()
|
||||
m := make(map[string]int, len(s.Tables))
|
||||
for _, tbl := range s.Tables {
|
||||
m[tbl.Name] = len(tbl.Columns)
|
||||
}
|
||||
return m
|
||||
}
|
||||
264
internal/sinks/postgres/schema.go
Normal file
264
internal/sinks/postgres/schema.go
Normal file
@@ -0,0 +1,264 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
)
|
||||
|
||||
const (
|
||||
tableObservations = "observations"
|
||||
tableObservationCloudLayers = "observation_cloud_layers"
|
||||
tableObservationPresentWeather = "observation_present_weather"
|
||||
tableForecasts = "forecasts"
|
||||
tableForecastPeriods = "forecast_periods"
|
||||
tableAlertRuns = "alert_runs"
|
||||
tableAlerts = "alerts"
|
||||
tableAlertReferences = "alert_references"
|
||||
)
|
||||
|
||||
// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each
|
||||
// configured sink using driver=postgres.
|
||||
func RegisterPostgresSchemas(cfg *config.Config) error {
|
||||
if cfg == nil {
|
||||
return fmt.Errorf("register postgres schemas: config is nil")
|
||||
}
|
||||
|
||||
schema := weatherPostgresSchema()
|
||||
for i, sk := range cfg.Sinks {
|
||||
if !isPostgresDriver(sk.Driver) {
|
||||
continue
|
||||
}
|
||||
if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil {
|
||||
return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func isPostgresDriver(driver string) bool {
|
||||
return strings.EqualFold(strings.TrimSpace(driver), "postgres")
|
||||
}
|
||||
|
||||
func weatherPostgresSchema() fksinks.PostgresSchema {
|
||||
return fksinks.PostgresSchema{
|
||||
Tables: []fksinks.PostgresTable{
|
||||
{
|
||||
Name: tableObservations,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "station_id", Type: "TEXT", Nullable: true},
|
||||
{Name: "station_name", Type: "TEXT", Nullable: true},
|
||||
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "condition_code", Type: "INTEGER", Nullable: false},
|
||||
{Name: "condition_text", Type: "TEXT", Nullable: true},
|
||||
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
|
||||
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
|
||||
{Name: "text_description", Type: "TEXT", Nullable: true},
|
||||
{Name: "icon_url", Type: "TEXT", Nullable: true},
|
||||
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "sea_level_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "raw_message", Type: "TEXT", Nullable: true},
|
||||
},
|
||||
PrimaryKey: []string{"event_id"},
|
||||
PruneColumn: "observed_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_obs_station_observed_at", Columns: []string{"station_id", "observed_at"}},
|
||||
{Name: "idx_wf_obs_observed_at", Columns: []string{"observed_at"}},
|
||||
{Name: "idx_wf_obs_condition_code", Columns: []string{"condition_code"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableObservationCloudLayers,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false},
|
||||
{Name: "layer_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "base_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "amount", Type: "TEXT", Nullable: true},
|
||||
},
|
||||
PrimaryKey: []string{"event_id", "layer_index"},
|
||||
PruneColumn: "observed_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_obs_cloud_observed_at", Columns: []string{"observed_at"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableObservationPresentWeather,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false},
|
||||
{Name: "weather_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "raw_text", Type: "TEXT", Nullable: true},
|
||||
},
|
||||
PrimaryKey: []string{"event_id", "weather_index"},
|
||||
PruneColumn: "observed_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_obs_present_observed_at", Columns: []string{"observed_at"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableForecasts,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "location_id", Type: "TEXT", Nullable: true},
|
||||
{Name: "location_name", Type: "TEXT", Nullable: true},
|
||||
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "product", Type: "TEXT", Nullable: false},
|
||||
{Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "period_count", Type: "INTEGER", Nullable: false},
|
||||
},
|
||||
PrimaryKey: []string{"event_id"},
|
||||
PruneColumn: "issued_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_fc_location_product_issued_at", Columns: []string{"location_id", "product", "issued_at"}},
|
||||
{Name: "idx_wf_fc_issued_at", Columns: []string{"issued_at"}},
|
||||
{Name: "idx_wf_fc_product_issued_at", Columns: []string{"product", "issued_at"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableForecastPeriods,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "run_event_id", Type: "TEXT REFERENCES forecasts(event_id) ON DELETE CASCADE", Nullable: false},
|
||||
{Name: "period_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "start_time", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "end_time", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "name", Type: "TEXT", Nullable: true},
|
||||
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
|
||||
{Name: "condition_code", Type: "INTEGER", Nullable: false},
|
||||
{Name: "condition_text", Type: "TEXT", Nullable: true},
|
||||
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
|
||||
{Name: "text_description", Type: "TEXT", Nullable: true},
|
||||
{Name: "detailed_text", Type: "TEXT", Nullable: true},
|
||||
{Name: "icon_url", Type: "TEXT", Nullable: true},
|
||||
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "cloud_cover_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "probability_of_precipitation_percent", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "precipitation_amount_mm", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "snowfall_depth_mm", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "uv_index", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
},
|
||||
PrimaryKey: []string{"run_event_id", "period_index"},
|
||||
PruneColumn: "issued_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_fc_period_start_time", Columns: []string{"start_time"}},
|
||||
{Name: "idx_wf_fc_period_end_time", Columns: []string{"end_time"}},
|
||||
{Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableAlertRuns,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "location_id", Type: "TEXT", Nullable: true},
|
||||
{Name: "location_name", Type: "TEXT", Nullable: true},
|
||||
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true},
|
||||
{Name: "alert_count", Type: "INTEGER", Nullable: false},
|
||||
},
|
||||
PrimaryKey: []string{"event_id"},
|
||||
PruneColumn: "as_of",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_alert_run_location_as_of", Columns: []string{"location_id", "as_of"}},
|
||||
{Name: "idx_wf_alert_run_as_of", Columns: []string{"as_of"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableAlerts,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false},
|
||||
{Name: "alert_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "alert_id", Type: "TEXT", Nullable: false},
|
||||
{Name: "event", Type: "TEXT", Nullable: true},
|
||||
{Name: "headline", Type: "TEXT", Nullable: true},
|
||||
{Name: "severity", Type: "TEXT", Nullable: true},
|
||||
{Name: "urgency", Type: "TEXT", Nullable: true},
|
||||
{Name: "certainty", Type: "TEXT", Nullable: true},
|
||||
{Name: "status", Type: "TEXT", Nullable: true},
|
||||
{Name: "message_type", Type: "TEXT", Nullable: true},
|
||||
{Name: "category", Type: "TEXT", Nullable: true},
|
||||
{Name: "response", Type: "TEXT", Nullable: true},
|
||||
{Name: "description", Type: "TEXT", Nullable: true},
|
||||
{Name: "instruction", Type: "TEXT", Nullable: true},
|
||||
{Name: "sent", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "effective", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "onset", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "expires", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "area_description", Type: "TEXT", Nullable: true},
|
||||
{Name: "sender_name", Type: "TEXT", Nullable: true},
|
||||
{Name: "reference_count", Type: "INTEGER", Nullable: false},
|
||||
},
|
||||
PrimaryKey: []string{"run_event_id", "alert_index"},
|
||||
PruneColumn: "as_of",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_alerts_alert_id", Columns: []string{"alert_id"}},
|
||||
{Name: "idx_wf_alerts_severity_expires", Columns: []string{"severity", "expires"}},
|
||||
{Name: "idx_wf_alerts_as_of", Columns: []string{"as_of"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableAlertReferences,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false},
|
||||
{Name: "alert_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "reference_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "id", Type: "TEXT", Nullable: true},
|
||||
{Name: "identifier", Type: "TEXT", Nullable: true},
|
||||
{Name: "sender", Type: "TEXT", Nullable: true},
|
||||
{Name: "sent", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
},
|
||||
PrimaryKey: []string{"run_event_id", "alert_index", "reference_index"},
|
||||
PruneColumn: "as_of",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_alert_refs_as_of", Columns: []string{"as_of"}},
|
||||
{Name: "idx_wf_alert_refs_sent", Columns: []string{"sent"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
MapEvent: mapPostgresEvent,
|
||||
}
|
||||
}
|
||||
96
internal/sinks/postgres/schema_test.go
Normal file
96
internal/sinks/postgres/schema_test.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
)
|
||||
|
||||
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
|
||||
err := RegisterPostgresSchemas(nil)
|
||||
if err == nil {
|
||||
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "config is nil") {
|
||||
t.Fatalf("error = %q, want config is nil", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
|
||||
cfg := &config.Config{
|
||||
Sinks: []config.SinkConfig{
|
||||
{Name: "stdout_only", Driver: "stdout"},
|
||||
{Name: "nats_only", Driver: "nats"},
|
||||
},
|
||||
}
|
||||
|
||||
if err := RegisterPostgresSchemas(cfg); err != nil {
|
||||
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
|
||||
sinkName := uniqueSinkName("pg_test")
|
||||
cfg := &config.Config{
|
||||
Sinks: []config.SinkConfig{
|
||||
{Name: sinkName, Driver: "postgres"},
|
||||
},
|
||||
}
|
||||
|
||||
if err := RegisterPostgresSchemas(cfg); err != nil {
|
||||
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
|
||||
}
|
||||
|
||||
err := RegisterPostgresSchemas(cfg)
|
||||
if err == nil {
|
||||
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "already registered") {
|
||||
t.Fatalf("error = %q, want already registered", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWeatherPostgresSchemaShape(t *testing.T) {
|
||||
s := weatherPostgresSchema()
|
||||
if s.MapEvent == nil {
|
||||
t.Fatalf("weatherPostgresSchema().MapEvent is nil")
|
||||
}
|
||||
|
||||
wantTables := map[string]bool{
|
||||
tableObservations: true,
|
||||
tableObservationCloudLayers: true,
|
||||
tableObservationPresentWeather: true,
|
||||
tableForecasts: true,
|
||||
tableForecastPeriods: true,
|
||||
tableAlertRuns: true,
|
||||
tableAlerts: true,
|
||||
tableAlertReferences: true,
|
||||
}
|
||||
|
||||
if len(s.Tables) != len(wantTables) {
|
||||
t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
|
||||
}
|
||||
|
||||
seenIndexes := map[string]bool{}
|
||||
for _, tbl := range s.Tables {
|
||||
if !wantTables[tbl.Name] {
|
||||
t.Fatalf("unexpected table %q in schema", tbl.Name)
|
||||
}
|
||||
if tbl.PruneColumn == "" {
|
||||
t.Fatalf("table %q missing prune column", tbl.Name)
|
||||
}
|
||||
for _, idx := range tbl.Indexes {
|
||||
if seenIndexes[idx.Name] {
|
||||
t.Fatalf("duplicate index name %q", idx.Name)
|
||||
}
|
||||
seenIndexes[idx.Name] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func uniqueSinkName(prefix string) string {
|
||||
return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano())
|
||||
}
|
||||
Reference in New Issue
Block a user