4 Commits

Author SHA1 Message Date
a0389ebce8 Added support for Area Forecast Discussions issued by the NWS
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 16:17:03 -05:00
40f17c9d86 Updates to track upstream feedkit v0.8.2
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 13:53:54 -05:00
c76088c38c Code cleanup and deduplication pass through weatherfeeder
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
ci/woodpecker/manual/build-image Pipeline was successful
2026-03-28 12:01:07 -05:00
2c1278a70a Moved generic and broadly useful helper functions upstream into feedkit
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 11:30:20 -05:00
47 changed files with 2160 additions and 800 deletions

42
API.md
View File

@@ -37,14 +37,24 @@ Examples:
## Canonical schemas ## Canonical schemas
weatherfeeder emits three canonical domain schemas: weatherfeeder emits four canonical domain schemas:
- `weather.observation.v1` - `weather.observation.v1`
- `weather.forecast.v1` - `weather.forecast.v1`
- `weather.forecast_discussion.v1`
- `weather.alert.v1` - `weather.alert.v1`
Each payload is described below using the JSON field names as the contract. Each payload is described below using the JSON field names as the contract.
### Raw upstream schemas
weatherfeeder sources also emit provider-specific raw schemas before normalization.
For this feature, the raw source schema is:
- `raw.nws.forecast_discussion.v1`
- payload type: string
- payload contents: exact fetched HTML response body
--- ---
## Shared Conventions ## Shared Conventions
@@ -217,6 +227,36 @@ A run may contain zero, one, or many alerts.
--- ---
## Schema: `weather.forecast_discussion.v1`
Payload type: `WeatherForecastDiscussion`
A `WeatherForecastDiscussion` is an issued narrative bulletin for an NWS office.
It is distinct from `weather.forecast.v1`, which is period-based.
### Fields
| Field | Type | Required | Notes |
|---|---:|:---:|---|
| `officeId` | string | no | NWS office identifier, e.g. `LSX` |
| `officeName` | string | no | Human office name |
| `product` | string | yes | Currently `afd` |
| `issuedAt` | string (timestamp) | yes | Bulletin issue time |
| `updatedAt` | string (timestamp) | no | Optional page/update timestamp |
| `keyMessages` | array | no | Ordered key-message bullet list |
| `shortTerm` | object | no | Short-term discussion section |
| `longTerm` | object | no | Long-term discussion section |
### Nested: `shortTerm` / `longTerm`
| Field | Type | Required | Notes |
|---|---:|:---:|---|
| `qualifier` | string | no | Header qualifier such as `(Through Late Sunday Night)` |
| `issuedAt` | string (timestamp) | no | Optional section-local issue time |
| `text` | string | no | Paragraph-preserved prose text |
---
## Compatibility rules ## Compatibility rules
- Consumers **must** ignore unknown fields. - Consumers **must** ignore unknown fields.

View File

@@ -14,6 +14,7 @@ Canonical domain schemas emitted after normalization:
- `weather.observation.v1``WeatherObservation` - `weather.observation.v1``WeatherObservation`
- `weather.forecast.v1``WeatherForecastRun` - `weather.forecast.v1``WeatherForecastRun`
- `weather.forecast_discussion.v1``WeatherForecastDiscussion`
- `weather.alert.v1``WeatherAlertRun` - `weather.alert.v1``WeatherAlertRun`
For the complete wire contract (event envelope + payload schemas, fields, units, and compatibility rules), see: For the complete wire contract (event envelope + payload schemas, fields, units, and compatibility rules), see:
@@ -22,7 +23,7 @@ For the complete wire contract (event envelope + payload schemas, fields, units,
## Upstream providers (current MVP) ## Upstream providers (current MVP)
- NWS: observations, hourly forecasts, narrative forecasts, alerts - NWS: observations, hourly forecasts, narrative forecasts, forecast discussions, alerts
- Open-Meteo: observations, hourly forecasts - Open-Meteo: observations, hourly forecasts
- OpenWeather: observations - OpenWeather: observations

View File

@@ -15,7 +15,7 @@ sources:
# driver: openmeteo_observation # driver: openmeteo_observation
# every: 10m # every: 10m
# params: # params:
# url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1" # url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl"
# user_agent: "HomeOps (eric@maximumdirect.net)" # user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: OpenWeatherObservation # - name: OpenWeatherObservation
@@ -63,6 +63,15 @@ sources:
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us" url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us"
user_agent: "HomeOps (eric@maximumdirect.net)" user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSForecastDiscussionSTL
mode: poll
kinds: ["forecast_discussion"]
driver: nws_forecast_discussion
every: 30m
params:
url: "https://forecast.weather.gov/product.php?site=LSX&issuedby=LSX&product=AFD&format=TXT&version=1&glossary=0"
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenMeteoHourlyForecastSTL - name: OpenMeteoHourlyForecastSTL
mode: poll mode: poll
kinds: ["forecast"] kinds: ["forecast"]
@@ -90,7 +99,7 @@ sinks:
driver: nats driver: nats
params: params:
url: nats://nats:4222 url: nats://nats:4222
exchange: weatherfeeder subject: weatherfeeder
# - name: pg_weatherfeeder # - name: pg_weatherfeeder
# driver: postgres # driver: postgres
@@ -108,13 +117,13 @@ sinks:
routes: routes:
- sink: stdout - sink: stdout
kinds: ["observation", "forecast", "alert"] kinds: ["observation", "forecast", "forecast_discussion", "alert"]
- sink: nats_weatherfeeder - sink: nats_weatherfeeder
kinds: ["observation", "forecast", "alert"] kinds: ["observation", "forecast", "forecast_discussion", "alert"]
# - sink: pg_weatherfeeder # - sink: pg_weatherfeeder
# kinds: ["observation", "forecast", "alert"] # kinds: ["observation", "forecast", "forecast_discussion", "alert"]
# - sink: logfile # - sink: logfile
# kinds: ["observation", "alert", "forecast"] # kinds: ["observation", "alert", "forecast", "forecast_discussion"]

View File

@@ -3,14 +3,10 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"sort"
"strings"
"syscall" "syscall"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
@@ -41,9 +37,6 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("config load failed: %v", err) log.Fatalf("config load failed: %v", err)
} }
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
log.Fatalf("postgres schema registration failed: %v", err)
}
// --- Registries --- // --- Registries ---
srcReg := fksources.NewRegistry() srcReg := fksources.NewRegistry()
@@ -51,15 +44,8 @@ func main() {
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs. // Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
sinkReg := fksinks.NewRegistry() sinkReg := fksinks.NewRegistry()
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) { fksinks.RegisterBuiltins(sinkReg)
return fksinks.NewStdoutSink(cfg.Name), nil sinkReg.Register("postgres", fksinks.PostgresFactory(wfpgsink.PostgresSchema()))
})
sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewPostgresSinkFromConfig(cfg)
})
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewNATSSinkFromConfig(cfg)
})
// --- Build sources into scheduler jobs --- // --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job var jobs []fkscheduler.Job
@@ -69,7 +55,7 @@ func main() {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
} }
if err := validateSourceExpectedKinds(sc, in); err != nil { if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
} }
@@ -99,7 +85,7 @@ func main() {
} }
// --- Compile routes --- // --- Compile routes ---
routes, err := compileRoutes(cfg, builtSinks) routes, err := fkdispatch.CompileRoutes(cfg)
if err != nil { if err != nil {
log.Fatalf("compile routes failed: %v", err) log.Fatalf("compile routes failed: %v", err)
} }
@@ -160,124 +146,6 @@ func main() {
log.Printf("shutdown complete") log.Printf("shutdown complete")
} }
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
if len(cfg.Routes) == 0 {
return defaultRoutes(builtSinks), nil
}
var routes []fkdispatch.Route
for i, r := range cfg.Routes {
if strings.TrimSpace(r.Sink) == "" {
return nil, fmt.Errorf("routes[%d].sink is empty", i)
}
if _, ok := builtSinks[r.Sink]; !ok {
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
}
kinds := map[fkevent.Kind]bool{}
for j, k := range r.Kinds {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
}
kinds[kind] = true
}
routes = append(routes, fkdispatch.Route{
SinkName: r.Sink,
Kinds: kinds,
})
}
return routes, nil
}
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
// nil Kinds means "match all kinds" by convention
var allKinds map[fkevent.Kind]bool = nil
routes := make([]fkdispatch.Route, 0, len(builtSinks))
for name := range builtSinks {
routes = append(routes, fkdispatch.Route{
SinkName: name,
Kinds: allKinds,
})
}
return routes
}
func isContextShutdown(err error) bool { func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
} }
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
if err != nil {
return err
}
if len(expectedKinds) == 0 {
return nil
}
advertisedKinds := advertisedSourceKinds(in)
if len(advertisedKinds) == 0 {
return nil
}
for kind := range expectedKinds {
if !advertisedKinds[kind] {
return fmt.Errorf(
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
kind,
sortedKinds(expectedKinds),
sortedKinds(advertisedKinds),
)
}
}
return nil
}
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
kinds := map[fkevent.Kind]bool{}
for i, k := range raw {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
}
kinds[kind] = true
}
return kinds, nil
}
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
if in == nil {
return nil
}
kinds := map[fkevent.Kind]bool{}
if ks, ok := in.(fksources.KindsSource); ok {
for _, kind := range ks.Kinds() {
kinds[kind] = true
}
return kinds
}
if ks, ok := in.(fksources.KindSource); ok {
kinds[ks.Kind()] = true
return kinds
}
return nil
}
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
out := make([]string, 0, len(kindSet))
for kind := range kindSet {
out = append(out, string(kind))
}
sort.Strings(out)
return out
}
// keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second

View File

@@ -13,6 +13,7 @@ import (
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe" fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
) )
@@ -23,13 +24,6 @@ type testInput struct {
func (s testInput) Name() string { return s.name } func (s testInput) Name() string { return s.name }
type testKindSource struct {
testInput
kind fkevent.Kind
}
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
type testKindsSource struct { type testKindsSource struct {
testInput testInput
kinds []fkevent.Kind kinds []fkevent.Kind
@@ -37,18 +31,6 @@ type testKindsSource struct {
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds } func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
sc := config.SourceConfig{Kind: "observation"}
in := testKindSource{
testInput: testInput{name: "test"},
kind: fkevent.Kind("observation"),
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) { func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"observation"}} sc := config.SourceConfig{Kinds: []string{"observation"}}
in := testKindsSource{ in := testKindsSource{
@@ -56,8 +38,8 @@ func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
kinds: []fkevent.Kind{"observation", "forecast"}, kinds: []fkevent.Kind{"observation", "forecast"},
} }
if err := validateSourceExpectedKinds(sc, in); err != nil { if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
} }
} }
@@ -68,12 +50,12 @@ func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
kinds: []fkevent.Kind{"observation", "forecast"}, kinds: []fkevent.Kind{"observation", "forecast"},
} }
err := validateSourceExpectedKinds(sc, in) err := fksources.ValidateExpectedKinds(sc, in)
if err == nil { if err == nil {
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil") t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil")
} }
if !strings.Contains(err.Error(), "configured expected kind") { if !strings.Contains(err.Error(), "configured expected kind") {
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err) t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err)
} }
} }
@@ -81,14 +63,8 @@ func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}} sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testInput{name: "test"} in := testInput{name: "test"}
if err := validateSourceExpectedKinds(sc, in); err != nil { if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
}
}
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
if _, err := parseExpectedKinds([]string{""}); err == nil {
t.Fatalf("parseExpectedKinds() expected error for empty kind")
} }
} }

