3 Commits
v0.8.2 ... main

Author SHA1 Message Date
f457bab039 Updated dependencies to feedkit v0.9.1
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-29 10:54:36 -05:00
6712c16167 Updated to feedkit v0.9.0
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-29 08:35:56 -05:00
a0389ebce8 Added support for Area Forecast Discussions issued by the NWS
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 16:17:03 -05:00
24 changed files with 1497 additions and 39 deletions

42
API.md
View File

@@ -37,14 +37,24 @@ Examples:
## Canonical schemas ## Canonical schemas
weatherfeeder emits three canonical domain schemas: weatherfeeder emits four canonical domain schemas:
- `weather.observation.v1` - `weather.observation.v1`
- `weather.forecast.v1` - `weather.forecast.v1`
- `weather.forecast_discussion.v1`
- `weather.alert.v1` - `weather.alert.v1`
Each payload is described below using the JSON field names as the contract. Each payload is described below using the JSON field names as the contract.
### Raw upstream schemas
weatherfeeder sources also emit provider-specific raw schemas before normalization.
For this feature, the raw source schema is:
- `raw.nws.forecast_discussion.v1`
- payload type: string
- payload contents: exact fetched HTML response body
--- ---
## Shared Conventions ## Shared Conventions
@@ -217,6 +227,36 @@ A run may contain zero, one, or many alerts.
--- ---
## Schema: `weather.forecast_discussion.v1`
Payload type: `WeatherForecastDiscussion`
A `WeatherForecastDiscussion` is an issued narrative bulletin for an NWS office.
It is distinct from `weather.forecast.v1`, which is period-based.
### Fields
| Field | Type | Required | Notes |
|---|---:|:---:|---|
| `officeId` | string | no | NWS office identifier, e.g. `LSX` |
| `officeName` | string | no | Human office name |
| `product` | string | yes | Currently `afd` |
| `issuedAt` | string (timestamp) | yes | Bulletin issue time |
| `updatedAt` | string (timestamp) | no | Optional page/update timestamp |
| `keyMessages` | array | no | Ordered key-message bullet list |
| `shortTerm` | object | no | Short-term discussion section |
| `longTerm` | object | no | Long-term discussion section |
### Nested: `shortTerm` / `longTerm`
| Field | Type | Required | Notes |
|---|---:|:---:|---|
| `qualifier` | string | no | Header qualifier such as `(Through Late Sunday Night)` |
| `issuedAt` | string (timestamp) | no | Optional section-local issue time |
| `text` | string | no | Paragraph-preserved prose text |
---
## Compatibility rules ## Compatibility rules
- Consumers **must** ignore unknown fields. - Consumers **must** ignore unknown fields.

View File

@@ -14,6 +14,7 @@ Canonical domain schemas emitted after normalization:
- `weather.observation.v1``WeatherObservation` - `weather.observation.v1``WeatherObservation`
- `weather.forecast.v1``WeatherForecastRun` - `weather.forecast.v1``WeatherForecastRun`
- `weather.forecast_discussion.v1``WeatherForecastDiscussion`
- `weather.alert.v1``WeatherAlertRun` - `weather.alert.v1``WeatherAlertRun`
For the complete wire contract (event envelope + payload schemas, fields, units, and compatibility rules), see: For the complete wire contract (event envelope + payload schemas, fields, units, and compatibility rules), see:
@@ -22,7 +23,7 @@ For the complete wire contract (event envelope + payload schemas, fields, units,
## Upstream providers (current MVP) ## Upstream providers (current MVP)
- NWS: observations, hourly forecasts, narrative forecasts, alerts - NWS: observations, hourly forecasts, narrative forecasts, forecast discussions, alerts
- Open-Meteo: observations, hourly forecasts - Open-Meteo: observations, hourly forecasts
- OpenWeather: observations - OpenWeather: observations

View File

@@ -15,7 +15,7 @@ sources:
# driver: openmeteo_observation # driver: openmeteo_observation
# every: 10m # every: 10m
# params: # params:
# url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1" # url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl"
# user_agent: "HomeOps (eric@maximumdirect.net)" # user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: OpenWeatherObservation # - name: OpenWeatherObservation
@@ -63,6 +63,15 @@ sources:
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us" url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us"
user_agent: "HomeOps (eric@maximumdirect.net)" user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSForecastDiscussionSTL
mode: poll
kinds: ["forecast_discussion"]
driver: nws_forecast_discussion
every: 30m
params:
url: "https://forecast.weather.gov/product.php?site=LSX&issuedby=LSX&product=AFD&format=TXT&version=1&glossary=0"
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenMeteoHourlyForecastSTL - name: OpenMeteoHourlyForecastSTL
mode: poll mode: poll
kinds: ["forecast"] kinds: ["forecast"]
@@ -108,13 +117,13 @@ sinks:
routes: routes:
- sink: stdout - sink: stdout
kinds: ["observation", "forecast", "alert"] kinds: ["observation", "forecast", "forecast_discussion", "alert"]
- sink: nats_weatherfeeder - sink: nats_weatherfeeder
kinds: ["observation", "forecast", "alert"] kinds: ["observation", "forecast", "forecast_discussion", "alert"]
# - sink: pg_weatherfeeder # - sink: pg_weatherfeeder
# kinds: ["observation", "forecast", "alert"] # kinds: ["observation", "forecast", "forecast_discussion", "alert"]
# - sink: logfile # - sink: logfile
# kinds: ["observation", "alert", "forecast"] # kinds: ["observation", "alert", "forecast", "forecast_discussion"]

View File

@@ -59,19 +59,11 @@ func main() {
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
} }
// If this is a polling source, every is required. job, err := fkscheduler.JobFromSourceConfig(in, sc)
if _, ok := in.(fksources.PollSource); ok && sc.Every.Duration <= 0 { if err != nil {
log.Fatalf( log.Fatalf("build scheduler job failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
i, sc.Name, sc.Driver,
)
} }
jobs = append(jobs, job)
// For stream sources, Every is ignored; it is fine if omitted/zero.
jobs = append(jobs, fkscheduler.Job{
Source: in,
Every: sc.Every.Duration,
})
} }
// --- Build sinks --- // --- Build sinks ---

View File

