diff --git a/API.md b/API.md index 4927882..4ffd6f2 100644 --- a/API.md +++ b/API.md @@ -37,14 +37,24 @@ Examples: ## Canonical schemas -weatherfeeder emits three canonical domain schemas: +weatherfeeder emits four canonical domain schemas: - `weather.observation.v1` - `weather.forecast.v1` +- `weather.forecast_discussion.v1` - `weather.alert.v1` Each payload is described below using the JSON field names as the contract. +### Raw upstream schemas + +weatherfeeder sources also emit provider-specific raw schemas before normalization. +For this feature, the raw source schema is: + +- `raw.nws.forecast_discussion.v1` + - payload type: string + - payload contents: exact fetched HTML response body + --- ## Shared Conventions @@ -217,6 +227,36 @@ A run may contain zero, one, or many alerts. --- +## Schema: `weather.forecast_discussion.v1` + +Payload type: `WeatherForecastDiscussion` + +A `WeatherForecastDiscussion` is an issued narrative bulletin for an NWS office. +It is distinct from `weather.forecast.v1`, which is period-based. + +### Fields + +| Field | Type | Required | Notes | +|---|---:|:---:|---| +| `officeId` | string | no | NWS office identifier, e.g. `LSX` | +| `officeName` | string | no | Human office name | +| `product` | string | yes | Currently `afd` | +| `issuedAt` | string (timestamp) | yes | Bulletin issue time | +| `updatedAt` | string (timestamp) | no | Optional page/update timestamp | +| `keyMessages` | array | no | Ordered key-message bullet list | +| `shortTerm` | object | no | Short-term discussion section | +| `longTerm` | object | no | Long-term discussion section | + +### Nested: `shortTerm` / `longTerm` + +| Field | Type | Required | Notes | +|---|---:|:---:|---| +| `qualifier` | string | no | Header qualifier such as `(Through Late Sunday Night)` | +| `issuedAt` | string (timestamp) | no | Optional section-local issue time | +| `text` | string | no | Paragraph-preserved prose text | + +--- + ## Compatibility rules - Consumers **must** ignore unknown fields. diff --git a/README.md b/README.md index 32a35de..81ec803 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Canonical domain schemas emitted after normalization: - `weather.observation.v1` → `WeatherObservation` - `weather.forecast.v1` → `WeatherForecastRun` +- `weather.forecast_discussion.v1` → `WeatherForecastDiscussion` - `weather.alert.v1` → `WeatherAlertRun` For the complete wire contract (event envelope + payload schemas, fields, units, and compatibility rules), see: @@ -22,7 +23,7 @@ For the complete wire contract (event envelope + payload schemas, fields, units, ## Upstream providers (current MVP) -- NWS: observations, hourly forecasts, narrative forecasts, alerts +- NWS: observations, hourly forecasts, narrative forecasts, forecast discussions, alerts - Open-Meteo: observations, hourly forecasts - OpenWeather: observations diff --git a/cmd/weatherfeeder/config.yml b/cmd/weatherfeeder/config.yml index 2bc1e7f..c5501b0 100644 --- a/cmd/weatherfeeder/config.yml +++ b/cmd/weatherfeeder/config.yml @@ -15,7 +15,7 @@ sources: # driver: openmeteo_observation # every: 10m # params: -# url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1" +# url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl" # user_agent: "HomeOps (eric@maximumdirect.net)" # - name: OpenWeatherObservation @@ -63,6 +63,15 @@ sources: url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us" user_agent: "HomeOps (eric@maximumdirect.net)" + - name: NWSForecastDiscussionSTL + mode: poll + kinds: ["forecast_discussion"] + driver: nws_forecast_discussion + every: 30m + params: + url: "https://forecast.weather.gov/product.php?site=LSX&issuedby=LSX&product=AFD&format=TXT&version=1&glossary=0" + user_agent: "HomeOps (eric@maximumdirect.net)" + - name: OpenMeteoHourlyForecastSTL mode: poll kinds: ["forecast"] @@ -108,13 +117,13 @@ sinks: routes: - sink: stdout - kinds: ["observation", "forecast", "alert"] + kinds: ["observation", "forecast", "forecast_discussion", "alert"] - sink: nats_weatherfeeder - kinds: ["observation", "forecast", "alert"] + kinds: ["observation", "forecast", "forecast_discussion", "alert"] # - sink: pg_weatherfeeder -# kinds: ["observation", "forecast", "alert"] +# kinds: ["observation", "forecast", "forecast_discussion", "alert"] # - sink: logfile -# kinds: ["observation", "alert", "forecast"] +# kinds: ["observation", "alert", "forecast", "forecast_discussion"] diff --git a/internal/normalizers/builtins_test.go b/internal/normalizers/builtins_test.go index e33507c..7c40ffb 100644 --- a/internal/normalizers/builtins_test.go +++ b/internal/normalizers/builtins_test.go @@ -19,6 +19,7 @@ func TestRegisterBuiltinsOrder(t *testing.T) { want := []fknormalize.Normalizer{ nws.ObservationNormalizer{}, nws.ForecastNormalizer{}, + nws.ForecastDiscussionNormalizer{}, nws.AlertsNormalizer{}, openmeteo.ObservationNormalizer{}, openmeteo.ForecastNormalizer{}, diff --git a/internal/normalizers/nws/forecast_discussion.go b/internal/normalizers/nws/forecast_discussion.go new file mode 100644 index 0000000..fb59005 --- /dev/null +++ b/internal/normalizers/nws/forecast_discussion.go @@ -0,0 +1,72 @@ +package nws + +import ( + "context" + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/event" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +type ForecastDiscussionNormalizer struct{} + +func (ForecastDiscussionNormalizer) Match(e event.Event) bool { + return strings.TrimSpace(e.Schema) == standards.SchemaRawNWSForecastDiscussionV1 +} + +func (ForecastDiscussionNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + _ = ctx + + rawHTML, err := decodeStringPayload(in.Payload) + if err != nil { + return nil, fmt.Errorf("nws forecast discussion normalize: %w", err) + } + + parsed, err := nwscommon.ParseForecastDiscussionHTML(rawHTML) + if err != nil { + return nil, fmt.Errorf("nws forecast discussion normalize: build: %w", err) + } + + payload := model.WeatherForecastDiscussion{ + OfficeID: strings.TrimSpace(parsed.OfficeID), + OfficeName: strings.TrimSpace(parsed.OfficeName), + Product: model.ForecastDiscussionProduct(strings.TrimSpace(parsed.Product)), + IssuedAt: parsed.IssuedAt.UTC(), + UpdatedAt: parsed.UpdatedAt, + KeyMessages: append([]string(nil), parsed.KeyMessages...), + ShortTerm: mapForecastDiscussionSection(parsed.ShortTerm), + LongTerm: mapForecastDiscussionSection(parsed.LongTerm), + } + + out, err := normcommon.Finalize(in, standards.SchemaWeatherForecastDiscussionV1, payload, payload.IssuedAt) + if err != nil { + return nil, fmt.Errorf("nws forecast discussion normalize: %w", err) + } + return out, nil +} + +func mapForecastDiscussionSection(in *nwscommon.ForecastDiscussionSection) *model.WeatherForecastDiscussionSection { + if in == nil { + return nil + } + return &model.WeatherForecastDiscussionSection{ + Qualifier: strings.TrimSpace(in.Qualifier), + IssuedAt: in.IssuedAt, + Text: strings.TrimSpace(in.Text), + } +} + +func decodeStringPayload(payload any) (string, error) { + switch v := payload.(type) { + case string: + return v, nil + case []byte: + return string(v), nil + default: + return "", fmt.Errorf("extract payload: expected string payload, got %T", payload) + } +} diff --git a/internal/normalizers/nws/forecast_discussion_test.go b/internal/normalizers/nws/forecast_discussion_test.go new file mode 100644 index 0000000..3e7332a --- /dev/null +++ b/internal/normalizers/nws/forecast_discussion_test.go @@ -0,0 +1,130 @@ +package nws + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +func TestForecastDiscussionNormalizerProducesCanonicalSchema(t *testing.T) { + rawHTML := loadForecastDiscussionSampleHTML(t) + + out, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{ + ID: "evt-discussion-1", + Kind: event.Kind("forecast_discussion"), + Source: "nws-discussion-test", + EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC), + Schema: standards.SchemaRawNWSForecastDiscussionV1, + Payload: rawHTML, + }) + if err != nil { + t.Fatalf("Normalize() error = %v", err) + } + if out == nil { + t.Fatalf("Normalize() returned nil output") + } + if out.Schema != standards.SchemaWeatherForecastDiscussionV1 { + t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastDiscussionV1) + } + if out.Kind != event.Kind("forecast_discussion") { + t.Fatalf("Kind = %q, want forecast_discussion", out.Kind) + } + + payload, ok := out.Payload.(model.WeatherForecastDiscussion) + if !ok { + t.Fatalf("Payload type = %T, want model.WeatherForecastDiscussion", out.Payload) + } + if payload.OfficeID != "LSX" { + t.Fatalf("OfficeID = %q, want LSX", payload.OfficeID) + } + if payload.Product != model.ForecastDiscussionProductAFD { + t.Fatalf("Product = %q, want %q", payload.Product, model.ForecastDiscussionProductAFD) + } + if len(payload.KeyMessages) != 3 { + t.Fatalf("KeyMessages len = %d, want 3", len(payload.KeyMessages)) + } + if payload.ShortTerm == nil || payload.LongTerm == nil { + t.Fatalf("ShortTerm=%v LongTerm=%v, want both populated", payload.ShortTerm, payload.LongTerm) + } + if payload.ShortTerm.Qualifier != "(Through Late Sunday Night)" { + t.Fatalf("ShortTerm.Qualifier = %q", payload.ShortTerm.Qualifier) + } + if !strings.Contains(payload.ShortTerm.Text, "After a chilly morning") { + t.Fatalf("ShortTerm.Text = %q, want normalized prose", payload.ShortTerm.Text) + } + if strings.Contains(payload.ShortTerm.Text, "BRC") { + t.Fatalf("ShortTerm.Text contains signature: %q", payload.ShortTerm.Text) + } + if strings.Contains(payload.LongTerm.Text, "AVIATION") || strings.Contains(payload.LongTerm.Text, "WATCHES/WARNINGS/ADVISORIES") { + t.Fatalf("LongTerm.Text includes downstream sections: %q", payload.LongTerm.Text) + } + wantEffectiveAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC) + if out.EffectiveAt == nil || !out.EffectiveAt.Equal(wantEffectiveAt) { + t.Fatalf("EffectiveAt = %v, want %s", out.EffectiveAt, wantEffectiveAt.Format(time.RFC3339)) + } +} + +func TestForecastDiscussionNormalizerRejectsMissingIssueTime(t *testing.T) { + _, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{ + ID: "evt-discussion-bad", + Kind: event.Kind("forecast_discussion"), + Source: "nws-discussion-test", + EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC), + Schema: standards.SchemaRawNWSForecastDiscussionV1, + Payload: "
National Weather Service Saint Louis MO
", + }) + if err == nil { + t.Fatalf("Normalize() error = nil, want error") + } + if !strings.Contains(err.Error(), "issue time") { + t.Fatalf("error = %q, want issue time context", err) + } +} + +func TestForecastDiscussionNormalizerWireShapeHasNoUnexpectedKeys(t *testing.T) { + rawHTML := loadForecastDiscussionSampleHTML(t) + + out, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{ + ID: "evt-discussion-2", + Kind: event.Kind("forecast_discussion"), + Source: "nws-discussion-test", + EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC), + Schema: standards.SchemaRawNWSForecastDiscussionV1, + Payload: rawHTML, + }) + if err != nil { + t.Fatalf("Normalize() error = %v", err) + } + + b, err := json.Marshal(out.Payload) + if err != nil { + t.Fatalf("json.Marshal(payload) error = %v", err) + } + var got map[string]any + if err := json.Unmarshal(b, &got); err != nil { + t.Fatalf("json.Unmarshal(payload) error = %v", err) + } + for _, key := range []string{"sections", "aviation"} { + if _, ok := got[key]; ok { + t.Fatalf("unexpected key %q in canonical payload", key) + } + } +} + +func loadForecastDiscussionSampleHTML(t *testing.T) string { + t.Helper() + + path := filepath.Join("..", "..", "providers", "nws", "testdata", "forecast_discussion_sample.html") + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("os.ReadFile(%q) error = %v", path, err) + } + return string(b) +} diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index c1ca0bf..2d01a0a 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -8,6 +8,7 @@ import ( var builtins = []fknormalize.Normalizer{ ObservationNormalizer{}, ForecastNormalizer{}, + ForecastDiscussionNormalizer{}, AlertsNormalizer{}, } diff --git a/internal/providers/nws/forecast_discussion.go b/internal/providers/nws/forecast_discussion.go new file mode 100644 index 0000000..164852e --- /dev/null +++ b/internal/providers/nws/forecast_discussion.go @@ -0,0 +1,552 @@ +package nws + +import ( + "fmt" + "html" + "regexp" + "strconv" + "strings" + "time" +) + +type ForecastDiscussion struct { + OfficeID string + OfficeName string + Product string + IssuedAt time.Time + UpdatedAt *time.Time + + KeyMessages []string + ShortTerm *ForecastDiscussionSection + LongTerm *ForecastDiscussionSection +} + +type ForecastDiscussionSection struct { + Qualifier string + IssuedAt *time.Time + Text string +} + +var ( + forecastDiscussionHeaderRE = regexp.MustCompile(`^\.(KEY MESSAGES|SHORT TERM|LONG TERM|AVIATION)\.\.\.(.*)$`) + forecastDiscussionAFDRE = regexp.MustCompile(`^AFD([A-Z]{3})$`) + forecastDiscussionWMORE = regexp.MustCompile(`\bK([A-Z]{3})\b`) + forecastDiscussionSigRE = regexp.MustCompile(`^[A-Z]{2,6}$`) +) + +func ParseForecastDiscussionHTML(raw string) (ForecastDiscussion, error) { + text, err := ExtractForecastDiscussionText(raw) + if err != nil { + return ForecastDiscussion{}, err + } + + parsed, err := ParseForecastDiscussionText(text) + if err != nil { + return ForecastDiscussion{}, err + } + + parsed.UpdatedAt = parseForecastDiscussionUpdatedAt(raw) + return parsed, nil +} + +func ExtractForecastDiscussionText(raw string) (string, error) { + lower := strings.ToLower(raw) + searchFrom := 0 + for { + openStart := strings.Index(lower[searchFrom:], " block") + } + openStart += searchFrom + + openEnd := strings.Index(lower[openStart:], ">") + if openEnd < 0 { + return "", fmt.Errorf("unterminated
 tag")
