package postgres import ( "context" "encoding/json" "fmt" "strings" "time" fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" "gitea.maximumdirect.net/ejr/weatherfeeder/model" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWrite, error) { schema := strings.TrimSpace(e.Schema) switch schema { case standards.SchemaWeatherObservationV1: return mapObservationEvent(e) case standards.SchemaWeatherForecastV1: return mapForecastEvent(e) case standards.SchemaWeatherForecastDiscussionV1: return mapForecastDiscussionEvent(e) case standards.SchemaWeatherAlertV1: return mapAlertEvent(e) default: return nil, nil } } func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { obs, err := decodePayload[model.WeatherObservation](e.Payload) if err != nil { return nil, fmt.Errorf("decode observation payload: %w", err) } if obs.Timestamp.IsZero() { return nil, fmt.Errorf("decode observation payload: timestamp is required") } observedAt := obs.Timestamp.UTC() writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.PresentWeather)) writes = append(writes, fksinks.PostgresWrite{ Table: tableObservations, Values: map[string]any{ "event_id": e.ID, "event_kind": string(e.Kind), "event_source": e.Source, "event_schema": e.Schema, "event_emitted_at": e.EmittedAt.UTC(), "event_effective_at": nullableTime(e.EffectiveAt), "station_id": nullableString(obs.StationID), "station_name": nullableString(obs.StationName), "observed_at": observedAt, "condition_code": int(obs.ConditionCode), "is_day": nullableBool(obs.IsDay), "text_description": nullableString(obs.TextDescription), "temperature_c": nullableFloat64(obs.TemperatureC), "dewpoint_c": nullableFloat64(obs.DewpointC), "wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees), "wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh), "wind_gust_kmh": nullableFloat64(obs.WindGustKmh), "barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa), "visibility_meters": nullableFloat64(obs.VisibilityMeters), "relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent), "apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC), }, }) for i, pw := range obs.PresentWeather { rawText, err := compactJSONText(pw.Raw) if err != nil { return nil, fmt.Errorf("observation presentWeather[%d].raw: %w", i, err) } writes = append(writes, fksinks.PostgresWrite{ Table: tableObservationPresentWeather, Values: map[string]any{ "event_id": e.ID, "weather_index": i, "observed_at": observedAt, "raw_text": rawText, }, }) } return writes, nil } func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { run, err := decodePayload[model.WeatherForecastRun](e.Payload) if err != nil { return nil, fmt.Errorf("decode forecast payload: %w", err) } if run.IssuedAt.IsZero() { return nil, fmt.Errorf("decode forecast payload: issuedAt is required") } if strings.TrimSpace(string(run.Product)) == "" { return nil, fmt.Errorf("decode forecast payload: product is required") } issuedAt := run.IssuedAt.UTC() writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Periods)) writes = append(writes, fksinks.PostgresWrite{ Table: tableForecasts, Values: map[string]any{ "event_id": e.ID, "event_kind": string(e.Kind), "event_source": e.Source, "event_schema": e.Schema, "event_emitted_at": e.EmittedAt.UTC(), "event_effective_at": nullableTime(e.EffectiveAt), "location_id": nullableString(run.LocationID), "location_name": nullableString(run.LocationName), "issued_at": issuedAt, "updated_at": nullableTime(run.UpdatedAt), "product": string(run.Product), "latitude": nullableFloat64(run.Latitude), "longitude": nullableFloat64(run.Longitude), "elevation_meters": nullableFloat64(run.ElevationMeters), "period_count": len(run.Periods), }, }) for i, p := range run.Periods { if p.StartTime.IsZero() || p.EndTime.IsZero() { return nil, fmt.Errorf("decode forecast payload: periods[%d] startTime/endTime are required", i) } writes = append(writes, fksinks.PostgresWrite{ Table: tableForecastPeriods, Values: map[string]any{ "run_event_id": e.ID, "period_index": i, "issued_at": issuedAt, "start_time": p.StartTime.UTC(), "end_time": p.EndTime.UTC(), "name": nullableString(p.Name), "is_day": nullableBool(p.IsDay), "condition_code": int(p.ConditionCode), "text_description": nullableString(p.TextDescription), "temperature_c": nullableFloat64(p.TemperatureC), "temperature_c_min": nullableFloat64(p.TemperatureCMin), "temperature_c_max": nullableFloat64(p.TemperatureCMax), "dewpoint_c": nullableFloat64(p.DewpointC), "relative_humidity_percent": nullableFloat64(p.RelativeHumidityPercent), "wind_direction_degrees": nullableFloat64(p.WindDirectionDegrees), "wind_speed_kmh": nullableFloat64(p.WindSpeedKmh), "wind_gust_kmh": nullableFloat64(p.WindGustKmh), "barometric_pressure_pa": nullableFloat64(p.BarometricPressurePa), "visibility_meters": nullableFloat64(p.VisibilityMeters), "apparent_temperature_c": nullableFloat64(p.ApparentTemperatureC), "cloud_cover_percent": nullableFloat64(p.CloudCoverPercent), "probability_of_precipitation_percent": nullableFloat64(p.ProbabilityOfPrecipitationPercent), "precipitation_amount_mm": nullableFloat64(p.PrecipitationAmountMm), "snowfall_depth_mm": nullableFloat64(p.SnowfallDepthMM), "uv_index": nullableFloat64(p.UVIndex), }, }) } return writes, nil } func mapForecastDiscussionEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { run, err := decodePayload[model.WeatherForecastDiscussion](e.Payload) if err != nil { return nil, fmt.Errorf("decode forecast discussion payload: %w", err) } if run.IssuedAt.IsZero() { return nil, fmt.Errorf("decode forecast discussion payload: issuedAt is required") } if strings.TrimSpace(string(run.Product)) == "" { return nil, fmt.Errorf("decode forecast discussion payload: product is required") } issuedAt := run.IssuedAt.UTC() shortTermQualifier, shortTermIssuedAt, shortTermText := nullableDiscussionSection(run.ShortTerm) longTermQualifier, longTermIssuedAt, longTermText := nullableDiscussionSection(run.LongTerm) writes := make([]fksinks.PostgresWrite, 0, 1+len(run.KeyMessages)) writes = append(writes, fksinks.PostgresWrite{ Table: tableForecastDiscussions, Values: map[string]any{ "event_id": e.ID, "event_kind": string(e.Kind), "event_source": e.Source, "event_schema": e.Schema, "event_emitted_at": e.EmittedAt.UTC(), "event_effective_at": nullableTime(e.EffectiveAt), "office_id": nullableString(run.OfficeID), "office_name": nullableString(run.OfficeName), "issued_at": issuedAt, "updated_at": nullableTime(run.UpdatedAt), "product": string(run.Product), "short_term_qualifier": shortTermQualifier, "short_term_issued_at": shortTermIssuedAt, "short_term_text": shortTermText, "long_term_qualifier": longTermQualifier, "long_term_issued_at": longTermIssuedAt, "long_term_text": longTermText, "key_message_count": len(run.KeyMessages), }, }) for i, msg := range run.KeyMessages { writes = append(writes, fksinks.PostgresWrite{ Table: tableForecastDiscussionKeyMessages, Values: map[string]any{ "run_event_id": e.ID, "message_index": i, "issued_at": issuedAt, "message_text": nullableString(msg), }, }) } return writes, nil } func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { run, err := decodePayload[model.WeatherAlertRun](e.Payload) if err != nil { return nil, fmt.Errorf("decode alert payload: %w", err) } if run.AsOf.IsZero() { return nil, fmt.Errorf("decode alert payload: asOf is required") } asOf := run.AsOf.UTC() writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Alerts)+countAlertReferences(run.Alerts)) writes = append(writes, fksinks.PostgresWrite{ Table: tableAlertRuns, Values: map[string]any{ "event_id": e.ID, "event_kind": string(e.Kind), "event_source": e.Source, "event_schema": e.Schema, "event_emitted_at": e.EmittedAt.UTC(), "event_effective_at": nullableTime(e.EffectiveAt), "location_id": nullableString(run.LocationID), "location_name": nullableString(run.LocationName), "as_of": asOf, "latitude": nullableFloat64(run.Latitude), "longitude": nullableFloat64(run.Longitude), "alert_count": len(run.Alerts), }, }) for i, a := range run.Alerts { if strings.TrimSpace(a.ID) == "" { return nil, fmt.Errorf("decode alert payload: alerts[%d].id is required", i) } writes = append(writes, fksinks.PostgresWrite{ Table: tableAlerts, Values: map[string]any{ "run_event_id": e.ID, "alert_index": i, "as_of": asOf, "alert_id": a.ID, "event": nullableString(a.Event), "headline": nullableString(a.Headline), "severity": nullableString(a.Severity), "urgency": nullableString(a.Urgency), "certainty": nullableString(a.Certainty), "status": nullableString(a.Status), "message_type": nullableString(a.MessageType), "category": nullableString(a.Category), "response": nullableString(a.Response), "description": nullableString(a.Description), "instruction": nullableString(a.Instruction), "sent": nullableTime(a.Sent), "effective": nullableTime(a.Effective), "onset": nullableTime(a.Onset), "expires": nullableTime(a.Expires), "area_description": nullableString(a.AreaDescription), "sender_name": nullableString(a.SenderName), "reference_count": len(a.References), }, }) for j, ref := range a.References { writes = append(writes, fksinks.PostgresWrite{ Table: tableAlertReferences, Values: map[string]any{ "run_event_id": e.ID, "alert_index": i, "reference_index": j, "as_of": asOf, "id": nullableString(ref.ID), "identifier": nullableString(ref.Identifier), "sender": nullableString(ref.Sender), "sent": nullableTime(ref.Sent), }, }) } } return writes, nil } func decodePayload[T any](payload any) (T, error) { var out T if payload == nil { return out, fmt.Errorf("payload is nil") } if typed, ok := payload.(T); ok { return typed, nil } if ptr, ok := payload.(*T); ok { if ptr == nil { return out, fmt.Errorf("payload pointer is nil") } return *ptr, nil } b, err := json.Marshal(payload) if err != nil { return out, fmt.Errorf("marshal payload: %w", err) } if err := json.Unmarshal(b, &out); err != nil { return out, fmt.Errorf("unmarshal payload: %w", err) } return out, nil } func nullableDiscussionSection(section *model.WeatherForecastDiscussionSection) (any, any, any) { if section == nil { return nil, nil, nil } return nullableString(section.Qualifier), nullableTime(section.IssuedAt), nullableString(section.Text) } func countAlertReferences(alerts []model.WeatherAlert) int { total := 0 for _, a := range alerts { total += len(a.References) } return total } func nullableString(s string) any { if strings.TrimSpace(s) == "" { return nil } return s } func nullableFloat64(v *float64) any { if v == nil { return nil } return *v } func nullableBool(v *bool) any { if v == nil { return nil } return *v } func nullableTime(v *time.Time) any { if v == nil || v.IsZero() { return nil } return v.UTC() } func compactJSONText(v any) (any, error) { if v == nil { return nil, nil } b, err := json.Marshal(v) if err != nil { return nil, err } if string(b) == "null" { return nil, nil } return string(b), nil }