@@ -13,9 +13,11 @@ import (
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe" fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
) )
type testInput struct { type testInput struct {
@@ -74,6 +76,31 @@ func TestExampleConfigLoads(t *testing.T) {
} }
} }
func TestExampleConfigSourcesBuildSchedulerJobs(t *testing.T) {
cfg, err := config.Load("config.yml")
if err != nil {
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
}
reg := fksources.NewRegistry()
wfsources.RegisterBuiltins(reg)
for i, sc := range cfg.Sources {
in, err := reg.BuildInput(sc)
if err != nil {
t.Fatalf("BuildInput(sources[%d]) error = %v", i, err)
}
job, err := fkscheduler.JobFromSourceConfig(in, sc)
if err != nil {
t.Fatalf("JobFromSourceConfig(sources[%d]) error = %v", i, err)
}
if job.Source == nil {
t.Fatalf("JobFromSourceConfig(sources[%d]) returned nil source", i)
}
}
}
func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) { func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
chain, err := buildProcessorChainForTests() chain, err := buildProcessorChainForTests()
if err != nil { if err != nil {

2
go.mod
View File

@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25 go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.8.2 require gitea.maximumdirect.net/ejr/feedkit v0.9.1
require ( require (
github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/compress v1.17.2 // indirect

4
go.sum
View File

@@ -1,5 +1,5 @@
gitea.maximumdirect.net/ejr/feedkit v0.8.2 h1:6AMxNacfqJ8SXQhFAUMW3LgiVxixs50tf+S8Q9Ivm+Y= gitea.maximumdirect.net/ejr/feedkit v0.9.1 h1:YghBQT1podqc+FJuPGuIZImV4A9dMr56Hikd5xuniig=
gitea.maximumdirect.net/ejr/feedkit v0.8.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= gitea.maximumdirect.net/ejr/feedkit v0.9.1/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=

View File

@@ -19,6 +19,7 @@ func TestRegisterBuiltinsOrder(t *testing.T) {
want := []fknormalize.Normalizer{ want := []fknormalize.Normalizer{
nws.ObservationNormalizer{}, nws.ObservationNormalizer{},
nws.ForecastNormalizer{}, nws.ForecastNormalizer{},
nws.ForecastDiscussionNormalizer{},
nws.AlertsNormalizer{}, nws.AlertsNormalizer{},
openmeteo.ObservationNormalizer{}, openmeteo.ObservationNormalizer{},
openmeteo.ForecastNormalizer{}, openmeteo.ForecastNormalizer{},

View File

@@ -0,0 +1,72 @@
package nws
import (
"context"
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/event"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type ForecastDiscussionNormalizer struct{}
func (ForecastDiscussionNormalizer) Match(e event.Event) bool {
return strings.TrimSpace(e.Schema) == standards.SchemaRawNWSForecastDiscussionV1
}
func (ForecastDiscussionNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
_ = ctx
rawHTML, err := decodeStringPayload(in.Payload)
if err != nil {
return nil, fmt.Errorf("nws forecast discussion normalize: %w", err)
}
parsed, err := nwscommon.ParseForecastDiscussionHTML(rawHTML)
if err != nil {
return nil, fmt.Errorf("nws forecast discussion normalize: build: %w", err)
}
payload := model.WeatherForecastDiscussion{
OfficeID: strings.TrimSpace(parsed.OfficeID),
OfficeName: strings.TrimSpace(parsed.OfficeName),
Product: model.ForecastDiscussionProduct(strings.TrimSpace(parsed.Product)),
IssuedAt: parsed.IssuedAt.UTC(),
UpdatedAt: parsed.UpdatedAt,
KeyMessages: append([]string(nil), parsed.KeyMessages...),
ShortTerm: mapForecastDiscussionSection(parsed.ShortTerm),
LongTerm: mapForecastDiscussionSection(parsed.LongTerm),
}
out, err := normcommon.Finalize(in, standards.SchemaWeatherForecastDiscussionV1, payload, payload.IssuedAt)
if err != nil {
return nil, fmt.Errorf("nws forecast discussion normalize: %w", err)
}
return out, nil
}
func mapForecastDiscussionSection(in *nwscommon.ForecastDiscussionSection) *model.WeatherForecastDiscussionSection {
if in == nil {
return nil
}
return &model.WeatherForecastDiscussionSection{
Qualifier: strings.TrimSpace(in.Qualifier),
IssuedAt: in.IssuedAt,
Text: strings.TrimSpace(in.Text),
}
}
func decodeStringPayload(payload any) (string, error) {
switch v := payload.(type) {
case string:
return v, nil
case []byte:
return string(v), nil
default:
return "", fmt.Errorf("extract payload: expected string payload, got %T", payload)
}
}

View File

@@ -0,0 +1,130 @@
package nws
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestForecastDiscussionNormalizerProducesCanonicalSchema(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
out, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{
ID: "evt-discussion-1",
Kind: event.Kind("forecast_discussion"),
Source: "nws-discussion-test",
EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC),
Schema: standards.SchemaRawNWSForecastDiscussionV1,
Payload: rawHTML,
})
if err != nil {
t.Fatalf("Normalize() error = %v", err)
}
if out == nil {
t.Fatalf("Normalize() returned nil output")
}
if out.Schema != standards.SchemaWeatherForecastDiscussionV1 {
t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastDiscussionV1)
}
if out.Kind != event.Kind("forecast_discussion") {
t.Fatalf("Kind = %q, want forecast_discussion", out.Kind)
}
payload, ok := out.Payload.(model.WeatherForecastDiscussion)
if !ok {
t.Fatalf("Payload type = %T, want model.WeatherForecastDiscussion", out.Payload)
}
if payload.OfficeID != "LSX" {
t.Fatalf("OfficeID = %q, want LSX", payload.OfficeID)
}
if payload.Product != model.ForecastDiscussionProductAFD {
t.Fatalf("Product = %q, want %q", payload.Product, model.ForecastDiscussionProductAFD)
}
if len(payload.KeyMessages) != 3 {
t.Fatalf("KeyMessages len = %d, want 3", len(payload.KeyMessages))
}
if payload.ShortTerm == nil || payload.LongTerm == nil {
t.Fatalf("ShortTerm=%v LongTerm=%v, want both populated", payload.ShortTerm, payload.LongTerm)
}
if payload.ShortTerm.Qualifier != "(Through Late Sunday Night)" {
t.Fatalf("ShortTerm.Qualifier = %q", payload.ShortTerm.Qualifier)
}
if !strings.Contains(payload.ShortTerm.Text, "After a chilly morning") {
t.Fatalf("ShortTerm.Text = %q, want normalized prose", payload.ShortTerm.Text)
}
if strings.Contains(payload.ShortTerm.Text, "BRC") {
t.Fatalf("ShortTerm.Text contains signature: %q", payload.ShortTerm.Text)
}
if strings.Contains(payload.LongTerm.Text, "AVIATION") || strings.Contains(payload.LongTerm.Text, "WATCHES/WARNINGS/ADVISORIES") {
t.Fatalf("LongTerm.Text includes downstream sections: %q", payload.LongTerm.Text)
}
wantEffectiveAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC)
if out.EffectiveAt == nil || !out.EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("EffectiveAt = %v, want %s", out.EffectiveAt, wantEffectiveAt.Format(time.RFC3339))
}
}
func TestForecastDiscussionNormalizerRejectsMissingIssueTime(t *testing.T) {
_, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{
ID: "evt-discussion-bad",
Kind: event.Kind("forecast_discussion"),
Source: "nws-discussion-test",
EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC),
Schema: standards.SchemaRawNWSForecastDiscussionV1,
Payload: "<html><body><pre class=\"glossaryProduct\">National Weather Service Saint Louis MO</pre></body></html>",
})
if err == nil {
t.Fatalf("Normalize() error = nil, want error")
}
if !strings.Contains(err.Error(), "issue time") {
t.Fatalf("error = %q, want issue time context", err)
}
}
func TestForecastDiscussionNormalizerWireShapeHasNoUnexpectedKeys(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
out, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{
ID: "evt-discussion-2",
Kind: event.Kind("forecast_discussion"),
Source: "nws-discussion-test",
EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC),
Schema: standards.SchemaRawNWSForecastDiscussionV1,
Payload: rawHTML,
})
if err != nil {
t.Fatalf("Normalize() error = %v", err)
}
b, err := json.Marshal(out.Payload)
if err != nil {
t.Fatalf("json.Marshal(payload) error = %v", err)
}
var got map[string]any
if err := json.Unmarshal(b, &got); err != nil {
t.Fatalf("json.Unmarshal(payload) error = %v", err)
}
for _, key := range []string{"sections", "aviation"} {
if _, ok := got[key]; ok {
t.Fatalf("unexpected key %q in canonical payload", key)
}
}
}
func loadForecastDiscussionSampleHTML(t *testing.T) string {
t.Helper()
path := filepath.Join("..", "..", "providers", "nws", "testdata", "forecast_discussion_sample.html")
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("os.ReadFile(%q) error = %v", path, err)
}
return string(b)
}