+		}
+		openEnd += openStart
+
+		tag := lower[openStart : openEnd+1]
+		if isGlossaryProductTag(tag) {
+			closeStart := strings.Index(lower[openEnd+1:], "
") + if closeStart < 0 { + return "", fmt.Errorf("missing closing for glossaryProduct block") + } + closeStart += openEnd + 1 + + text := html.UnescapeString(raw[openEnd+1 : closeStart]) + text = strings.ReplaceAll(text, "\r\n", "\n") + text = strings.ReplaceAll(text, "\r", "\n") + return text, nil + } + + searchFrom = openEnd + 1 + } +} + +func ParseForecastDiscussionText(text string) (ForecastDiscussion, error) { + lines := splitLines(text) + + officeID := parseForecastDiscussionOfficeID(lines) + officeName, issuedAt, err := parseForecastDiscussionHeader(lines) + if err != nil { + return ForecastDiscussion{}, err + } + + out := ForecastDiscussion{ + OfficeID: officeID, + OfficeName: officeName, + Product: "afd", + IssuedAt: issuedAt.UTC(), + } + + if block, ok := extractForecastDiscussionSection(lines, "KEY MESSAGES"); ok { + out.KeyMessages = parseForecastDiscussionKeyMessages(block) + } + if block, ok := extractForecastDiscussionSection(lines, "SHORT TERM"); ok { + section, err := parseForecastDiscussionTextSection(block) + if err != nil { + return ForecastDiscussion{}, fmt.Errorf("parse SHORT TERM: %w", err) + } + out.ShortTerm = §ion + } + if block, ok := extractForecastDiscussionSection(lines, "LONG TERM"); ok { + section, err := parseForecastDiscussionTextSection(block) + if err != nil { + return ForecastDiscussion{}, fmt.Errorf("parse LONG TERM: %w", err) + } + out.LongTerm = §ion + } + + return out, nil +} + +func isGlossaryProductTag(tag string) bool { + tag = strings.ToLower(tag) + return strings.Contains(tag, `class="glossaryproduct"`) || + strings.Contains(tag, `class='glossaryproduct'`) || + strings.Contains(tag, `class="glossaryproduct `) || + strings.Contains(tag, `class='glossaryproduct `) +} + +func parseForecastDiscussionUpdatedAt(raw string) *time.Time { + lower := strings.ToLower(raw) + searchFrom := 0 + for { + metaStart := strings.Index(lower[searchFrom:], "") + if metaEnd < 0 { + return nil + } + metaEnd += metaStart + + tag := raw[metaStart : metaEnd+1] + if !strings.EqualFold(strings.TrimSpace(extractHTMLAttr(tag, "name")), "DC.date.created") { + searchFrom = metaEnd + 1 + continue + } + + content := strings.TrimSpace(extractHTMLAttr(tag, "content")) + if content == "" { + return nil + } + t, err := ParseTime(content) + if err != nil { + return nil + } + tt := t.UTC() + return &tt + } +} + +func extractHTMLAttr(tag, attr string) string { + lower := strings.ToLower(tag) + attrLower := strings.ToLower(attr) + for i := 0; i < len(lower); i++ { + idx := strings.Index(lower[i:], attrLower) + if idx < 0 { + return "" + } + idx += i + if idx > 0 { + prev := lower[idx-1] + if isAttrNameChar(prev) { + i = idx + len(attrLower) + continue + } + } + j := idx + len(attrLower) + for j < len(lower) && isHTMLSpace(lower[j]) { + j++ + } + if j >= len(lower) || lower[j] != '=' { + i = idx + len(attrLower) + continue + } + j++ + for j < len(lower) && isHTMLSpace(lower[j]) { + j++ + } + if j >= len(tag) { + return "" + } + quote := tag[j] + if quote != '"' && quote != '\'' { + return "" + } + j++ + k := j + for k < len(tag) && tag[k] != quote { + k++ + } + if k >= len(tag) { + return "" + } + return html.UnescapeString(tag[j:k]) + } + return "" +} + +func isHTMLSpace(b byte) bool { + switch b { + case ' ', '\n', '\r', '\t', '\f': + return true + default: + return false + } +} + +func isAttrNameChar(b byte) bool { + switch { + case b >= 'a' && b <= 'z': + return true + case b >= 'A' && b <= 'Z': + return true + case b >= '0' && b <= '9': + return true + case b == '-' || b == '_' || b == ':': + return true + default: + return false + } +} + +func splitLines(text string) []string { + text = strings.ReplaceAll(text, "\r\n", "\n") + text = strings.ReplaceAll(text, "\r", "\n") + return strings.Split(text, "\n") +} + +func parseForecastDiscussionOfficeID(lines []string) string { + for _, raw := range lines { + line := strings.TrimSpace(raw) + if m := forecastDiscussionAFDRE.FindStringSubmatch(line); len(m) == 2 { + return m[1] + } + } + for _, raw := range lines { + line := strings.TrimSpace(raw) + if m := forecastDiscussionWMORE.FindStringSubmatch(line); len(m) == 2 { + return m[1] + } + } + return "" +} + +func parseForecastDiscussionHeader(lines []string) (string, time.Time, error) { + for i, raw := range lines { + line := strings.TrimSpace(raw) + if !strings.HasPrefix(line, "National Weather Service ") { + continue + } + + officeName := line + for j := i + 1; j < len(lines); j++ { + tsLine := strings.TrimSpace(lines[j]) + if tsLine == "" { + continue + } + issuedAt, err := parseForecastDiscussionIssueTime(tsLine) + if err != nil { + return "", time.Time{}, fmt.Errorf("parse bulletin issuedAt %q: %w", tsLine, err) + } + return officeName, issuedAt.UTC(), nil + } + + return "", time.Time{}, fmt.Errorf("missing bulletin issue time after office line") + } + + return "", time.Time{}, fmt.Errorf("missing office header") +} + +func parseForecastDiscussionIssueTime(line string) (time.Time, error) { + line = strings.TrimSpace(line) + line = strings.TrimPrefix(line, "Issued at ") + line = strings.TrimSpace(line) + + parts := strings.Fields(line) + if len(parts) != 7 { + return time.Time{}, fmt.Errorf("unexpected issue time format") + } + + loc, err := forecastDiscussionLocation(parts[2]) + if err != nil { + return time.Time{}, err + } + + datePart, err := time.Parse("Mon Jan 2 2006", strings.Join(parts[3:], " ")) + if err != nil { + return time.Time{}, err + } + + hour, minute, err := parseForecastDiscussionClock(parts[0], parts[1]) + if err != nil { + return time.Time{}, err + } + + return time.Date( + datePart.Year(), + datePart.Month(), + datePart.Day(), + hour, + minute, + 0, + 0, + loc, + ), nil +} + +func parseForecastDiscussionClock(rawClock, rawAMPM string) (int, int, error) { + clock := strings.TrimSpace(rawClock) + ampm := strings.ToUpper(strings.TrimSpace(rawAMPM)) + if ampm != "AM" && ampm != "PM" { + return 0, 0, fmt.Errorf("unexpected meridiem %q", rawAMPM) + } + + n, err := strconv.Atoi(clock) + if err != nil { + return 0, 0, fmt.Errorf("invalid clock %q", rawClock) + } + + hour := n + minute := 0 + if len(clock) >= 3 { + hour = n / 100 + minute = n % 100 + } + + if hour < 1 || hour > 12 { + return 0, 0, fmt.Errorf("invalid hour %q", rawClock) + } + if minute < 0 || minute > 59 { + return 0, 0, fmt.Errorf("invalid minute %q", rawClock) + } + + if ampm == "AM" { + if hour == 12 { + hour = 0 + } + return hour, minute, nil + } + + if hour != 12 { + hour += 12 + } + return hour, minute, nil +} + +func forecastDiscussionLocation(abbrev string) (*time.Location, error) { + offsets := map[string]int{ + "AST": -4 * 3600, + "ADT": -3 * 3600, + "EST": -5 * 3600, + "EDT": -4 * 3600, + "CST": -6 * 3600, + "CDT": -5 * 3600, + "MST": -7 * 3600, + "MDT": -6 * 3600, + "PST": -8 * 3600, + "PDT": -7 * 3600, + "AKST": -9 * 3600, + "AKDT": -8 * 3600, + "HST": -10 * 3600, + "UTC": 0, + "GMT": 0, + } + + abbr := strings.ToUpper(strings.TrimSpace(abbrev)) + offset, ok := offsets[abbr] + if !ok { + return nil, fmt.Errorf("unsupported time zone %q", abbrev) + } + return time.FixedZone(abbr, offset), nil +} + +func extractForecastDiscussionSection(lines []string, section string) ([]string, bool) { + target := "." + section + "..." + for i, raw := range lines { + line := strings.TrimSpace(raw) + if !strings.HasPrefix(line, target) { + continue + } + + out := []string{line} + for j := i + 1; j < len(lines); j++ { + next := strings.TrimSpace(lines[j]) + if next == "&&" || next == "$$" || strings.Contains(next, "WATCHES/WARNINGS/ADVISORIES") { + break + } + if j > i+1 && isForecastDiscussionSectionHeader(next) { + break + } + out = append(out, lines[j]) + } + return out, true + } + return nil, false +} + +func isForecastDiscussionSectionHeader(line string) bool { + return forecastDiscussionHeaderRE.MatchString(strings.TrimSpace(line)) +} + +func parseForecastDiscussionKeyMessages(block []string) []string { + if len(block) <= 1 { + return nil + } + + body := trimBlankLines(block[1:]) + var messages []string + var current strings.Builder + + flush := func() { + msg := strings.TrimSpace(current.String()) + if msg != "" { + messages = append(messages, msg) + } + current.Reset() + } + + for _, raw := range body { + line := strings.TrimSpace(raw) + if line == "" { + continue + } + if strings.HasPrefix(line, "-") { + flush() + line = strings.TrimSpace(strings.TrimPrefix(line, "-")) + current.WriteString(line) + continue + } + if current.Len() > 0 { + current.WriteByte(' ') + } + current.WriteString(line) + } + flush() + + return messages +} + +func parseForecastDiscussionTextSection(block []string) (ForecastDiscussionSection, error) { + if len(block) == 0 { + return ForecastDiscussionSection{}, fmt.Errorf("empty section") + } + + section := ForecastDiscussionSection{ + Qualifier: parseForecastDiscussionQualifier(strings.TrimSpace(block[0])), + } + + body := trimBlankLines(block[1:]) + if len(body) == 0 { + return section, nil + } + + first := strings.TrimSpace(body[0]) + if strings.HasPrefix(first, "Issued at ") { + issuedAt, err := parseForecastDiscussionIssueTime(first) + if err != nil { + return ForecastDiscussionSection{}, fmt.Errorf("parse section issuedAt %q: %w", first, err) + } + tt := issuedAt.UTC() + section.IssuedAt = &tt + body = trimBlankLines(body[1:]) + } + + body = trimForecastDiscussionSignatureLines(body) + section.Text = joinForecastDiscussionParagraphs(body) + return section, nil +} + +func parseForecastDiscussionQualifier(header string) string { + m := forecastDiscussionHeaderRE.FindStringSubmatch(header) + if len(m) != 3 { + return "" + } + return strings.TrimSpace(m[2]) +} + +func trimBlankLines(lines []string) []string { + start := 0 + for start < len(lines) && strings.TrimSpace(lines[start]) == "" { + start++ + } + + end := len(lines) + for end > start && strings.TrimSpace(lines[end-1]) == "" { + end-- + } + + return lines[start:end] +} + +func trimForecastDiscussionSignatureLines(lines []string) []string { + lines = trimBlankLines(lines) + for len(lines) > 0 { + last := strings.TrimSpace(lines[len(lines)-1]) + if last == "" { + lines = lines[:len(lines)-1] + continue + } + if forecastDiscussionSigRE.MatchString(last) { + lines = trimBlankLines(lines[:len(lines)-1]) + continue + } + break + } + return lines +} + +func joinForecastDiscussionParagraphs(lines []string) string { + lines = trimBlankLines(lines) + if len(lines) == 0 { + return "" + } + + var paragraphs []string + current := make([]string, 0, len(lines)) + + flush := func() { + if len(current) == 0 { + return + } + paragraphs = append(paragraphs, strings.Join(current, " ")) + current = current[:0] + } + + for _, raw := range lines { + line := strings.TrimSpace(raw) + if line == "" { + flush() + continue + } + current = append(current, line) + } + flush() + + return strings.Join(paragraphs, "\n\n") +} diff --git a/internal/providers/nws/forecast_discussion_test.go b/internal/providers/nws/forecast_discussion_test.go new file mode 100644 index 0000000..ae3f6af --- /dev/null +++ b/internal/providers/nws/forecast_discussion_test.go @@ -0,0 +1,118 @@ +package nws + +import ( + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestParseForecastDiscussionHTMLParsesExpectedFields(t *testing.T) { + raw := loadForecastDiscussionSampleHTML(t) + + got, err := ParseForecastDiscussionHTML(raw) + if err != nil { + t.Fatalf("ParseForecastDiscussionHTML() error = %v", err) + } + + if got.OfficeID != "LSX" { + t.Fatalf("OfficeID = %q, want LSX", got.OfficeID) + } + if got.OfficeName != "National Weather Service Saint Louis MO" { + t.Fatalf("OfficeName = %q", got.OfficeName) + } + if got.Product != "afd" { + t.Fatalf("Product = %q, want afd", got.Product) + } + + wantIssuedAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC) + if !got.IssuedAt.Equal(wantIssuedAt) { + t.Fatalf("IssuedAt = %s, want %s", got.IssuedAt.Format(time.RFC3339), wantIssuedAt.Format(time.RFC3339)) + } + + wantUpdatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC) + if got.UpdatedAt == nil || !got.UpdatedAt.Equal(wantUpdatedAt) { + t.Fatalf("UpdatedAt = %v, want %s", got.UpdatedAt, wantUpdatedAt.Format(time.RFC3339)) + } + + wantMessages := []string{ + "Elevated fire danger conditions are expected across a broad area tomorrow afternoon due to breezy southwest winds and low humidity.", + "Very warm temperatures are expected once again Monday and Tuesday, with highs well into the 80s.", + "A cold front late Tuesday or early Wednesday brings our next chance of thunderstorms, followed by a cooldown and possibly more chances for rain later in the week.", + } + if len(got.KeyMessages) != len(wantMessages) { + t.Fatalf("KeyMessages len = %d, want %d", len(got.KeyMessages), len(wantMessages)) + } + for i := range wantMessages { + if got.KeyMessages[i] != wantMessages[i] { + t.Fatalf("KeyMessages[%d] = %q, want %q", i, got.KeyMessages[i], wantMessages[i]) + } + } + + if got.ShortTerm == nil { + t.Fatalf("ShortTerm is nil") + } + if got.ShortTerm.Qualifier != "(Through Late Sunday Night)" { + t.Fatalf("ShortTerm.Qualifier = %q", got.ShortTerm.Qualifier) + } + if got.ShortTerm.IssuedAt == nil || !got.ShortTerm.IssuedAt.Equal(time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)) { + t.Fatalf("ShortTerm.IssuedAt = %v", got.ShortTerm.IssuedAt) + } + if !strings.Contains(got.ShortTerm.Text, "After a chilly morning") { + t.Fatalf("ShortTerm.Text missing expected prose: %q", got.ShortTerm.Text) + } + if strings.Contains(got.ShortTerm.Text, "BRC") { + t.Fatalf("ShortTerm.Text should not include signature: %q", got.ShortTerm.Text) + } + if strings.Contains(got.ShortTerm.Text, "\n\n\n") { + t.Fatalf("ShortTerm.Text contains unexpected paragraph breaks: %q", got.ShortTerm.Text) + } + + if got.LongTerm == nil { + t.Fatalf("LongTerm is nil") + } + if got.LongTerm.Qualifier != "(Monday through Next Saturday)" { + t.Fatalf("LongTerm.Qualifier = %q", got.LongTerm.Qualifier) + } + if got.LongTerm.IssuedAt == nil || !got.LongTerm.IssuedAt.Equal(time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)) { + t.Fatalf("LongTerm.IssuedAt = %v", got.LongTerm.IssuedAt) + } + if !strings.Contains(got.LongTerm.Text, "The peak of the warmth arrives Monday and Tuesday") { + t.Fatalf("LongTerm.Text missing expected prose: %q", got.LongTerm.Text) + } + if strings.Contains(got.LongTerm.Text, "AVIATION") || strings.Contains(got.LongTerm.Text, "WATCHES/WARNINGS/ADVISORIES") { + t.Fatalf("LongTerm.Text includes content from other sections: %q", got.LongTerm.Text) + } +} + +func TestParseForecastDiscussionHTMLMissingPreBlock(t *testing.T) { + _, err := ParseForecastDiscussionHTML("
no pre block
") + if err == nil { + t.Fatalf("ParseForecastDiscussionHTML() error = nil, want error") + } + if !strings.Contains(err.Error(), "glossaryProduct") { + t.Fatalf("error = %q, want glossaryProduct context", err) + } +} + +func TestParseForecastDiscussionTextMissingIssueTime(t *testing.T) { + _, err := ParseForecastDiscussionText("National Weather Service Saint Louis MO\n\n.KEY MESSAGES...\n- Test") + if err == nil { + t.Fatalf("ParseForecastDiscussionText() error = nil, want error") + } + if !strings.Contains(err.Error(), "issue time") { + t.Fatalf("error = %q, want issue time context", err) + } +} + +func loadForecastDiscussionSampleHTML(t *testing.T) string { + t.Helper() + + path := filepath.Join("testdata", "forecast_discussion_sample.html") + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("os.ReadFile(%q) error = %v", path, err) + } + return string(b) +} diff --git a/internal/providers/nws/testdata/forecast_discussion_sample.html b/internal/providers/nws/testdata/forecast_discussion_sample.html new file mode 100644 index 0000000..90f8169 --- /dev/null +++ b/internal/providers/nws/testdata/forecast_discussion_sample.html @@ -0,0 +1,84 @@ + + + + + National Weather Service + + +
+988
+FXUS63 KLSX 281924
+AFDLSX
+
+Area Forecast Discussion
+National Weather Service Saint Louis MO
+224 PM CDT Sat Mar 28 2026
+
+.KEY MESSAGES...
+
+- Elevated fire danger conditions are expected across a broad area
+  tomorrow afternoon due to breezy southwest winds and low
+  humidity.
+
+- Very warm temperatures are expected once again Monday and
+  Tuesday, with highs well into the 80s.
+
+- A cold front late Tuesday or early Wednesday brings our next
+  chance of thunderstorms, followed by a cooldown and possibly
+  more chances for rain later in the week.
+
+&&
+
+.SHORT TERM...  (Through Late Sunday Night)
+Issued at 219 PM CDT Sat Mar 28 2026
+
+After a chilly morning that saw widespread freezing temperatures,
+another warmup is in store over the next several days as southerly
+winds become re-established. We will also see the return of
+shower/thunderstorm chances Tuesday onward as we enter a more
+unsettled pattern.
+
+In the near-term, the focus continues to be on some lingering fire
+weather potential thanks to the presence of an exceptionally dry
+airmass.
+
+BRC
+
+&&
+
+.LONG TERM...  (Monday through Next Saturday)
+Issued at 219 PM CDT Sat Mar 28 2026
+
+The peak of the warmth arrives Monday and Tuesday, as a broad, but
+low-amplitude ridge nudges eastward and steady warm/moist advection
+continues on both days.
+
+Wednesday onward, the day-to-day details become much less clear, but
+latest trends suggest that an active/wet pattern will likely
+continue as another more substantial trough follows with additional
+chances for showers/thunderstorms late in the week.
+
+BRC
+
+&&
+
+.AVIATION...  (For the 18z TAFs through 18z Sunday Afternoon)
+Issued at 1133 AM CDT Sat Mar 28 2026
+
+VFR conditions are expected throughout the 18Z TAF period.
+
+BRC
+
+&&
+
+.LSX WATCHES/WARNINGS/ADVISORIES...
+MO...None.
+IL...None.
+&&
+
+$$
+
+WFO LSX
+    
+ + diff --git a/internal/sinks/postgres/map.go b/internal/sinks/postgres/map.go index 4934b3c..c891b79 100644 --- a/internal/sinks/postgres/map.go +++ b/internal/sinks/postgres/map.go @@ -20,6 +20,8 @@ func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWri return mapObservationEvent(e) case standards.SchemaWeatherForecastV1: return mapForecastEvent(e) + case standards.SchemaWeatherForecastDiscussionV1: + return mapForecastDiscussionEvent(e) case standards.SchemaWeatherAlertV1: return mapAlertEvent(e) default: @@ -160,6 +162,62 @@ func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { return writes, nil } +func mapForecastDiscussionEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { + run, err := decodePayload[model.WeatherForecastDiscussion](e.Payload) + if err != nil { + return nil, fmt.Errorf("decode forecast discussion payload: %w", err) + } + if run.IssuedAt.IsZero() { + return nil, fmt.Errorf("decode forecast discussion payload: issuedAt is required") + } + if strings.TrimSpace(string(run.Product)) == "" { + return nil, fmt.Errorf("decode forecast discussion payload: product is required") + } + + issuedAt := run.IssuedAt.UTC() + shortTermQualifier, shortTermIssuedAt, shortTermText := nullableDiscussionSection(run.ShortTerm) + longTermQualifier, longTermIssuedAt, longTermText := nullableDiscussionSection(run.LongTerm) + + writes := make([]fksinks.PostgresWrite, 0, 1+len(run.KeyMessages)) + writes = append(writes, fksinks.PostgresWrite{ + Table: tableForecastDiscussions, + Values: map[string]any{ + "event_id": e.ID, + "event_kind": string(e.Kind), + "event_source": e.Source, + "event_schema": e.Schema, + "event_emitted_at": e.EmittedAt.UTC(), + "event_effective_at": nullableTime(e.EffectiveAt), + "office_id": nullableString(run.OfficeID), + "office_name": nullableString(run.OfficeName), + "issued_at": issuedAt, + "updated_at": nullableTime(run.UpdatedAt), + "product": string(run.Product), + "short_term_qualifier": shortTermQualifier, + "short_term_issued_at": shortTermIssuedAt, + "short_term_text": shortTermText, + "long_term_qualifier": longTermQualifier, + "long_term_issued_at": longTermIssuedAt, + "long_term_text": longTermText, + "key_message_count": len(run.KeyMessages), + }, + }) + + for i, msg := range run.KeyMessages { + writes = append(writes, fksinks.PostgresWrite{ + Table: tableForecastDiscussionKeyMessages, + Values: map[string]any{ + "run_event_id": e.ID, + "message_index": i, + "issued_at": issuedAt, + "message_text": nullableString(msg), + }, + }) + } + + return writes, nil +} + func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { run, err := decodePayload[model.WeatherAlertRun](e.Payload) if err != nil { @@ -269,6 +327,13 @@ func decodePayload[T any](payload any) (T, error) { return out, nil } +func nullableDiscussionSection(section *model.WeatherForecastDiscussionSection) (any, any, any) { + if section == nil { + return nil, nil, nil + } + return nullableString(section.Qualifier), nullableTime(section.IssuedAt), nullableString(section.Text) +} + func countAlertReferences(alerts []model.WeatherAlert) int { total := 0 for _, a := range alerts { diff --git a/internal/sinks/postgres/map_test.go b/internal/sinks/postgres/map_test.go index 1e91fcf..651f965 100644 --- a/internal/sinks/postgres/map_test.go +++ b/internal/sinks/postgres/map_test.go @@ -146,6 +146,50 @@ func TestMapPostgresEventAlertStructPayload(t *testing.T) { assertAllWritesIncludeAllColumns(t, writes) } +func TestMapPostgresEventForecastDiscussionStructPayload(t *testing.T) { + updatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC) + shortIssuedAt := time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC) + run := model.WeatherForecastDiscussion{ + OfficeID: "LSX", + OfficeName: "National Weather Service Saint Louis MO", + Product: model.ForecastDiscussionProductAFD, + IssuedAt: time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC), + UpdatedAt: &updatedAt, + KeyMessages: []string{"msg one", "msg two"}, + ShortTerm: &model.WeatherForecastDiscussionSection{Qualifier: "(Tonight)", IssuedAt: &shortIssuedAt, Text: "Short term text"}, + LongTerm: &model.WeatherForecastDiscussionSection{Text: "Long term text"}, + } + + writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", run)) + if err != nil { + t.Fatalf("mapPostgresEvent() error = %v", err) + } + if len(writes) != 3 { + t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes)) + } + + if writes[0].Table != tableForecastDiscussions { + t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecastDiscussions) + } + if got := writes[0].Values["key_message_count"]; got != 2 { + t.Fatalf("forecast_discussions key_message_count = %#v, want 2", got) + } + if got := writes[0].Values["short_term_qualifier"]; got != "(Tonight)" { + t.Fatalf("forecast_discussions short_term_qualifier = %#v, want (Tonight)", got) + } + if got := writes[0].Values["long_term_issued_at"]; got != nil { + t.Fatalf("forecast_discussions long_term_issued_at = %#v, want nil", got) + } + if writes[1].Table != tableForecastDiscussionKeyMessages || writes[2].Table != tableForecastDiscussionKeyMessages { + t.Fatalf("forecast discussion key message writes not in expected order") + } + if got := writes[2].Values["message_index"]; got != 1 { + t.Fatalf("second key message index = %#v, want 1", got) + } + + assertAllWritesIncludeAllColumns(t, writes) +} + func TestMapPostgresEventMapPayload(t *testing.T) { run := model.WeatherForecastRun{ IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), @@ -201,6 +245,16 @@ func TestMapPostgresEventMalformedPayload(t *testing.T) { } } +func TestMapPostgresEventForecastDiscussionMalformedPayload(t *testing.T) { + _, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", "bad")) + if err == nil { + t.Fatalf("mapPostgresEvent() expected error for malformed payload") + } + if !strings.Contains(err.Error(), "decode forecast discussion payload") { + t.Fatalf("error = %q, want decode forecast discussion payload context", err) + } +} + func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event { effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC) return fkevent.Event{ diff --git a/internal/sinks/postgres/schema.go b/internal/sinks/postgres/schema.go index 38e51ba..1f2d31f 100644 --- a/internal/sinks/postgres/schema.go +++ b/internal/sinks/postgres/schema.go @@ -5,13 +5,15 @@ import ( ) const ( - tableObservations = "observations" - tableObservationPresentWeather = "observation_present_weather" - tableForecasts = "forecasts" - tableForecastPeriods = "forecast_periods" - tableAlertRuns = "alert_runs" - tableAlerts = "alerts" - tableAlertReferences = "alert_references" + tableObservations = "observations" + tableObservationPresentWeather = "observation_present_weather" + tableForecasts = "forecasts" + tableForecastPeriods = "forecast_periods" + tableForecastDiscussions = "forecast_discussions" + tableForecastDiscussionKeyMessages = "forecast_discussion_key_messages" + tableAlertRuns = "alert_runs" + tableAlerts = "alerts" + tableAlertReferences = "alert_references" ) // PostgresSchema returns weatherfeeder's Postgres schema definition. @@ -129,6 +131,49 @@ func PostgresSchema() fksinks.PostgresSchema { {Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}}, }, }, + { + Name: tableForecastDiscussions, + Columns: []fksinks.PostgresColumn{ + {Name: "event_id", Type: "TEXT", Nullable: false}, + {Name: "event_kind", Type: "TEXT", Nullable: false}, + {Name: "event_source", Type: "TEXT", Nullable: false}, + {Name: "event_schema", Type: "TEXT", Nullable: false}, + {Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "office_id", Type: "TEXT", Nullable: true}, + {Name: "office_name", Type: "TEXT", Nullable: true}, + {Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "product", Type: "TEXT", Nullable: false}, + {Name: "short_term_qualifier", Type: "TEXT", Nullable: true}, + {Name: "short_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "short_term_text", Type: "TEXT", Nullable: true}, + {Name: "long_term_qualifier", Type: "TEXT", Nullable: true}, + {Name: "long_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "long_term_text", Type: "TEXT", Nullable: true}, + {Name: "key_message_count", Type: "INTEGER", Nullable: false}, + }, + PrimaryKey: []string{"event_id"}, + PruneColumn: "issued_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_discussion_office_product_issued_at", Columns: []string{"office_id", "product", "issued_at"}}, + {Name: "idx_wf_discussion_issued_at", Columns: []string{"issued_at"}}, + }, + }, + { + Name: tableForecastDiscussionKeyMessages, + Columns: []fksinks.PostgresColumn{ + {Name: "run_event_id", Type: "TEXT REFERENCES forecast_discussions(event_id) ON DELETE CASCADE", Nullable: false}, + {Name: "message_index", Type: "INTEGER", Nullable: false}, + {Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "message_text", Type: "TEXT", Nullable: true}, + }, + PrimaryKey: []string{"run_event_id", "message_index"}, + PruneColumn: "issued_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_discussion_message_issued_at", Columns: []string{"issued_at"}}, + }, + }, { Name: tableAlertRuns, Columns: []fksinks.PostgresColumn{ diff --git a/internal/sinks/postgres/schema_test.go b/internal/sinks/postgres/schema_test.go index 65779e4..cdeb919 100644 --- a/internal/sinks/postgres/schema_test.go +++ b/internal/sinks/postgres/schema_test.go @@ -9,13 +9,15 @@ func TestWeatherPostgresSchemaShape(t *testing.T) { } wantTables := map[string]bool{ - tableObservations: true, - tableObservationPresentWeather: true, - tableForecasts: true, - tableForecastPeriods: true, - tableAlertRuns: true, - tableAlerts: true, - tableAlertReferences: true, + tableObservations: true, + tableObservationPresentWeather: true, + tableForecasts: true, + tableForecastPeriods: true, + tableForecastDiscussions: true, + tableForecastDiscussionKeyMessages: true, + tableAlertRuns: true, + tableAlerts: true, + tableAlertReferences: true, } if len(s.Tables) != len(wantTables) { diff --git a/internal/sources/builtins.go b/internal/sources/builtins.go index 35fe9ad..5e7afdb 100644 --- a/internal/sources/builtins.go +++ b/internal/sources/builtins.go @@ -19,6 +19,9 @@ var pollDriverRegistrations = []pollDriverRegistration{ {driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }}, {driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }}, {driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }}, + {driver: "nws_forecast_discussion", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { + return nws.NewForecastDiscussionSource(cfg) + }}, {driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }}, {driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }}, {driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { diff --git a/internal/sources/builtins_test.go b/internal/sources/builtins_test.go index 0319417..fc56020 100644 --- a/internal/sources/builtins_test.go +++ b/internal/sources/builtins_test.go @@ -34,6 +34,19 @@ func TestRegisterBuiltinsRegistersNWSNarrativeForecastDriver(t *testing.T) { } } +func TestRegisterBuiltinsRegistersNWSForecastDiscussionDriver(t *testing.T) { + reg := fksource.NewRegistry() + RegisterBuiltins(reg) + + in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_discussion")) + if err != nil { + t.Fatalf("BuildInput(nws_forecast_discussion) error = %v", err) + } + if _, ok := in.(fksource.PollSource); !ok { + t.Fatalf("BuildInput(nws_forecast_discussion) type = %T, want PollSource", in) + } +} + func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) { reg := fksource.NewRegistry() RegisterBuiltins(reg) @@ -56,6 +69,7 @@ func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) { "nws_alerts", "nws_forecast_hourly", "nws_forecast_narrative", + "nws_forecast_discussion", "openmeteo_observation", "openmeteo_forecast", "openweather_observation", diff --git a/internal/sources/nws/forecast_discussion.go b/internal/sources/nws/forecast_discussion.go new file mode 100644 index 0000000..ce50b8b --- /dev/null +++ b/internal/sources/nws/forecast_discussion.go @@ -0,0 +1,68 @@ +package nws + +import ( + "context" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +// ForecastDiscussionSource polls an NWS forecast discussion HTML page and emits a RAW discussion Event. +// +// Output schema: +// - standards.SchemaRawNWSForecastDiscussionV1 +type ForecastDiscussionSource struct { + http *fksources.HTTPSource +} + +func NewForecastDiscussionSource(cfg config.SourceConfig) (*ForecastDiscussionSource, error) { + const driver = "nws_forecast_discussion" + + hs, err := fksources.NewHTTPSource(driver, cfg, "text/html, application/xhtml+xml") + if err != nil { + return nil, err + } + + return &ForecastDiscussionSource{http: hs}, nil +} + +func (s *ForecastDiscussionSource) Name() string { return s.http.Name } + +func (s *ForecastDiscussionSource) Kinds() []event.Kind { + return []event.Kind{event.Kind("forecast_discussion")} +} + +func (s *ForecastDiscussionSource) Poll(ctx context.Context) ([]event.Event, error) { + body, changed, err := s.http.FetchBytesIfChanged(ctx) + if err != nil { + return nil, err + } + if !changed { + return nil, nil + } + + rawHTML := string(body) + parsed, err := nwscommon.ParseForecastDiscussionHTML(rawHTML) + if err != nil { + return nil, err + } + + issuedAt := parsed.IssuedAt.UTC() + effectiveAt := &issuedAt + emittedAt := time.Now().UTC() + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) + + return fksources.SingleEvent( + event.Kind("forecast_discussion"), + s.http.Name, + standards.SchemaRawNWSForecastDiscussionV1, + eventID, + emittedAt, + effectiveAt, + rawHTML, + ) +} diff --git a/internal/sources/nws/forecast_discussion_test.go b/internal/sources/nws/forecast_discussion_test.go new file mode 100644 index 0000000..271f339 --- /dev/null +++ b/internal/sources/nws/forecast_discussion_test.go @@ -0,0 +1,138 @@ +package nws + +import ( + "context" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +func TestForecastDiscussionSourcePollEmitsExpectedEvent(t *testing.T) { + rawHTML := loadForecastDiscussionSampleHTML(t) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + _, _ = w.Write([]byte(rawHTML)) + })) + defer srv.Close() + + src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL)) + if err != nil { + t.Fatalf("NewForecastDiscussionSource() error = %v", err) + } + if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast_discussion") { + t.Fatalf("Kinds() = %#v, want [forecast_discussion]", got) + } + + events, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("Poll() error = %v", err) + } + if len(events) != 1 { + t.Fatalf("Poll() len = %d, want 1", len(events)) + } + + got := events[0] + if got.Kind != event.Kind("forecast_discussion") { + t.Fatalf("Kind = %q, want forecast_discussion", got.Kind) + } + if got.Schema != standards.SchemaRawNWSForecastDiscussionV1 { + t.Fatalf("Schema = %q, want %q", got.Schema, standards.SchemaRawNWSForecastDiscussionV1) + } + wantEffectiveAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC) + if got.EffectiveAt == nil || !got.EffectiveAt.Equal(wantEffectiveAt) { + t.Fatalf("EffectiveAt = %v, want %s", got.EffectiveAt, wantEffectiveAt.Format(time.RFC3339)) + } + payload, ok := got.Payload.(string) + if !ok { + t.Fatalf("Payload type = %T, want string", got.Payload) + } + if payload != rawHTML { + t.Fatalf("Payload did not preserve exact HTML") + } +} + +func TestForecastDiscussionSourcePollReturnsNoEventsWhenUnchanged(t *testing.T) { + rawHTML := loadForecastDiscussionSampleHTML(t) + const etag = `"discussion-v1"` + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("If-None-Match") == etag { + w.WriteHeader(http.StatusNotModified) + return + } + w.Header().Set("ETag", etag) + _, _ = w.Write([]byte(rawHTML)) + })) + defer srv.Close() + + src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL)) + if err != nil { + t.Fatalf("NewForecastDiscussionSource() error = %v", err) + } + + first, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("first Poll() error = %v", err) + } + if len(first) != 1 { + t.Fatalf("first Poll() len = %d, want 1", len(first)) + } + + second, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("second Poll() error = %v", err) + } + if len(second) != 0 { + t.Fatalf("second Poll() len = %d, want 0", len(second)) + } +} + +func TestForecastDiscussionSourcePollRejectsInvalidHTML(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("
missing discussion block
")) + })) + defer srv.Close() + + src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL)) + if err != nil { + t.Fatalf("NewForecastDiscussionSource() error = %v", err) + } + + _, err = src.Poll(context.Background()) + if err == nil { + t.Fatalf("Poll() error = nil, want error") + } + if !strings.Contains(err.Error(), "glossaryProduct") { + t.Fatalf("error = %q, want glossaryProduct context", err) + } +} + +func forecastDiscussionSourceConfig(url string) config.SourceConfig { + return config.SourceConfig{ + Name: "test-forecast-discussion-source", + Driver: "nws_forecast_discussion", + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": url, + "user_agent": "test-agent", + }, + } +} + +func loadForecastDiscussionSampleHTML(t *testing.T) string { + t.Helper() + + path := filepath.Join("..", "..", "providers", "nws", "testdata", "forecast_discussion_sample.html") + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("os.ReadFile(%q) error = %v", path, err) + } + return string(b) +} diff --git a/model/forecast_discussion.go b/model/forecast_discussion.go new file mode 100644 index 0000000..446062c --- /dev/null +++ b/model/forecast_discussion.go @@ -0,0 +1,40 @@ +package model + +import "time" + +// ForecastDiscussionProduct distinguishes the discussion bulletin family. +// +// Today weatherfeeder only normalizes Area Forecast Discussion (AFD) products, +// but this remains a distinct type so additional discussion-like products can be +// added without changing the payload field type. +type ForecastDiscussionProduct string + +const ( + ForecastDiscussionProductAFD ForecastDiscussionProduct = "afd" +) + +// WeatherForecastDiscussion is a canonical issued discussion bulletin for an NWS office. +// +// Unlike WeatherForecastRun, this is authored narrative text rather than a sequence +// of forecast periods. +type WeatherForecastDiscussion struct { + OfficeID string `json:"officeId,omitempty"` + OfficeName string `json:"officeName,omitempty"` + + Product ForecastDiscussionProduct `json:"product"` + + IssuedAt time.Time `json:"issuedAt"` + UpdatedAt *time.Time `json:"updatedAt,omitempty"` + + KeyMessages []string `json:"keyMessages,omitempty"` + + ShortTerm *WeatherForecastDiscussionSection `json:"shortTerm,omitempty"` + LongTerm *WeatherForecastDiscussionSection `json:"longTerm,omitempty"` +} + +// WeatherForecastDiscussionSection is a fixed prose section within a discussion bulletin. +type WeatherForecastDiscussionSection struct { + Qualifier string `json:"qualifier,omitempty"` + IssuedAt *time.Time `json:"issuedAt,omitempty"` + Text string `json:"text,omitempty"` +} diff --git a/standards/schema.go b/standards/schema.go index eca717d..16ed4aa 100644 --- a/standards/schema.go +++ b/standards/schema.go @@ -17,13 +17,15 @@ const ( SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1" SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1" + SchemaRawNWSForecastDiscussionV1 = "raw.nws.forecast_discussion.v1" SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1" SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1" SchemaRawNWSAlertsV1 = "raw.nws.alerts.v1" // Canonical domain schemas (emitted after normalization). - SchemaWeatherObservationV1 = "weather.observation.v1" - SchemaWeatherForecastV1 = "weather.forecast.v1" - SchemaWeatherAlertV1 = "weather.alert.v1" + SchemaWeatherObservationV1 = "weather.observation.v1" + SchemaWeatherForecastV1 = "weather.forecast.v1" + SchemaWeatherForecastDiscussionV1 = "weather.forecast_discussion.v1" + SchemaWeatherAlertV1 = "weather.alert.v1" )