2
go.mod
View File

@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25 go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.8.0 require gitea.maximumdirect.net/ejr/feedkit v0.8.2
require ( require (
github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/compress v1.17.2 // indirect

4
go.sum
View File

@@ -1,5 +1,5 @@
gitea.maximumdirect.net/ejr/feedkit v0.8.0 h1:JdEEy6T3AQ97alLNYcQ3crN3tOEZPLMBD0Qr/MH5/dw= gitea.maximumdirect.net/ejr/feedkit v0.8.2 h1:6AMxNacfqJ8SXQhFAUMW3LgiVxixs50tf+S8Q9Ivm+Y=
gitea.maximumdirect.net/ejr/feedkit v0.8.0/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= gitea.maximumdirect.net/ejr/feedkit v0.8.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=

View File

@@ -9,6 +9,12 @@ import (
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
) )
var builtinRegistrations = []func([]fknormalize.Normalizer) []fknormalize.Normalizer{
nws.Register,
openmeteo.Register,
openweather.Register,
}
// RegisterBuiltins registers all normalizers shipped with this binary. // RegisterBuiltins registers all normalizers shipped with this binary.
// //
// This mirrors internal/sources.RegisterBuiltins, but note the selection model: // This mirrors internal/sources.RegisterBuiltins, but note the selection model:
@@ -27,9 +33,9 @@ func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
// //
// Order here should be stable across releases to reduce surprises when adding // Order here should be stable across releases to reduce surprises when adding
// new normalizers. // new normalizers.
out = nws.Register(out) for _, register := range builtinRegistrations {
out = openmeteo.Register(out) out = register(out)
out = openweather.Register(out) }
return out return out
} }

View File

@@ -19,6 +19,7 @@ func TestRegisterBuiltinsOrder(t *testing.T) {
want := []fknormalize.Normalizer{ want := []fknormalize.Normalizer{
nws.ObservationNormalizer{}, nws.ObservationNormalizer{},
nws.ForecastNormalizer{}, nws.ForecastNormalizer{},
nws.ForecastDiscussionNormalizer{},
nws.AlertsNormalizer{}, nws.AlertsNormalizer{},
openmeteo.ObservationNormalizer{}, openmeteo.ObservationNormalizer{},
openmeteo.ForecastNormalizer{}, openmeteo.ForecastNormalizer{},

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// Finalize builds the output event envelope by copying the input and applying the // Finalize builds the output event envelope by copying the input and applying the
@@ -16,20 +17,7 @@ import (
// If effectiveAt is zero, any existing in.EffectiveAt is preserved. // If effectiveAt is zero, any existing in.EffectiveAt is preserved.
// - Payload floats are rounded to a stable wire-friendly precision (see round.go). // - Payload floats are rounded to a stable wire-friendly precision (see round.go).
func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) { func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) {
out := in // Enforce stable numeric presentation for weather payloads before delegating to feedkit's
out.Schema = outSchema // generic envelope finalizer.
return fknormalize.FinalizeEvent(in, outSchema, RoundFloats(outPayload, DefaultFloatPrecision), effectiveAt)
// Enforce stable numeric presentation for sinks: round floats in the canonical payload.
out.Payload = RoundFloats(outPayload, DefaultFloatPrecision)
if !effectiveAt.IsZero() {
t := effectiveAt.UTC()
out.EffectiveAt = &t
}
if err := out.Validate(); err != nil {
return nil, err
}
return &out, nil
} }

View File

@@ -0,0 +1,36 @@
package common
import (
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestFinalizeRoundsWeatherPayloadFloats(t *testing.T) {
type payload struct {
Value float64
}
in := event.Event{
ID: "evt-1",
Kind: event.Kind("observation"),
Source: "source-a",
EmittedAt: time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC),
Schema: "raw.example.v1",
Payload: map[string]any{"old": true},
}
out, err := Finalize(in, "weather.example.v1", payload{Value: 1.234567}, time.Time{})
if err != nil {
t.Fatalf("Finalize() unexpected error: %v", err)
}
got, ok := out.Payload.(payload)
if !ok {
t.Fatalf("Finalize() payload type = %T, want payload", out.Payload)
}
if got.Value != 1.2346 {
t.Fatalf("Finalize() rounded value = %v, want 1.2346", got.Value)
}
}

View File

@@ -2,11 +2,11 @@
package common package common
import ( import (
"encoding/json"
"fmt" "fmt"
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T. // DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T.
@@ -18,19 +18,7 @@ import (
// Errors include a small amount of stage context ("extract payload", "decode raw payload"). // Errors include a small amount of stage context ("extract payload", "decode raw payload").
// Callers typically wrap these with a provider/kind label. // Callers typically wrap these with a provider/kind label.
func DecodeJSONPayload[T any](in event.Event) (T, error) { func DecodeJSONPayload[T any](in event.Event) (T, error) {
var zero T return fknormalize.DecodeJSONPayload[T](in)
b, err := PayloadBytes(in)
if err != nil {
return zero, fmt.Errorf("extract payload: %w", err)
}
var parsed T
if err := json.Unmarshal(b, &parsed); err != nil {
return zero, fmt.Errorf("decode raw payload: %w", err)
}
return parsed, nil
} }
// NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern: // NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern:

View File

@@ -1,53 +0,0 @@
package common
import (
"encoding/json"
"fmt"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// PayloadBytes extracts a JSON payload into bytes suitable for json.Unmarshal.
//
// Supported payload shapes (weatherfeeder convention):
// - json.RawMessage (recommended for raw events)
// - []byte
// - string (assumed to contain JSON)
// - map[string]any (re-marshaled to JSON)
//
// If you add other raw representations later, extend this function.
func PayloadBytes(e event.Event) ([]byte, error) {
if e.Payload == nil {
return nil, fmt.Errorf("payload is nil")
}
switch v := e.Payload.(type) {
case json.RawMessage:
if len(v) == 0 {
return nil, fmt.Errorf("payload is empty json.RawMessage")
}
return []byte(v), nil
case []byte:
if len(v) == 0 {
return nil, fmt.Errorf("payload is empty []byte")
}
return v, nil
case string:
if v == "" {
return nil, fmt.Errorf("payload is empty string")
}
return []byte(v), nil
case map[string]any:
b, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("marshal map payload: %w", err)
}
return b, nil
default:
return nil, fmt.Errorf("unsupported payload type %T", e.Payload)
}
}

View File

@@ -75,62 +75,63 @@ func normalizeNarrativeForecastEvent(in event.Event) (*event.Event, error) {
) )
} }
type forecastPeriodMapper[T any] func(idx int, period T) (model.WeatherForecastPeriod, error)
// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model). // buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) { func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) return buildForecastRun(
if err != nil { parsed.Properties.GeneratedAt,
return model.WeatherForecastRun{}, time.Time{}, err parsed.Properties.UpdateTime,
} parsed.Geometry.Coordinates,
// Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
run := newForecastRunBase(
issuedAt,
updatedAt,
model.ForecastProductHourly,
lat,
lon,
parsed.Properties.Elevation.Value, parsed.Properties.Elevation.Value,
model.ForecastProductHourly,
parsed.Properties.Periods,
mapHourlyForecastPeriod,
) )
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
for i, p := range parsed.Properties.Periods {
period, err := mapHourlyForecastPeriod(i, p)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
periods = append(periods, period)
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
} }
// buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model). // buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model).
func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) { func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) return buildForecastRun(
parsed.Properties.GeneratedAt,
parsed.Properties.UpdateTime,
parsed.Geometry.Coordinates,
parsed.Properties.Elevation.Value,
model.ForecastProductNarrative,
parsed.Properties.Periods,
mapNarrativeForecastPeriod,
)
}
func buildForecastRun[T any](
generatedAt string,
updateTime string,
coordinates [][][]float64,
elevation *float64,
product model.ForecastProduct,
srcPeriods []T,
mapPeriod forecastPeriodMapper[T],
) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(generatedAt, updateTime)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err return model.WeatherForecastRun{}, time.Time{}, err
} }
// Best-effort location centroid from the GeoJSON polygon (optional). // Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates) lat, lon := centroidLatLon(coordinates)
run := newForecastRunBase( run := newForecastRunBase(
issuedAt, issuedAt,
updatedAt, updatedAt,
model.ForecastProductNarrative, product,
lat, lat,
lon, lon,
parsed.Properties.Elevation.Value, elevation,
) )
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) periods := make([]model.WeatherForecastPeriod, 0, len(srcPeriods))
for i, p := range parsed.Properties.Periods { for i, p := range srcPeriods {
period, err := mapNarrativeForecastPeriod(i, p) period, err := mapPeriod(i, p)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err return model.WeatherForecastRun{}, time.Time{}, err
} }

View File