View File

@@ -8,6 +8,7 @@ import (
var builtins = []fknormalize.Normalizer{ var builtins = []fknormalize.Normalizer{
ObservationNormalizer{}, ObservationNormalizer{},
ForecastNormalizer{}, ForecastNormalizer{},
ForecastDiscussionNormalizer{},
AlertsNormalizer{}, AlertsNormalizer{},
} }

View File

@@ -0,0 +1,552 @@
package nws
import (
"fmt"
"html"
"regexp"
"strconv"
"strings"
"time"
)
type ForecastDiscussion struct {
OfficeID string
OfficeName string
Product string
IssuedAt time.Time
UpdatedAt *time.Time
KeyMessages []string
ShortTerm *ForecastDiscussionSection
LongTerm *ForecastDiscussionSection
}
type ForecastDiscussionSection struct {
Qualifier string
IssuedAt *time.Time
Text string
}
var (
forecastDiscussionHeaderRE = regexp.MustCompile(`^\.(KEY MESSAGES|SHORT TERM|LONG TERM|AVIATION)\.\.\.(.*)$`)
forecastDiscussionAFDRE = regexp.MustCompile(`^AFD([A-Z]{3})$`)
forecastDiscussionWMORE = regexp.MustCompile(`\bK([A-Z]{3})\b`)
forecastDiscussionSigRE = regexp.MustCompile(`^[A-Z]{2,6}$`)
)
func ParseForecastDiscussionHTML(raw string) (ForecastDiscussion, error) {
text, err := ExtractForecastDiscussionText(raw)
if err != nil {
return ForecastDiscussion{}, err
}
parsed, err := ParseForecastDiscussionText(text)
if err != nil {
return ForecastDiscussion{}, err
}
parsed.UpdatedAt = parseForecastDiscussionUpdatedAt(raw)
return parsed, nil
}
func ExtractForecastDiscussionText(raw string) (string, error) {
lower := strings.ToLower(raw)
searchFrom := 0
for {
openStart := strings.Index(lower[searchFrom:], "<pre")
if openStart < 0 {
return "", fmt.Errorf("missing <pre class=\"glossaryProduct\"> block")
}
openStart += searchFrom
openEnd := strings.Index(lower[openStart:], ">")
if openEnd < 0 {
return "", fmt.Errorf("unterminated <pre> tag")
}
openEnd += openStart
tag := lower[openStart : openEnd+1]
if isGlossaryProductTag(tag) {
closeStart := strings.Index(lower[openEnd+1:], "</pre>")
if closeStart < 0 {
return "", fmt.Errorf("missing closing </pre> for glossaryProduct block")
}
closeStart += openEnd + 1
text := html.UnescapeString(raw[openEnd+1 : closeStart])
text = strings.ReplaceAll(text, "\r\n", "\n")
text = strings.ReplaceAll(text, "\r", "\n")
return text, nil
}
searchFrom = openEnd + 1
}
}
func ParseForecastDiscussionText(text string) (ForecastDiscussion, error) {
lines := splitLines(text)
officeID := parseForecastDiscussionOfficeID(lines)
officeName, issuedAt, err := parseForecastDiscussionHeader(lines)
if err != nil {
return ForecastDiscussion{}, err
}
out := ForecastDiscussion{
OfficeID: officeID,
OfficeName: officeName,
Product: "afd",
IssuedAt: issuedAt.UTC(),
}
if block, ok := extractForecastDiscussionSection(lines, "KEY MESSAGES"); ok {
out.KeyMessages = parseForecastDiscussionKeyMessages(block)
}
if block, ok := extractForecastDiscussionSection(lines, "SHORT TERM"); ok {
section, err := parseForecastDiscussionTextSection(block)
if err != nil {
return ForecastDiscussion{}, fmt.Errorf("parse SHORT TERM: %w", err)
}
out.ShortTerm = &section
}
if block, ok := extractForecastDiscussionSection(lines, "LONG TERM"); ok {
section, err := parseForecastDiscussionTextSection(block)
if err != nil {
return ForecastDiscussion{}, fmt.Errorf("parse LONG TERM: %w", err)
}
out.LongTerm = &section
}
return out, nil
}
func isGlossaryProductTag(tag string) bool {
tag = strings.ToLower(tag)
return strings.Contains(tag, `class="glossaryproduct"`) ||
strings.Contains(tag, `class='glossaryproduct'`) ||
strings.Contains(tag, `class="glossaryproduct `) ||
strings.Contains(tag, `class='glossaryproduct `)
}
func parseForecastDiscussionUpdatedAt(raw string) *time.Time {
lower := strings.ToLower(raw)
searchFrom := 0
for {
metaStart := strings.Index(lower[searchFrom:], "<meta")
if metaStart < 0 {
return nil
}
metaStart += searchFrom
metaEnd := strings.Index(lower[metaStart:], ">")
if metaEnd < 0 {
return nil
}
metaEnd += metaStart
tag := raw[metaStart : metaEnd+1]
if !strings.EqualFold(strings.TrimSpace(extractHTMLAttr(tag, "name")), "DC.date.created") {
searchFrom = metaEnd + 1
continue
}
content := strings.TrimSpace(extractHTMLAttr(tag, "content"))
if content == "" {
return nil
}
t, err := ParseTime(content)
if err != nil {
return nil
}
tt := t.UTC()
return &tt
}
}
func extractHTMLAttr(tag, attr string) string {
lower := strings.ToLower(tag)
attrLower := strings.ToLower(attr)
for i := 0; i < len(lower); i++ {
idx := strings.Index(lower[i:], attrLower)
if idx < 0 {
return ""
}
idx += i
if idx > 0 {
prev := lower[idx-1]
if isAttrNameChar(prev) {
i = idx + len(attrLower)
continue
}
}
j := idx + len(attrLower)
for j < len(lower) && isHTMLSpace(lower[j]) {
j++
}
if j >= len(lower) || lower[j] != '=' {
i = idx + len(attrLower)
continue
}
j++
for j < len(lower) && isHTMLSpace(lower[j]) {
j++
}
if j >= len(tag) {
return ""
}
quote := tag[j]
if quote != '"' && quote != '\'' {
return ""
}
j++
k := j
for k < len(tag) && tag[k] != quote {
k++
}
if k >= len(tag) {
return ""
}
return html.UnescapeString(tag[j:k])
}
return ""
}
func isHTMLSpace(b byte) bool {
switch b {
case ' ', '\n', '\r', '\t', '\f':
return true
default:
return false
}
}
func isAttrNameChar(b byte) bool {
switch {
case b >= 'a' && b <= 'z':
return true
case b >= 'A' && b <= 'Z':
return true
case b >= '0' && b <= '9':
return true
case b == '-' || b == '_' || b == ':':
return true
default:
return false
}
}
func splitLines(text string) []string {
text = strings.ReplaceAll(text, "\r\n", "\n")
text = strings.ReplaceAll(text, "\r", "\n")
return strings.Split(text, "\n")
}
func parseForecastDiscussionOfficeID(lines []string) string {
for _, raw := range lines {
line := strings.TrimSpace(raw)
if m := forecastDiscussionAFDRE.FindStringSubmatch(line); len(m) == 2 {
return m[1]
}
}
for _, raw := range lines {
line := strings.TrimSpace(raw)
if m := forecastDiscussionWMORE.FindStringSubmatch(line); len(m) == 2 {
return m[1]
}
}
return ""
}
func parseForecastDiscussionHeader(lines []string) (string, time.Time, error) {
for i, raw := range lines {
line := strings.TrimSpace(raw)
if !strings.HasPrefix(line, "National Weather Service ") {
continue
}
officeName := line
for j := i + 1; j < len(lines); j++ {
tsLine := strings.TrimSpace(lines[j])
if tsLine == "" {
continue
}
issuedAt, err := parseForecastDiscussionIssueTime(tsLine)
if err != nil {
return "", time.Time{}, fmt.Errorf("parse bulletin issuedAt %q: %w", tsLine, err)
}
return officeName, issuedAt.UTC(), nil
}
return "", time.Time{}, fmt.Errorf("missing bulletin issue time after office line")
}
return "", time.Time{}, fmt.Errorf("missing office header")
}
func parseForecastDiscussionIssueTime(line string) (time.Time, error) {
line = strings.TrimSpace(line)
line = strings.TrimPrefix(line, "Issued at ")
line = strings.TrimSpace(line)
parts := strings.Fields(line)
if len(parts) != 7 {
return time.Time{}, fmt.Errorf("unexpected issue time format")
}
loc, err := forecastDiscussionLocation(parts[2])
if err != nil {
return time.Time{}, err
}
datePart, err := time.Parse("Mon Jan 2 2006", strings.Join(parts[3:], " "))
if err != nil {
return time.Time{}, err
}
hour, minute, err := parseForecastDiscussionClock(parts[0], parts[1])
if err != nil {
return time.Time{}, err
}
return time.Date(
datePart.Year(),
datePart.Month(),
datePart.Day(),
hour,
minute,
0,
0,
loc,
), nil
}
func parseForecastDiscussionClock(rawClock, rawAMPM string) (int, int, error) {
clock := strings.TrimSpace(rawClock)
ampm := strings.ToUpper(strings.TrimSpace(rawAMPM))
if ampm != "AM" && ampm != "PM" {
return 0, 0, fmt.Errorf("unexpected meridiem %q", rawAMPM)
}
n, err := strconv.Atoi(clock)
if err != nil {
return 0, 0, fmt.Errorf("invalid clock %q", rawClock)
}
hour := n
minute := 0
if len(clock) >= 3 {
hour = n / 100
minute = n % 100
}
if hour < 1 || hour > 12 {
return 0, 0, fmt.Errorf("invalid hour %q", rawClock)
}
if minute < 0 || minute > 59 {
return 0, 0, fmt.Errorf("invalid minute %q", rawClock)
}
if ampm == "AM" {
if hour == 12 {
hour = 0
}
return hour, minute, nil
}
if hour != 12 {
hour += 12
}
return hour, minute, nil
}
func forecastDiscussionLocation(abbrev string) (*time.Location, error) {
offsets := map[string]int{
"AST": -4 * 3600,
"ADT": -3 * 3600,
"EST": -5 * 3600,
"EDT": -4 * 3600,
"CST": -6 * 3600,
"CDT": -5 * 3600,
"MST": -7 * 3600,
"MDT": -6 * 3600,
"PST": -8 * 3600,
"PDT": -7 * 3600,
"AKST": -9 * 3600,
"AKDT": -8 * 3600,
"HST": -10 * 3600,
"UTC": 0,
"GMT": 0,
}
abbr := strings.ToUpper(strings.TrimSpace(abbrev))
offset, ok := offsets[abbr]
if !ok {
return nil, fmt.Errorf("unsupported time zone %q", abbrev)
}
return time.FixedZone(abbr, offset), nil
}
func extractForecastDiscussionSection(lines []string, section string) ([]string, bool) {
target := "." + section + "..."
for i, raw := range lines {
line := strings.TrimSpace(raw)
if !strings.HasPrefix(line, target) {
continue
}
out := []string{line}
for j := i + 1; j < len(lines); j++ {
next := strings.TrimSpace(lines[j])
if next == "&&" || next == "$$" || strings.Contains(next, "WATCHES/WARNINGS/ADVISORIES") {
break
}
if j > i+1 && isForecastDiscussionSectionHeader(next) {
break
}
out = append(out, lines[j])
}
return out, true
}
return nil, false
}
func isForecastDiscussionSectionHeader(line string) bool {
return forecastDiscussionHeaderRE.MatchString(strings.TrimSpace(line))
}
func parseForecastDiscussionKeyMessages(block []string) []string {
if len(block) <= 1 {
return nil
}
body := trimBlankLines(block[1:])
var messages []string
var current strings.Builder
flush := func() {
msg := strings.TrimSpace(current.String())
if msg != "" {
messages = append(messages, msg)
}
current.Reset()
}
for _, raw := range body {
line := strings.TrimSpace(raw)
if line == "" {
continue
}
if strings.HasPrefix(line, "-") {
flush()
line = strings.TrimSpace(strings.TrimPrefix(line, "-"))
current.WriteString(line)
continue
}
if current.Len() > 0 {
current.WriteByte(' ')
}
current.WriteString(line)
}
flush()
return messages
}
func parseForecastDiscussionTextSection(block []string) (ForecastDiscussionSection, error) {
if len(block) == 0 {
return ForecastDiscussionSection{}, fmt.Errorf("empty section")
}
section := ForecastDiscussionSection{
Qualifier: parseForecastDiscussionQualifier(strings.TrimSpace(block[0])),
}
body := trimBlankLines(block[1:])
if len(body) == 0 {
return section, nil
}
first := strings.TrimSpace(body[0])
if strings.HasPrefix(first, "Issued at ") {
issuedAt, err := parseForecastDiscussionIssueTime(first)
if err != nil {
return ForecastDiscussionSection{}, fmt.Errorf("parse section issuedAt %q: %w", first, err)
}
tt := issuedAt.UTC()
section.IssuedAt = &tt
body = trimBlankLines(body[1:])
}
body = trimForecastDiscussionSignatureLines(body)
section.Text = joinForecastDiscussionParagraphs(body)
return section, nil
}
func parseForecastDiscussionQualifier(header string) string {
m := forecastDiscussionHeaderRE.FindStringSubmatch(header)
if len(m) != 3 {
return ""
}
return strings.TrimSpace(m[2])
}
func trimBlankLines(lines []string) []string {
start := 0
for start < len(lines) && strings.TrimSpace(lines[start]) == "" {
start++
}
end := len(lines)
for end > start && strings.TrimSpace(lines[end-1]) == "" {
end--
}
return lines[start:end]
}
func trimForecastDiscussionSignatureLines(lines []string) []string {
lines = trimBlankLines(lines)
for len(lines) > 0 {
last := strings.TrimSpace(lines[len(lines)-1])
if last == "" {
lines = lines[:len(lines)-1]
continue
}
if forecastDiscussionSigRE.MatchString(last) {
lines = trimBlankLines(lines[:len(lines)-1])
continue
}
break
}
return lines
}
func joinForecastDiscussionParagraphs(lines []string) string {
lines = trimBlankLines(lines)
if len(lines) == 0 {
return ""
}
var paragraphs []string
current := make([]string, 0, len(lines))
flush := func() {
if len(current) == 0 {
return
}
paragraphs = append(paragraphs, strings.Join(current, " "))
current = current[:0]
}
for _, raw := range lines {
line := strings.TrimSpace(raw)
if line == "" {
flush()
continue
}
current = append(current, line)
}
flush()
return strings.Join(paragraphs, "\n\n")
}

View File

@@ -0,0 +1,118 @@
package nws
import (
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func TestParseForecastDiscussionHTMLParsesExpectedFields(t *testing.T) {
raw := loadForecastDiscussionSampleHTML(t)
got, err := ParseForecastDiscussionHTML(raw)
if err != nil {
t.Fatalf("ParseForecastDiscussionHTML() error = %v", err)
}
if got.OfficeID != "LSX" {
t.Fatalf("OfficeID = %q, want LSX", got.OfficeID)
}
if got.OfficeName != "National Weather Service Saint Louis MO" {
t.Fatalf("OfficeName = %q", got.OfficeName)
}
if got.Product != "afd" {
t.Fatalf("Product = %q, want afd", got.Product)
}
wantIssuedAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC)
if !got.IssuedAt.Equal(wantIssuedAt) {
t.Fatalf("IssuedAt = %s, want %s", got.IssuedAt.Format(time.RFC3339), wantIssuedAt.Format(time.RFC3339))
}
wantUpdatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC)
if got.UpdatedAt == nil || !got.UpdatedAt.Equal(wantUpdatedAt) {
t.Fatalf("UpdatedAt = %v, want %s", got.UpdatedAt, wantUpdatedAt.Format(time.RFC3339))
}
wantMessages := []string{
"Elevated fire danger conditions are expected across a broad area tomorrow afternoon due to breezy southwest winds and low humidity.",
"Very warm temperatures are expected once again Monday and Tuesday, with highs well into the 80s.",
"A cold front late Tuesday or early Wednesday brings our next chance of thunderstorms, followed by a cooldown and possibly more chances for rain later in the week.",
}
if len(got.KeyMessages) != len(wantMessages) {
t.Fatalf("KeyMessages len = %d, want %d", len(got.KeyMessages), len(wantMessages))
}
for i := range wantMessages {
if got.KeyMessages[i] != wantMessages[i] {
t.Fatalf("KeyMessages[%d] = %q, want %q", i, got.KeyMessages[i], wantMessages[i])
}
}
if got.ShortTerm == nil {
t.Fatalf("ShortTerm is nil")
}
if got.ShortTerm.Qualifier != "(Through Late Sunday Night)" {
t.Fatalf("ShortTerm.Qualifier = %q", got.ShortTerm.Qualifier)
}
if got.ShortTerm.IssuedAt == nil || !got.ShortTerm.IssuedAt.Equal(time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)) {
t.Fatalf("ShortTerm.IssuedAt = %v", got.ShortTerm.IssuedAt)
}
if !strings.Contains(got.ShortTerm.Text, "After a chilly morning") {
t.Fatalf("ShortTerm.Text missing expected prose: %q", got.ShortTerm.Text)
}
if strings.Contains(got.ShortTerm.Text, "BRC") {
t.Fatalf("ShortTerm.Text should not include signature: %q", got.ShortTerm.Text)
}
if strings.Contains(got.ShortTerm.Text, "\n\n\n") {
t.Fatalf("ShortTerm.Text contains unexpected paragraph breaks: %q", got.ShortTerm.Text)
}
if got.LongTerm == nil {
t.Fatalf("LongTerm is nil")
}
if got.LongTerm.Qualifier != "(Monday through Next Saturday)" {
t.Fatalf("LongTerm.Qualifier = %q", got.LongTerm.Qualifier)
}
if got.LongTerm.IssuedAt == nil || !got.LongTerm.IssuedAt.Equal(time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)) {
t.Fatalf("LongTerm.IssuedAt = %v", got.LongTerm.IssuedAt)
}
if !strings.Contains(got.LongTerm.Text, "The peak of the warmth arrives Monday and Tuesday") {
t.Fatalf("LongTerm.Text missing expected prose: %q", got.LongTerm.Text)
}
if strings.Contains(got.LongTerm.Text, "AVIATION") || strings.Contains(got.LongTerm.Text, "WATCHES/WARNINGS/ADVISORIES") {
t.Fatalf("LongTerm.Text includes content from other sections: %q", got.LongTerm.Text)
}
}
func TestParseForecastDiscussionHTMLMissingPreBlock(t *testing.T) {
_, err := ParseForecastDiscussionHTML("<html><body><div>no pre block</div></body></html>")
if err == nil {
t.Fatalf("ParseForecastDiscussionHTML() error = nil, want error")
}
if !strings.Contains(err.Error(), "glossaryProduct") {
t.Fatalf("error = %q, want glossaryProduct context", err)
}
}
func TestParseForecastDiscussionTextMissingIssueTime(t *testing.T) {
_, err := ParseForecastDiscussionText("National Weather Service Saint Louis MO\n\n.KEY MESSAGES...\n- Test")
if err == nil {
t.Fatalf("ParseForecastDiscussionText() error = nil, want error")
}
if !strings.Contains(err.Error(), "issue time") {
t.Fatalf("error = %q, want issue time context", err)
}
}
func loadForecastDiscussionSampleHTML(t *testing.T) string {
t.Helper()
path := filepath.Join("testdata", "forecast_discussion_sample.html")
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("os.ReadFile(%q) error = %v", path, err)
}
return string(b)
}

