6 Commits

Author SHA1 Message Date
2c1278a70a Moved generic and broadly useful helper functions upstream into feedkit
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 11:30:20 -05:00
eb27486466 Moved HTTP polling helpers upstream into feedkit, and updated to feedkit v0.8.0
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 10:02:50 -05:00
de5add59fd Updated default config.yml to include a commented postgres sink example with pruning enabled
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 08:04:44 -05:00
356c3be648 Feature addition to support narrative forecast updates from the NWS
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-27 16:07:12 -05:00
dbaebbbd7a Updates to the nws forecast source and normalizer to separate code specific to hourly forecasts and prepare for upcoming feature addition of daily and narrative forecasts
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-27 12:58:23 -05:00
88d5727a84 Simplified the forecast schema
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-26 21:35:08 -05:00
37 changed files with 1045 additions and 772 deletions

6
API.md
View File

@@ -143,11 +143,7 @@ A `WeatherForecastPeriod` is valid for `[startTime, endTime)`.
| `name` | string | no | Human label (often empty for hourly) |
| `isDay` | bool | no | Day/night hint |
| `conditionCode` | int | yes | WMO code (`-1` for unknown) |
| `conditionText` | string | no | Canonical short text |
| `providerRawDescription` | string | no | Provider-specific “evidence” text |
| `textDescription` | string | no | Human-facing short phrase |
| `detailedText` | string | no | Longer narrative |
| `iconUrl` | string | no | Legacy/transitional |
| `temperatureC` | number | no | °C |
| `temperatureCMin` | number | no | °C (aggregated products) |
| `temperatureCMax` | number | no | °C (aggregated products) |
@@ -269,7 +265,7 @@ A run may contain zero, one, or many alerts.
"startTime": "2026-01-17T14:00:00Z",
"endTime": "2026-01-17T15:00:00Z",
"conditionCode": 2,
"conditionText": "Partly Cloudy",
"textDescription": "Partly Cloudy",
"temperatureC": 3.5,
"probabilityOfPrecipitationPercent": 10
}

View File

@@ -22,7 +22,7 @@ For the complete wire contract (event envelope + payload schemas, fields, units,
## Upstream providers (current MVP)
- NWS: observations, hourly forecasts, alerts
- NWS: observations, hourly forecasts, narrative forecasts, alerts
- Open-Meteo: observations, hourly forecasts
- OpenWeather: observations

View File

@@ -48,12 +48,21 @@ sources:
- name: NWSHourlyForecastSTL
mode: poll
kinds: ["forecast"]
driver: nws_forecast
driver: nws_forecast_hourly
every: 45m
params:
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly"
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSNarrativeForecastSTL
mode: poll
kinds: ["forecast"]
driver: nws_forecast_narrative
every: 45m
params:
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast?units=us"
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenMeteoHourlyForecastSTL
mode: poll
kinds: ["forecast"]
@@ -83,6 +92,15 @@ sinks:
url: nats://nats:4222
exchange: weatherfeeder
# - name: pg_weatherfeeder
# driver: postgres
# params:
# uri: postgres://weatherdb:5432/weatherdb?sslmode=disable
# username: weatherdb
# password: weatherdb
# prune: 3d
# # Prunes rows older than now-3d on each write transaction.
# - name: logfile
# driver: file
# params:
@@ -95,5 +113,8 @@ routes:
- sink: nats_weatherfeeder
kinds: ["observation", "forecast", "alert"]
# - sink: pg_weatherfeeder
# kinds: ["observation", "forecast", "alert"]
# - sink: logfile
# kinds: ["observation", "alert", "forecast"]

View File

@@ -3,14 +3,10 @@ package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
@@ -41,7 +37,7 @@ func main() {
if err != nil {
log.Fatalf("config load failed: %v", err)
}
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, wfpgsink.PostgresSchema()); err != nil {
log.Fatalf("postgres schema registration failed: %v", err)
}
@@ -51,15 +47,7 @@ func main() {
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
sinkReg := fksinks.NewRegistry()
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewStdoutSink(cfg.Name), nil
})
sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewPostgresSinkFromConfig(cfg)
})
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewNATSSinkFromConfig(cfg)
})
fksinks.RegisterBuiltins(sinkReg)
// --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job
@@ -69,7 +57,7 @@ func main() {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
@@ -99,7 +87,7 @@ func main() {
}
// --- Compile routes ---
routes, err := compileRoutes(cfg, builtSinks)
routes, err := fkdispatch.CompileRoutes(cfg)
if err != nil {
log.Fatalf("compile routes failed: %v", err)
}
@@ -160,124 +148,6 @@ func main() {
log.Printf("shutdown complete")
}
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
if len(cfg.Routes) == 0 {
return defaultRoutes(builtSinks), nil
}
var routes []fkdispatch.Route
for i, r := range cfg.Routes {
if strings.TrimSpace(r.Sink) == "" {
return nil, fmt.Errorf("routes[%d].sink is empty", i)
}
if _, ok := builtSinks[r.Sink]; !ok {
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
}
kinds := map[fkevent.Kind]bool{}
for j, k := range r.Kinds {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
}
kinds[kind] = true
}
routes = append(routes, fkdispatch.Route{
SinkName: r.Sink,
Kinds: kinds,
})
}
return routes, nil
}
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
// nil Kinds means "match all kinds" by convention
var allKinds map[fkevent.Kind]bool = nil
routes := make([]fkdispatch.Route, 0, len(builtSinks))
for name := range builtSinks {
routes = append(routes, fkdispatch.Route{
SinkName: name,
Kinds: allKinds,
})
}
return routes
}
func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
if err != nil {
return err
}
if len(expectedKinds) == 0 {
return nil
}
advertisedKinds := advertisedSourceKinds(in)
if len(advertisedKinds) == 0 {
return nil
}
for kind := range expectedKinds {
if !advertisedKinds[kind] {
return fmt.Errorf(
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
kind,
sortedKinds(expectedKinds),
sortedKinds(advertisedKinds),
)
}
}
return nil
}
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
kinds := map[fkevent.Kind]bool{}
for i, k := range raw {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
}
kinds[kind] = true
}
return kinds, nil
}
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
if in == nil {
return nil
}
kinds := map[fkevent.Kind]bool{}
if ks, ok := in.(fksources.KindsSource); ok {
for _, kind := range ks.Kinds() {
kinds[kind] = true
}
return kinds
}
if ks, ok := in.(fksources.KindSource); ok {
kinds[ks.Kind()] = true
return kinds
}
return nil
}
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
out := make([]string, 0, len(kindSet))
for kind := range kindSet {
out = append(out, string(kind))
}
sort.Strings(out)
return out
}
// keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second

View File