@@ -0,0 +1,72 @@
package nws
import (
"context"
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/event"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type ForecastDiscussionNormalizer struct{}
func (ForecastDiscussionNormalizer) Match(e event.Event) bool {
return strings.TrimSpace(e.Schema) == standards.SchemaRawNWSForecastDiscussionV1
}
func (ForecastDiscussionNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
_ = ctx
rawHTML, err := decodeStringPayload(in.Payload)
if err != nil {
return nil, fmt.Errorf("nws forecast discussion normalize: %w", err)
}
parsed, err := nwscommon.ParseForecastDiscussionHTML(rawHTML)
if err != nil {
return nil, fmt.Errorf("nws forecast discussion normalize: build: %w", err)
}
payload := model.WeatherForecastDiscussion{
OfficeID: strings.TrimSpace(parsed.OfficeID),
OfficeName: strings.TrimSpace(parsed.OfficeName),
Product: model.ForecastDiscussionProduct(strings.TrimSpace(parsed.Product)),
IssuedAt: parsed.IssuedAt.UTC(),
UpdatedAt: parsed.UpdatedAt,
KeyMessages: append([]string(nil), parsed.KeyMessages...),
ShortTerm: mapForecastDiscussionSection(parsed.ShortTerm),
LongTerm: mapForecastDiscussionSection(parsed.LongTerm),
}
out, err := normcommon.Finalize(in, standards.SchemaWeatherForecastDiscussionV1, payload, payload.IssuedAt)
if err != nil {
return nil, fmt.Errorf("nws forecast discussion normalize: %w", err)
}
return out, nil
}
func mapForecastDiscussionSection(in *nwscommon.ForecastDiscussionSection) *model.WeatherForecastDiscussionSection {
if in == nil {
return nil
}
return &model.WeatherForecastDiscussionSection{
Qualifier: strings.TrimSpace(in.Qualifier),
IssuedAt: in.IssuedAt,
Text: strings.TrimSpace(in.Text),
}
}
func decodeStringPayload(payload any) (string, error) {
switch v := payload.(type) {
case string:
return v, nil
case []byte:
return string(v), nil
default:
return "", fmt.Errorf("extract payload: expected string payload, got %T", payload)
}
}

View File

@@ -0,0 +1,130 @@
package nws
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestForecastDiscussionNormalizerProducesCanonicalSchema(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
out, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{
ID: "evt-discussion-1",
Kind: event.Kind("forecast_discussion"),
Source: "nws-discussion-test",
EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC),
Schema: standards.SchemaRawNWSForecastDiscussionV1,
Payload: rawHTML,
})
if err != nil {
t.Fatalf("Normalize() error = %v", err)
}
if out == nil {
t.Fatalf("Normalize() returned nil output")
}
if out.Schema != standards.SchemaWeatherForecastDiscussionV1 {
t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastDiscussionV1)
}
if out.Kind != event.Kind("forecast_discussion") {
t.Fatalf("Kind = %q, want forecast_discussion", out.Kind)
}
payload, ok := out.Payload.(model.WeatherForecastDiscussion)
if !ok {
t.Fatalf("Payload type = %T, want model.WeatherForecastDiscussion", out.Payload)
}
if payload.OfficeID != "LSX" {
t.Fatalf("OfficeID = %q, want LSX", payload.OfficeID)
}
if payload.Product != model.ForecastDiscussionProductAFD {
t.Fatalf("Product = %q, want %q", payload.Product, model.ForecastDiscussionProductAFD)
}
if len(payload.KeyMessages) != 3 {
t.Fatalf("KeyMessages len = %d, want 3", len(payload.KeyMessages))
}
if payload.ShortTerm == nil || payload.LongTerm == nil {
t.Fatalf("ShortTerm=%v LongTerm=%v, want both populated", payload.ShortTerm, payload.LongTerm)
}
if payload.ShortTerm.Qualifier != "(Through Late Sunday Night)" {
t.Fatalf("ShortTerm.Qualifier = %q", payload.ShortTerm.Qualifier)
}
if !strings.Contains(payload.ShortTerm.Text, "After a chilly morning") {
t.Fatalf("ShortTerm.Text = %q, want normalized prose", payload.ShortTerm.Text)
}
if strings.Contains(payload.ShortTerm.Text, "BRC") {
t.Fatalf("ShortTerm.Text contains signature: %q", payload.ShortTerm.Text)
}
if strings.Contains(payload.LongTerm.Text, "AVIATION") || strings.Contains(payload.LongTerm.Text, "WATCHES/WARNINGS/ADVISORIES") {
t.Fatalf("LongTerm.Text includes downstream sections: %q", payload.LongTerm.Text)
}
wantEffectiveAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC)
if out.EffectiveAt == nil || !out.EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("EffectiveAt = %v, want %s", out.EffectiveAt, wantEffectiveAt.Format(time.RFC3339))
}
}
func TestForecastDiscussionNormalizerRejectsMissingIssueTime(t *testing.T) {
_, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{
ID: "evt-discussion-bad",
Kind: event.Kind("forecast_discussion"),
Source: "nws-discussion-test",
EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC),
Schema: standards.SchemaRawNWSForecastDiscussionV1,
Payload: "<html><body><pre class=\"glossaryProduct\">National Weather Service Saint Louis MO</pre></body></html>",
})
if err == nil {
t.Fatalf("Normalize() error = nil, want error")
}
if !strings.Contains(err.Error(), "issue time") {
t.Fatalf("error = %q, want issue time context", err)
}
}
func TestForecastDiscussionNormalizerWireShapeHasNoUnexpectedKeys(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
out, err := (ForecastDiscussionNormalizer{}).Normalize(nil, event.Event{
ID: "evt-discussion-2",
Kind: event.Kind("forecast_discussion"),
Source: "nws-discussion-test",
EmittedAt: time.Date(2026, 3, 28, 19, 25, 0, 0, time.UTC),
Schema: standards.SchemaRawNWSForecastDiscussionV1,
Payload: rawHTML,
})
if err != nil {
t.Fatalf("Normalize() error = %v", err)
}
b, err := json.Marshal(out.Payload)
if err != nil {
t.Fatalf("json.Marshal(payload) error = %v", err)
}
var got map[string]any
if err := json.Unmarshal(b, &got); err != nil {
t.Fatalf("json.Unmarshal(payload) error = %v", err)
}
for _, key := range []string{"sections", "aviation"} {
if _, ok := got[key]; ok {
t.Fatalf("unexpected key %q in canonical payload", key)
}
}
}
func loadForecastDiscussionSampleHTML(t *testing.T) string {
t.Helper()
path := filepath.Join("..", "..", "providers", "nws", "testdata", "forecast_discussion_sample.html")
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("os.ReadFile(%q) error = %v", path, err)
}
return string(b)
}

View File

@@ -47,6 +47,55 @@ func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) {
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0]) assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
} }
func TestBuildHourlyForecastPreservesUpdatedAtCentroidAndElevation(t *testing.T) {
parsed := nwsHourlyForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z"
parsed.Properties.UpdateTime = "2026-03-16T18:30:00Z"
elevation := 123.4
parsed.Properties.Elevation.Value = &elevation
parsed.Geometry.Coordinates = [][][]float64{
{
{-90.0, 38.0},
{-89.0, 38.0},
{-89.0, 39.0},
{-90.0, 39.0},
},
}
parsed.Properties.Periods = []nwsHourlyForecastPeriod{
{
StartTime: "2026-03-16T19:00:00Z",
EndTime: "2026-03-16T20:00:00Z",
ShortForecast: "Cloudy",
},
}
run, effectiveAt, err := buildHourlyForecast(parsed)
if err != nil {
t.Fatalf("buildHourlyForecast() error = %v", err)
}
wantIssued := time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC)
wantUpdated := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
if !run.IssuedAt.Equal(wantIssued) {
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
if run.UpdatedAt == nil || !run.UpdatedAt.Equal(wantUpdated) {
t.Fatalf("UpdatedAt = %v, want %s", run.UpdatedAt, wantUpdated.Format(time.RFC3339))
}
if run.Latitude == nil || math.Abs(*run.Latitude-38.5) > 0.0001 {
t.Fatalf("Latitude = %v, want 38.5", run.Latitude)
}
if run.Longitude == nil || math.Abs(*run.Longitude+89.5) > 0.0001 {
t.Fatalf("Longitude = %v, want -89.5", run.Longitude)
}
if run.ElevationMeters == nil || math.Abs(*run.ElevationMeters-elevation) > 0.0001 {
t.Fatalf("ElevationMeters = %v, want %.1f", run.ElevationMeters, elevation)
}
if !effectiveAt.Equal(wantIssued) {
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
}
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) { func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{ _, err := normalizeForecastEventBySchema(event.Event{
Schema: "raw.nws.daily.forecast.v1", Schema: "raw.nws.daily.forecast.v1",
@@ -85,6 +134,70 @@ func TestNormalizeForecastEventBySchemaRoutesNarrative(t *testing.T) {
} }
} }
func TestNormalizeForecastEventBySchemaProducesCanonicalWeatherForecastSchema(t *testing.T) {
tests := []struct {
name string
schema string
payload map[string]any
}{
{
name: "hourly",
schema: standards.SchemaRawNWSHourlyForecastV1,
payload: map[string]any{
"properties": map[string]any{
"generatedAt": "2026-03-16T18:00:00Z",
"periods": []map[string]any{
{
"startTime": "2026-03-16T19:00:00Z",
"endTime": "2026-03-16T20:00:00Z",
"shortForecast": "Cloudy",
},
},
},
},
},
{
name: "narrative",
schema: standards.SchemaRawNWSNarrativeForecastV1,
payload: map[string]any{
"properties": map[string]any{
"generatedAt": "2026-03-16T18:00:00Z",
"periods": []map[string]any{
{
"startTime": "2026-03-16T19:00:00Z",
"endTime": "2026-03-16T20:00:00Z",
"shortForecast": "Cloudy",
"detailedForecast": "Cloudy",
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := normalizeForecastEventBySchema(event.Event{
ID: "evt-1",
Kind: event.Kind("forecast"),
Source: "nws-test",
EmittedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
Schema: tt.schema,
Payload: tt.payload,
})
if err != nil {
t.Fatalf("normalizeForecastEventBySchema() error = %v", err)
}
if out == nil {
t.Fatalf("normalizeForecastEventBySchema() returned nil output")
}
if out.Schema != standards.SchemaWeatherForecastV1 {
t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastV1)
}
})
}
}
func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) { func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) {
parsed := nwsNarrativeForecastResponse{} parsed := nwsNarrativeForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z" parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"

View File

@@ -5,18 +5,14 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
var builtins = []fknormalize.Normalizer{
ObservationNormalizer{},
ForecastNormalizer{},
ForecastDiscussionNormalizer{},
AlertsNormalizer{},
}
// Register appends NWS normalizers in stable order. // Register appends NWS normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in return append(in, builtins...)
// Observations
out = append(out, ObservationNormalizer{})
// Forecasts
out = append(out, ForecastNormalizer{})
// Alerts
out = append(out, AlertsNormalizer{})
return out
} }

View File

@@ -5,14 +5,12 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
var builtins = []fknormalize.Normalizer{
ObservationNormalizer{},
ForecastNormalizer{},
}
// Register appends Open-Meteo normalizers in stable order. // Register appends Open-Meteo normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in return append(in, builtins...)
// Observations
out = append(out, ObservationNormalizer{})
// Forecasts
out = append(out, ForecastNormalizer{})
return out
} }

View File

@@ -5,12 +5,11 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
var builtins = []fknormalize.Normalizer{
ObservationNormalizer{},
}
// Register appends OpenWeather normalizers in stable order. // Register appends OpenWeather normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in return append(in, builtins...)
// Observations
out = append(out, ObservationNormalizer{})
return out
} }

View File

