Added a postgres sink implementation.
Some checks failed
ci/woodpecker/push/build-image Pipeline failed

This commit is contained in:
2026-03-16 15:32:46 -05:00
parent 859ee9dd5c
commit 84da2bb689
5 changed files with 963 additions and 0 deletions

View File

@@ -0,0 +1,343 @@
package postgres
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWrite, error) {
schema := strings.TrimSpace(e.Schema)
switch schema {
case standards.SchemaWeatherObservationV1:
return mapObservationEvent(e)
case standards.SchemaWeatherForecastV1:
return mapForecastEvent(e)
case standards.SchemaWeatherAlertV1:
return mapAlertEvent(e)
default:
return nil, nil
}
}
func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
obs, err := decodePayload[model.WeatherObservation](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode observation payload: %w", err)
}
if obs.Timestamp.IsZero() {
return nil, fmt.Errorf("decode observation payload: timestamp is required")
}
observedAt := obs.Timestamp.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.CloudLayers)+len(obs.PresentWeather))
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservations,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"station_id": nullableString(obs.StationID),
"station_name": nullableString(obs.StationName),
"observed_at": observedAt,
"condition_code": int(obs.ConditionCode),
"condition_text": nullableString(obs.ConditionText),
"is_day": nullableBool(obs.IsDay),
"provider_raw_description": nullableString(obs.ProviderRawDescription),
"text_description": nullableString(obs.TextDescription),
"icon_url": nullableString(obs.IconURL),
"temperature_c": nullableFloat64(obs.TemperatureC),
"dewpoint_c": nullableFloat64(obs.DewpointC),
"wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees),
"wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh),
"wind_gust_kmh": nullableFloat64(obs.WindGustKmh),
"barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa),
"sea_level_pressure_pa": nullableFloat64(obs.SeaLevelPressurePa),
"visibility_meters": nullableFloat64(obs.VisibilityMeters),
"relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent),
"apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC),
"elevation_meters": nullableFloat64(obs.ElevationMeters),
"raw_message": nullableString(obs.RawMessage),
},
})
for i, cl := range obs.CloudLayers {
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservationCloudLayers,
Values: map[string]any{
"event_id": e.ID,
"layer_index": i,
"observed_at": observedAt,
"base_meters": nullableFloat64(cl.BaseMeters),
"amount": nullableString(cl.Amount),
},
})
}
for i, pw := range obs.PresentWeather {
rawText, err := compactJSONText(pw.Raw)
if err != nil {
return nil, fmt.Errorf("observation presentWeather[%d].raw: %w", i, err)
}
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservationPresentWeather,
Values: map[string]any{
"event_id": e.ID,
"weather_index": i,
"observed_at": observedAt,
"raw_text": rawText,
},
})
}
return writes, nil
}
func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherForecastRun](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode forecast payload: %w", err)
}
if run.IssuedAt.IsZero() {
return nil, fmt.Errorf("decode forecast payload: issuedAt is required")
}
if strings.TrimSpace(string(run.Product)) == "" {
return nil, fmt.Errorf("decode forecast payload: product is required")
}
issuedAt := run.IssuedAt.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Periods))
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecasts,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"location_id": nullableString(run.LocationID),
"location_name": nullableString(run.LocationName),
"issued_at": issuedAt,
"updated_at": nullableTime(run.UpdatedAt),
"product": string(run.Product),
"latitude": nullableFloat64(run.Latitude),
"longitude": nullableFloat64(run.Longitude),
"elevation_meters": nullableFloat64(run.ElevationMeters),
"period_count": len(run.Periods),
},
})
for i, p := range run.Periods {
if p.StartTime.IsZero() || p.EndTime.IsZero() {
return nil, fmt.Errorf("decode forecast payload: periods[%d] startTime/endTime are required", i)
}
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecastPeriods,
Values: map[string]any{
"run_event_id": e.ID,
"period_index": i,
"issued_at": issuedAt,
"start_time": p.StartTime.UTC(),
"end_time": p.EndTime.UTC(),
"name": nullableString(p.Name),
"is_day": nullableBool(p.IsDay),
"condition_code": int(p.ConditionCode),
"condition_text": nullableString(p.ConditionText),
"provider_raw_description": nullableString(p.ProviderRawDescription),
"text_description": nullableString(p.TextDescription),
"detailed_text": nullableString(p.DetailedText),
"icon_url": nullableString(p.IconURL),
"temperature_c": nullableFloat64(p.TemperatureC),
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
"temperature_c_max": nullableFloat64(p.TemperatureCMax),
"dewpoint_c": nullableFloat64(p.DewpointC),
"relative_humidity_percent": nullableFloat64(p.RelativeHumidityPercent),
"wind_direction_degrees": nullableFloat64(p.WindDirectionDegrees),
"wind_speed_kmh": nullableFloat64(p.WindSpeedKmh),
"wind_gust_kmh": nullableFloat64(p.WindGustKmh),
"barometric_pressure_pa": nullableFloat64(p.BarometricPressurePa),
"visibility_meters": nullableFloat64(p.VisibilityMeters),
"apparent_temperature_c": nullableFloat64(p.ApparentTemperatureC),
"cloud_cover_percent": nullableFloat64(p.CloudCoverPercent),
"probability_of_precipitation_percent": nullableFloat64(p.ProbabilityOfPrecipitationPercent),
"precipitation_amount_mm": nullableFloat64(p.PrecipitationAmountMm),
"snowfall_depth_mm": nullableFloat64(p.SnowfallDepthMM),
"uv_index": nullableFloat64(p.UVIndex),
},
})
}
return writes, nil
}
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherAlertRun](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode alert payload: %w", err)
}
if run.AsOf.IsZero() {
return nil, fmt.Errorf("decode alert payload: asOf is required")
}
asOf := run.AsOf.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Alerts)+countAlertReferences(run.Alerts))
writes = append(writes, fksinks.PostgresWrite{
Table: tableAlertRuns,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"location_id": nullableString(run.LocationID),
"location_name": nullableString(run.LocationName),
"as_of": asOf,
"latitude": nullableFloat64(run.Latitude),
"longitude": nullableFloat64(run.Longitude),
"alert_count": len(run.Alerts),
},
})
for i, a := range run.Alerts {
if strings.TrimSpace(a.ID) == "" {
return nil, fmt.Errorf("decode alert payload: alerts[%d].id is required", i)
}
writes = append(writes, fksinks.PostgresWrite{
Table: tableAlerts,
Values: map[string]any{
"run_event_id": e.ID,
"alert_index": i,
"as_of": asOf,
"alert_id": a.ID,
"event": nullableString(a.Event),
"headline": nullableString(a.Headline),
"severity": nullableString(a.Severity),
"urgency": nullableString(a.Urgency),
"certainty": nullableString(a.Certainty),
"status": nullableString(a.Status),
"message_type": nullableString(a.MessageType),
"category": nullableString(a.Category),
"response": nullableString(a.Response),
"description": nullableString(a.Description),
"instruction": nullableString(a.Instruction),
"sent": nullableTime(a.Sent),
"effective": nullableTime(a.Effective),
"onset": nullableTime(a.Onset),
"expires": nullableTime(a.Expires),
"area_description": nullableString(a.AreaDescription),
"sender_name": nullableString(a.SenderName),
"reference_count": len(a.References),
},
})
for j, ref := range a.References {
writes = append(writes, fksinks.PostgresWrite{
Table: tableAlertReferences,
Values: map[string]any{
"run_event_id": e.ID,
"alert_index": i,
"reference_index": j,
"as_of": asOf,
"id": nullableString(ref.ID),
"identifier": nullableString(ref.Identifier),
"sender": nullableString(ref.Sender),
"sent": nullableTime(ref.Sent),
},
})
}
}
return writes, nil
}
func decodePayload[T any](payload any) (T, error) {
var out T
if payload == nil {
return out, fmt.Errorf("payload is nil")
}
if typed, ok := payload.(T); ok {
return typed, nil
}
if ptr, ok := payload.(*T); ok {
if ptr == nil {
return out, fmt.Errorf("payload pointer is nil")
}
return *ptr, nil
}
b, err := json.Marshal(payload)
if err != nil {
return out, fmt.Errorf("marshal payload: %w", err)
}
if err := json.Unmarshal(b, &out); err != nil {
return out, fmt.Errorf("unmarshal payload: %w", err)
}
return out, nil
}
func countAlertReferences(alerts []model.WeatherAlert) int {
total := 0
for _, a := range alerts {
total += len(a.References)
}
return total
}
func nullableString(s string) any {
if strings.TrimSpace(s) == "" {
return nil
}
return s
}
func nullableFloat64(v *float64) any {
if v == nil {
return nil
}
return *v
}
func nullableBool(v *bool) any {
if v == nil {
return nil
}
return *v
}
func nullableTime(v *time.Time) any {
if v == nil || v.IsZero() {
return nil
}
return v.UTC()
}
func compactJSONText(v any) (any, error) {
if v == nil {
return nil, nil
}
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
if string(b) == "null" {
return nil, nil
}
return string(b), nil
}