@@ -13,6 +13,7 @@ import (
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
)
@@ -44,8 +45,8 @@ func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
kind: fkevent.Kind("observation"),
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
}
}
@@ -56,8 +57,8 @@ func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
kinds: []fkevent.Kind{"observation", "forecast"},
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
}
}
@@ -68,12 +69,12 @@ func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
kinds: []fkevent.Kind{"observation", "forecast"},
}
err := validateSourceExpectedKinds(sc, in)
err := fksources.ValidateExpectedKinds(sc, in)
if err == nil {
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil")
t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil")
}
if !strings.Contains(err.Error(), "configured expected kind") {
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err)
t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err)
}
}
@@ -81,14 +82,8 @@ func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testInput{name: "test"}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
if _, err := parseExpectedKinds([]string{""}); err == nil {
t.Fatalf("parseExpectedKinds() expected error for empty kind")
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
}
}

2
go.mod
View File

@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
require gitea.maximumdirect.net/ejr/feedkit v0.8.1
require (
github.com/klauspost/compress v1.17.2 // indirect

4
go.sum
View File

@@ -1,5 +1,5 @@
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
gitea.maximumdirect.net/ejr/feedkit v0.8.1 h1:gSZ5J/mYHfNZ6dp27TS0V9RQ0JuP5ZAHXhSeCJaBu60=
gitea.maximumdirect.net/ejr/feedkit v0.8.1/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=

View File

@@ -5,6 +5,7 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
)
// Finalize builds the output event envelope by copying the input and applying the
@@ -16,20 +17,7 @@ import (
// If effectiveAt is zero, any existing in.EffectiveAt is preserved.
// - Payload floats are rounded to a stable wire-friendly precision (see round.go).
func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) {
out := in
out.Schema = outSchema
// Enforce stable numeric presentation for sinks: round floats in the canonical payload.
out.Payload = RoundFloats(outPayload, DefaultFloatPrecision)
if !effectiveAt.IsZero() {
t := effectiveAt.UTC()
out.EffectiveAt = &t
}
if err := out.Validate(); err != nil {
return nil, err
}
return &out, nil
// Enforce stable numeric presentation for weather payloads before delegating to feedkit's
// generic envelope finalizer.
return fknormalize.FinalizeEvent(in, outSchema, RoundFloats(outPayload, DefaultFloatPrecision), effectiveAt)
}

View File

@@ -0,0 +1,36 @@
package common
import (
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestFinalizeRoundsWeatherPayloadFloats(t *testing.T) {
type payload struct {
Value float64
}
in := event.Event{
ID: "evt-1",
Kind: event.Kind("observation"),
Source: "source-a",
EmittedAt: time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC),
Schema: "raw.example.v1",
Payload: map[string]any{"old": true},
}
out, err := Finalize(in, "weather.example.v1", payload{Value: 1.234567}, time.Time{})
if err != nil {
t.Fatalf("Finalize() unexpected error: %v", err)
}
got, ok := out.Payload.(payload)
if !ok {
t.Fatalf("Finalize() payload type = %T, want payload", out.Payload)
}
if got.Value != 1.2346 {
t.Fatalf("Finalize() rounded value = %v, want 1.2346", got.Value)
}
}

View File