@@ -0,0 +1,552 @@
package nws
import (
"fmt"
"html"
"regexp"
"strconv"
"strings"
"time"
)
type ForecastDiscussion struct {
OfficeID string
OfficeName string
Product string
IssuedAt time.Time
UpdatedAt *time.Time
KeyMessages []string
ShortTerm *ForecastDiscussionSection
LongTerm *ForecastDiscussionSection
}
type ForecastDiscussionSection struct {
Qualifier string
IssuedAt *time.Time
Text string
}
var (
forecastDiscussionHeaderRE = regexp.MustCompile(`^\.(KEY MESSAGES|SHORT TERM|LONG TERM|AVIATION)\.\.\.(.*)$`)
forecastDiscussionAFDRE = regexp.MustCompile(`^AFD([A-Z]{3})$`)
forecastDiscussionWMORE = regexp.MustCompile(`\bK([A-Z]{3})\b`)
forecastDiscussionSigRE = regexp.MustCompile(`^[A-Z]{2,6}$`)
)
func ParseForecastDiscussionHTML(raw string) (ForecastDiscussion, error) {
text, err := ExtractForecastDiscussionText(raw)
if err != nil {
return ForecastDiscussion{}, err
}
parsed, err := ParseForecastDiscussionText(text)
if err != nil {
return ForecastDiscussion{}, err
}
parsed.UpdatedAt = parseForecastDiscussionUpdatedAt(raw)
return parsed, nil
}
func ExtractForecastDiscussionText(raw string) (string, error) {
lower := strings.ToLower(raw)
searchFrom := 0
for {
openStart := strings.Index(lower[searchFrom:], "<pre")
if openStart < 0 {
return "", fmt.Errorf("missing <pre class=\"glossaryProduct\"> block")
}
openStart += searchFrom
openEnd := strings.Index(lower[openStart:], ">")
if openEnd < 0 {
return "", fmt.Errorf("unterminated <pre> tag")
}
openEnd += openStart
tag := lower[openStart : openEnd+1]
if isGlossaryProductTag(tag) {
closeStart := strings.Index(lower[openEnd+1:], "</pre>")
if closeStart < 0 {
return "", fmt.Errorf("missing closing </pre> for glossaryProduct block")
}
closeStart += openEnd + 1
text := html.UnescapeString(raw[openEnd+1 : closeStart])
text = strings.ReplaceAll(text, "\r\n", "\n")
text = strings.ReplaceAll(text, "\r", "\n")
return text, nil
}
searchFrom = openEnd + 1
}
}
func ParseForecastDiscussionText(text string) (ForecastDiscussion, error) {
lines := splitLines(text)
officeID := parseForecastDiscussionOfficeID(lines)
officeName, issuedAt, err := parseForecastDiscussionHeader(lines)
if err != nil {
return ForecastDiscussion{}, err
}
out := ForecastDiscussion{
OfficeID: officeID,
OfficeName: officeName,
Product: "afd",
IssuedAt: issuedAt.UTC(),
}
if block, ok := extractForecastDiscussionSection(lines, "KEY MESSAGES"); ok {
out.KeyMessages = parseForecastDiscussionKeyMessages(block)
}
if block, ok := extractForecastDiscussionSection(lines, "SHORT TERM"); ok {
section, err := parseForecastDiscussionTextSection(block)
if err != nil {
return ForecastDiscussion{}, fmt.Errorf("parse SHORT TERM: %w", err)
}
out.ShortTerm = &section
}
if block, ok := extractForecastDiscussionSection(lines, "LONG TERM"); ok {
section, err := parseForecastDiscussionTextSection(block)
if err != nil {
return ForecastDiscussion{}, fmt.Errorf("parse LONG TERM: %w", err)
}
out.LongTerm = &section
}
return out, nil
}
func isGlossaryProductTag(tag string) bool {
tag = strings.ToLower(tag)
return strings.Contains(tag, `class="glossaryproduct"`) ||
strings.Contains(tag, `class='glossaryproduct'`) ||
strings.Contains(tag, `class="glossaryproduct `) ||
strings.Contains(tag, `class='glossaryproduct `)
}
func parseForecastDiscussionUpdatedAt(raw string) *time.Time {
lower := strings.ToLower(raw)
searchFrom := 0
for {
metaStart := strings.Index(lower[searchFrom:], "<meta")
if metaStart < 0 {
return nil
}
metaStart += searchFrom
metaEnd := strings.Index(lower[metaStart:], ">")
if metaEnd < 0 {
return nil
}
metaEnd += metaStart
tag := raw[metaStart : metaEnd+1]
if !strings.EqualFold(strings.TrimSpace(extractHTMLAttr(tag, "name")), "DC.date.created") {
searchFrom = metaEnd + 1
continue
}
content := strings.TrimSpace(extractHTMLAttr(tag, "content"))
if content == "" {
return nil
}
t, err := ParseTime(content)
if err != nil {
return nil
}
tt := t.UTC()
return &tt
}
}
func extractHTMLAttr(tag, attr string) string {
lower := strings.ToLower(tag)
attrLower := strings.ToLower(attr)
for i := 0; i < len(lower); i++ {
idx := strings.Index(lower[i:], attrLower)
if idx < 0 {
return ""
}
idx += i
if idx > 0 {
prev := lower[idx-1]
if isAttrNameChar(prev) {
i = idx + len(attrLower)
continue
}
}
j := idx + len(attrLower)
for j < len(lower) && isHTMLSpace(lower[j]) {
j++
}
if j >= len(lower) || lower[j] != '=' {
i = idx + len(attrLower)
continue
}
j++
for j < len(lower) && isHTMLSpace(lower[j]) {
j++
}
if j >= len(tag) {
return ""
}
quote := tag[j]
if quote != '"' && quote != '\'' {
return ""
}
j++
k := j
for k < len(tag) && tag[k] != quote {
k++
}
if k >= len(tag) {
return ""
}
return html.UnescapeString(tag[j:k])
}
return ""
}
func isHTMLSpace(b byte) bool {
switch b {
case ' ', '\n', '\r', '\t', '\f':
return true
default:
return false
}
}
func isAttrNameChar(b byte) bool {
switch {
case b >= 'a' && b <= 'z':
return true
case b >= 'A' && b <= 'Z':
return true
case b >= '0' && b <= '9':
return true
case b == '-' || b == '_' || b == ':':
return true
default:
return false
}
}
func splitLines(text string) []string {
text = strings.ReplaceAll(text, "\r\n", "\n")
text = strings.ReplaceAll(text, "\r", "\n")
return strings.Split(text, "\n")
}
func parseForecastDiscussionOfficeID(lines []string) string {
for _, raw := range lines {
line := strings.TrimSpace(raw)
if m := forecastDiscussionAFDRE.FindStringSubmatch(line); len(m) == 2 {
return m[1]
}
}
for _, raw := range lines {
line := strings.TrimSpace(raw)
if m := forecastDiscussionWMORE.FindStringSubmatch(line); len(m) == 2 {
return m[1]
}
}
return ""
}
func parseForecastDiscussionHeader(lines []string) (string, time.Time, error) {
for i, raw := range lines {
line := strings.TrimSpace(raw)
if !strings.HasPrefix(line, "National Weather Service ") {
continue
}
officeName := line
for j := i + 1; j < len(lines); j++ {
tsLine := strings.TrimSpace(lines[j])
if tsLine == "" {
continue
}
issuedAt, err := parseForecastDiscussionIssueTime(tsLine)
if err != nil {
return "", time.Time{}, fmt.Errorf("parse bulletin issuedAt %q: %w", tsLine, err)
}
return officeName, issuedAt.UTC(), nil
}
return "", time.Time{}, fmt.Errorf("missing bulletin issue time after office line")
}
return "", time.Time{}, fmt.Errorf("missing office header")
}
func parseForecastDiscussionIssueTime(line string) (time.Time, error) {
line = strings.TrimSpace(line)
line = strings.TrimPrefix(line, "Issued at ")
line = strings.TrimSpace(line)
parts := strings.Fields(line)
if len(parts) != 7 {
return time.Time{}, fmt.Errorf("unexpected issue time format")
}
loc, err := forecastDiscussionLocation(parts[2])
if err != nil {
return time.Time{}, err
}
datePart, err := time.Parse("Mon Jan 2 2006", strings.Join(parts[3:], " "))
if err != nil {
return time.Time{}, err
}
hour, minute, err := parseForecastDiscussionClock(parts[0], parts[1])
if err != nil {
return time.Time{}, err
}
return time.Date(
datePart.Year(),
datePart.Month(),
datePart.Day(),
hour,
minute,
0,
0,
loc,
), nil
}
func parseForecastDiscussionClock(rawClock, rawAMPM string) (int, int, error) {
clock := strings.TrimSpace(rawClock)
ampm := strings.ToUpper(strings.TrimSpace(rawAMPM))
if ampm != "AM" && ampm != "PM" {
return 0, 0, fmt.Errorf("unexpected meridiem %q", rawAMPM)
}
n, err := strconv.Atoi(clock)
if err != nil {
return 0, 0, fmt.Errorf("invalid clock %q", rawClock)
}
hour := n
minute := 0
if len(clock) >= 3 {
hour = n / 100
minute = n % 100
}
if hour < 1 || hour > 12 {
return 0, 0, fmt.Errorf("invalid hour %q", rawClock)
}
if minute < 0 || minute > 59 {
return 0, 0, fmt.Errorf("invalid minute %q", rawClock)
}
if ampm == "AM" {
if hour == 12 {
hour = 0
}
return hour, minute, nil
}
if hour != 12 {
hour += 12
}
return hour, minute, nil
}
func forecastDiscussionLocation(abbrev string) (*time.Location, error) {
offsets := map[string]int{
"AST": -4 * 3600,
"ADT": -3 * 3600,
"EST": -5 * 3600,
"EDT": -4 * 3600,
"CST": -6 * 3600,
"CDT": -5 * 3600,
"MST": -7 * 3600,
"MDT": -6 * 3600,
"PST": -8 * 3600,
"PDT": -7 * 3600,
"AKST": -9 * 3600,
"AKDT": -8 * 3600,
"HST": -10 * 3600,
"UTC": 0,
"GMT": 0,
}
abbr := strings.ToUpper(strings.TrimSpace(abbrev))
offset, ok := offsets[abbr]
if !ok {
return nil, fmt.Errorf("unsupported time zone %q", abbrev)
}
return time.FixedZone(abbr, offset), nil
}
func extractForecastDiscussionSection(lines []string, section string) ([]string, bool) {
target := "." + section + "..."
for i, raw := range lines {
line := strings.TrimSpace(raw)
if !strings.HasPrefix(line, target) {
continue
}
out := []string{line}
for j := i + 1; j < len(lines); j++ {
next := strings.TrimSpace(lines[j])
if next == "&&" || next == "$$" || strings.Contains(next, "WATCHES/WARNINGS/ADVISORIES") {
break
}
if j > i+1 && isForecastDiscussionSectionHeader(next) {
break
}
out = append(out, lines[j])
}
return out, true
}
return nil, false
}
func isForecastDiscussionSectionHeader(line string) bool {
return forecastDiscussionHeaderRE.MatchString(strings.TrimSpace(line))
}
func parseForecastDiscussionKeyMessages(block []string) []string {
if len(block) <= 1 {
return nil
}
body := trimBlankLines(block[1:])
var messages []string
var current strings.Builder
flush := func() {
msg := strings.TrimSpace(current.String())
if msg != "" {
messages = append(messages, msg)
}
current.Reset()
}
for _, raw := range body {
line := strings.TrimSpace(raw)
if line == "" {
continue
}
if strings.HasPrefix(line, "-") {
flush()
line = strings.TrimSpace(strings.TrimPrefix(line, "-"))
current.WriteString(line)
continue
}
if current.Len() > 0 {
current.WriteByte(' ')
}
current.WriteString(line)
}
flush()
return messages
}
func parseForecastDiscussionTextSection(block []string) (ForecastDiscussionSection, error) {
if len(block) == 0 {
return ForecastDiscussionSection{}, fmt.Errorf("empty section")
}
section := ForecastDiscussionSection{
Qualifier: parseForecastDiscussionQualifier(strings.TrimSpace(block[0])),
}
body := trimBlankLines(block[1:])
if len(body) == 0 {
return section, nil
}
first := strings.TrimSpace(body[0])
if strings.HasPrefix(first, "Issued at ") {
issuedAt, err := parseForecastDiscussionIssueTime(first)
if err != nil {
return ForecastDiscussionSection{}, fmt.Errorf("parse section issuedAt %q: %w", first, err)
}
tt := issuedAt.UTC()
section.IssuedAt = &tt
body = trimBlankLines(body[1:])
}
body = trimForecastDiscussionSignatureLines(body)
section.Text = joinForecastDiscussionParagraphs(body)
return section, nil
}
func parseForecastDiscussionQualifier(header string) string {
m := forecastDiscussionHeaderRE.FindStringSubmatch(header)
if len(m) != 3 {
return ""
}
return strings.TrimSpace(m[2])
}
func trimBlankLines(lines []string) []string {
start := 0
for start < len(lines) && strings.TrimSpace(lines[start]) == "" {
start++
}
end := len(lines)
for end > start && strings.TrimSpace(lines[end-1]) == "" {
end--
}
return lines[start:end]
}
func trimForecastDiscussionSignatureLines(lines []string) []string {
lines = trimBlankLines(lines)
for len(lines) > 0 {
last := strings.TrimSpace(lines[len(lines)-1])
if last == "" {
lines = lines[:len(lines)-1]
continue
}
if forecastDiscussionSigRE.MatchString(last) {
lines = trimBlankLines(lines[:len(lines)-1])
continue
}
break
}
return lines
}
func joinForecastDiscussionParagraphs(lines []string) string {
lines = trimBlankLines(lines)
if len(lines) == 0 {
return ""
}
var paragraphs []string
current := make([]string, 0, len(lines))
flush := func() {
if len(current) == 0 {
return
}
paragraphs = append(paragraphs, strings.Join(current, " "))
current = current[:0]
}
for _, raw := range lines {
line := strings.TrimSpace(raw)
if line == "" {
flush()
continue
}
current = append(current, line)
}
flush()
return strings.Join(paragraphs, "\n\n")
}