View File

@@ -0,0 +1,84 @@
<!DOCTYPE html>
<html class="no-js">
<head>
<meta name="DC.date.created" scheme="ISO8601" content="2026-03-28T20:29:47+00:00" />
<title>National Weather Service</title>
</head>
<body>
<pre class="glossaryProduct">
988
FXUS63 KLSX 281924
AFDLSX
Area Forecast Discussion
National Weather Service Saint Louis MO
224 PM CDT Sat Mar 28 2026
.KEY MESSAGES...
- Elevated fire danger conditions are expected across a broad area
tomorrow afternoon due to breezy southwest winds and low
humidity.
- Very warm temperatures are expected once again Monday and
Tuesday, with highs well into the 80s.
- A cold front late Tuesday or early Wednesday brings our next
chance of thunderstorms, followed by a cooldown and possibly
more chances for rain later in the week.
&&
.SHORT TERM... (Through Late Sunday Night)
Issued at 219 PM CDT Sat Mar 28 2026
After a chilly morning that saw widespread freezing temperatures,
another warmup is in store over the next several days as southerly
winds become re-established. We will also see the return of
shower/thunderstorm chances Tuesday onward as we enter a more
unsettled pattern.
In the near-term, the focus continues to be on some lingering fire
weather potential thanks to the presence of an exceptionally dry
airmass.
BRC
&&
.LONG TERM... (Monday through Next Saturday)
Issued at 219 PM CDT Sat Mar 28 2026
The peak of the warmth arrives Monday and Tuesday, as a broad, but
low-amplitude ridge nudges eastward and steady warm/moist advection
continues on both days.
Wednesday onward, the day-to-day details become much less clear, but
latest trends suggest that an active/wet pattern will likely
continue as another more substantial trough follows with additional
chances for showers/thunderstorms late in the week.
BRC
&&
.AVIATION... (For the 18z TAFs through 18z Sunday Afternoon)
Issued at 1133 AM CDT Sat Mar 28 2026
VFR conditions are expected throughout the 18Z TAF period.
BRC
&&
.LSX WATCHES/WARNINGS/ADVISORIES...
MO...None.
IL...None.
&&
$$
WFO LSX
</pre>
</body>
</html>