@@ -2,11 +2,11 @@
package common
import (
"encoding/json"
"fmt"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
)
// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T.
@@ -18,19 +18,7 @@ import (
// Errors include a small amount of stage context ("extract payload", "decode raw payload").
// Callers typically wrap these with a provider/kind label.
func DecodeJSONPayload[T any](in event.Event) (T, error) {
var zero T
b, err := PayloadBytes(in)
if err != nil {
return zero, fmt.Errorf("extract payload: %w", err)
}
var parsed T
if err := json.Unmarshal(b, &parsed); err != nil {
return zero, fmt.Errorf("decode raw payload: %w", err)
}
return parsed, nil
return fknormalize.DecodeJSONPayload[T](in)
}
// NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern:

View File

@@ -1,53 +0,0 @@
package common
import (
"encoding/json"
"fmt"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// PayloadBytes extracts a JSON payload into bytes suitable for json.Unmarshal.
//
// Supported payload shapes (weatherfeeder convention):
// - json.RawMessage (recommended for raw events)
// - []byte
// - string (assumed to contain JSON)
// - map[string]any (re-marshaled to JSON)
//
// If you add other raw representations later, extend this function.
func PayloadBytes(e event.Event) ([]byte, error) {
if e.Payload == nil {
return nil, fmt.Errorf("payload is nil")
}
switch v := e.Payload.(type) {
case json.RawMessage:
if len(v) == 0 {
return nil, fmt.Errorf("payload is empty json.RawMessage")
}
return []byte(v), nil
case []byte:
if len(v) == 0 {
return nil, fmt.Errorf("payload is empty []byte")
}
return v, nil
case string:
if v == "" {
return nil, fmt.Errorf("payload is empty string")
}
return []byte(v), nil
case map[string]any:
b, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("marshal map payload: %w", err)
}
return b, nil
default:
return nil, fmt.Errorf("unsupported payload type %T", e.Payload)
}
}

View File

@@ -17,9 +17,10 @@ import (
// ForecastNormalizer converts:
//
// standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1
// standards.SchemaRawNWSNarrativeForecastV1 -> standards.SchemaWeatherForecastV1
//
// It interprets NWS GeoJSON gridpoint *hourly* forecast responses and maps them into
// the canonical model.WeatherForecastRun representation.
// It keeps one NWS forecast normalization entrypoint and dispatches to product-specific
// builders by raw schema.
//
// Caveats / policy:
// 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode
@@ -29,81 +30,186 @@ import (
type ForecastNormalizer struct{}
func (ForecastNormalizer) Match(e event.Event) bool {
s := strings.TrimSpace(e.Schema)
return s == standards.SchemaRawNWSHourlyForecastV1
switch strings.TrimSpace(e.Schema) {
case standards.SchemaRawNWSHourlyForecastV1:
return true
case standards.SchemaRawNWSNarrativeForecastV1:
return true
default:
return false
}
}
func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
return normalizeForecastEventBySchema(in)
}
func normalizeForecastEventBySchema(in event.Event) (*event.Event, error) {
switch strings.TrimSpace(in.Schema) {
case standards.SchemaRawNWSHourlyForecastV1:
return normalizeHourlyForecastEvent(in)
case standards.SchemaRawNWSNarrativeForecastV1:
return normalizeNarrativeForecastEvent(in)
default:
return nil, fmt.Errorf("unsupported nws forecast schema %q", strings.TrimSpace(in.Schema))
}
}
func normalizeHourlyForecastEvent(in event.Event) (*event.Event, error) {
return normcommon.NormalizeJSON(
in,
"nws hourly forecast",
standards.SchemaWeatherForecastV1,
buildForecast,
buildHourlyForecast,
)
}
// buildForecast contains the domain mapping logic (provider -> canonical model).
func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.Time, error) {
// IssuedAt is required by the canonical model.
issuedStr := strings.TrimSpace(parsed.Properties.GeneratedAt)
func normalizeNarrativeForecastEvent(in event.Event) (*event.Event, error) {
return normcommon.NormalizeJSON(
in,
"nws narrative forecast",
standards.SchemaWeatherForecastV1,
buildNarrativeForecast,
)
}
// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
// Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
run := newForecastRunBase(
issuedAt,
updatedAt,
model.ForecastProductHourly,
lat,
lon,
parsed.Properties.Elevation.Value,
)
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
for i, p := range parsed.Properties.Periods {
period, err := mapHourlyForecastPeriod(i, p)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
periods = append(periods, period)
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
}
// buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model).
func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
// Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
run := newForecastRunBase(
issuedAt,
updatedAt,
model.ForecastProductNarrative,
lat,
lon,
parsed.Properties.Elevation.Value,
)
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
for i, p := range parsed.Properties.Periods {
period, err := mapNarrativeForecastPeriod(i, p)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
periods = append(periods, period)
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
}
func parseForecastRunTimes(generatedAt, updateTime string) (time.Time, *time.Time, error) {
issuedStr := strings.TrimSpace(generatedAt)
if issuedStr == "" {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing properties.generatedAt")
return time.Time{}, nil, fmt.Errorf("missing properties.generatedAt")
}
issuedAt, err := nwscommon.ParseTime(issuedStr)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err)
return time.Time{}, nil, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err)
}
issuedAt = issuedAt.UTC()
// UpdatedAt is optional.
var updatedAt *time.Time
if s := strings.TrimSpace(parsed.Properties.UpdateTime); s != "" {
if s := strings.TrimSpace(updateTime); s != "" {
if t, err := nwscommon.ParseTime(s); err == nil {
tt := t.UTC()
updatedAt = &tt
}
}
// Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
// Schema is explicitly hourly, so product is not a heuristic.
run := model.WeatherForecastRun{
LocationID: "",
LocationName: "",
IssuedAt: issuedAt,
UpdatedAt: updatedAt,
Product: model.ForecastProductHourly,
Latitude: lat,
Longitude: lon,
ElevationMeters: parsed.Properties.Elevation.Value,
Periods: nil,
return issuedAt, updatedAt, nil
}
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
for i, p := range parsed.Properties.Periods {
startStr := strings.TrimSpace(p.StartTime)
endStr := strings.TrimSpace(p.EndTime)
func newForecastRunBase(
issuedAt time.Time,
updatedAt *time.Time,
product model.ForecastProduct,
lat, lon, elevation *float64,
) model.WeatherForecastRun {
return model.WeatherForecastRun{
LocationID: "",
LocationName: "",
IssuedAt: issuedAt,
UpdatedAt: updatedAt,
Product: product,
Latitude: lat,
Longitude: lon,
ElevationMeters: elevation,
Periods: nil,
}
}
func parseForecastPeriodWindow(startStr, endStr string, idx int) (time.Time, time.Time, error) {
startStr = strings.TrimSpace(startStr)
endStr = strings.TrimSpace(endStr)
if startStr == "" || endStr == "" {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", i)
return time.Time{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", idx)
}
start, err := nwscommon.ParseTime(startStr)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", i, startStr, err)
return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", idx, startStr, err)
}
end, err := nwscommon.ParseTime(endStr)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", i, endStr, err)
return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", idx, endStr, err)
}
return start.UTC(), end.UTC(), nil
}
func mapHourlyForecastPeriod(idx int, p nwsHourlyForecastPeriod) (model.WeatherForecastPeriod, error) {
start, end, err := parseForecastPeriodWindow(p.StartTime, p.EndTime, idx)
if err != nil {
return model.WeatherForecastPeriod{}, err
}
start = start.UTC()
end = end.UTC()
// NWS hourly supplies isDaytime; make it a pointer to match the canonical model.
var isDay *bool
@@ -118,9 +224,7 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
providerDesc := strings.TrimSpace(p.ShortForecast)
wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC)
canonicalText := standards.WMOText(wmo, isDay)
period := model.WeatherForecastPeriod{
return model.WeatherForecastPeriod{
StartTime: start,
EndTime: end,
@@ -128,15 +232,9 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
IsDay: isDay,
ConditionCode: wmo,
ConditionText: canonicalText,
ProviderRawDescription: providerDesc,
// For forecasts, keep provider text as the human-facing description.
TextDescription: strings.TrimSpace(p.ShortForecast),
DetailedText: strings.TrimSpace(p.DetailedForecast),
IconURL: strings.TrimSpace(p.Icon),
// For forecasts, keep provider short forecast text as the human-facing description.
TextDescription: providerDesc,
TemperatureC: tempC,
@@ -147,13 +245,49 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed),
ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value,
}, nil
}
periods = append(periods, period)
func mapNarrativeForecastPeriod(idx int, p nwsNarrativeForecastPeriod) (model.WeatherForecastPeriod, error) {
start, end, err := parseForecastPeriodWindow(p.StartTime, p.EndTime, idx)
if err != nil {
return model.WeatherForecastPeriod{}, err
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
// NWS narrative supplies isDaytime; make it a pointer to match the canonical model.
var isDay *bool
if p.IsDaytime != nil {
b := *p.IsDaytime
isDay = &b
}
tempC := tempCFromNWS(p.Temperature, p.TemperatureUnit)
// Infer WMO from shortForecast (and fall back to icon token).
shortForecast := strings.TrimSpace(p.ShortForecast)
wmo := wmoFromNWSForecast(shortForecast, p.Icon, tempC)
textDescription := strings.TrimSpace(p.DetailedForecast)
if textDescription == "" {
textDescription = shortForecast
}
return model.WeatherForecastPeriod{
StartTime: start,
EndTime: end,
Name: strings.TrimSpace(p.Name),
IsDay: isDay,
ConditionCode: wmo,
TextDescription: textDescription,
TemperatureC: tempC,
WindDirectionDegrees: parseNWSWindDirectionDegrees(p.WindDirection),
WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed),
ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value,
}, nil
}

View File