View File

@@ -0,0 +1,118 @@
package nws
import (
"os"
"path/filepath"
"strings"
"testing"
"time"
)
func TestParseForecastDiscussionHTMLParsesExpectedFields(t *testing.T) {
raw := loadForecastDiscussionSampleHTML(t)
got, err := ParseForecastDiscussionHTML(raw)
if err != nil {
t.Fatalf("ParseForecastDiscussionHTML() error = %v", err)
}
if got.OfficeID != "LSX" {
t.Fatalf("OfficeID = %q, want LSX", got.OfficeID)
}
if got.OfficeName != "National Weather Service Saint Louis MO" {
t.Fatalf("OfficeName = %q", got.OfficeName)
}
if got.Product != "afd" {
t.Fatalf("Product = %q, want afd", got.Product)
}
wantIssuedAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC)
if !got.IssuedAt.Equal(wantIssuedAt) {
t.Fatalf("IssuedAt = %s, want %s", got.IssuedAt.Format(time.RFC3339), wantIssuedAt.Format(time.RFC3339))
}
wantUpdatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC)
if got.UpdatedAt == nil || !got.UpdatedAt.Equal(wantUpdatedAt) {
t.Fatalf("UpdatedAt = %v, want %s", got.UpdatedAt, wantUpdatedAt.Format(time.RFC3339))
}
wantMessages := []string{
"Elevated fire danger conditions are expected across a broad area tomorrow afternoon due to breezy southwest winds and low humidity.",
"Very warm temperatures are expected once again Monday and Tuesday, with highs well into the 80s.",
"A cold front late Tuesday or early Wednesday brings our next chance of thunderstorms, followed by a cooldown and possibly more chances for rain later in the week.",
}
if len(got.KeyMessages) != len(wantMessages) {
t.Fatalf("KeyMessages len = %d, want %d", len(got.KeyMessages), len(wantMessages))
}
for i := range wantMessages {
if got.KeyMessages[i] != wantMessages[i] {
t.Fatalf("KeyMessages[%d] = %q, want %q", i, got.KeyMessages[i], wantMessages[i])
}
}
if got.ShortTerm == nil {
t.Fatalf("ShortTerm is nil")
}
if got.ShortTerm.Qualifier != "(Through Late Sunday Night)" {
t.Fatalf("ShortTerm.Qualifier = %q", got.ShortTerm.Qualifier)
}
if got.ShortTerm.IssuedAt == nil || !got.ShortTerm.IssuedAt.Equal(time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)) {
t.Fatalf("ShortTerm.IssuedAt = %v", got.ShortTerm.IssuedAt)
}
if !strings.Contains(got.ShortTerm.Text, "After a chilly morning") {
t.Fatalf("ShortTerm.Text missing expected prose: %q", got.ShortTerm.Text)
}
if strings.Contains(got.ShortTerm.Text, "BRC") {
t.Fatalf("ShortTerm.Text should not include signature: %q", got.ShortTerm.Text)
}
if strings.Contains(got.ShortTerm.Text, "\n\n\n") {
t.Fatalf("ShortTerm.Text contains unexpected paragraph breaks: %q", got.ShortTerm.Text)
}
if got.LongTerm == nil {
t.Fatalf("LongTerm is nil")
}
if got.LongTerm.Qualifier != "(Monday through Next Saturday)" {
t.Fatalf("LongTerm.Qualifier = %q", got.LongTerm.Qualifier)
}
if got.LongTerm.IssuedAt == nil || !got.LongTerm.IssuedAt.Equal(time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)) {
t.Fatalf("LongTerm.IssuedAt = %v", got.LongTerm.IssuedAt)
}
if !strings.Contains(got.LongTerm.Text, "The peak of the warmth arrives Monday and Tuesday") {
t.Fatalf("LongTerm.Text missing expected prose: %q", got.LongTerm.Text)
}
if strings.Contains(got.LongTerm.Text, "AVIATION") || strings.Contains(got.LongTerm.Text, "WATCHES/WARNINGS/ADVISORIES") {
t.Fatalf("LongTerm.Text includes content from other sections: %q", got.LongTerm.Text)
}
}
func TestParseForecastDiscussionHTMLMissingPreBlock(t *testing.T) {
_, err := ParseForecastDiscussionHTML("<html><body><div>no pre block</div></body></html>")
if err == nil {
t.Fatalf("ParseForecastDiscussionHTML() error = nil, want error")
}
if !strings.Contains(err.Error(), "glossaryProduct") {
t.Fatalf("error = %q, want glossaryProduct context", err)
}
}
func TestParseForecastDiscussionTextMissingIssueTime(t *testing.T) {
_, err := ParseForecastDiscussionText("National Weather Service Saint Louis MO\n\n.KEY MESSAGES...\n- Test")
if err == nil {
t.Fatalf("ParseForecastDiscussionText() error = nil, want error")
}
if !strings.Contains(err.Error(), "issue time") {
t.Fatalf("error = %q, want issue time context", err)
}
}
func loadForecastDiscussionSampleHTML(t *testing.T) string {
t.Helper()
path := filepath.Join("testdata", "forecast_discussion_sample.html")
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("os.ReadFile(%q) error = %v", path, err)
}
return string(b)
}

View File

@@ -0,0 +1,84 @@
<!DOCTYPE html>
<html class="no-js">
<head>
<meta name="DC.date.created" scheme="ISO8601" content="2026-03-28T20:29:47+00:00" />
<title>National Weather Service</title>
</head>
<body>
<pre class="glossaryProduct">
988
FXUS63 KLSX 281924
AFDLSX
Area Forecast Discussion
National Weather Service Saint Louis MO
224 PM CDT Sat Mar 28 2026
.KEY MESSAGES...
- Elevated fire danger conditions are expected across a broad area
tomorrow afternoon due to breezy southwest winds and low
humidity.
- Very warm temperatures are expected once again Monday and
Tuesday, with highs well into the 80s.
- A cold front late Tuesday or early Wednesday brings our next
chance of thunderstorms, followed by a cooldown and possibly
more chances for rain later in the week.
&&
.SHORT TERM... (Through Late Sunday Night)
Issued at 219 PM CDT Sat Mar 28 2026
After a chilly morning that saw widespread freezing temperatures,
another warmup is in store over the next several days as southerly
winds become re-established. We will also see the return of
shower/thunderstorm chances Tuesday onward as we enter a more
unsettled pattern.
In the near-term, the focus continues to be on some lingering fire
weather potential thanks to the presence of an exceptionally dry
airmass.
BRC
&&
.LONG TERM... (Monday through Next Saturday)
Issued at 219 PM CDT Sat Mar 28 2026
The peak of the warmth arrives Monday and Tuesday, as a broad, but
low-amplitude ridge nudges eastward and steady warm/moist advection
continues on both days.
Wednesday onward, the day-to-day details become much less clear, but
latest trends suggest that an active/wet pattern will likely
continue as another more substantial trough follows with additional
chances for showers/thunderstorms late in the week.
BRC
&&
.AVIATION... (For the 18z TAFs through 18z Sunday Afternoon)
Issued at 1133 AM CDT Sat Mar 28 2026
VFR conditions are expected throughout the 18Z TAF period.
BRC
&&
.LSX WATCHES/WARNINGS/ADVISORIES...
MO...None.
IL...None.
&&
$$
WFO LSX
</pre>
</body>
</html>

View File

