Compare commits
5 Commits
129cebd94d
...
v0.8.0
| Author | SHA1 | Date | |
|---|---|---|---|
| eb27486466 | |||
| de5add59fd | |||
| 356c3be648 | |||
| dbaebbbd7a | |||
| 88d5727a84 |
6
API.md
6
API.md
@@ -143,11 +143,7 @@ A `WeatherForecastPeriod` is valid for `[startTime, endTime)`.
|
|||||||
| `name` | string | no | Human label (often empty for hourly) |
|
| `name` | string | no | Human label (often empty for hourly) |
|
||||||
| `isDay` | bool | no | Day/night hint |
|
| `isDay` | bool | no | Day/night hint |
|
||||||
| `conditionCode` | int | yes | WMO code (`-1` for unknown) |
|
| `conditionCode` | int | yes | WMO code (`-1` for unknown) |
|
||||||
| `conditionText` | string | no | Canonical short text |
|
|
||||||
| `providerRawDescription` | string | no | Provider-specific “evidence” text |
|
|
||||||
| `textDescription` | string | no | Human-facing short phrase |
|
| `textDescription` | string | no | Human-facing short phrase |
|
||||||
| `detailedText` | string | no | Longer narrative |
|
|
||||||
| `iconUrl` | string | no | Legacy/transitional |
|
|
||||||
| `temperatureC` | number | no | °C |
|
| `temperatureC` | number | no | °C |
|
||||||
| `temperatureCMin` | number | no | °C (aggregated products) |
|
| `temperatureCMin` | number | no | °C (aggregated products) |
|
||||||
| `temperatureCMax` | number | no | °C (aggregated products) |
|
| `temperatureCMax` | number | no | °C (aggregated products) |
|
||||||
@@ -269,7 +265,7 @@ A run may contain zero, one, or many alerts.
|
|||||||
"startTime": "2026-01-17T14:00:00Z",
|
"startTime": "2026-01-17T14:00:00Z",
|
||||||
"endTime": "2026-01-17T15:00:00Z",
|
"endTime": "2026-01-17T15:00:00Z",
|
||||||
"conditionCode": 2,
|
"conditionCode": 2,
|
||||||
"conditionText": "Partly Cloudy",
|
"textDescription": "Partly Cloudy",
|
||||||
"temperatureC": 3.5,
|
"temperatureC": 3.5,
|
||||||
"probabilityOfPrecipitationPercent": 10
|
"probabilityOfPrecipitationPercent": 10
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ For the complete wire contract (event envelope + payload schemas, fields, units,
|
|||||||
|
|
||||||
## Upstream providers (current MVP)
|
## Upstream providers (current MVP)
|
||||||
|
|
||||||
- NWS: observations, hourly forecasts, alerts
|
- NWS: observations, hourly forecasts, narrative forecasts, alerts
|
||||||
- Open-Meteo: observations, hourly forecasts
|
- Open-Meteo: observations, hourly forecasts
|
||||||
- OpenWeather: observations
|
- OpenWeather: observations
|
||||||
|
|
||||||
|
|||||||
@@ -48,12 +48,21 @@ sources:
|
|||||||
- name: NWSHourlyForecastSTL
|
- name: NWSHourlyForecastSTL
|
||||||
mode: poll
|
mode: poll
|
||||||
kinds: ["forecast"]
|
kinds: ["forecast"]
|
||||||
driver: nws_forecast
|
driver: nws_forecast_hourly
|
||||||
every: 45m
|
every: 45m
|
||||||
params:
|
params:
|
||||||
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly"
|
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly"
|
||||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
|
- name: NWSNarrativeForecastSTL
|
||||||
|
mode: poll
|
||||||
|
kinds: ["forecast"]
|
||||||
|
driver: nws_forecast_narrative
|
||||||
|
every: 45m
|
||||||
|
params:
|
||||||
|
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us"
|
||||||
|
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
- name: OpenMeteoHourlyForecastSTL
|
- name: OpenMeteoHourlyForecastSTL
|
||||||
mode: poll
|
mode: poll
|
||||||
kinds: ["forecast"]
|
kinds: ["forecast"]
|
||||||
@@ -83,6 +92,15 @@ sinks:
|
|||||||
url: nats://nats:4222
|
url: nats://nats:4222
|
||||||
exchange: weatherfeeder
|
exchange: weatherfeeder
|
||||||
|
|
||||||
|
# - name: pg_weatherfeeder
|
||||||
|
# driver: postgres
|
||||||
|
# params:
|
||||||
|
# uri: postgres://weatherdb:5432/weatherdb?sslmode=disable
|
||||||
|
# username: weatherdb
|
||||||
|
# password: weatherdb
|
||||||
|
# prune: 3d
|
||||||
|
# # Prunes rows older than now-3d on each write transaction.
|
||||||
|
|
||||||
# - name: logfile
|
# - name: logfile
|
||||||
# driver: file
|
# driver: file
|
||||||
# params:
|
# params:
|
||||||
@@ -95,5 +113,8 @@ routes:
|
|||||||
- sink: nats_weatherfeeder
|
- sink: nats_weatherfeeder
|
||||||
kinds: ["observation", "forecast", "alert"]
|
kinds: ["observation", "forecast", "alert"]
|
||||||
|
|
||||||
|
# - sink: pg_weatherfeeder
|
||||||
|
# kinds: ["observation", "forecast", "alert"]
|
||||||
|
|
||||||
# - sink: logfile
|
# - sink: logfile
|
||||||
# kinds: ["observation", "alert", "forecast"]
|
# kinds: ["observation", "alert", "forecast"]
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
|
require gitea.maximumdirect.net/ejr/feedkit v0.8.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
|
gitea.maximumdirect.net/ejr/feedkit v0.8.0 h1:JdEEy6T3AQ97alLNYcQ3crN3tOEZPLMBD0Qr/MH5/dw=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
gitea.maximumdirect.net/ejr/feedkit v0.8.0/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
|||||||
@@ -17,9 +17,10 @@ import (
|
|||||||
// ForecastNormalizer converts:
|
// ForecastNormalizer converts:
|
||||||
//
|
//
|
||||||
// standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1
|
// standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1
|
||||||
|
// standards.SchemaRawNWSNarrativeForecastV1 -> standards.SchemaWeatherForecastV1
|
||||||
//
|
//
|
||||||
// It interprets NWS GeoJSON gridpoint *hourly* forecast responses and maps them into
|
// It keeps one NWS forecast normalization entrypoint and dispatches to product-specific
|
||||||
// the canonical model.WeatherForecastRun representation.
|
// builders by raw schema.
|
||||||
//
|
//
|
||||||
// Caveats / policy:
|
// Caveats / policy:
|
||||||
// 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode
|
// 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode
|
||||||
@@ -29,81 +30,186 @@ import (
|
|||||||
type ForecastNormalizer struct{}
|
type ForecastNormalizer struct{}
|
||||||
|
|
||||||
func (ForecastNormalizer) Match(e event.Event) bool {
|
func (ForecastNormalizer) Match(e event.Event) bool {
|
||||||
s := strings.TrimSpace(e.Schema)
|
switch strings.TrimSpace(e.Schema) {
|
||||||
return s == standards.SchemaRawNWSHourlyForecastV1
|
case standards.SchemaRawNWSHourlyForecastV1:
|
||||||
|
return true
|
||||||
|
case standards.SchemaRawNWSNarrativeForecastV1:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||||
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||||||
|
|
||||||
|
return normalizeForecastEventBySchema(in)
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeForecastEventBySchema(in event.Event) (*event.Event, error) {
|
||||||
|
switch strings.TrimSpace(in.Schema) {
|
||||||
|
case standards.SchemaRawNWSHourlyForecastV1:
|
||||||
|
return normalizeHourlyForecastEvent(in)
|
||||||
|
case standards.SchemaRawNWSNarrativeForecastV1:
|
||||||
|
return normalizeNarrativeForecastEvent(in)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported nws forecast schema %q", strings.TrimSpace(in.Schema))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeHourlyForecastEvent(in event.Event) (*event.Event, error) {
|
||||||
return normcommon.NormalizeJSON(
|
return normcommon.NormalizeJSON(
|
||||||
in,
|
in,
|
||||||
"nws hourly forecast",
|
"nws hourly forecast",
|
||||||
standards.SchemaWeatherForecastV1,
|
standards.SchemaWeatherForecastV1,
|
||||||
buildForecast,
|
buildHourlyForecast,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildForecast contains the domain mapping logic (provider -> canonical model).
|
func normalizeNarrativeForecastEvent(in event.Event) (*event.Event, error) {
|
||||||
func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
return normcommon.NormalizeJSON(
|
||||||
// IssuedAt is required by the canonical model.
|
in,
|
||||||
issuedStr := strings.TrimSpace(parsed.Properties.GeneratedAt)
|
"nws narrative forecast",
|
||||||
|
standards.SchemaWeatherForecastV1,
|
||||||
|
buildNarrativeForecast,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
|
||||||
|
func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
||||||
|
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
|
||||||
|
if err != nil {
|
||||||
|
return model.WeatherForecastRun{}, time.Time{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Best-effort location centroid from the GeoJSON polygon (optional).
|
||||||
|
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
|
||||||
|
|
||||||
|
run := newForecastRunBase(
|
||||||
|
issuedAt,
|
||||||
|
updatedAt,
|
||||||
|
model.ForecastProductHourly,
|
||||||
|
lat,
|
||||||
|
lon,
|
||||||
|
parsed.Properties.Elevation.Value,
|
||||||
|
)
|
||||||
|
|
||||||
|
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
|
||||||
|
for i, p := range parsed.Properties.Periods {
|
||||||
|
period, err := mapHourlyForecastPeriod(i, p)
|
||||||
|
if err != nil {
|
||||||
|
return model.WeatherForecastRun{}, time.Time{}, err
|
||||||
|
}
|
||||||
|
periods = append(periods, period)
|
||||||
|
}
|
||||||
|
|
||||||
|
run.Periods = periods
|
||||||
|
|
||||||
|
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
|
||||||
|
return run, issuedAt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model).
|
||||||
|
func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
||||||
|
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
|
||||||
|
if err != nil {
|
||||||
|
return model.WeatherForecastRun{}, time.Time{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Best-effort location centroid from the GeoJSON polygon (optional).
|
||||||
|
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
|
||||||
|
|
||||||
|
run := newForecastRunBase(
|
||||||
|
issuedAt,
|
||||||
|
updatedAt,
|
||||||
|
model.ForecastProductNarrative,
|
||||||
|
lat,
|
||||||
|
lon,
|
||||||
|
parsed.Properties.Elevation.Value,
|
||||||
|
)
|
||||||
|
|
||||||
|
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
|
||||||
|
for i, p := range parsed.Properties.Periods {
|
||||||
|
period, err := mapNarrativeForecastPeriod(i, p)
|
||||||
|
if err != nil {
|
||||||
|
return model.WeatherForecastRun{}, time.Time{}, err
|
||||||
|
}
|
||||||
|
periods = append(periods, period)
|
||||||
|
}
|
||||||
|
|
||||||
|
run.Periods = periods
|
||||||
|
|
||||||
|
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
|
||||||
|
return run, issuedAt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseForecastRunTimes(generatedAt, updateTime string) (time.Time, *time.Time, error) {
|
||||||
|
issuedStr := strings.TrimSpace(generatedAt)
|
||||||
if issuedStr == "" {
|
if issuedStr == "" {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing properties.generatedAt")
|
return time.Time{}, nil, fmt.Errorf("missing properties.generatedAt")
|
||||||
}
|
}
|
||||||
issuedAt, err := nwscommon.ParseTime(issuedStr)
|
issuedAt, err := nwscommon.ParseTime(issuedStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err)
|
return time.Time{}, nil, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err)
|
||||||
}
|
}
|
||||||
issuedAt = issuedAt.UTC()
|
issuedAt = issuedAt.UTC()
|
||||||
|
|
||||||
// UpdatedAt is optional.
|
|
||||||
var updatedAt *time.Time
|
var updatedAt *time.Time
|
||||||
if s := strings.TrimSpace(parsed.Properties.UpdateTime); s != "" {
|
if s := strings.TrimSpace(updateTime); s != "" {
|
||||||
if t, err := nwscommon.ParseTime(s); err == nil {
|
if t, err := nwscommon.ParseTime(s); err == nil {
|
||||||
tt := t.UTC()
|
tt := t.UTC()
|
||||||
updatedAt = &tt
|
updatedAt = &tt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Best-effort location centroid from the GeoJSON polygon (optional).
|
return issuedAt, updatedAt, nil
|
||||||
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
|
|
||||||
|
|
||||||
// Schema is explicitly hourly, so product is not a heuristic.
|
|
||||||
run := model.WeatherForecastRun{
|
|
||||||
LocationID: "",
|
|
||||||
LocationName: "",
|
|
||||||
|
|
||||||
IssuedAt: issuedAt,
|
|
||||||
UpdatedAt: updatedAt,
|
|
||||||
Product: model.ForecastProductHourly,
|
|
||||||
|
|
||||||
Latitude: lat,
|
|
||||||
Longitude: lon,
|
|
||||||
ElevationMeters: parsed.Properties.Elevation.Value,
|
|
||||||
|
|
||||||
Periods: nil,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
|
func newForecastRunBase(
|
||||||
for i, p := range parsed.Properties.Periods {
|
issuedAt time.Time,
|
||||||
startStr := strings.TrimSpace(p.StartTime)
|
updatedAt *time.Time,
|
||||||
endStr := strings.TrimSpace(p.EndTime)
|
product model.ForecastProduct,
|
||||||
|
lat, lon, elevation *float64,
|
||||||
|
) model.WeatherForecastRun {
|
||||||
|
return model.WeatherForecastRun{
|
||||||
|
LocationID: "",
|
||||||
|
LocationName: "",
|
||||||
|
IssuedAt: issuedAt,
|
||||||
|
UpdatedAt: updatedAt,
|
||||||
|
Product: product,
|
||||||
|
Latitude: lat,
|
||||||
|
Longitude: lon,
|
||||||
|
|
||||||
|
ElevationMeters: elevation,
|
||||||
|
Periods: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseForecastPeriodWindow(startStr, endStr string, idx int) (time.Time, time.Time, error) {
|
||||||
|
startStr = strings.TrimSpace(startStr)
|
||||||
|
endStr = strings.TrimSpace(endStr)
|
||||||
|
|
||||||
if startStr == "" || endStr == "" {
|
if startStr == "" || endStr == "" {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", i)
|
return time.Time{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
start, err := nwscommon.ParseTime(startStr)
|
start, err := nwscommon.ParseTime(startStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", i, startStr, err)
|
return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", idx, startStr, err)
|
||||||
}
|
}
|
||||||
end, err := nwscommon.ParseTime(endStr)
|
end, err := nwscommon.ParseTime(endStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", i, endStr, err)
|
return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", idx, endStr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return start.UTC(), end.UTC(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func mapHourlyForecastPeriod(idx int, p nwsHourlyForecastPeriod) (model.WeatherForecastPeriod, error) {
|
||||||
|
start, end, err := parseForecastPeriodWindow(p.StartTime, p.EndTime, idx)
|
||||||
|
if err != nil {
|
||||||
|
return model.WeatherForecastPeriod{}, err
|
||||||
}
|
}
|
||||||
start = start.UTC()
|
|
||||||
end = end.UTC()
|
|
||||||
|
|
||||||
// NWS hourly supplies isDaytime; make it a pointer to match the canonical model.
|
// NWS hourly supplies isDaytime; make it a pointer to match the canonical model.
|
||||||
var isDay *bool
|
var isDay *bool
|
||||||
@@ -118,9 +224,7 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
|
|||||||
providerDesc := strings.TrimSpace(p.ShortForecast)
|
providerDesc := strings.TrimSpace(p.ShortForecast)
|
||||||
wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC)
|
wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC)
|
||||||
|
|
||||||
canonicalText := standards.WMOText(wmo, isDay)
|
return model.WeatherForecastPeriod{
|
||||||
|
|
||||||
period := model.WeatherForecastPeriod{
|
|
||||||
StartTime: start,
|
StartTime: start,
|
||||||
EndTime: end,
|
EndTime: end,
|
||||||
|
|
||||||
@@ -128,15 +232,9 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
|
|||||||
IsDay: isDay,
|
IsDay: isDay,
|
||||||
|
|
||||||
ConditionCode: wmo,
|
ConditionCode: wmo,
|
||||||
ConditionText: canonicalText,
|
|
||||||
|
|
||||||
ProviderRawDescription: providerDesc,
|
// For forecasts, keep provider short forecast text as the human-facing description.
|
||||||
|
TextDescription: providerDesc,
|
||||||
// For forecasts, keep provider text as the human-facing description.
|
|
||||||
TextDescription: strings.TrimSpace(p.ShortForecast),
|
|
||||||
DetailedText: strings.TrimSpace(p.DetailedForecast),
|
|
||||||
|
|
||||||
IconURL: strings.TrimSpace(p.Icon),
|
|
||||||
|
|
||||||
TemperatureC: tempC,
|
TemperatureC: tempC,
|
||||||
|
|
||||||
@@ -147,13 +245,49 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
|
|||||||
WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed),
|
WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed),
|
||||||
|
|
||||||
ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value,
|
ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
periods = append(periods, period)
|
func mapNarrativeForecastPeriod(idx int, p nwsNarrativeForecastPeriod) (model.WeatherForecastPeriod, error) {
|
||||||
|
start, end, err := parseForecastPeriodWindow(p.StartTime, p.EndTime, idx)
|
||||||
|
if err != nil {
|
||||||
|
return model.WeatherForecastPeriod{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
run.Periods = periods
|
// NWS narrative supplies isDaytime; make it a pointer to match the canonical model.
|
||||||
|
var isDay *bool
|
||||||
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
|
if p.IsDaytime != nil {
|
||||||
return run, issuedAt, nil
|
b := *p.IsDaytime
|
||||||
|
isDay = &b
|
||||||
|
}
|
||||||
|
|
||||||
|
tempC := tempCFromNWS(p.Temperature, p.TemperatureUnit)
|
||||||
|
|
||||||
|
// Infer WMO from shortForecast (and fall back to icon token).
|
||||||
|
shortForecast := strings.TrimSpace(p.ShortForecast)
|
||||||
|
wmo := wmoFromNWSForecast(shortForecast, p.Icon, tempC)
|
||||||
|
|
||||||
|
textDescription := strings.TrimSpace(p.DetailedForecast)
|
||||||
|
if textDescription == "" {
|
||||||
|
textDescription = shortForecast
|
||||||
|
}
|
||||||
|
|
||||||
|
return model.WeatherForecastPeriod{
|
||||||
|
StartTime: start,
|
||||||
|
EndTime: end,
|
||||||
|
|
||||||
|
Name: strings.TrimSpace(p.Name),
|
||||||
|
IsDay: isDay,
|
||||||
|
|
||||||
|
ConditionCode: wmo,
|
||||||
|
|
||||||
|
TextDescription: textDescription,
|
||||||
|
|
||||||
|
TemperatureC: tempC,
|
||||||
|
|
||||||
|
WindDirectionDegrees: parseNWSWindDirectionDegrees(p.WindDirection),
|
||||||
|
WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed),
|
||||||
|
|
||||||
|
ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
211
internal/normalizers/nws/forecast_test.go
Normal file
211
internal/normalizers/nws/forecast_test.go
Normal file
@@ -0,0 +1,211 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"math"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) {
|
||||||
|
parsed := nwsHourlyForecastResponse{}
|
||||||
|
parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z"
|
||||||
|
parsed.Properties.Periods = []nwsHourlyForecastPeriod{
|
||||||
|
{
|
||||||
|
StartTime: "2026-03-16T19:00:00Z",
|
||||||
|
EndTime: "2026-03-16T20:00:00Z",
|
||||||
|
ShortForecast: " Mostly Cloudy ",
|
||||||
|
DetailedForecast: "Clouds increasing overnight.",
|
||||||
|
Icon: "https://example.invalid/icon",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run, effectiveAt, err := buildHourlyForecast(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildHourlyForecast() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(run.Periods) != 1 {
|
||||||
|
t.Fatalf("periods len = %d, want 1", len(run.Periods))
|
||||||
|
}
|
||||||
|
if got, want := run.Periods[0].TextDescription, "Mostly Cloudy"; got != want {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantIssued := time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC)
|
||||||
|
if !run.IssuedAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
if !effectiveAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
|
||||||
|
_, err := normalizeForecastEventBySchema(event.Event{
|
||||||
|
Schema: "raw.nws.daily.forecast.v1",
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("normalizeForecastEventBySchema() expected unsupported schema error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "unsupported nws forecast schema") {
|
||||||
|
t.Fatalf("error = %q, want unsupported schema context", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeForecastEventBySchemaRoutesHourly(t *testing.T) {
|
||||||
|
_, err := normalizeForecastEventBySchema(event.Event{
|
||||||
|
Schema: standards.SchemaRawNWSHourlyForecastV1,
|
||||||
|
Payload: map[string]any{"properties": map[string]any{}},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("normalizeForecastEventBySchema() expected build error for missing generatedAt")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "missing properties.generatedAt") {
|
||||||
|
t.Fatalf("error = %q, want missing properties.generatedAt", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeForecastEventBySchemaRoutesNarrative(t *testing.T) {
|
||||||
|
_, err := normalizeForecastEventBySchema(event.Event{
|
||||||
|
Schema: standards.SchemaRawNWSNarrativeForecastV1,
|
||||||
|
Payload: map[string]any{"properties": map[string]any{}},
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("normalizeForecastEventBySchema() expected build error for missing generatedAt")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "missing properties.generatedAt") {
|
||||||
|
t.Fatalf("error = %q, want missing properties.generatedAt", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) {
|
||||||
|
parsed := nwsNarrativeForecastResponse{}
|
||||||
|
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"
|
||||||
|
isDay := true
|
||||||
|
tempF := 53.0
|
||||||
|
pop := 20.0
|
||||||
|
|
||||||
|
parsed.Properties.Periods = []nwsNarrativeForecastPeriod{
|
||||||
|
{
|
||||||
|
Name: "Today",
|
||||||
|
StartTime: "2026-03-27T10:00:00-05:00",
|
||||||
|
EndTime: "2026-03-27T18:00:00-05:00",
|
||||||
|
IsDaytime: &isDay,
|
||||||
|
Temperature: &tempF,
|
||||||
|
TemperatureUnit: "F",
|
||||||
|
WindSpeed: "10 to 14 mph",
|
||||||
|
WindDirection: "SW",
|
||||||
|
ShortForecast: "Partly Sunny",
|
||||||
|
DetailedForecast: " Partly sunny, with a high near 53. ",
|
||||||
|
ProbabilityOfPrecipitation: struct {
|
||||||
|
UnitCode string `json:"unitCode"`
|
||||||
|
Value *float64 `json:"value"`
|
||||||
|
}{
|
||||||
|
UnitCode: "wmoUnit:percent",
|
||||||
|
Value: &pop,
|
||||||
|
},
|
||||||
|
Icon: "https://api.weather.gov/icons/land/day/bkn?size=medium",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run, effectiveAt, err := buildNarrativeForecast(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildNarrativeForecast() error = %v", err)
|
||||||
|
}
|
||||||
|
if got, want := run.Product, model.ForecastProductNarrative; got != want {
|
||||||
|
t.Fatalf("Product = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
if len(run.Periods) != 1 {
|
||||||
|
t.Fatalf("periods len = %d, want 1", len(run.Periods))
|
||||||
|
}
|
||||||
|
|
||||||
|
p := run.Periods[0]
|
||||||
|
if got, want := p.TextDescription, "Partly sunny, with a high near 53."; got != want {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
if p.TemperatureC == nil {
|
||||||
|
t.Fatalf("TemperatureC is nil, want converted value")
|
||||||
|
}
|
||||||
|
if math.Abs(*p.TemperatureC-11.6666666667) > 0.0001 {
|
||||||
|
t.Fatalf("TemperatureC = %.6f, want ~11.6667", *p.TemperatureC)
|
||||||
|
}
|
||||||
|
if p.IsDay == nil || !*p.IsDay {
|
||||||
|
t.Fatalf("IsDay = %v, want true", p.IsDay)
|
||||||
|
}
|
||||||
|
if p.WindDirectionDegrees == nil || *p.WindDirectionDegrees != 225 {
|
||||||
|
t.Fatalf("WindDirectionDegrees = %v, want 225", p.WindDirectionDegrees)
|
||||||
|
}
|
||||||
|
if p.WindSpeedKmh == nil || math.Abs(*p.WindSpeedKmh-19.3128) > 0.001 {
|
||||||
|
t.Fatalf("WindSpeedKmh = %.6f, want ~19.3128", derefOrZero(p.WindSpeedKmh))
|
||||||
|
}
|
||||||
|
if p.ProbabilityOfPrecipitationPercent == nil || *p.ProbabilityOfPrecipitationPercent != 20 {
|
||||||
|
t.Fatalf("ProbabilityOfPrecipitationPercent = %v, want 20", p.ProbabilityOfPrecipitationPercent)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantIssued := time.Date(2026, 3, 27, 15, 17, 1, 0, time.UTC)
|
||||||
|
if !run.IssuedAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
if !effectiveAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNoLegacyForecastDescriptionKeys(t, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildNarrativeForecastFallsBackToShortForecastDescription(t *testing.T) {
|
||||||
|
parsed := nwsNarrativeForecastResponse{}
|
||||||
|
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"
|
||||||
|
parsed.Properties.Periods = []nwsNarrativeForecastPeriod{
|
||||||
|
{
|
||||||
|
StartTime: "2026-03-27T18:00:00-05:00",
|
||||||
|
EndTime: "2026-03-28T06:00:00-05:00",
|
||||||
|
ShortForecast: " Mostly Clear ",
|
||||||
|
DetailedForecast: " ",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run, _, err := buildNarrativeForecast(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildNarrativeForecast() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(run.Periods) != 1 {
|
||||||
|
t.Fatalf("periods len = %d, want 1", len(run.Periods))
|
||||||
|
}
|
||||||
|
if got, want := run.Periods[0].TextDescription, "Mostly Clear"; got != want {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
b, err := json.Marshal(period)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("json.Marshal(period) error = %v", err)
|
||||||
|
}
|
||||||
|
var got map[string]any
|
||||||
|
if err := json.Unmarshal(b, &got); err != nil {
|
||||||
|
t.Fatalf("json.Unmarshal(period) error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range []string{"conditionText", "providerRawDescription", "detailedText", "iconUrl"} {
|
||||||
|
if _, ok := got[key]; ok {
|
||||||
|
t.Fatalf("unexpected legacy key %q in marshaled period: %#v", key, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func derefOrZero(v *float64) float64 {
|
||||||
|
if v == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return *v
|
||||||
|
}
|
||||||
@@ -98,12 +98,9 @@ type nwsCloudLayer struct {
|
|||||||
Amount string `json:"amount"`
|
Amount string `json:"amount"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// nwsForecastResponse is a minimal-but-sufficient representation of the NWS
|
// nwsHourlyForecastResponse is a minimal-but-sufficient representation of the NWS
|
||||||
// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
|
// gridpoint hourly forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
|
||||||
//
|
type nwsHourlyForecastResponse struct {
|
||||||
// This is currently designed to support the hourly forecast endpoint; revisions may be needed
|
|
||||||
// to accommodate other forecast endpoints in the future.
|
|
||||||
type nwsForecastResponse struct {
|
|
||||||
Geometry struct {
|
Geometry struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat]
|
Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat]
|
||||||
@@ -122,11 +119,11 @@ type nwsForecastResponse struct {
|
|||||||
Value *float64 `json:"value"`
|
Value *float64 `json:"value"`
|
||||||
} `json:"elevation"`
|
} `json:"elevation"`
|
||||||
|
|
||||||
Periods []nwsForecastPeriod `json:"periods"`
|
Periods []nwsHourlyForecastPeriod `json:"periods"`
|
||||||
} `json:"properties"`
|
} `json:"properties"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type nwsForecastPeriod struct {
|
type nwsHourlyForecastPeriod struct {
|
||||||
Number int `json:"number"`
|
Number int `json:"number"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
StartTime string `json:"startTime"`
|
StartTime string `json:"startTime"`
|
||||||
@@ -161,6 +158,56 @@ type nwsForecastPeriod struct {
|
|||||||
DetailedForecast string `json:"detailedForecast"`
|
DetailedForecast string `json:"detailedForecast"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nwsNarrativeForecastResponse is a minimal-but-sufficient representation of the NWS
|
||||||
|
// gridpoint narrative forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
|
||||||
|
type nwsNarrativeForecastResponse struct {
|
||||||
|
Geometry struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat]
|
||||||
|
} `json:"geometry"`
|
||||||
|
|
||||||
|
Properties struct {
|
||||||
|
Units string `json:"units"` // "us" or "si" (often "us" for narrative)
|
||||||
|
ForecastGenerator string `json:"forecastGenerator"` // e.g. "BaselineForecastGenerator"
|
||||||
|
|
||||||
|
GeneratedAt string `json:"generatedAt"` // RFC3339-ish
|
||||||
|
UpdateTime string `json:"updateTime"` // RFC3339-ish
|
||||||
|
ValidTimes string `json:"validTimes"`
|
||||||
|
|
||||||
|
Elevation struct {
|
||||||
|
UnitCode string `json:"unitCode"`
|
||||||
|
Value *float64 `json:"value"`
|
||||||
|
} `json:"elevation"`
|
||||||
|
|
||||||
|
Periods []nwsNarrativeForecastPeriod `json:"periods"`
|
||||||
|
} `json:"properties"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type nwsNarrativeForecastPeriod struct {
|
||||||
|
Number int `json:"number"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
StartTime string `json:"startTime"`
|
||||||
|
EndTime string `json:"endTime"`
|
||||||
|
|
||||||
|
IsDaytime *bool `json:"isDaytime"`
|
||||||
|
|
||||||
|
Temperature *float64 `json:"temperature"`
|
||||||
|
TemperatureUnit string `json:"temperatureUnit"` // "F" or "C"
|
||||||
|
TemperatureTrend any `json:"temperatureTrend"`
|
||||||
|
|
||||||
|
ProbabilityOfPrecipitation struct {
|
||||||
|
UnitCode string `json:"unitCode"`
|
||||||
|
Value *float64 `json:"value"`
|
||||||
|
} `json:"probabilityOfPrecipitation"`
|
||||||
|
|
||||||
|
WindSpeed string `json:"windSpeed"` // e.g. "9 mph", "10 to 15 mph"
|
||||||
|
WindDirection string `json:"windDirection"` // e.g. "W", "NW"
|
||||||
|
|
||||||
|
Icon string `json:"icon"`
|
||||||
|
ShortForecast string `json:"shortForecast"`
|
||||||
|
DetailedForecast string `json:"detailedForecast"`
|
||||||
|
}
|
||||||
|
|
||||||
// nwsAlertsResponse is a minimal-but-sufficient representation of the NWS /alerts
|
// nwsAlertsResponse is a minimal-but-sufficient representation of the NWS /alerts
|
||||||
// FeatureCollection payload needed for mapping into model.WeatherAlertRun.
|
// FeatureCollection payload needed for mapping into model.WeatherAlertRun.
|
||||||
type nwsAlertsResponse struct {
|
type nwsAlertsResponse struct {
|
||||||
|
|||||||
@@ -108,14 +108,7 @@ func buildForecast(parsed omForecastResponse, fallbackIssued time.Time) (model.W
|
|||||||
IsDay: isDay,
|
IsDay: isDay,
|
||||||
|
|
||||||
ConditionCode: wmo,
|
ConditionCode: wmo,
|
||||||
ConditionText: canonicalText,
|
|
||||||
|
|
||||||
ProviderRawDescription: "",
|
|
||||||
|
|
||||||
TextDescription: canonicalText,
|
TextDescription: canonicalText,
|
||||||
DetailedText: "",
|
|
||||||
|
|
||||||
IconURL: "",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if v := floatAt(parsed.Hourly.Temperature2m, i); v != nil {
|
if v := floatAt(parsed.Hourly.Temperature2m, i); v != nil {
|
||||||
|
|||||||
71
internal/normalizers/openmeteo/forecast_test.go
Normal file
71
internal/normalizers/openmeteo/forecast_test.go
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
package openmeteo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBuildForecastUsesCanonicalTextDescription(t *testing.T) {
|
||||||
|
weatherCode := 2
|
||||||
|
isDay := 1
|
||||||
|
|
||||||
|
parsed := omForecastResponse{
|
||||||
|
Timezone: "UTC",
|
||||||
|
UTCOffsetSeconds: 0,
|
||||||
|
Hourly: omForecastHourly{
|
||||||
|
Time: []string{"2026-03-16T19:00"},
|
||||||
|
WeatherCode: []*int{&weatherCode},
|
||||||
|
IsDay: []*int{&isDay},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run, effectiveAt, err := buildForecast(parsed, time.Time{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildForecast() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(run.Periods) != 1 {
|
||||||
|
t.Fatalf("periods len = %d, want 1", len(run.Periods))
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedText := standards.WMOText(model.WMOCode(weatherCode), boolPtr(true))
|
||||||
|
if got := run.Periods[0].TextDescription; got != expectedText {
|
||||||
|
t.Fatalf("TextDescription = %q, want %q", got, expectedText)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantIssued := time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC)
|
||||||
|
if !run.IssuedAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
if !effectiveAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func boolPtr(v bool) *bool {
|
||||||
|
return &v
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
b, err := json.Marshal(period)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("json.Marshal(period) error = %v", err)
|
||||||
|
}
|
||||||
|
var got map[string]any
|
||||||
|
if err := json.Unmarshal(b, &got); err != nil {
|
||||||
|
t.Fatalf("json.Unmarshal(period) error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range []string{"conditionText", "providerRawDescription", "detailedText", "iconUrl"} {
|
||||||
|
if _, ok := got[key]; ok {
|
||||||
|
t.Fatalf("unexpected legacy key %q in marshaled period: %#v", key, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -102,11 +102,7 @@
|
|||||||
// - name TEXT NULL -> payload.periods[i].name
|
// - name TEXT NULL -> payload.periods[i].name
|
||||||
// - is_day BOOLEAN NULL -> payload.periods[i].isDay
|
// - is_day BOOLEAN NULL -> payload.periods[i].isDay
|
||||||
// - condition_code INTEGER -> payload.periods[i].conditionCode
|
// - condition_code INTEGER -> payload.periods[i].conditionCode
|
||||||
// - condition_text TEXT NULL -> payload.periods[i].conditionText
|
|
||||||
// - provider_raw_description TEXT NULL -> payload.periods[i].providerRawDescription
|
|
||||||
// - text_description TEXT NULL -> payload.periods[i].textDescription
|
// - text_description TEXT NULL -> payload.periods[i].textDescription
|
||||||
// - detailed_text TEXT NULL -> payload.periods[i].detailedText
|
|
||||||
// - icon_url TEXT NULL -> payload.periods[i].iconUrl
|
|
||||||
// - temperature_c DOUBLE PRECISION NULL -> payload.periods[i].temperatureC
|
// - temperature_c DOUBLE PRECISION NULL -> payload.periods[i].temperatureC
|
||||||
// - temperature_c_min DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMin
|
// - temperature_c_min DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMin
|
||||||
// - temperature_c_max DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMax
|
// - temperature_c_max DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMax
|
||||||
|
|||||||
@@ -136,11 +136,7 @@ func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
|
|||||||
"name": nullableString(p.Name),
|
"name": nullableString(p.Name),
|
||||||
"is_day": nullableBool(p.IsDay),
|
"is_day": nullableBool(p.IsDay),
|
||||||
"condition_code": int(p.ConditionCode),
|
"condition_code": int(p.ConditionCode),
|
||||||
"condition_text": nullableString(p.ConditionText),
|
|
||||||
"provider_raw_description": nullableString(p.ProviderRawDescription),
|
|
||||||
"text_description": nullableString(p.TextDescription),
|
"text_description": nullableString(p.TextDescription),
|
||||||
"detailed_text": nullableString(p.DetailedText),
|
|
||||||
"icon_url": nullableString(p.IconURL),
|
|
||||||
"temperature_c": nullableFloat64(p.TemperatureC),
|
"temperature_c": nullableFloat64(p.TemperatureC),
|
||||||
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
|
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
|
||||||
"temperature_c_max": nullableFloat64(p.TemperatureCMax),
|
"temperature_c_max": nullableFloat64(p.TemperatureCMax),
|
||||||
|
|||||||
@@ -64,7 +64,6 @@ func TestMapPostgresEventForecastStructPayload(t *testing.T) {
|
|||||||
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
||||||
IsDay: &isDay,
|
IsDay: &isDay,
|
||||||
ConditionCode: model.WMOCode(2),
|
ConditionCode: model.WMOCode(2),
|
||||||
ConditionText: "Partly Cloudy",
|
|
||||||
TemperatureC: &temp,
|
TemperatureC: &temp,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -130,11 +130,7 @@ func weatherPostgresSchema() fksinks.PostgresSchema {
|
|||||||
{Name: "name", Type: "TEXT", Nullable: true},
|
{Name: "name", Type: "TEXT", Nullable: true},
|
||||||
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
|
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
|
||||||
{Name: "condition_code", Type: "INTEGER", Nullable: false},
|
{Name: "condition_code", Type: "INTEGER", Nullable: false},
|
||||||
{Name: "condition_text", Type: "TEXT", Nullable: true},
|
|
||||||
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
|
|
||||||
{Name: "text_description", Type: "TEXT", Nullable: true},
|
{Name: "text_description", Type: "TEXT", Nullable: true},
|
||||||
{Name: "detailed_text", Type: "TEXT", Nullable: true},
|
|
||||||
{Name: "icon_url", Type: "TEXT", Nullable: true},
|
|
||||||
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
{Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true},
|
{Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
{Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true},
|
{Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true},
|
||||||
|
|||||||
@@ -19,8 +19,11 @@ func RegisterBuiltins(r *fksource.Registry) {
|
|||||||
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return nws.NewAlertsSource(cfg)
|
return nws.NewAlertsSource(cfg)
|
||||||
})
|
})
|
||||||
r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return nws.NewForecastSource(cfg)
|
return nws.NewHourlyForecastSource(cfg)
|
||||||
|
})
|
||||||
|
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
|
return nws.NewNarrativeForecastSource(cfg)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Open-Meteo drivers
|
// Open-Meteo drivers
|
||||||
|
|||||||
60
internal/sources/builtins_test.go
Normal file
60
internal/sources/builtins_test.go
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
package sources
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegisterBuiltinsRegistersNWSHourlyForecastDriver(t *testing.T) {
|
||||||
|
reg := fksource.NewRegistry()
|
||||||
|
RegisterBuiltins(reg)
|
||||||
|
|
||||||
|
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_hourly"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildInput(nws_forecast_hourly) error = %v", err)
|
||||||
|
}
|
||||||
|
if _, ok := in.(fksource.PollSource); !ok {
|
||||||
|
t.Fatalf("BuildInput(nws_forecast_hourly) type = %T, want PollSource", in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterBuiltinsRegistersNWSNarrativeForecastDriver(t *testing.T) {
|
||||||
|
reg := fksource.NewRegistry()
|
||||||
|
RegisterBuiltins(reg)
|
||||||
|
|
||||||
|
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_narrative"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildInput(nws_forecast_narrative) error = %v", err)
|
||||||
|
}
|
||||||
|
if _, ok := in.(fksource.PollSource); !ok {
|
||||||
|
t.Fatalf("BuildInput(nws_forecast_narrative) type = %T, want PollSource", in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
|
||||||
|
reg := fksource.NewRegistry()
|
||||||
|
RegisterBuiltins(reg)
|
||||||
|
|
||||||
|
_, err := reg.BuildInput(sourceConfigForDriver("nws_forecast"))
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("BuildInput(nws_forecast) expected unknown driver error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), `unknown source driver: "nws_forecast"`) {
|
||||||
|
t.Fatalf("error = %q, want unknown source driver for nws_forecast", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sourceConfigForDriver(driver string) config.SourceConfig {
|
||||||
|
return config.SourceConfig{
|
||||||
|
Name: "test-source",
|
||||||
|
Driver: driver,
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": "https://example.invalid",
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,104 +0,0 @@
|
|||||||
// FILE: ./internal/sources/common/config.go
|
|
||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This file centralizes small, boring config-validation patterns shared across
|
|
||||||
// weatherfeeder source drivers.
|
|
||||||
//
|
|
||||||
// Goal: keep driver constructors (New*Source) easy to read and consistent, while
|
|
||||||
// keeping driver-specific options in cfg.Params (feedkit remains domain-agnostic).
|
|
||||||
|
|
||||||
// HTTPSourceConfig is the standard "HTTP-polling source" config shape used across drivers.
|
|
||||||
type HTTPSourceConfig struct {
|
|
||||||
Name string
|
|
||||||
URL string
|
|
||||||
UserAgent string
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireHTTPSourceConfig enforces weatherfeeder's standard HTTP source config:
|
|
||||||
//
|
|
||||||
// - cfg.Name must be present
|
|
||||||
// - cfg.Params must be present
|
|
||||||
// - params.url must be present (accepts "url" or "URL")
|
|
||||||
// - params.user_agent must be present (accepts "user_agent" or "userAgent")
|
|
||||||
//
|
|
||||||
// We intentionally require a User-Agent for *all* sources, even when upstreams
|
|
||||||
// do not strictly require one. This keeps config uniform across providers.
|
|
||||||
func RequireHTTPSourceConfig(driver string, cfg config.SourceConfig) (HTTPSourceConfig, error) {
|
|
||||||
if strings.TrimSpace(cfg.Name) == "" {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s: name is required", driver)
|
|
||||||
}
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
url, ok := cfg.ParamString("url", "URL")
|
|
||||||
if !ok {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
|
||||||
if !ok {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
return HTTPSourceConfig{
|
|
||||||
Name: cfg.Name,
|
|
||||||
URL: url,
|
|
||||||
UserAgent: ua,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- The helpers below remain useful for future drivers; they are not required
|
|
||||||
// --- by the observation sources after adopting RequireHTTPSourceConfig.
|
|
||||||
|
|
||||||
// RequireName ensures cfg.Name is present and non-whitespace.
|
|
||||||
func RequireName(driver string, cfg config.SourceConfig) error {
|
|
||||||
if strings.TrimSpace(cfg.Name) == "" {
|
|
||||||
return fmt.Errorf("%s: name is required", driver)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireParams ensures cfg.Params is non-nil. The "want" string should be a short
|
|
||||||
// description of required keys, e.g. "need params.url and params.user_agent".
|
|
||||||
func RequireParams(driver string, cfg config.SourceConfig, want string) error {
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return fmt.Errorf("%s %q: params are required (%s)", driver, cfg.Name, want)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireURL returns the configured URL for a source.
|
|
||||||
// Canonical key is "url"; we also accept "URL" as a convenience.
|
|
||||||
func RequireURL(driver string, cfg config.SourceConfig) (string, error) {
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return "", fmt.Errorf("%s %q: params are required (need params.url)", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
u, ok := cfg.ParamString("url", "URL")
|
|
||||||
if !ok {
|
|
||||||
return "", fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
return u, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireUserAgent returns the configured User-Agent for a source.
|
|
||||||
// Canonical key is "user_agent"; we also accept "userAgent" as a convenience.
|
|
||||||
func RequireUserAgent(driver string, cfg config.SourceConfig) (string, error) {
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return "", fmt.Errorf("%s %q: params are required (need params.user_agent)", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
|
||||||
if !ok {
|
|
||||||
return "", fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
return ua, nil
|
|
||||||
}
|
|
||||||
@@ -1,76 +0,0 @@
|
|||||||
// FILE: ./internal/sources/common/http_source.go
|
|
||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
// HTTPSource is a tiny, reusable "HTTP polling spine" for weatherfeeder sources.
|
|
||||||
//
|
|
||||||
// It centralizes the boring parts:
|
|
||||||
// - standard config shape (url + user_agent) via RequireHTTPSourceConfig
|
|
||||||
// - a default http.Client with timeout
|
|
||||||
// - FetchBody / headers / max-body safety limit
|
|
||||||
// - consistent error wrapping (driver + source name)
|
|
||||||
//
|
|
||||||
// Individual drivers remain responsible for:
|
|
||||||
// - decoding minimal metadata (for Event.ID / EffectiveAt)
|
|
||||||
// - constructing the event envelope (kind/schema/payload)
|
|
||||||
type HTTPSource struct {
|
|
||||||
Driver string
|
|
||||||
Name string
|
|
||||||
URL string
|
|
||||||
UserAgent string
|
|
||||||
Accept string
|
|
||||||
Client *http.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHTTPSource builds an HTTPSource using weatherfeeder's standard HTTP source
|
|
||||||
// config (params.url + params.user_agent) and a default HTTP client.
|
|
||||||
func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) {
|
|
||||||
c, err := RequireHTTPSourceConfig(driver, cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &HTTPSource{
|
|
||||||
Driver: driver,
|
|
||||||
Name: c.Name,
|
|
||||||
URL: c.URL,
|
|
||||||
UserAgent: c.UserAgent,
|
|
||||||
Accept: accept,
|
|
||||||
Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchBytes fetches the URL and returns the raw response body bytes.
|
|
||||||
func (s *HTTPSource) FetchBytes(ctx context.Context) ([]byte, error) {
|
|
||||||
client := s.Client
|
|
||||||
if client == nil {
|
|
||||||
// Defensive: allow tests or callers to nil out Client; keep behavior sane.
|
|
||||||
client = transport.NewHTTPClient(transport.DefaultHTTPTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := transport.FetchBody(ctx, client, s.URL, s.UserAgent, s.Accept)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err)
|
|
||||||
}
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchJSON fetches the URL and returns the raw body as json.RawMessage.
|
|
||||||
// json.Unmarshal accepts json.RawMessage directly, so callers can decode minimal
|
|
||||||
// metadata without keeping both []byte and RawMessage in their own structs.
|
|
||||||
func (s *HTTPSource) FetchJSON(ctx context.Context) (json.RawMessage, error) {
|
|
||||||
b, err := s.FetchBytes(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return json.RawMessage(b), nil
|
|
||||||
}
|
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -22,14 +23,14 @@ import (
|
|||||||
// Output schema:
|
// Output schema:
|
||||||
// - standards.SchemaRawNWSAlertsV1
|
// - standards.SchemaRawNWSAlertsV1
|
||||||
type AlertsSource struct {
|
type AlertsSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
||||||
const driver = "nws_alerts"
|
const driver = "nws_alerts"
|
||||||
|
|
||||||
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
|
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -43,10 +44,13 @@ func (s *AlertsSource) Name() string { return s.http.Name }
|
|||||||
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
|
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
|
||||||
|
|
||||||
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt policy for alerts:
|
// EffectiveAt policy for alerts:
|
||||||
// Prefer the collection-level "updated" timestamp (best dedupe signal).
|
// Prefer the collection-level "updated" timestamp (best dedupe signal).
|
||||||
@@ -97,16 +101,19 @@ type alertsMeta struct {
|
|||||||
ParsedLatestFeatureTime time.Time `json:"-"`
|
ParsedLatestFeatureTime time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) {
|
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, alertsMeta{}, err
|
return nil, alertsMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, alertsMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta alertsMeta
|
var meta alertsMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, alertsMeta{}, nil
|
return raw, alertsMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Top-level updated (preferred).
|
// Top-level updated (preferred).
|
||||||
@@ -143,5 +150,5 @@ func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMet
|
|||||||
}
|
}
|
||||||
meta.ParsedLatestFeatureTime = latest
|
meta.ParsedLatestFeatureTime = latest
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
// FILE: internal/sources/nws/forecast.go
|
// FILE: internal/sources/nws/forecast_hourly.go
|
||||||
package nws
|
package nws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -9,44 +9,48 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event.
|
// HourlyForecastSource polls an NWS hourly forecast endpoint and emits a RAW forecast Event.
|
||||||
//
|
//
|
||||||
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
|
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
|
||||||
// minimal metadata for Event.EffectiveAt and Event.ID.
|
// minimal metadata for Event.EffectiveAt and Event.ID.
|
||||||
//
|
//
|
||||||
// Output schema (current implementation):
|
// Output schema (current implementation):
|
||||||
// - standards.SchemaRawNWSHourlyForecastV1
|
// - standards.SchemaRawNWSHourlyForecastV1
|
||||||
type ForecastSource struct {
|
type HourlyForecastSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
|
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
|
||||||
const driver = "nws_forecast"
|
const driver = "nws_forecast_hourly"
|
||||||
|
|
||||||
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ForecastSource{http: hs}, nil
|
return &HourlyForecastSource{http: hs}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ForecastSource) Name() string { return s.http.Name }
|
func (s *HourlyForecastSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
// Kind is used for routing/policy.
|
// Kind is used for routing/policy.
|
||||||
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||||
|
|
||||||
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
||||||
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
||||||
@@ -80,7 +84,7 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
|
|
||||||
// ---- RAW fetch + minimal metadata decode ----
|
// ---- RAW fetch + minimal metadata decode ----
|
||||||
|
|
||||||
type forecastMeta struct {
|
type hourlyForecastMeta struct {
|
||||||
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
|
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
|
|
||||||
@@ -94,16 +98,19 @@ type forecastMeta struct {
|
|||||||
ParsedUpdateTime time.Time `json:"-"`
|
ParsedUpdateTime time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) {
|
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, forecastMeta{}, err
|
return nil, hourlyForecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, hourlyForecastMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta forecastMeta
|
var meta hourlyForecastMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, forecastMeta{}, nil
|
return raw, hourlyForecastMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// generatedAt (preferred)
|
// generatedAt (preferred)
|
||||||
@@ -125,5 +132,5 @@ func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
136
internal/sources/nws/forecast_narrative.go
Normal file
136
internal/sources/nws/forecast_narrative.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
// FILE: internal/sources/nws/forecast_narrative.go
|
||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NarrativeForecastSource polls an NWS narrative forecast endpoint and emits a RAW forecast Event.
|
||||||
|
//
|
||||||
|
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
|
||||||
|
// minimal metadata for Event.EffectiveAt and Event.ID.
|
||||||
|
//
|
||||||
|
// Output schema:
|
||||||
|
// - standards.SchemaRawNWSNarrativeForecastV1
|
||||||
|
type NarrativeForecastSource struct {
|
||||||
|
http *fksources.HTTPSource
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
|
||||||
|
const driver = "nws_forecast_narrative"
|
||||||
|
|
||||||
|
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
||||||
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &NarrativeForecastSource{http: hs}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
|
// Kind is used for routing/policy.
|
||||||
|
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||||
|
|
||||||
|
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
||||||
|
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
||||||
|
var effectiveAt *time.Time
|
||||||
|
switch {
|
||||||
|
case !meta.ParsedGeneratedAt.IsZero():
|
||||||
|
t := meta.ParsedGeneratedAt.UTC()
|
||||||
|
effectiveAt = &t
|
||||||
|
case !meta.ParsedUpdateTime.IsZero():
|
||||||
|
t := meta.ParsedUpdateTime.UTC()
|
||||||
|
effectiveAt = &t
|
||||||
|
}
|
||||||
|
|
||||||
|
emittedAt := time.Now().UTC()
|
||||||
|
|
||||||
|
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
|
||||||
|
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
|
||||||
|
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
|
||||||
|
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
|
return common.SingleRawEvent(
|
||||||
|
s.Kind(),
|
||||||
|
s.http.Name,
|
||||||
|
standards.SchemaRawNWSNarrativeForecastV1,
|
||||||
|
eventID,
|
||||||
|
emittedAt,
|
||||||
|
effectiveAt,
|
||||||
|
raw,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- RAW fetch + minimal metadata decode ----
|
||||||
|
|
||||||
|
type narrativeForecastMeta struct {
|
||||||
|
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
|
||||||
|
ID string `json:"id"`
|
||||||
|
|
||||||
|
Properties struct {
|
||||||
|
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
|
||||||
|
UpdateTime string `json:"updateTime"` // last update time of underlying data
|
||||||
|
Updated string `json:"updated"` // deprecated alias for updateTime
|
||||||
|
} `json:"properties"`
|
||||||
|
|
||||||
|
ParsedGeneratedAt time.Time `json:"-"`
|
||||||
|
ParsedUpdateTime time.Time `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
|
||||||
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, narrativeForecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, narrativeForecastMeta{}, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var meta narrativeForecastMeta
|
||||||
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
|
return raw, narrativeForecastMeta{}, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// generatedAt (preferred)
|
||||||
|
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
|
||||||
|
if genStr != "" {
|
||||||
|
if t, err := nwscommon.ParseTime(genStr); err == nil {
|
||||||
|
meta.ParsedGeneratedAt = t.UTC()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateTime, with fallback to deprecated "updated"
|
||||||
|
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
|
||||||
|
if updStr == "" {
|
||||||
|
updStr = strings.TrimSpace(meta.Properties.Updated)
|
||||||
|
}
|
||||||
|
if updStr != "" {
|
||||||
|
if t, err := nwscommon.ParseTime(updStr); err == nil {
|
||||||
|
meta.ParsedUpdateTime = t.UTC()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return raw, meta, true, nil
|
||||||
|
}
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -16,13 +17,13 @@ import (
|
|||||||
|
|
||||||
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "nws_observation"
|
const driver = "nws_observation"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -35,10 +36,13 @@ func (s *ObservationSource) Name() string { return s.http.Name }
|
|||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
@@ -72,16 +76,19 @@ type observationMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, observationMeta{}, err
|
return nil, observationMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, observationMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta observationMeta
|
var meta observationMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
|
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
|
||||||
return raw, observationMeta{}, nil
|
return raw, observationMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
||||||
@@ -91,5 +98,5 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, obse
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
63
internal/sources/nws/observation_test.go
Normal file
63
internal/sources/nws/observation_test.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) {
|
||||||
|
var call int
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
call++
|
||||||
|
switch call {
|
||||||
|
case 1:
|
||||||
|
w.Header().Set("ETag", `"obs-v1"`)
|
||||||
|
_, _ = w.Write([]byte(`{"id":"obs-1","properties":{"timestamp":"2026-03-28T12:00:00Z"}}`))
|
||||||
|
case 2:
|
||||||
|
if got := r.Header.Get("If-None-Match"); got != `"obs-v1"` {
|
||||||
|
t.Fatalf("second request If-None-Match = %q", got)
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNotModified)
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected call count %d", call)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
src, err := NewObservationSource(config.SourceConfig{
|
||||||
|
Name: "NWSObservationTest",
|
||||||
|
Driver: "nws_observation",
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": srv.URL,
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewObservationSource() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
first, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("first Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(first) != 1 {
|
||||||
|
t.Fatalf("first Poll() len = %d, want 1", len(first))
|
||||||
|
}
|
||||||
|
if first[0].Kind != event.Kind("observation") {
|
||||||
|
t.Fatalf("first Poll() kind = %q", first[0].Kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
second, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(second) != 0 {
|
||||||
|
t.Fatalf("second Poll() len = %d, want 0", len(second))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -15,13 +16,13 @@ import (
|
|||||||
|
|
||||||
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.
|
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.
|
||||||
type ForecastSource struct {
|
type ForecastSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
|
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
|
||||||
const driver = "openmeteo_forecast"
|
const driver = "openmeteo_forecast"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -34,10 +35,13 @@ func (s *ForecastSource) Name() string { return s.http.Name }
|
|||||||
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||||
|
|
||||||
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Open-Meteo does not expose a true "issued at" timestamp for forecast runs.
|
// Open-Meteo does not expose a true "issued at" timestamp for forecast runs.
|
||||||
// We use current.time when present; otherwise we fall back to the first hourly time
|
// We use current.time when present; otherwise we fall back to the first hourly time
|
||||||
@@ -79,16 +83,19 @@ type forecastMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) {
|
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, forecastMeta{}, err
|
return nil, forecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, forecastMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta forecastMeta
|
var meta forecastMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, forecastMeta{}, nil
|
return raw, forecastMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := strings.TrimSpace(meta.Current.Time)
|
ts := strings.TrimSpace(meta.Current.Time)
|
||||||
@@ -106,5 +113,5 @@ func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -15,13 +16,13 @@ import (
|
|||||||
|
|
||||||
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "openmeteo_observation"
|
const driver = "openmeteo_observation"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -34,10 +35,13 @@ func (s *ObservationSource) Name() string { return s.http.Name }
|
|||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
if !meta.ParsedTimestamp.IsZero() {
|
if !meta.ParsedTimestamp.IsZero() {
|
||||||
@@ -72,21 +76,24 @@ type openMeteoMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, openMeteoMeta{}, err
|
return nil, openMeteoMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, openMeteoMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta openMeteoMeta
|
var meta openMeteoMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; envelope will omit EffectiveAt.
|
// If metadata decode fails, still return raw; envelope will omit EffectiveAt.
|
||||||
return raw, openMeteoMeta{}, nil
|
return raw, openMeteoMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil {
|
if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil {
|
||||||
meta.ParsedTimestamp = t.UTC()
|
meta.ParsedTimestamp = t.UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,19 +9,20 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
|
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "openweather_observation"
|
const driver = "openweather_observation"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -42,10 +43,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
if !meta.ParsedTimestamp.IsZero() {
|
if !meta.ParsedTimestamp.IsZero() {
|
||||||
@@ -75,20 +79,23 @@ type openWeatherMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, openWeatherMeta{}, err
|
return nil, openWeatherMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, openWeatherMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta openWeatherMeta
|
var meta openWeatherMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
return raw, openWeatherMeta{}, nil
|
return raw, openWeatherMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if meta.Dt > 0 {
|
if meta.Dt > 0 {
|
||||||
meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC()
|
meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,18 +75,8 @@ type WeatherForecastPeriod struct {
|
|||||||
// Like WeatherObservation, this is required; use an “unknown” WMOCode if unmappable.
|
// Like WeatherObservation, this is required; use an “unknown” WMOCode if unmappable.
|
||||||
ConditionCode WMOCode `json:"conditionCode"`
|
ConditionCode WMOCode `json:"conditionCode"`
|
||||||
|
|
||||||
// Provider-independent short text describing the conditions (normalized, if possible).
|
// Human-facing narrative summary for this period.
|
||||||
ConditionText string `json:"conditionText,omitempty"`
|
TextDescription string `json:"textDescription,omitempty"`
|
||||||
|
|
||||||
// Provider-specific “evidence” for troubleshooting mapping and drift.
|
|
||||||
ProviderRawDescription string `json:"providerRawDescription,omitempty"`
|
|
||||||
|
|
||||||
// Human-facing narrative. Not all providers supply rich text (Open-Meteo often won’t).
|
|
||||||
TextDescription string `json:"textDescription,omitempty"` // short phrase / summary
|
|
||||||
DetailedText string `json:"detailedText,omitempty"` // longer narrative, if available
|
|
||||||
|
|
||||||
// Provider-specific (legacy / transitional)
|
|
||||||
IconURL string `json:"iconUrl,omitempty"`
|
|
||||||
|
|
||||||
// Core predicted measurements (nullable; units align with WeatherObservation)
|
// Core predicted measurements (nullable; units align with WeatherObservation)
|
||||||
TemperatureC *float64 `json:"temperatureC,omitempty"`
|
TemperatureC *float64 `json:"temperatureC,omitempty"`
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ const (
|
|||||||
SchemaRawOpenWeatherCurrentV1 = "raw.openweather.current.v1"
|
SchemaRawOpenWeatherCurrentV1 = "raw.openweather.current.v1"
|
||||||
|
|
||||||
SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1"
|
SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1"
|
||||||
|
SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1"
|
||||||
SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1"
|
SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1"
|
||||||
SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1"
|
SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user