Added support for Area Forecast Discussions issued by the NWS
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
This commit is contained in:
@@ -20,6 +20,8 @@ func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWri
|
||||
return mapObservationEvent(e)
|
||||
case standards.SchemaWeatherForecastV1:
|
||||
return mapForecastEvent(e)
|
||||
case standards.SchemaWeatherForecastDiscussionV1:
|
||||
return mapForecastDiscussionEvent(e)
|
||||
case standards.SchemaWeatherAlertV1:
|
||||
return mapAlertEvent(e)
|
||||
default:
|
||||
@@ -160,6 +162,62 @@ func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
return writes, nil
|
||||
}
|
||||
|
||||
func mapForecastDiscussionEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
run, err := decodePayload[model.WeatherForecastDiscussion](e.Payload)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode forecast discussion payload: %w", err)
|
||||
}
|
||||
if run.IssuedAt.IsZero() {
|
||||
return nil, fmt.Errorf("decode forecast discussion payload: issuedAt is required")
|
||||
}
|
||||
if strings.TrimSpace(string(run.Product)) == "" {
|
||||
return nil, fmt.Errorf("decode forecast discussion payload: product is required")
|
||||
}
|
||||
|
||||
issuedAt := run.IssuedAt.UTC()
|
||||
shortTermQualifier, shortTermIssuedAt, shortTermText := nullableDiscussionSection(run.ShortTerm)
|
||||
longTermQualifier, longTermIssuedAt, longTermText := nullableDiscussionSection(run.LongTerm)
|
||||
|
||||
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.KeyMessages))
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableForecastDiscussions,
|
||||
Values: map[string]any{
|
||||
"event_id": e.ID,
|
||||
"event_kind": string(e.Kind),
|
||||
"event_source": e.Source,
|
||||
"event_schema": e.Schema,
|
||||
"event_emitted_at": e.EmittedAt.UTC(),
|
||||
"event_effective_at": nullableTime(e.EffectiveAt),
|
||||
"office_id": nullableString(run.OfficeID),
|
||||
"office_name": nullableString(run.OfficeName),
|
||||
"issued_at": issuedAt,
|
||||
"updated_at": nullableTime(run.UpdatedAt),
|
||||
"product": string(run.Product),
|
||||
"short_term_qualifier": shortTermQualifier,
|
||||
"short_term_issued_at": shortTermIssuedAt,
|
||||
"short_term_text": shortTermText,
|
||||
"long_term_qualifier": longTermQualifier,
|
||||
"long_term_issued_at": longTermIssuedAt,
|
||||
"long_term_text": longTermText,
|
||||
"key_message_count": len(run.KeyMessages),
|
||||
},
|
||||
})
|
||||
|
||||
for i, msg := range run.KeyMessages {
|
||||
writes = append(writes, fksinks.PostgresWrite{
|
||||
Table: tableForecastDiscussionKeyMessages,
|
||||
Values: map[string]any{
|
||||
"run_event_id": e.ID,
|
||||
"message_index": i,
|
||||
"issued_at": issuedAt,
|
||||
"message_text": nullableString(msg),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return writes, nil
|
||||
}
|
||||
|
||||
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
||||
run, err := decodePayload[model.WeatherAlertRun](e.Payload)
|
||||
if err != nil {
|
||||
@@ -269,6 +327,13 @@ func decodePayload[T any](payload any) (T, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func nullableDiscussionSection(section *model.WeatherForecastDiscussionSection) (any, any, any) {
|
||||
if section == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return nullableString(section.Qualifier), nullableTime(section.IssuedAt), nullableString(section.Text)
|
||||
}
|
||||
|
||||
func countAlertReferences(alerts []model.WeatherAlert) int {
|
||||
total := 0
|
||||
for _, a := range alerts {
|
||||
|
||||
@@ -146,6 +146,50 @@ func TestMapPostgresEventAlertStructPayload(t *testing.T) {
|
||||
assertAllWritesIncludeAllColumns(t, writes)
|
||||
}
|
||||
|
||||
func TestMapPostgresEventForecastDiscussionStructPayload(t *testing.T) {
|
||||
updatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC)
|
||||
shortIssuedAt := time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)
|
||||
run := model.WeatherForecastDiscussion{
|
||||
OfficeID: "LSX",
|
||||
OfficeName: "National Weather Service Saint Louis MO",
|
||||
Product: model.ForecastDiscussionProductAFD,
|
||||
IssuedAt: time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC),
|
||||
UpdatedAt: &updatedAt,
|
||||
KeyMessages: []string{"msg one", "msg two"},
|
||||
ShortTerm: &model.WeatherForecastDiscussionSection{Qualifier: "(Tonight)", IssuedAt: &shortIssuedAt, Text: "Short term text"},
|
||||
LongTerm: &model.WeatherForecastDiscussionSection{Text: "Long term text"},
|
||||
}
|
||||
|
||||
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", run))
|
||||
if err != nil {
|
||||
t.Fatalf("mapPostgresEvent() error = %v", err)
|
||||
}
|
||||
if len(writes) != 3 {
|
||||
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
|
||||
}
|
||||
|
||||
if writes[0].Table != tableForecastDiscussions {
|
||||
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecastDiscussions)
|
||||
}
|
||||
if got := writes[0].Values["key_message_count"]; got != 2 {
|
||||
t.Fatalf("forecast_discussions key_message_count = %#v, want 2", got)
|
||||
}
|
||||
if got := writes[0].Values["short_term_qualifier"]; got != "(Tonight)" {
|
||||
t.Fatalf("forecast_discussions short_term_qualifier = %#v, want (Tonight)", got)
|
||||
}
|
||||
if got := writes[0].Values["long_term_issued_at"]; got != nil {
|
||||
t.Fatalf("forecast_discussions long_term_issued_at = %#v, want nil", got)
|
||||
}
|
||||
if writes[1].Table != tableForecastDiscussionKeyMessages || writes[2].Table != tableForecastDiscussionKeyMessages {
|
||||
t.Fatalf("forecast discussion key message writes not in expected order")
|
||||
}
|
||||
if got := writes[2].Values["message_index"]; got != 1 {
|
||||
t.Fatalf("second key message index = %#v, want 1", got)
|
||||
}
|
||||
|
||||
assertAllWritesIncludeAllColumns(t, writes)
|
||||
}
|
||||
|
||||
func TestMapPostgresEventMapPayload(t *testing.T) {
|
||||
run := model.WeatherForecastRun{
|
||||
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||
@@ -201,6 +245,16 @@ func TestMapPostgresEventMalformedPayload(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMapPostgresEventForecastDiscussionMalformedPayload(t *testing.T) {
|
||||
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", "bad"))
|
||||
if err == nil {
|
||||
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "decode forecast discussion payload") {
|
||||
t.Fatalf("error = %q, want decode forecast discussion payload context", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
|
||||
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
|
||||
return fkevent.Event{
|
||||
|
||||
@@ -5,13 +5,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
tableObservations = "observations"
|
||||
tableObservationPresentWeather = "observation_present_weather"
|
||||
tableForecasts = "forecasts"
|
||||
tableForecastPeriods = "forecast_periods"
|
||||
tableAlertRuns = "alert_runs"
|
||||
tableAlerts = "alerts"
|
||||
tableAlertReferences = "alert_references"
|
||||
tableObservations = "observations"
|
||||
tableObservationPresentWeather = "observation_present_weather"
|
||||
tableForecasts = "forecasts"
|
||||
tableForecastPeriods = "forecast_periods"
|
||||
tableForecastDiscussions = "forecast_discussions"
|
||||
tableForecastDiscussionKeyMessages = "forecast_discussion_key_messages"
|
||||
tableAlertRuns = "alert_runs"
|
||||
tableAlerts = "alerts"
|
||||
tableAlertReferences = "alert_references"
|
||||
)
|
||||
|
||||
// PostgresSchema returns weatherfeeder's Postgres schema definition.
|
||||
@@ -129,6 +131,49 @@ func PostgresSchema() fksinks.PostgresSchema {
|
||||
{Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableForecastDiscussions,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "event_id", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_kind", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_source", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_schema", Type: "TEXT", Nullable: false},
|
||||
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "office_id", Type: "TEXT", Nullable: true},
|
||||
{Name: "office_name", Type: "TEXT", Nullable: true},
|
||||
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "product", Type: "TEXT", Nullable: false},
|
||||
{Name: "short_term_qualifier", Type: "TEXT", Nullable: true},
|
||||
{Name: "short_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "short_term_text", Type: "TEXT", Nullable: true},
|
||||
{Name: "long_term_qualifier", Type: "TEXT", Nullable: true},
|
||||
{Name: "long_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true},
|
||||
{Name: "long_term_text", Type: "TEXT", Nullable: true},
|
||||
{Name: "key_message_count", Type: "INTEGER", Nullable: false},
|
||||
},
|
||||
PrimaryKey: []string{"event_id"},
|
||||
PruneColumn: "issued_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_discussion_office_product_issued_at", Columns: []string{"office_id", "product", "issued_at"}},
|
||||
{Name: "idx_wf_discussion_issued_at", Columns: []string{"issued_at"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableForecastDiscussionKeyMessages,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
{Name: "run_event_id", Type: "TEXT REFERENCES forecast_discussions(event_id) ON DELETE CASCADE", Nullable: false},
|
||||
{Name: "message_index", Type: "INTEGER", Nullable: false},
|
||||
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
|
||||
{Name: "message_text", Type: "TEXT", Nullable: true},
|
||||
},
|
||||
PrimaryKey: []string{"run_event_id", "message_index"},
|
||||
PruneColumn: "issued_at",
|
||||
Indexes: []fksinks.PostgresIndex{
|
||||
{Name: "idx_wf_discussion_message_issued_at", Columns: []string{"issued_at"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: tableAlertRuns,
|
||||
Columns: []fksinks.PostgresColumn{
|
||||
|
||||
@@ -9,13 +9,15 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
|
||||
}
|
||||
|
||||
wantTables := map[string]bool{
|
||||
tableObservations: true,
|
||||
tableObservationPresentWeather: true,
|
||||
tableForecasts: true,
|
||||
tableForecastPeriods: true,
|
||||
tableAlertRuns: true,
|
||||
tableAlerts: true,
|
||||
tableAlertReferences: true,
|
||||
tableObservations: true,
|
||||
tableObservationPresentWeather: true,
|
||||
tableForecasts: true,
|
||||
tableForecastPeriods: true,
|
||||
tableForecastDiscussions: true,
|
||||
tableForecastDiscussionKeyMessages: true,
|
||||
tableAlertRuns: true,
|
||||
tableAlerts: true,
|
||||
tableAlertReferences: true,
|
||||
}
|
||||
|
||||
if len(s.Tables) != len(wantTables) {
|
||||
|
||||
Reference in New Issue
Block a user