@@ -20,6 +20,8 @@ func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWri
return mapObservationEvent(e) return mapObservationEvent(e)
case standards.SchemaWeatherForecastV1: case standards.SchemaWeatherForecastV1:
return mapForecastEvent(e) return mapForecastEvent(e)
case standards.SchemaWeatherForecastDiscussionV1:
return mapForecastDiscussionEvent(e)
case standards.SchemaWeatherAlertV1: case standards.SchemaWeatherAlertV1:
return mapAlertEvent(e) return mapAlertEvent(e)
default: default:
@@ -160,6 +162,62 @@ func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
return writes, nil return writes, nil
} }
func mapForecastDiscussionEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherForecastDiscussion](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode forecast discussion payload: %w", err)
}
if run.IssuedAt.IsZero() {
return nil, fmt.Errorf("decode forecast discussion payload: issuedAt is required")
}
if strings.TrimSpace(string(run.Product)) == "" {
return nil, fmt.Errorf("decode forecast discussion payload: product is required")
}
issuedAt := run.IssuedAt.UTC()
shortTermQualifier, shortTermIssuedAt, shortTermText := nullableDiscussionSection(run.ShortTerm)
longTermQualifier, longTermIssuedAt, longTermText := nullableDiscussionSection(run.LongTerm)
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.KeyMessages))
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecastDiscussions,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"office_id": nullableString(run.OfficeID),
"office_name": nullableString(run.OfficeName),
"issued_at": issuedAt,
"updated_at": nullableTime(run.UpdatedAt),
"product": string(run.Product),
"short_term_qualifier": shortTermQualifier,
"short_term_issued_at": shortTermIssuedAt,
"short_term_text": shortTermText,
"long_term_qualifier": longTermQualifier,
"long_term_issued_at": longTermIssuedAt,
"long_term_text": longTermText,
"key_message_count": len(run.KeyMessages),
},
})
for i, msg := range run.KeyMessages {
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecastDiscussionKeyMessages,
Values: map[string]any{
"run_event_id": e.ID,
"message_index": i,
"issued_at": issuedAt,
"message_text": nullableString(msg),
},
})
}
return writes, nil
}
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherAlertRun](e.Payload) run, err := decodePayload[model.WeatherAlertRun](e.Payload)
if err != nil { if err != nil {
@@ -269,6 +327,13 @@ func decodePayload[T any](payload any) (T, error) {
return out, nil return out, nil
} }
func nullableDiscussionSection(section *model.WeatherForecastDiscussionSection) (any, any, any) {
if section == nil {
return nil, nil, nil
}
return nullableString(section.Qualifier), nullableTime(section.IssuedAt), nullableString(section.Text)
}
func countAlertReferences(alerts []model.WeatherAlert) int { func countAlertReferences(alerts []model.WeatherAlert) int {
total := 0 total := 0
for _, a := range alerts { for _, a := range alerts {

View File

@@ -146,6 +146,50 @@ func TestMapPostgresEventAlertStructPayload(t *testing.T) {
assertAllWritesIncludeAllColumns(t, writes) assertAllWritesIncludeAllColumns(t, writes)
} }
func TestMapPostgresEventForecastDiscussionStructPayload(t *testing.T) {
updatedAt := time.Date(2026, 3, 28, 20, 29, 47, 0, time.UTC)
shortIssuedAt := time.Date(2026, 3, 28, 19, 19, 0, 0, time.UTC)
run := model.WeatherForecastDiscussion{
OfficeID: "LSX",
OfficeName: "National Weather Service Saint Louis MO",
Product: model.ForecastDiscussionProductAFD,
IssuedAt: time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC),
UpdatedAt: &updatedAt,
KeyMessages: []string{"msg one", "msg two"},
ShortTerm: &model.WeatherForecastDiscussionSection{Qualifier: "(Tonight)", IssuedAt: &shortIssuedAt, Text: "Short term text"},
LongTerm: &model.WeatherForecastDiscussionSection{Text: "Long term text"},
}
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", run))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 3 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
}
if writes[0].Table != tableForecastDiscussions {
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecastDiscussions)
}
if got := writes[0].Values["key_message_count"]; got != 2 {
t.Fatalf("forecast_discussions key_message_count = %#v, want 2", got)
}
if got := writes[0].Values["short_term_qualifier"]; got != "(Tonight)" {
t.Fatalf("forecast_discussions short_term_qualifier = %#v, want (Tonight)", got)
}
if got := writes[0].Values["long_term_issued_at"]; got != nil {
t.Fatalf("forecast_discussions long_term_issued_at = %#v, want nil", got)
}
if writes[1].Table != tableForecastDiscussionKeyMessages || writes[2].Table != tableForecastDiscussionKeyMessages {
t.Fatalf("forecast discussion key message writes not in expected order")
}
if got := writes[2].Values["message_index"]; got != 1 {
t.Fatalf("second key message index = %#v, want 1", got)
}
assertAllWritesIncludeAllColumns(t, writes)
}
func TestMapPostgresEventMapPayload(t *testing.T) { func TestMapPostgresEventMapPayload(t *testing.T) {
run := model.WeatherForecastRun{ run := model.WeatherForecastRun{
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
@@ -201,6 +245,16 @@ func TestMapPostgresEventMalformedPayload(t *testing.T) {
} }
} }
func TestMapPostgresEventForecastDiscussionMalformedPayload(t *testing.T) {
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastDiscussionV1, "forecast_discussion", "bad"))
if err == nil {
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
}
if !strings.Contains(err.Error(), "decode forecast discussion payload") {
t.Fatalf("error = %q, want decode forecast discussion payload context", err)
}
}
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event { func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC) effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
return fkevent.Event{ return fkevent.Event{
@@ -238,7 +292,7 @@ func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWri
} }
func tableColumnCounts() map[string]int { func tableColumnCounts() map[string]int {
s := weatherPostgresSchema() s := PostgresSchema()
m := make(map[string]int, len(s.Tables)) m := make(map[string]int, len(s.Tables))
for _, tbl := range s.Tables { for _, tbl := range s.Tables {
m[tbl.Name] = len(tbl.Columns) m[tbl.Name] = len(tbl.Columns)

View File

@@ -1,10 +1,6 @@
package postgres package postgres
import ( import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
) )
@@ -13,36 +9,15 @@ const (
tableObservationPresentWeather = "observation_present_weather" tableObservationPresentWeather = "observation_present_weather"
tableForecasts = "forecasts" tableForecasts = "forecasts"
tableForecastPeriods = "forecast_periods" tableForecastPeriods = "forecast_periods"
tableForecastDiscussions = "forecast_discussions"
tableForecastDiscussionKeyMessages = "forecast_discussion_key_messages"
tableAlertRuns = "alert_runs" tableAlertRuns = "alert_runs"
tableAlerts = "alerts" tableAlerts = "alerts"
tableAlertReferences = "alert_references" tableAlertReferences = "alert_references"
) )
// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each // PostgresSchema returns weatherfeeder's Postgres schema definition.
// configured sink using driver=postgres. func PostgresSchema() fksinks.PostgresSchema {
func RegisterPostgresSchemas(cfg *config.Config) error {
if cfg == nil {
return fmt.Errorf("register postgres schemas: config is nil")
}
schema := weatherPostgresSchema()
for i, sk := range cfg.Sinks {
if !isPostgresDriver(sk.Driver) {
continue
}
if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil {
return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err)
}
}
return nil
}
func isPostgresDriver(driver string) bool {
return strings.EqualFold(strings.TrimSpace(driver), "postgres")
}
func weatherPostgresSchema() fksinks.PostgresSchema {
return fksinks.PostgresSchema{ return fksinks.PostgresSchema{
Tables: []fksinks.PostgresTable{ Tables: []fksinks.PostgresTable{
{ {
@@ -156,6 +131,49 @@ func weatherPostgresSchema() fksinks.PostgresSchema {
{Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}}, {Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}},
}, },
}, },
{
Name: tableForecastDiscussions,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT", Nullable: false},
{Name: "event_kind", Type: "TEXT", Nullable: false},
{Name: "event_source", Type: "TEXT", Nullable: false},
{Name: "event_schema", Type: "TEXT", Nullable: false},
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "office_id", Type: "TEXT", Nullable: true},
{Name: "office_name", Type: "TEXT", Nullable: true},
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "product", Type: "TEXT", Nullable: false},
{Name: "short_term_qualifier", Type: "TEXT", Nullable: true},
{Name: "short_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "short_term_text", Type: "TEXT", Nullable: true},
{Name: "long_term_qualifier", Type: "TEXT", Nullable: true},
{Name: "long_term_issued_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "long_term_text", Type: "TEXT", Nullable: true},
{Name: "key_message_count", Type: "INTEGER", Nullable: false},
},
PrimaryKey: []string{"event_id"},
PruneColumn: "issued_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_discussion_office_product_issued_at", Columns: []string{"office_id", "product", "issued_at"}},
{Name: "idx_wf_discussion_issued_at", Columns: []string{"issued_at"}},
},
},
{
Name: tableForecastDiscussionKeyMessages,
Columns: []fksinks.PostgresColumn{
{Name: "run_event_id", Type: "TEXT REFERENCES forecast_discussions(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "message_index", Type: "INTEGER", Nullable: false},
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "message_text", Type: "TEXT", Nullable: true},
},
PrimaryKey: []string{"run_event_id", "message_index"},
PruneColumn: "issued_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_discussion_message_issued_at", Columns: []string{"issued_at"}},
},
},
{ {
Name: tableAlertRuns, Name: tableAlertRuns,
Columns: []fksinks.PostgresColumn{ Columns: []fksinks.PostgresColumn{

View File

@@ -1,62 +1,11 @@
package postgres package postgres
import ( import "testing"
"fmt"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
err := RegisterPostgresSchemas(nil)
if err == nil {
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
}
if !strings.Contains(err.Error(), "config is nil") {
t.Fatalf("error = %q, want config is nil", err)
}
}
func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
cfg := &config.Config{
Sinks: []config.SinkConfig{
{Name: "stdout_only", Driver: "stdout"},
{Name: "nats_only", Driver: "nats"},
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
}
}
func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
sinkName := uniqueSinkName("pg_test")
cfg := &config.Config{
Sinks: []config.SinkConfig{
{Name: sinkName, Driver: "postgres"},
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
}
err := RegisterPostgresSchemas(cfg)
if err == nil {
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
}
if !strings.Contains(err.Error(), "already registered") {
t.Fatalf("error = %q, want already registered", err)
}
}
func TestWeatherPostgresSchemaShape(t *testing.T) { func TestWeatherPostgresSchemaShape(t *testing.T) {
s := weatherPostgresSchema() s := PostgresSchema()
if s.MapEvent == nil { if s.MapEvent == nil {
t.Fatalf("weatherPostgresSchema().MapEvent is nil") t.Fatalf("PostgresSchema().MapEvent is nil")
} }
wantTables := map[string]bool{ wantTables := map[string]bool{
@@ -64,13 +13,15 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
tableObservationPresentWeather: true, tableObservationPresentWeather: true,
tableForecasts: true, tableForecasts: true,
tableForecastPeriods: true, tableForecastPeriods: true,
tableForecastDiscussions: true,
tableForecastDiscussionKeyMessages: true,
tableAlertRuns: true, tableAlertRuns: true,
tableAlerts: true, tableAlerts: true,
tableAlertReferences: true, tableAlertReferences: true,
} }
if len(s.Tables) != len(wantTables) { if len(s.Tables) != len(wantTables) {
t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables)) t.Fatalf("PostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
} }
seenIndexes := map[string]bool{} seenIndexes := map[string]bool{}
@@ -89,7 +40,3 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
} }
} }
} }
func uniqueSinkName(prefix string) string {
return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano())
}

View File