View File

@@ -20,6 +20,8 @@ func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWri
return mapObservationEvent(e) return mapObservationEvent(e)
case standards.SchemaWeatherForecastV1: case standards.SchemaWeatherForecastV1:
return mapForecastEvent(e) return mapForecastEvent(e)
case standards.SchemaWeatherForecastDiscussionV1:
return mapForecastDiscussionEvent(e)
case standards.SchemaWeatherAlertV1: case standards.SchemaWeatherAlertV1:
return mapAlertEvent(e) return mapAlertEvent(e)
default: default:
@@ -160,6 +162,62 @@ func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
return writes, nil return writes, nil
} }
func mapForecastDiscussionEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherForecastDiscussion](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode forecast discussion payload: %w", err)
}
if run.IssuedAt.IsZero() {
return nil, fmt.Errorf("decode forecast discussion payload: issuedAt is required")
}
if strings.TrimSpace(string(run.Product)) == "" {
return nil, fmt.Errorf("decode forecast discussion payload: product is required")
}
issuedAt := run.IssuedAt.UTC()
shortTermQualifier, shortTermIssuedAt, shortTermText := nullableDiscussionSection(run.ShortTerm)
longTermQualifier, longTermIssuedAt, longTermText := nullableDiscussionSection(run.LongTerm)
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.KeyMessages))
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecastDiscussions,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"office_id": nullableString(run.OfficeID),
"office_name": nullableString(run.OfficeName),
"issued_at": issuedAt,
"updated_at": nullableTime(run.UpdatedAt),
"product": string(run.Product),
"short_term_qualifier": shortTermQualifier,
"short_term_issued_at": shortTermIssuedAt,
"short_term_text": shortTermText,
"long_term_qualifier": longTermQualifier,
"long_term_issued_at": longTermIssuedAt,
"long_term_text": longTermText,
"key_message_count": len(run.KeyMessages),
},
})
for i, msg := range run.KeyMessages {
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecastDiscussionKeyMessages,
Values: map[string]any{
"run_event_id": e.ID,
"message_index": i,
"issued_at": issuedAt,
"message_text": nullableString(msg),
},
})
}
return writes, nil
}
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherAlertRun](e.Payload) run, err := decodePayload[model.WeatherAlertRun](e.Payload)
if err != nil { if err != nil {
@@ -269,6 +327,13 @@ func decodePayload[T any](payload any) (T, error) {
return out, nil return out, nil
} }
func nullableDiscussionSection(section *model.WeatherForecastDiscussionSection) (any, any, any) {
if section == nil {
return nil, nil, nil
}
return nullableString(section.Qualifier), nullableTime(section.IssuedAt), nullableString(section.Text)
}
func countAlertReferences(alerts []model.WeatherAlert) int { func countAlertReferences(alerts []model.WeatherAlert) int {
total := 0 total := 0
for _, a := range alerts { for _, a := range alerts {

View File

@@ -146,6 +146,50 @@ func TestMapPostgresEventAlertStructPayload(t *testing.T) {
assertAllWritesIncludeAllColumns(t, writes) assertAllWritesIncludeAllColumns(t, writes)
} }
func TestMapPostgresEventForecastDiscussionStructPayload(t *testing.T) {
updatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC)
shortIssuedAt := time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)
run := model.WeatherForecastDiscussion{
OfficeID: "LSX",
OfficeName: "National Weather Service Saint Louis MO",
Product: model.ForecastDiscussionProductAFD,
IssuedAt: time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC),
UpdatedAt: &updatedAt,
KeyMessages: []string{"msg one", "msg two"},
ShortTerm: &model.WeatherForecastDiscussionSection{Qualifier: "(Tonight)", IssuedAt: &shortIssuedAt, Text: "Short term text"},
LongTerm: &model.WeatherForecastDiscussionSection{Text: "Long term text"},
}
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", run))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 3 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
}
if writes[0].Table != tableForecastDiscussions {
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecastDiscussions)
}
if got := writes[0].Values["key_message_count"]; got != 2 {
t.Fatalf("forecast_discussions key_message_count = %#v, want 2", got)
}
if got := writes[0].Values["short_term_qualifier"]; got != "(Tonight)" {
t.Fatalf("forecast_discussions short_term_qualifier = %#v, want (Tonight)", got)
}
if got := writes[0].Values["long_term_issued_at"]; got != nil {
t.Fatalf("forecast_discussions long_term_issued_at = %#v, want nil", got)
}
if writes[1].Table != tableForecastDiscussionKeyMessages || writes[2].Table != tableForecastDiscussionKeyMessages {
t.Fatalf("forecast discussion key message writes not in expected order")
}
if got := writes[2].Values["message_index"]; got != 1 {
t.Fatalf("second key message index = %#v, want 1", got)
}
assertAllWritesIncludeAllColumns(t, writes)
}
func TestMapPostgresEventMapPayload(t *testing.T) { func TestMapPostgresEventMapPayload(t *testing.T) {
run := model.WeatherForecastRun{ run := model.WeatherForecastRun{
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
@@ -201,6 +245,16 @@ func TestMapPostgresEventMalformedPayload(t *testing.T) {
} }
} }
func TestMapPostgresEventForecastDiscussionMalformedPayload(t *testing.T) {
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", "bad"))
if err == nil {
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
}
if !strings.Contains(err.Error(), "decode forecast discussion payload") {
t.Fatalf("error = %q, want decode forecast discussion payload context", err)
}
}
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event { func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC) effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
return fkevent.Event{ return fkevent.Event{

View File

@@ -5,13 +5,15 @@ import (
) )
const ( const (
tableObservations = "observations" tableObservations = "observations"
tableObservationPresentWeather = "observation_present_weather" tableObservationPresentWeather = "observation_present_weather"
tableForecasts = "forecasts" tableForecasts = "forecasts"
tableForecastPeriods = "forecast_periods" tableForecastPeriods = "forecast_periods"
tableAlertRuns = "alert_runs" tableForecastDiscussions = "forecast_discussions"
tableAlerts = "alerts" tableForecastDiscussionKeyMessages = "forecast_discussion_key_messages"
tableAlertReferences = "alert_references" tableAlertRuns = "alert_runs"
tableAlerts = "alerts"
tableAlertReferences = "alert_references"
) )
// PostgresSchema returns weatherfeeder's Postgres schema definition. // PostgresSchema returns weatherfeeder's Postgres schema definition.
@@ -129,6 +131,49 @@ func PostgresSchema() fksinks.PostgresSchema {
{Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}}, {Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}},
}, },
}, },
{
Name: tableForecastDiscussions,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT", Nullable: false},
{Name: "event_kind", Type: "TEXT", Nullable: false},
{Name: "event_source", Type: "TEXT", Nullable: false},
{Name: "event_schema", Type: "TEXT", Nullable: false},
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "office_id", Type: "TEXT", Nullable: true},
{Name: "office_name", Type: "TEXT", Nullable: true},
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "product", Type: "TEXT", Nullable: false},
{Name: "short_term_qualifier", Type: "TEXT", Nullable: true},
{Name: "short_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "short_term_text", Type: "TEXT", Nullable: true},
{Name: "long_term_qualifier", Type: "TEXT", Nullable: true},
{Name: "long_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "long_term_text", Type: "TEXT", Nullable: true},
{Name: "key_message_count", Type: "INTEGER", Nullable: false},
},
PrimaryKey: []string{"event_id"},
PruneColumn: "issued_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_discussion_office_product_issued_at", Columns: []string{"office_id", "product", "issued_at"}},
{Name: "idx_wf_discussion_issued_at", Columns: []string{"issued_at"}},
},
},
{
Name: tableForecastDiscussionKeyMessages,
Columns: []fksinks.PostgresColumn{
{Name: "run_event_id", Type: "TEXT REFERENCES forecast_discussions(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "message_index", Type: "INTEGER", Nullable: false},
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "message_text", Type: "TEXT", Nullable: true},
},
PrimaryKey: []string{"run_event_id", "message_index"},
PruneColumn: "issued_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_discussion_message_issued_at", Columns: []string{"issued_at"}},
},
},
{ {
Name: tableAlertRuns, Name: tableAlertRuns,
Columns: []fksinks.PostgresColumn{ Columns: []fksinks.PostgresColumn{

View File

@@ -9,13 +9,15 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
} }
wantTables := map[string]bool{ wantTables := map[string]bool{
tableObservations: true, tableObservations: true,
tableObservationPresentWeather: true, tableObservationPresentWeather: true,
tableForecasts: true, tableForecasts: true,
tableForecastPeriods: true, tableForecastPeriods: true,
tableAlertRuns: true, tableForecastDiscussions: true,
tableAlerts: true, tableForecastDiscussionKeyMessages: true,
tableAlertReferences: true, tableAlertRuns: true,
tableAlerts: true,
tableAlertReferences: true,
} }
if len(s.Tables) != len(wantTables) { if len(s.Tables) != len(wantTables) {

View File

@@ -19,6 +19,9 @@ var pollDriverRegistrations = []pollDriverRegistration{
{driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }}, {driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }},
{driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }}, {driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }},
{driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }}, {driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }},
{driver: "nws_forecast_discussion", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewForecastDiscussionSource(cfg)
}},
{driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }}, {driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }},
{driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }}, {driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }},
{driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { {driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {

View File

@@ -34,6 +34,19 @@ func TestRegisterBuiltinsRegistersNWSNarrativeForecastDriver(t *testing.T) {
} }
} }
func TestRegisterBuiltinsRegistersNWSForecastDiscussionDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_discussion"))
if err != nil {
t.Fatalf("BuildInput(nws_forecast_discussion) error = %v", err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(nws_forecast_discussion) type = %T, want PollSource", in)
}
}
func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) { func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
reg := fksource.NewRegistry() reg := fksource.NewRegistry()
RegisterBuiltins(reg) RegisterBuiltins(reg)
@@ -56,6 +69,7 @@ func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) {
"nws_alerts", "nws_alerts",
"nws_forecast_hourly", "nws_forecast_hourly",
"nws_forecast_narrative", "nws_forecast_narrative",
"nws_forecast_discussion",
"openmeteo_observation", "openmeteo_observation",
"openmeteo_forecast", "openmeteo_forecast",
"openweather_observation", "openweather_observation",

View File

@@ -0,0 +1,68 @@
package nws
import (
"context"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastDiscussionSource polls an NWS forecast discussion HTML page and emits a RAW discussion Event.
//
// Output schema:
// - standards.SchemaRawNWSForecastDiscussionV1
type ForecastDiscussionSource struct {
http *fksources.HTTPSource
}
func NewForecastDiscussionSource(cfg config.SourceConfig) (*ForecastDiscussionSource, error) {
const driver = "nws_forecast_discussion"
hs, err := fksources.NewHTTPSource(driver, cfg, "text/html, application/xhtml+xml")
if err != nil {
return nil, err
}
return &ForecastDiscussionSource{http: hs}, nil
}
func (s *ForecastDiscussionSource) Name() string { return s.http.Name }
func (s *ForecastDiscussionSource) Kinds() []event.Kind {
return []event.Kind{event.Kind("forecast_discussion")}
}
func (s *ForecastDiscussionSource) Poll(ctx context.Context) ([]event.Event, error) {
body, changed, err := s.http.FetchBytesIfChanged(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
rawHTML := string(body)
parsed, err := nwscommon.ParseForecastDiscussionHTML(rawHTML)
if err != nil {
return nil, err
}
issuedAt := parsed.IssuedAt.UTC()
effectiveAt := &issuedAt
emittedAt := time.Now().UTC()
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
event.Kind("forecast_discussion"),
s.http.Name,
standards.SchemaRawNWSForecastDiscussionV1,
eventID,
emittedAt,
effectiveAt,
rawHTML,
)
}

View File

@@ -0,0 +1,138 @@
package nws
import (
"context"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestForecastDiscussionSourcePollEmitsExpectedEvent(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_, _ = w.Write([]byte(rawHTML))
}))
defer srv.Close()
src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL))
if err != nil {
t.Fatalf("NewForecastDiscussionSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast_discussion") {
t.Fatalf("Kinds() = %#v, want [forecast_discussion]", got)
}
events, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(events) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(events))
}
got := events[0]
if got.Kind != event.Kind("forecast_discussion") {
t.Fatalf("Kind = %q, want forecast_discussion", got.Kind)
}
if got.Schema != standards.SchemaRawNWSForecastDiscussionV1 {
t.Fatalf("Schema = %q, want %q", got.Schema, standards.SchemaRawNWSForecastDiscussionV1)
}
wantEffectiveAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC)
if got.EffectiveAt == nil || !got.EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("EffectiveAt = %v, want %s", got.EffectiveAt, wantEffectiveAt.Format(time.RFC3339))
}
payload, ok := got.Payload.(string)
if !ok {
t.Fatalf("Payload type = %T, want string", got.Payload)
}
if payload != rawHTML {
t.Fatalf("Payload did not preserve exact HTML")
}
}
func TestForecastDiscussionSourcePollReturnsNoEventsWhenUnchanged(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
const etag = `"discussion-v1"`
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
}
w.Header().Set("ETag", etag)
_, _ = w.Write([]byte(rawHTML))
}))
defer srv.Close()
src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL))
if err != nil {
t.Fatalf("NewForecastDiscussionSource() error = %v", err)
}
first, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("first Poll() error = %v", err)
}
if len(first) != 1 {
t.Fatalf("first Poll() len = %d, want 1", len(first))
}
second, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("second Poll() error = %v", err)
}
if len(second) != 0 {
t.Fatalf("second Poll() len = %d, want 0", len(second))
}
}
func TestForecastDiscussionSourcePollRejectsInvalidHTML(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("<html><body><div>missing discussion block</div></body></html>"))
}))
defer srv.Close()
src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL))
if err != nil {
t.Fatalf("NewForecastDiscussionSource() error = %v", err)
}
_, err = src.Poll(context.Background())
if err == nil {
t.Fatalf("Poll() error = nil, want error")
}
if !strings.Contains(err.Error(), "glossaryProduct") {
t.Fatalf("error = %q, want glossaryProduct context", err)
}
}
func forecastDiscussionSourceConfig(url string) config.SourceConfig {
return config.SourceConfig{
Name: "test-forecast-discussion-source",
Driver: "nws_forecast_discussion",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": url,
"user_agent": "test-agent",
},
}
}
func loadForecastDiscussionSampleHTML(t *testing.T) string {
t.Helper()
path := filepath.Join("..", "..", "providers", "nws", "testdata", "forecast_discussion_sample.html")
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("os.ReadFile(%q) error = %v", path, err)
}
return string(b)
}