@@ -0,0 +1,211 @@
package nws
import (
"encoding/json"
"math"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) {
parsed := nwsHourlyForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z"
parsed.Properties.Periods = []nwsHourlyForecastPeriod{
{
StartTime: "2026-03-16T19:00:00Z",
EndTime: "2026-03-16T20:00:00Z",
ShortForecast: " Mostly Cloudy ",
DetailedForecast: "Clouds increasing overnight.",
Icon: "https://example.invalid/icon",
},
}
run, effectiveAt, err := buildHourlyForecast(parsed)
if err != nil {
t.Fatalf("buildHourlyForecast() error = %v", err)
}
if len(run.Periods) != 1 {
t.Fatalf("periods len = %d, want 1", len(run.Periods))
}
if got, want := run.Periods[0].TextDescription, "Mostly Cloudy"; got != want {
t.Fatalf("TextDescription = %q, want %q", got, want)
}
wantIssued := time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC)
if !run.IssuedAt.Equal(wantIssued) {
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
if !effectiveAt.Equal(wantIssued) {
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
}
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{
Schema: "raw.nws.daily.forecast.v1",
})
if err == nil {
t.Fatalf("normalizeForecastEventBySchema() expected unsupported schema error")
}
if !strings.Contains(err.Error(), "unsupported nws forecast schema") {
t.Fatalf("error = %q, want unsupported schema context", err)
}
}
func TestNormalizeForecastEventBySchemaRoutesHourly(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{
Schema: standards.SchemaRawNWSHourlyForecastV1,
Payload: map[string]any{"properties": map[string]any{}},
})
if err == nil {
t.Fatalf("normalizeForecastEventBySchema() expected build error for missing generatedAt")
}
if !strings.Contains(err.Error(), "missing properties.generatedAt") {
t.Fatalf("error = %q, want missing properties.generatedAt", err)
}
}
func TestNormalizeForecastEventBySchemaRoutesNarrative(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{
Schema: standards.SchemaRawNWSNarrativeForecastV1,
Payload: map[string]any{"properties": map[string]any{}},
})
if err == nil {
t.Fatalf("normalizeForecastEventBySchema() expected build error for missing generatedAt")
}
if !strings.Contains(err.Error(), "missing properties.generatedAt") {
t.Fatalf("error = %q, want missing properties.generatedAt", err)
}
}
func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) {
parsed := nwsNarrativeForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"
isDay := true
tempF := 53.0
pop := 20.0
parsed.Properties.Periods = []nwsNarrativeForecastPeriod{
{
Name: "Today",
StartTime: "2026-03-27T10:00:00-05:00",
EndTime: "2026-03-27T18:00:00-05:00",
IsDaytime: &isDay,
Temperature: &tempF,
TemperatureUnit: "F",
WindSpeed: "10 to 14 mph",
WindDirection: "SW",
ShortForecast: "Partly Sunny",
DetailedForecast: " Partly sunny, with a high near 53. ",
ProbabilityOfPrecipitation: struct {
UnitCode string `json:"unitCode"`
Value *float64 `json:"value"`
}{
UnitCode: "wmoUnit:percent",
Value: &pop,
},
Icon: "https://api.weather.gov/icons/land/day/bkn?size=medium",
},
}
run, effectiveAt, err := buildNarrativeForecast(parsed)
if err != nil {
t.Fatalf("buildNarrativeForecast() error = %v", err)
}
if got, want := run.Product, model.ForecastProductNarrative; got != want {
t.Fatalf("Product = %q, want %q", got, want)
}
if len(run.Periods) != 1 {
t.Fatalf("periods len = %d, want 1", len(run.Periods))
}
p := run.Periods[0]
if got, want := p.TextDescription, "Partly sunny, with a high near 53."; got != want {
t.Fatalf("TextDescription = %q, want %q", got, want)
}
if p.TemperatureC == nil {
t.Fatalf("TemperatureC is nil, want converted value")
}
if math.Abs(*p.TemperatureC-11.6666666667) > 0.0001 {
t.Fatalf("TemperatureC = %.6f, want ~11.6667", *p.TemperatureC)
}
if p.IsDay == nil || !*p.IsDay {
t.Fatalf("IsDay = %v, want true", p.IsDay)
}
if p.WindDirectionDegrees == nil || *p.WindDirectionDegrees != 225 {
t.Fatalf("WindDirectionDegrees = %v, want 225", p.WindDirectionDegrees)
}
if p.WindSpeedKmh == nil || math.Abs(*p.WindSpeedKmh-19.3128) > 0.001 {
t.Fatalf("WindSpeedKmh = %.6f, want ~19.3128", derefOrZero(p.WindSpeedKmh))
}
if p.ProbabilityOfPrecipitationPercent == nil || *p.ProbabilityOfPrecipitationPercent != 20 {
t.Fatalf("ProbabilityOfPrecipitationPercent = %v, want 20", p.ProbabilityOfPrecipitationPercent)
}
wantIssued := time.Date(2026, 3, 27, 15, 17, 1, 0, time.UTC)
if !run.IssuedAt.Equal(wantIssued) {
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
if !effectiveAt.Equal(wantIssued) {
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
assertNoLegacyForecastDescriptionKeys(t, p)
}
func TestBuildNarrativeForecastFallsBackToShortForecastDescription(t *testing.T) {
parsed := nwsNarrativeForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"
parsed.Properties.Periods = []nwsNarrativeForecastPeriod{
{
StartTime: "2026-03-27T18:00:00-05:00",
EndTime: "2026-03-28T06:00:00-05:00",
ShortForecast: " Mostly Clear ",
DetailedForecast: " ",
},
}
run, _, err := buildNarrativeForecast(parsed)
if err != nil {
t.Fatalf("buildNarrativeForecast() error = %v", err)
}
if len(run.Periods) != 1 {
t.Fatalf("periods len = %d, want 1", len(run.Periods))
}
if got, want := run.Periods[0].TextDescription, "Mostly Clear"; got != want {
t.Fatalf("TextDescription = %q, want %q", got, want)
}
}
func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) {
t.Helper()
b, err := json.Marshal(period)
if err != nil {
t.Fatalf("json.Marshal(period) error = %v", err)
}
var got map[string]any
if err := json.Unmarshal(b, &got); err != nil {
t.Fatalf("json.Unmarshal(period) error = %v", err)
}
for _, key := range []string{"conditionText", "providerRawDescription", "detailedText", "iconUrl"} {
if _, ok := got[key]; ok {
t.Fatalf("unexpected legacy key %q in marshaled period: %#v", key, got)
}
}
}
func derefOrZero(v *float64) float64 {
if v == nil {
return 0
}
return *v
}

View File

@@ -98,12 +98,9 @@ type nwsCloudLayer struct {
Amount string `json:"amount"`
}
// nwsForecastResponse is a minimal-but-sufficient representation of the NWS
// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
//
// This is currently designed to support the hourly forecast endpoint; revisions may be needed
// to accommodate other forecast endpoints in the future.
type nwsForecastResponse struct {
// nwsHourlyForecastResponse is a minimal-but-sufficient representation of the NWS
// gridpoint hourly forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
type nwsHourlyForecastResponse struct {
Geometry struct {
Type string `json:"type"`
Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat]
@@ -122,11 +119,11 @@ type nwsForecastResponse struct {
Value *float64 `json:"value"`
} `json:"elevation"`
Periods []nwsForecastPeriod `json:"periods"`
Periods []nwsHourlyForecastPeriod `json:"periods"`
} `json:"properties"`
}
type nwsForecastPeriod struct {
type nwsHourlyForecastPeriod struct {
Number int `json:"number"`
Name string `json:"name"`
StartTime string `json:"startTime"`
@@ -161,6 +158,56 @@ type nwsForecastPeriod struct {
DetailedForecast string `json:"detailedForecast"`
}
// nwsNarrativeForecastResponse is a minimal-but-sufficient representation of the NWS
// gridpoint narrative forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
type nwsNarrativeForecastResponse struct {
Geometry struct {
Type string `json:"type"`
Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat]
} `json:"geometry"`
Properties struct {
Units string `json:"units"` // "us" or "si" (often "us" for narrative)
ForecastGenerator string `json:"forecastGenerator"` // e.g. "BaselineForecastGenerator"
GeneratedAt string `json:"generatedAt"` // RFC3339-ish
UpdateTime string `json:"updateTime"` // RFC3339-ish
ValidTimes string `json:"validTimes"`
Elevation struct {
UnitCode string `json:"unitCode"`
Value *float64 `json:"value"`
} `json:"elevation"`
Periods []nwsNarrativeForecastPeriod `json:"periods"`
} `json:"properties"`
}
type nwsNarrativeForecastPeriod struct {
Number int `json:"number"`
Name string `json:"name"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
IsDaytime *bool `json:"isDaytime"`
Temperature *float64 `json:"temperature"`
TemperatureUnit string `json:"temperatureUnit"` // "F" or "C"
TemperatureTrend any `json:"temperatureTrend"`
ProbabilityOfPrecipitation struct {
UnitCode string `json:"unitCode"`
Value *float64 `json:"value"`
} `json:"probabilityOfPrecipitation"`
WindSpeed string `json:"windSpeed"` // e.g. "9 mph", "10 to 15 mph"
WindDirection string `json:"windDirection"` // e.g. "W", "NW"
Icon string `json:"icon"`
ShortForecast string `json:"shortForecast"`
DetailedForecast string `json:"detailedForecast"`
}
// nwsAlertsResponse is a minimal-but-sufficient representation of the NWS /alerts
// FeatureCollection payload needed for mapping into model.WeatherAlertRun.
type nwsAlertsResponse struct {

View File

@@ -108,14 +108,7 @@ func buildForecast(parsed omForecastResponse, fallbackIssued time.Time) (model.W
IsDay: isDay,
ConditionCode: wmo,
ConditionText: canonicalText,
ProviderRawDescription: "",
TextDescription: canonicalText,
DetailedText: "",
IconURL: "",
}
if v := floatAt(parsed.Hourly.Temperature2m, i); v != nil {

View File

@@ -0,0 +1,71 @@
package openmeteo
import (
"encoding/json"
"testing"
"time"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestBuildForecastUsesCanonicalTextDescription(t *testing.T) {
weatherCode := 2
isDay := 1
parsed := omForecastResponse{
Timezone: "UTC",
UTCOffsetSeconds: 0,
Hourly: omForecastHourly{
Time: []string{"2026-03-16T19:00"},
WeatherCode: []*int{&weatherCode},
IsDay: []*int{&isDay},
},
}
run, effectiveAt, err := buildForecast(parsed, time.Time{})
if err != nil {
t.Fatalf("buildForecast() error = %v", err)
}
if len(run.Periods) != 1 {
t.Fatalf("periods len = %d, want 1", len(run.Periods))
}
expectedText := standards.WMOText(model.WMOCode(weatherCode), boolPtr(true))
if got := run.Periods[0].TextDescription; got != expectedText {
t.Fatalf("TextDescription = %q, want %q", got, expectedText)
}
wantIssued := time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC)
if !run.IssuedAt.Equal(wantIssued) {
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
if !effectiveAt.Equal(wantIssued) {
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
}
func boolPtr(v bool) *bool {
return &v
}
func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) {
t.Helper()
b, err := json.Marshal(period)
if err != nil {
t.Fatalf("json.Marshal(period) error = %v", err)
}
var got map[string]any
if err := json.Unmarshal(b, &got); err != nil {
t.Fatalf("json.Unmarshal(period) error = %v", err)
}
for _, key := range []string{"conditionText", "providerRawDescription", "detailedText", "iconUrl"} {
if _, ok := got[key]; ok {
t.Fatalf("unexpected legacy key %q in marshaled period: %#v", key, got)
}
}
}

View File

@@ -102,11 +102,7 @@
// - name TEXT NULL -> payload.periods[i].name
// - is_day BOOLEAN NULL -> payload.periods[i].isDay
// - condition_code INTEGER -> payload.periods[i].conditionCode
// - condition_text TEXT NULL -> payload.periods[i].conditionText
// - provider_raw_description TEXT NULL -> payload.periods[i].providerRawDescription
// - text_description TEXT NULL -> payload.periods[i].textDescription
// - detailed_text TEXT NULL -> payload.periods[i].detailedText
// - icon_url TEXT NULL -> payload.periods[i].iconUrl
// - temperature_c DOUBLE PRECISION NULL -> payload.periods[i].temperatureC
// - temperature_c_min DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMin
// - temperature_c_max DOUBLE PRECISION NULL -> payload.periods[i].temperatureCMax

View File

@@ -136,11 +136,7 @@ func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
"name": nullableString(p.Name),
"is_day": nullableBool(p.IsDay),
"condition_code": int(p.ConditionCode),
"condition_text": nullableString(p.ConditionText),
"provider_raw_description": nullableString(p.ProviderRawDescription),
"text_description": nullableString(p.TextDescription),
"detailed_text": nullableString(p.DetailedText),
"icon_url": nullableString(p.IconURL),
"temperature_c": nullableFloat64(p.TemperatureC),
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
"temperature_c_max": nullableFloat64(p.TemperatureCMax),

View File

@@ -64,7 +64,6 @@ func TestMapPostgresEventForecastStructPayload(t *testing.T) {
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
IsDay: &isDay,
ConditionCode: model.WMOCode(2),
ConditionText: "Partly Cloudy",
TemperatureC: &temp,
},
{
@@ -239,7 +238,7 @@ func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWri
}
func tableColumnCounts() map[string]int {
s := weatherPostgresSchema()
s := PostgresSchema()
m := make(map[string]int, len(s.Tables))
for _, tbl := range s.Tables {
m[tbl.Name] = len(tbl.Columns)

View File

@@ -1,10 +1,6 @@
package postgres
import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
)
@@ -18,31 +14,8 @@ const (
tableAlertReferences = "alert_references"
)
// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each
// configured sink using driver=postgres.
func RegisterPostgresSchemas(cfg *config.Config) error {
if cfg == nil {
return fmt.Errorf("register postgres schemas: config is nil")
}
schema := weatherPostgresSchema()
for i, sk := range cfg.Sinks {
if !isPostgresDriver(sk.Driver) {
continue
}
if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil {
return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err)
}
}
return nil
}
func isPostgresDriver(driver string) bool {
return strings.EqualFold(strings.TrimSpace(driver), "postgres")
}
func weatherPostgresSchema() fksinks.PostgresSchema {
// PostgresSchema returns weatherfeeder's Postgres schema definition.
func PostgresSchema() fksinks.PostgresSchema {
return fksinks.PostgresSchema{
Tables: []fksinks.PostgresTable{
{
@@ -130,11 +103,7 @@ func weatherPostgresSchema() fksinks.PostgresSchema {
{Name: "name", Type: "TEXT", Nullable: true},
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
{Name: "condition_code", Type: "INTEGER", Nullable: false},
{Name: "condition_text", Type: "TEXT", Nullable: true},
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
{Name: "text_description", Type: "TEXT", Nullable: true},
{Name: "detailed_text", Type: "TEXT", Nullable: true},
{Name: "icon_url", Type: "TEXT", Nullable: true},
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true},

View File

@@ -7,10 +7,11 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
)
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
err := RegisterPostgresSchemas(nil)
err := fksinks.RegisterPostgresSchemaForConfiguredSinks(nil, PostgresSchema())
if err == nil {
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
}
@@ -27,7 +28,7 @@ func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil {
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
}
}
@@ -40,11 +41,11 @@ func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil {
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
}
err := RegisterPostgresSchemas(cfg)
err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema())
if err == nil {
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
}
@@ -54,9 +55,9 @@ func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
}
func TestWeatherPostgresSchemaShape(t *testing.T) {
s := weatherPostgresSchema()
s := PostgresSchema()
if s.MapEvent == nil {
t.Fatalf("weatherPostgresSchema().MapEvent is nil")
t.Fatalf("PostgresSchema().MapEvent is nil")
}
wantTables := map[string]bool{
@@ -70,7 +71,7 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
}
if len(s.Tables) != len(wantTables) {
t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
t.Fatalf("PostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
}
seenIndexes := map[string]bool{}

View File

@@ -19,8 +19,11 @@ func RegisterBuiltins(r *fksource.Registry) {
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewAlertsSource(cfg)
})
r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewForecastSource(cfg)
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewHourlyForecastSource(cfg)
})
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewNarrativeForecastSource(cfg)
})
// Open-Meteo drivers

View File

@@ -0,0 +1,60 @@
package sources
import (
"strings"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
)
func TestRegisterBuiltinsRegistersNWSHourlyForecastDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_hourly"))
if err != nil {
t.Fatalf("BuildInput(nws_forecast_hourly) error = %v", err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(nws_forecast_hourly) type = %T, want PollSource", in)
}
}
func TestRegisterBuiltinsRegistersNWSNarrativeForecastDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_narrative"))
if err != nil {
t.Fatalf("BuildInput(nws_forecast_narrative) error = %v", err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(nws_forecast_narrative) type = %T, want PollSource", in)
}
}
func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
_, err := reg.BuildInput(sourceConfigForDriver("nws_forecast"))
if err == nil {
t.Fatalf("BuildInput(nws_forecast) expected unknown driver error")
}
if !strings.Contains(err.Error(), `unknown source driver: "nws_forecast"`) {
t.Fatalf("error = %q, want unknown source driver for nws_forecast", err)
}
}
func sourceConfigForDriver(driver string) config.SourceConfig {
return config.SourceConfig{
Name: "test-source",
Driver: driver,
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
}
}

View File

@@ -1,104 +0,0 @@
// FILE: ./internal/sources/common/config.go
package common
import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
// This file centralizes small, boring config-validation patterns shared across
// weatherfeeder source drivers.
//
// Goal: keep driver constructors (New*Source) easy to read and consistent, while
// keeping driver-specific options in cfg.Params (feedkit remains domain-agnostic).
// HTTPSourceConfig is the standard "HTTP-polling source" config shape used across drivers.
type HTTPSourceConfig struct {
Name string
URL string
UserAgent string
}
// RequireHTTPSourceConfig enforces weatherfeeder's standard HTTP source config:
//
// - cfg.Name must be present
// - cfg.Params must be present
// - params.url must be present (accepts "url" or "URL")
// - params.user_agent must be present (accepts "user_agent" or "userAgent")
//
// We intentionally require a User-Agent for *all* sources, even when upstreams
// do not strictly require one. This keeps config uniform across providers.
func RequireHTTPSourceConfig(driver string, cfg config.SourceConfig) (HTTPSourceConfig, error) {
if strings.TrimSpace(cfg.Name) == "" {
return HTTPSourceConfig{}, fmt.Errorf("%s: name is required", driver)
}
if cfg.Params == nil {
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name)
}
url, ok := cfg.ParamString("url", "URL")
if !ok {
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
}
ua, ok := cfg.ParamString("user_agent", "userAgent")
if !ok {
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
}
return HTTPSourceConfig{
Name: cfg.Name,
URL: url,
UserAgent: ua,
}, nil
}
// --- The helpers below remain useful for future drivers; they are not required
// --- by the observation sources after adopting RequireHTTPSourceConfig.
// RequireName ensures cfg.Name is present and non-whitespace.
func RequireName(driver string, cfg config.SourceConfig) error {
if strings.TrimSpace(cfg.Name) == "" {
return fmt.Errorf("%s: name is required", driver)
}
return nil
}
// RequireParams ensures cfg.Params is non-nil. The "want" string should be a short
// description of required keys, e.g. "need params.url and params.user_agent".
func RequireParams(driver string, cfg config.SourceConfig, want string) error {
if cfg.Params == nil {
return fmt.Errorf("%s %q: params are required (%s)", driver, cfg.Name, want)
}
return nil
}
// RequireURL returns the configured URL for a source.
// Canonical key is "url"; we also accept "URL" as a convenience.
func RequireURL(driver string, cfg config.SourceConfig) (string, error) {
if cfg.Params == nil {
return "", fmt.Errorf("%s %q: params are required (need params.url)", driver, cfg.Name)
}
u, ok := cfg.ParamString("url", "URL")
if !ok {
return "", fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
}
return u, nil
}
// RequireUserAgent returns the configured User-Agent for a source.
// Canonical key is "user_agent"; we also accept "userAgent" as a convenience.
func RequireUserAgent(driver string, cfg config.SourceConfig) (string, error) {
if cfg.Params == nil {
return "", fmt.Errorf("%s %q: params are required (need params.user_agent)", driver, cfg.Name)
}
ua, ok := cfg.ParamString("user_agent", "userAgent")
if !ok {
return "", fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
}
return ua, nil
}

View File

@@ -1,54 +0,0 @@
// FILE: ./internal/sources/common/event.go
package common
import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// SingleRawEvent constructs, validates, and returns a slice containing exactly one event.
//
// This removes repetitive "event envelope ceremony" from individual sources.
// Sources remain responsible for:
// - fetching bytes (raw payload)
// - choosing Schema (raw schema identifier)
// - computing Event.ID and (optional) EffectiveAt
//
// emittedAt is explicit so callers can compute IDs using the same timestamp (or
// so tests can provide a stable value).
func SingleRawEvent(
kind event.Kind,
sourceName string,
schema string,
id string,
emittedAt time.Time,
effectiveAt *time.Time,
payload any,
) ([]event.Event, error) {
if emittedAt.IsZero() {
emittedAt = time.Now().UTC()
} else {
emittedAt = emittedAt.UTC()
}
e := event.Event{
ID: id,
Kind: kind,
Source: sourceName,
EmittedAt: emittedAt,
EffectiveAt: effectiveAt,
// RAW schema (normalizer matches on this).
Schema: schema,
// Raw payload (usually json.RawMessage). Normalizer will decode and map to canonical model.
Payload: payload,
}
if err := e.Validate(); err != nil {
return nil, err
}
return []event.Event{e}, nil
}

View File

@@ -1,76 +0,0 @@
// FILE: ./internal/sources/common/http_source.go
package common
import (
"context"
"encoding/json"
"fmt"
"net/http"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/transport"
)
// HTTPSource is a tiny, reusable "HTTP polling spine" for weatherfeeder sources.
//
// It centralizes the boring parts:
// - standard config shape (url + user_agent) via RequireHTTPSourceConfig
// - a default http.Client with timeout
// - FetchBody / headers / max-body safety limit
// - consistent error wrapping (driver + source name)
//
// Individual drivers remain responsible for:
// - decoding minimal metadata (for Event.ID / EffectiveAt)
// - constructing the event envelope (kind/schema/payload)
type HTTPSource struct {
Driver string
Name string
URL string
UserAgent string
Accept string
Client *http.Client
}
// NewHTTPSource builds an HTTPSource using weatherfeeder's standard HTTP source
// config (params.url + params.user_agent) and a default HTTP client.
func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) {
c, err := RequireHTTPSourceConfig(driver, cfg)
if err != nil {
return nil, err
}
return &HTTPSource{
Driver: driver,
Name: c.Name,
URL: c.URL,
UserAgent: c.UserAgent,
Accept: accept,
Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout),
}, nil
}
// FetchBytes fetches the URL and returns the raw response body bytes.
func (s *HTTPSource) FetchBytes(ctx context.Context) ([]byte, error) {
client := s.Client
if client == nil {
// Defensive: allow tests or callers to nil out Client; keep behavior sane.
client = transport.NewHTTPClient(transport.DefaultHTTPTimeout)
}
b, err := transport.FetchBody(ctx, client, s.URL, s.UserAgent, s.Accept)
if err != nil {
return nil, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err)
}
return b, nil
}
// FetchJSON fetches the URL and returns the raw body as json.RawMessage.
// json.Unmarshal accepts json.RawMessage directly, so callers can decode minimal
// metadata without keeping both []byte and RawMessage in their own structs.
func (s *HTTPSource) FetchJSON(ctx context.Context) (json.RawMessage, error) {
b, err := s.FetchBytes(ctx)
if err != nil {
return nil, err
}
return json.RawMessage(b), nil
}

View File

@@ -1,39 +0,0 @@
// FILE: ./internal/sources/common/id.go
package common
import (
"fmt"
"strings"
"time"
)
// ChooseEventID applies weatherfeeder's opinionated Event.ID policy:
//
// - If upstream provides an ID, use it (trimmed).
// - Otherwise, ID is "<Source>:<EffectiveAt>" when available.
// - If EffectiveAt is unavailable, fall back to "<Source>:<EmittedAt>".
//
// Timestamps are encoded as RFC3339Nano in UTC.
func ChooseEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string {
if id := strings.TrimSpace(upstreamID); id != "" {
return id
}
src := strings.TrimSpace(sourceName)
if src == "" {
src = "UNKNOWN_SOURCE"
}
// Prefer EffectiveAt for dedupe friendliness.
if effectiveAt != nil && !effectiveAt.IsZero() {
return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano))
}
// Fall back to EmittedAt (still stable within a poll invocation).
t := emittedAt.UTC()
if t.IsZero() {
t = time.Now().UTC()
}
return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano))
}

View File

@@ -9,8 +9,8 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
@@ -22,14 +22,14 @@ import (
// Output schema:
// - standards.SchemaRawNWSAlertsV1
type AlertsSource struct {
http *common.HTTPSource
http *fksources.HTTPSource
}
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
const driver = "nws_alerts"
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil {
return nil, err
}
@@ -43,10 +43,13 @@ func (s *AlertsSource) Name() string { return s.http.Name }
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt policy for alerts:
// Prefer the collection-level "updated" timestamp (best dedupe signal).
@@ -65,9 +68,9 @@ func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
// NWS alerts collections do not provide a stable per-snapshot ID.
// Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness.
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSAlertsV1,
@@ -97,16 +100,19 @@ type alertsMeta struct {
ParsedLatestFeatureTime time.Time `json:"-"`
}
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) {
raw, err := s.http.FetchJSON(ctx)
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, alertsMeta{}, err
return nil, alertsMeta{}, false, err
}
if !changed {
return nil, alertsMeta{}, false, nil
}
var meta alertsMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, alertsMeta{}, nil
return raw, alertsMeta{}, true, nil
}
// Top-level updated (preferred).
@@ -143,5 +149,5 @@ func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMet
}
meta.ParsedLatestFeatureTime = latest
return raw, meta, nil
return raw, meta, true, nil
}

View File

@@ -1,4 +1,4 @@
// FILE: internal/sources/nws/forecast.go
// FILE: internal/sources/nws/forecast_hourly.go
package nws
import (
@@ -9,44 +9,47 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event.
// HourlyForecastSource polls an NWS hourly forecast endpoint and emits a RAW forecast Event.
//
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
// minimal metadata for Event.EffectiveAt and Event.ID.
//
// Output schema (current implementation):
// - standards.SchemaRawNWSHourlyForecastV1
type ForecastSource struct {
http *common.HTTPSource
type HourlyForecastSource struct {
http *fksources.HTTPSource
}
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
const driver = "nws_forecast"
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
const driver = "nws_forecast_hourly"
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil {
return nil, err
}
return &ForecastSource{http: hs}, nil
return &HourlyForecastSource{http: hs}, nil
}
func (s *ForecastSource) Name() string { return s.http.Name }
func (s *HourlyForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
@@ -65,9 +68,9 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSHourlyForecastV1,
@@ -80,7 +83,7 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
// ---- RAW fetch + minimal metadata decode ----
type forecastMeta struct {
type hourlyForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
@@ -94,16 +97,19 @@ type forecastMeta struct {
ParsedUpdateTime time.Time `json:"-"`
}
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) {
raw, err := s.http.FetchJSON(ctx)
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, forecastMeta{}, err
return nil, hourlyForecastMeta{}, false, err
}
if !changed {
return nil, hourlyForecastMeta{}, false, nil
}
var meta forecastMeta
var meta hourlyForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, forecastMeta{}, nil
return raw, hourlyForecastMeta{}, true, nil
}
// generatedAt (preferred)
@@ -125,5 +131,5 @@ func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecas
}
}
return raw, meta, nil
return raw, meta, true, nil
}

View File

@@ -0,0 +1,135 @@
// FILE: internal/sources/nws/forecast_narrative.go
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// NarrativeForecastSource polls an NWS narrative forecast endpoint and emits a RAW forecast Event.
//
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
// minimal metadata for Event.EffectiveAt and Event.ID.
//
// Output schema:
// - standards.SchemaRawNWSNarrativeForecastV1
type NarrativeForecastSource struct {
http *fksources.HTTPSource
}
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
const driver = "nws_forecast_narrative"
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil {
return nil, err
}
return &NarrativeForecastSource{http: hs}, nil
}
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSNarrativeForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type narrativeForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, narrativeForecastMeta{}, false, err
}
if !changed {
return nil, narrativeForecastMeta{}, false, nil
}
var meta narrativeForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, narrativeForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
}

View File

@@ -9,20 +9,20 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
type ObservationSource struct {
http *common.HTTPSource
http *fksources.HTTPSource
}
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
const driver = "nws_observation"
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil {
return nil, err
}
@@ -35,10 +35,13 @@ func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for observations its naturally the observation timestamp.
var effectiveAt *time.Time
@@ -48,9 +51,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
}
emittedAt := time.Now().UTC()
eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSObservationV1,
@@ -72,16 +75,19 @@ type observationMeta struct {
ParsedTimestamp time.Time `json:"-"`
}
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
raw, err := s.http.FetchJSON(ctx)
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, observationMeta{}, err
return nil, observationMeta{}, false, err
}
if !changed {
return nil, observationMeta{}, false, nil
}
var meta observationMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
return raw, observationMeta{}, nil
return raw, observationMeta{}, true, nil
}
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
@@ -91,5 +97,5 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, obse
}
}
return raw, meta, nil
return raw, meta, true, nil
}

View File

@@ -0,0 +1,63 @@
package nws
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) {
var call int
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
call++
switch call {
case 1:
w.Header().Set("ETag", `"obs-v1"`)
_, _ = w.Write([]byte(`{"id":"obs-1","properties":{"timestamp":"2026-03-28T12:00:00Z"}}`))
case 2:
if got := r.Header.Get("If-None-Match"); got != `"obs-v1"` {
t.Fatalf("second request If-None-Match = %q", got)
}
w.WriteHeader(http.StatusNotModified)
default:
t.Fatalf("unexpected call count %d", call)
}
}))
defer srv.Close()
src, err := NewObservationSource(config.SourceConfig{
Name: "NWSObservationTest",
Driver: "nws_observation",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": srv.URL,
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewObservationSource() error = %v", err)
}
first, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("first Poll() error = %v", err)
}
if len(first) != 1 {
t.Fatalf("first Poll() len = %d, want 1", len(first))
}
if first[0].Kind != event.Kind("observation") {
t.Fatalf("first Poll() kind = %q", first[0].Kind)
}
second, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("second Poll() error = %v", err)
}
if len(second) != 0 {
t.Fatalf("second Poll() len = %d, want 0", len(second))
}
}

View File

@@ -8,20 +8,20 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.
type ForecastSource struct {
http *common.HTTPSource
http *fksources.HTTPSource
}
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
const driver = "openmeteo_forecast"
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
if err != nil {
return nil, err
}
@@ -34,10 +34,13 @@ func (s *ForecastSource) Name() string { return s.http.Name }
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// Open-Meteo does not expose a true "issued at" timestamp for forecast runs.
// We use current.time when present; otherwise we fall back to the first hourly time
@@ -49,9 +52,9 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
}
emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawOpenMeteoHourlyForecastV1,
@@ -79,16 +82,19 @@ type forecastMeta struct {
ParsedTimestamp time.Time `json:"-"`
}
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) {
raw, err := s.http.FetchJSON(ctx)
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, forecastMeta{}, err
return nil, forecastMeta{}, false, err
}
if !changed {
return nil, forecastMeta{}, false, nil
}
var meta forecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, forecastMeta{}, nil
return raw, forecastMeta{}, true, nil
}
ts := strings.TrimSpace(meta.Current.Time)
@@ -106,5 +112,5 @@ func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecas
}
}
return raw, meta, nil
return raw, meta, true, nil
}

View File

@@ -8,20 +8,20 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
type ObservationSource struct {
http *common.HTTPSource
http *fksources.HTTPSource
}
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
const driver = "openmeteo_observation"
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
if err != nil {
return nil, err
}
@@ -34,10 +34,13 @@ func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
var effectiveAt *time.Time
if !meta.ParsedTimestamp.IsZero() {
@@ -46,9 +49,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
}
emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawOpenMeteoCurrentV1,
@@ -72,21 +75,24 @@ type openMeteoMeta struct {
ParsedTimestamp time.Time `json:"-"`
}
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) {
raw, err := s.http.FetchJSON(ctx)
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, openMeteoMeta{}, err
return nil, openMeteoMeta{}, false, err
}
if !changed {
return nil, openMeteoMeta{}, false, nil
}
var meta openMeteoMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; envelope will omit EffectiveAt.
return raw, openMeteoMeta{}, nil
return raw, openMeteoMeta{}, true, nil
}
if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil {
meta.ParsedTimestamp = t.UTC()
}
return raw, meta, nil
return raw, meta, true, nil
}

View File

@@ -9,19 +9,19 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type ObservationSource struct {
http *common.HTTPSource
http *fksources.HTTPSource
}
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
const driver = "openweather_observation"
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
if err != nil {
return nil, err
}
@@ -42,10 +42,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
}
raw, meta, err := s.fetchRaw(ctx)
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
var effectiveAt *time.Time
if !meta.ParsedTimestamp.IsZero() {
@@ -54,9 +57,9 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
}
emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
return fksources.SingleEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawOpenWeatherCurrentV1,
@@ -75,20 +78,23 @@ type openWeatherMeta struct {
ParsedTimestamp time.Time `json:"-"`
}
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) {
raw, err := s.http.FetchJSON(ctx)
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, openWeatherMeta{}, err
return nil, openWeatherMeta{}, false, err
}
if !changed {
return nil, openWeatherMeta{}, false, nil
}
var meta openWeatherMeta
if err := json.Unmarshal(raw, &meta); err != nil {
return raw, openWeatherMeta{}, nil
return raw, openWeatherMeta{}, true, nil
}
if meta.Dt > 0 {
meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC()
}
return raw, meta, nil
return raw, meta, true, nil
}

View File

@@ -75,18 +75,8 @@ type WeatherForecastPeriod struct {
// Like WeatherObservation, this is required; use an “unknown” WMOCode if unmappable.
ConditionCode WMOCode `json:"conditionCode"`
// Provider-independent short text describing the conditions (normalized, if possible).
ConditionText string `json:"conditionText,omitempty"`
// Provider-specific “evidence” for troubleshooting mapping and drift.
ProviderRawDescription string `json:"providerRawDescription,omitempty"`
// Human-facing narrative. Not all providers supply rich text (Open-Meteo often wont).
TextDescription string `json:"textDescription,omitempty"` // short phrase / summary
DetailedText string `json:"detailedText,omitempty"` // longer narrative, if available
// Provider-specific (legacy / transitional)
IconURL string `json:"iconUrl,omitempty"`
// Human-facing narrative summary for this period.
TextDescription string `json:"textDescription,omitempty"`
// Core predicted measurements (nullable; units align with WeatherObservation)
TemperatureC *float64 `json:"temperatureC,omitempty"`

View File

@@ -16,6 +16,7 @@ const (
SchemaRawOpenWeatherCurrentV1 = "raw.openweather.current.v1"
SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1"
SchemaRawNWSNarrativeForecastV1 = "raw.nws.narrative.forecast.v1"
SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1"
SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1"