@@ -9,33 +9,33 @@ import (
fksource "gitea.maximumdirect.net/ejr/feedkit/sources" fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
) )
type pollDriverRegistration struct {
driver string
factory func(config.SourceConfig) (fksource.PollSource, error)
}
var pollDriverRegistrations = []pollDriverRegistration{
{driver: "nws_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewObservationSource(cfg) }},
{driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }},
{driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }},
{driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }},
{driver: "nws_forecast_discussion", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewForecastDiscussionSource(cfg)
}},
{driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }},
{driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }},
{driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
}},
}
// RegisterBuiltins registers the source drivers that ship with this binary. // RegisterBuiltins registers the source drivers that ship with this binary.
// Keeping this in one place makes main.go very readable. // Keeping this in one place makes main.go very readable.
func RegisterBuiltins(r *fksource.Registry) { func RegisterBuiltins(r *fksource.Registry) {
// NWS drivers for _, reg := range pollDriverRegistrations {
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) { reg := reg
return nws.NewObservationSource(cfg) r.RegisterPoll(reg.driver, func(cfg config.SourceConfig) (fksource.PollSource, error) {
}) return reg.factory(cfg)
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewAlertsSource(cfg)
})
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewHourlyForecastSource(cfg)
})
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewNarrativeForecastSource(cfg)
})
// Open-Meteo drivers
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewObservationSource(cfg)
})
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewForecastSource(cfg)
})
// OpenWeatherMap drivers
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
}) })
}
} }

View File

@@ -34,6 +34,19 @@ func TestRegisterBuiltinsRegistersNWSNarrativeForecastDriver(t *testing.T) {
} }
} }
func TestRegisterBuiltinsRegistersNWSForecastDiscussionDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_discussion"))
if err != nil {
t.Fatalf("BuildInput(nws_forecast_discussion) error = %v", err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(nws_forecast_discussion) type = %T, want PollSource", in)
}
}
func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) { func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
reg := fksource.NewRegistry() reg := fksource.NewRegistry()
RegisterBuiltins(reg) RegisterBuiltins(reg)
@@ -47,13 +60,43 @@ func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
} }
} }
func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
drivers := []string{
"nws_observation",
"nws_alerts",
"nws_forecast_hourly",
"nws_forecast_narrative",
"nws_forecast_discussion",
"openmeteo_observation",
"openmeteo_forecast",
"openweather_observation",
}
for _, driver := range drivers {
in, err := reg.BuildInput(sourceConfigForDriver(driver))
if err != nil {
t.Fatalf("BuildInput(%s) error = %v", driver, err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(%s) type = %T, want PollSource", driver, in)
}
}
}
func sourceConfigForDriver(driver string) config.SourceConfig { func sourceConfigForDriver(driver string) config.SourceConfig {
url := "https://example.invalid"
if driver == "openweather_observation" {
url = "https://example.invalid?units=metric"
}
return config.SourceConfig{ return config.SourceConfig{
Name: "test-source", Name: "test-source",
Driver: driver, Driver: driver,
Mode: config.SourceModePoll, Mode: config.SourceModePoll,
Params: map[string]any{ Params: map[string]any{
"url": "https://example.invalid", "url": url,
"user_agent": "test-agent", "user_agent": "test-agent",
}, },
} }

View File

@@ -1,54 +0,0 @@
// FILE: ./internal/sources/common/event.go
package common
import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// SingleRawEvent constructs, validates, and returns a slice containing exactly one event.
//
// This removes repetitive "event envelope ceremony" from individual sources.
// Sources remain responsible for:
// - fetching bytes (raw payload)
// - choosing Schema (raw schema identifier)
// - computing Event.ID and (optional) EffectiveAt
//
// emittedAt is explicit so callers can compute IDs using the same timestamp (or
// so tests can provide a stable value).
func SingleRawEvent(
kind event.Kind,
sourceName string,
schema string,
id string,
emittedAt time.Time,
effectiveAt *time.Time,
payload any,
) ([]event.Event, error) {
if emittedAt.IsZero() {
emittedAt = time.Now().UTC()
} else {
emittedAt = emittedAt.UTC()
}
e := event.Event{
ID: id,
Kind: kind,
Source: sourceName,
EmittedAt: emittedAt,
EffectiveAt: effectiveAt,
// RAW schema (normalizer matches on this).
Schema: schema,
// Raw payload (usually json.RawMessage). Normalizer will decode and map to canonical model.
Payload: payload,
}
if err := e.Validate(); err != nil {
return nil, err
}
return []event.Event{e}, nil
}

View File

@@ -1,39 +0,0 @@
// FILE: ./internal/sources/common/id.go
package common
import (
"fmt"
"strings"
"time"
)
// ChooseEventID applies weatherfeeder's opinionated Event.ID policy:
//
// - If upstream provides an ID, use it (trimmed).
// - Otherwise, ID is "<Source>:<EffectiveAt>" when available.
// - If EffectiveAt is unavailable, fall back to "<Source>:<EmittedAt>".
//
// Timestamps are encoded as RFC3339Nano in UTC.
func ChooseEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string {
if id := strings.TrimSpace(upstreamID); id != "" {
return id
}
src := strings.TrimSpace(sourceName)
if src == "" {
src = "UNKNOWN_SOURCE"
}
// Prefer EffectiveAt for dedupe friendliness.
if effectiveAt != nil && !effectiveAt.IsZero() {
return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano))
}
// Fall back to EmittedAt (still stable within a poll invocation).
t := emittedAt.UTC()
if t.IsZero() {
t = time.Now().UTC()
}
return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano))
}

View File

@@ -11,7 +11,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -40,8 +39,8 @@ func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
func (s *AlertsSource) Name() string { return s.http.Name } func (s *AlertsSource) Name() string { return s.http.Name }
// Kind is used for routing/policy. // Kinds is used for routing/policy.
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } func (s *AlertsSource) Kinds() []event.Kind { return []event.Kind{event.Kind("alert")} }
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -69,10 +68,10 @@ func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
// NWS alerts collections do not provide a stable per-snapshot ID. // NWS alerts collections do not provide a stable per-snapshot ID.
// Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness. // Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness.
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("alert"),
s.http.Name, s.http.Name,
standards.SchemaRawNWSAlertsV1, standards.SchemaRawNWSAlertsV1,
eventID, eventID,

View File

@@ -0,0 +1,114 @@
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
)
const nwsForecastAccept = "application/geo+json, application/json"
type forecastSource struct {
http *fksources.HTTPSource
rawSchema string
}
type forecastMeta struct {
Properties struct {
GeneratedAt string `json:"generatedAt"`
UpdateTime string `json:"updateTime"`
Updated string `json:"updated"`
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func newForecastSource(cfg config.SourceConfig, driver, rawSchema string) (*forecastSource, error) {
hs, err := fksources.NewHTTPSource(driver, cfg, nwsForecastAccept)
if err != nil {
return nil, err
}
return &forecastSource{
http: hs,
rawSchema: rawSchema,
}, nil
}
func (s *forecastSource) Name() string { return s.http.Name }
func (s *forecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} }
func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
event.Kind("forecast"),
s.http.Name,
s.rawSchema,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
func (s *forecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, forecastMeta{}, false, err
}
if !changed {
return nil, forecastMeta{}, false, nil
}
var meta forecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
return raw, forecastMeta{}, true, nil
}
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
}

View File

@@ -0,0 +1,68 @@
package nws
import (
"context"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastDiscussionSource polls an NWS forecast discussion HTML page and emits a RAW discussion Event.
//
// Output schema:
// - standards.SchemaRawNWSForecastDiscussionV1
type ForecastDiscussionSource struct {
http *fksources.HTTPSource
}
func NewForecastDiscussionSource(cfg config.SourceConfig) (*ForecastDiscussionSource, error) {
const driver = "nws_forecast_discussion"
hs, err := fksources.NewHTTPSource(driver, cfg, "text/html, application/xhtml+xml")
if err != nil {
return nil, err
}
return &ForecastDiscussionSource{http: hs}, nil
}
func (s *ForecastDiscussionSource) Name() string { return s.http.Name }
func (s *ForecastDiscussionSource) Kinds() []event.Kind {
return []event.Kind{event.Kind("forecast_discussion")}
}
func (s *ForecastDiscussionSource) Poll(ctx context.Context) ([]event.Event, error) {
body, changed, err := s.http.FetchBytesIfChanged(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
rawHTML := string(body)
parsed, err := nwscommon.ParseForecastDiscussionHTML(rawHTML)
if err != nil {
return nil, err
}
issuedAt := parsed.IssuedAt.UTC()
effectiveAt := &issuedAt
emittedAt := time.Now().UTC()
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
event.Kind("forecast_discussion"),
s.http.Name,
standards.SchemaRawNWSForecastDiscussionV1,
eventID,
emittedAt,
effectiveAt,
rawHTML,
)
}

View File

@@ -0,0 +1,138 @@
package nws
import (
"context"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestForecastDiscussionSourcePollEmitsExpectedEvent(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_, _ = w.Write([]byte(rawHTML))
}))
defer srv.Close()
src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL))
if err != nil {
t.Fatalf("NewForecastDiscussionSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast_discussion") {
t.Fatalf("Kinds() = %#v, want [forecast_discussion]", got)
}
events, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(events) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(events))
}
got := events[0]
if got.Kind != event.Kind("forecast_discussion") {
t.Fatalf("Kind = %q, want forecast_discussion", got.Kind)
}
if got.Schema != standards.SchemaRawNWSForecastDiscussionV1 {
t.Fatalf("Schema = %q, want %q", got.Schema, standards.SchemaRawNWSForecastDiscussionV1)
}
wantEffectiveAt := time.Date(2026, 3, 28, 19, 24, 0, 0, time.UTC)
if got.EffectiveAt == nil || !got.EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("EffectiveAt = %v, want %s", got.EffectiveAt, wantEffectiveAt.Format(time.RFC3339))
}
payload, ok := got.Payload.(string)
if !ok {
t.Fatalf("Payload type = %T, want string", got.Payload)
}
if payload != rawHTML {
t.Fatalf("Payload did not preserve exact HTML")
}
}
func TestForecastDiscussionSourcePollReturnsNoEventsWhenUnchanged(t *testing.T) {
rawHTML := loadForecastDiscussionSampleHTML(t)
const etag = `"discussion-v1"`
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
}
w.Header().Set("ETag", etag)
_, _ = w.Write([]byte(rawHTML))
}))
defer srv.Close()
src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL))
if err != nil {
t.Fatalf("NewForecastDiscussionSource() error = %v", err)
}
first, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("first Poll() error = %v", err)
}
if len(first) != 1 {
t.Fatalf("first Poll() len = %d, want 1", len(first))
}
second, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("second Poll() error = %v", err)
}
if len(second) != 0 {
t.Fatalf("second Poll() len = %d, want 0", len(second))
}
}
func TestForecastDiscussionSourcePollRejectsInvalidHTML(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("<html><body><div>missing discussion block</div></body></html>"))
}))
defer srv.Close()
src, err := NewForecastDiscussionSource(forecastDiscussionSourceConfig(srv.URL))
if err != nil {
t.Fatalf("NewForecastDiscussionSource() error = %v", err)
}
_, err = src.Poll(context.Background())
if err == nil {
t.Fatalf("Poll() error = nil, want error")
}
if !strings.Contains(err.Error(), "glossaryProduct") {
t.Fatalf("error = %q, want glossaryProduct context", err)
}
}
func forecastDiscussionSourceConfig(url string) config.SourceConfig {
return config.SourceConfig{
Name: "test-forecast-discussion-source",
Driver: "nws_forecast_discussion",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": url,
"user_agent": "test-agent",
},
}
}
func loadForecastDiscussionSampleHTML(t *testing.T) string {
t.Helper()
path := filepath.Join("..", "..", "providers", "nws", "testdata", "forecast_discussion_sample.html")
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("os.ReadFile(%q) error = %v", path, err)
}
return string(b)
}