View File

@@ -0,0 +1,40 @@
package model
import "time"
// ForecastDiscussionProduct distinguishes the discussion bulletin family.
//
// Today weatherfeeder only normalizes Area Forecast Discussion (AFD) products,
// but this remains a distinct type so additional discussion-like products can be
// added without changing the payload field type.
type ForecastDiscussionProduct string
const (
ForecastDiscussionProductAFD ForecastDiscussionProduct = "afd"
)
// WeatherForecastDiscussion is a canonical issued discussion bulletin for an NWS office.
//
// Unlike WeatherForecastRun, this is authored narrative text rather than a sequence
// of forecast periods.
type WeatherForecastDiscussion struct {
OfficeID string `json:"officeId,omitempty"`
OfficeName string `json:"officeName,omitempty"`
Product ForecastDiscussionProduct `json:"product"`
IssuedAt time.Time `json:"issuedAt"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
KeyMessages []string `json:"keyMessages,omitempty"`
ShortTerm *WeatherForecastDiscussionSection `json:"shortTerm,omitempty"`
LongTerm *WeatherForecastDiscussionSection `json:"longTerm,omitempty"`
}
// WeatherForecastDiscussionSection is a fixed prose section within a discussion bulletin.
type WeatherForecastDiscussionSection struct {
Qualifier string `json:"qualifier,omitempty"`
IssuedAt *time.Time `json:"issuedAt,omitempty"`
Text string `json:"text,omitempty"`
}

View File

@@ -17,13 +17,15 @@ const (
SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1" SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1"
SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1" SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1"
SchemaRawNWSForecastDiscussionV1 = "raw.nws.forecast_discussion.v1"
SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1" SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1"
SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1" SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1"
SchemaRawNWSAlertsV1 = "raw.nws.alerts.v1" SchemaRawNWSAlertsV1 = "raw.nws.alerts.v1"
// Canonical domain schemas (emitted after normalization). // Canonical domain schemas (emitted after normalization).
SchemaWeatherObservationV1 = "weather.observation.v1" SchemaWeatherObservationV1 = "weather.observation.v1"
SchemaWeatherForecastV1 = "weather.forecast.v1" SchemaWeatherForecastV1 = "weather.forecast.v1"
SchemaWeatherAlertV1 = "weather.alert.v1" SchemaWeatherForecastDiscussionV1 = "weather.forecast_discussion.v1"
SchemaWeatherAlertV1 = "weather.alert.v1"
) )