View File

@@ -2,16 +2,7 @@
package nws package nws
import ( import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -23,114 +14,15 @@ import (
// Output schema (current implementation): // Output schema (current implementation):
// - standards.SchemaRawNWSHourlyForecastV1 // - standards.SchemaRawNWSHourlyForecastV1
type HourlyForecastSource struct { type HourlyForecastSource struct {
http *fksources.HTTPSource *forecastSource
} }
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) { func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
const driver = "nws_forecast_hourly" const driver = "nws_forecast_hourly"
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSHourlyForecastV1)
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &HourlyForecastSource{http: hs}, nil return &HourlyForecastSource{forecastSource: src}, nil
}
func (s *HourlyForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSHourlyForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type hourlyForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, hourlyForecastMeta{}, false, err
}
if !changed {
return nil, hourlyForecastMeta{}, false, nil
}
var meta hourlyForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, hourlyForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
} }

View File

@@ -2,16 +2,7 @@
package nws package nws
import ( import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -23,114 +14,15 @@ import (
// Output schema: // Output schema:
// - standards.SchemaRawNWSNarrativeForecastV1 // - standards.SchemaRawNWSNarrativeForecastV1
type NarrativeForecastSource struct { type NarrativeForecastSource struct {
http *fksources.HTTPSource *forecastSource
} }
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) { func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
const driver = "nws_forecast_narrative" const driver = "nws_forecast_narrative"
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSNarrativeForecastV1)
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &NarrativeForecastSource{http: hs}, nil return &NarrativeForecastSource{forecastSource: src}, nil
}
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSNarrativeForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type narrativeForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, narrativeForecastMeta{}, false, err
}
if !changed {
return nil, narrativeForecastMeta{}, false, nil
}
var meta narrativeForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, narrativeForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
} }

View File

@@ -0,0 +1,189 @@
package nws
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type forecastPoller interface {
Poll(ctx context.Context) ([]event.Event, error)
}
func TestForecastSourcesEmitExpectedSchemaAndPreferGeneratedAt(t *testing.T) {
tests := []struct {
name string
driver string
wantSchema string
newSource func(config.SourceConfig) (forecastPoller, error)
}{
{
name: "hourly",
driver: "nws_forecast_hourly",
wantSchema: standards.SchemaRawNWSHourlyForecastV1,
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
return NewHourlyForecastSource(cfg)
},
},
{
name: "narrative",
driver: "nws_forecast_narrative",
wantSchema: standards.SchemaRawNWSNarrativeForecastV1,
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
return NewNarrativeForecastSource(cfg)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"properties":{"generatedAt":"2026-03-28T12:00:00Z","updateTime":"2026-03-28T11:00:00Z"}}`))
}))
defer srv.Close()
src, err := tt.newSource(forecastSourceConfig(tt.driver, srv.URL))
if err != nil {
t.Fatalf("newSource() error = %v", err)
}
if ks, ok := src.(interface{ Kinds() []event.Kind }); !ok {
t.Fatalf("source does not implement Kinds()")
} else if gotKinds := ks.Kinds(); len(gotKinds) != 1 || gotKinds[0] != event.Kind("forecast") {
t.Fatalf("Kinds() = %#v, want [forecast]", gotKinds)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if got[0].Schema != tt.wantSchema {
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, tt.wantSchema)
}
if got[0].Kind != event.Kind("forecast") {
t.Fatalf("Poll() kind = %q, want forecast", got[0].Kind)
}
wantEffectiveAt := time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC)
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, wantEffectiveAt)
}
})
}
}
func TestForecastSourcePollEffectiveAtFallbackOrder(t *testing.T) {
tests := []struct {
name string
body string
wantEffectiveAt *time.Time
}{
{
name: "updateTime fallback",
body: `{"properties":{"updateTime":"2026-03-28T11:00:00Z"}}`,
wantEffectiveAt: func() *time.Time {
t := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC)
return &t
}(),
},
{
name: "updated fallback",
body: `{"properties":{"updated":"2026-03-28T10:00:00Z"}}`,
wantEffectiveAt: func() *time.Time {
t := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC)
return &t
}(),
},
{
name: "omitted when metadata lacks timestamps",
body: `{"properties":{}}`,
wantEffectiveAt: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(tt.body))
}))
defer srv.Close()
src, err := NewHourlyForecastSource(forecastSourceConfig("nws_forecast_hourly", srv.URL))
if err != nil {
t.Fatalf("NewHourlyForecastSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if tt.wantEffectiveAt == nil {
if got[0].EffectiveAt != nil {
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
}
return
}
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(*tt.wantEffectiveAt) {
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, *tt.wantEffectiveAt)
}
})
}
}
func TestForecastSourcePollMetadataDecodeFailureStillEmitsRawEvent(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`not-json`))
}))
defer srv.Close()
src, err := NewNarrativeForecastSource(forecastSourceConfig("nws_forecast_narrative", srv.URL))
if err != nil {
t.Fatalf("NewNarrativeForecastSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if got[0].EffectiveAt != nil {
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
}
if got[0].Schema != standards.SchemaRawNWSNarrativeForecastV1 {
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, standards.SchemaRawNWSNarrativeForecastV1)
}
raw, ok := got[0].Payload.(json.RawMessage)
if !ok {
t.Fatalf("Poll() payload type = %T, want json.RawMessage", got[0].Payload)
}
if string(raw) != "not-json" {
t.Fatalf("Poll() payload = %q, want %q", string(raw), "not-json")
}
}
func forecastSourceConfig(driver, url string) config.SourceConfig {
return config.SourceConfig{
Name: "test-forecast-source",
Driver: driver,
Mode: config.SourceModePoll,
Params: map[string]any{
"url": url,
"user_agent": "test-agent",
},
}
}

View File

@@ -11,7 +11,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -33,7 +32,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -52,10 +51,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("observation"),
s.http.Name, s.http.Name,
standards.SchemaRawNWSObservationV1, standards.SchemaRawNWSObservationV1,
eventID, eventID,

View File

@@ -41,6 +41,9 @@ func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("NewObservationSource() error = %v", err) t.Fatalf("NewObservationSource() error = %v", err)
} }
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
t.Fatalf("Kinds() = %#v, want [observation]", got)
}
first, err := src.Poll(context.Background()) first, err := src.Poll(context.Background())
if err != nil { if err != nil {

View File

@@ -10,7 +10,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -32,7 +31,7 @@ func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
func (s *ForecastSource) Name() string { return s.http.Name } func (s *ForecastSource) Name() string { return s.http.Name }
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *ForecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} }
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -53,10 +52,10 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("forecast"),
s.http.Name, s.http.Name,
standards.SchemaRawOpenMeteoHourlyForecastV1, standards.SchemaRawOpenMeteoHourlyForecastV1,
eventID, eventID,

View File

@@ -10,7 +10,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -32,7 +31,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -50,10 +49,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("observation"),
s.http.Name, s.http.Name,
standards.SchemaRawOpenMeteoCurrentV1, standards.SchemaRawOpenMeteoCurrentV1,
eventID, eventID,

View File

@@ -0,0 +1,44 @@
package openmeteo
import (
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestObservationSourceAdvertisesKinds(t *testing.T) {
src, err := NewObservationSource(config.SourceConfig{
Name: "openmeteo-observation-test",
Driver: "openmeteo_observation",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewObservationSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
t.Fatalf("Kinds() = %#v, want [observation]", got)
}
}
func TestForecastSourceAdvertisesKinds(t *testing.T) {
src, err := NewForecastSource(config.SourceConfig{
Name: "openmeteo-forecast-test",
Driver: "openmeteo_forecast",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewForecastSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast") {
t.Fatalf("Kinds() = %#v, want [forecast]", got)
}
}

View File

@@ -11,7 +11,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather" owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -36,7 +35,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
if err := owcommon.RequireMetricUnits(s.http.URL); err != nil { if err := owcommon.RequireMetricUnits(s.http.URL); err != nil {
@@ -58,10 +57,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("observation"),
s.http.Name, s.http.Name,
standards.SchemaRawOpenWeatherCurrentV1, standards.SchemaRawOpenWeatherCurrentV1,
eventID, eventID,

View File

@@ -0,0 +1,26 @@
package openweather
import (
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestObservationSourceAdvertisesKinds(t *testing.T) {
src, err := NewObservationSource(config.SourceConfig{
Name: "openweather-observation-test",
Driver: "openweather_observation",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid?units=metric",
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewObservationSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
t.Fatalf("Kinds() = %#v, want [observation]", got)
}
}

View File

@@ -0,0 +1,40 @@
package model
import "time"
// ForecastDiscussionProduct distinguishes the discussion bulletin family.
//
// Today weatherfeeder only normalizes Area Forecast Discussion (AFD) products,
// but this remains a distinct type so additional discussion-like products can be
// added without changing the payload field type.
type ForecastDiscussionProduct string
const (
ForecastDiscussionProductAFD ForecastDiscussionProduct = "afd"
)
// WeatherForecastDiscussion is a canonical issued discussion bulletin for an NWS office.
//
// Unlike WeatherForecastRun, this is authored narrative text rather than a sequence
// of forecast periods.
type WeatherForecastDiscussion struct {
OfficeID string `json:"officeId,omitempty"`
OfficeName string `json:"officeName,omitempty"`
Product ForecastDiscussionProduct `json:"product"`
IssuedAt time.Time `json:"issuedAt"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
KeyMessages []string `json:"keyMessages,omitempty"`
ShortTerm *WeatherForecastDiscussionSection `json:"shortTerm,omitempty"`
LongTerm *WeatherForecastDiscussionSection `json:"longTerm,omitempty"`
}
// WeatherForecastDiscussionSection is a fixed prose section within a discussion bulletin.
type WeatherForecastDiscussionSection struct {
Qualifier string `json:"qualifier,omitempty"`
IssuedAt *time.Time `json:"issuedAt,omitempty"`
Text string `json:"text,omitempty"`
}

View File

@@ -17,6 +17,7 @@ const (
SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1" SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1"
SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1" SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1"
SchemaRawNWSForecastDiscussionV1 = "raw.nws.forecast_discussion.v1"
SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1" SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1"
SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1" SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1"
@@ -25,5 +26,6 @@ const (
// Canonical domain schemas (emitted after normalization). // Canonical domain schemas (emitted after normalization).
SchemaWeatherObservationV1 = "weather.observation.v1" SchemaWeatherObservationV1 = "weather.observation.v1"
SchemaWeatherForecastV1 = "weather.forecast.v1" SchemaWeatherForecastV1 = "weather.forecast.v1"
SchemaWeatherForecastDiscussionV1 = "weather.forecast_discussion.v1"
SchemaWeatherAlertV1 = "weather.alert.v1" SchemaWeatherAlertV1 = "weather.alert.v1"
) )