Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 40f17c9d86 | |||
| c76088c38c |
@@ -90,7 +90,7 @@ sinks:
|
|||||||
driver: nats
|
driver: nats
|
||||||
params:
|
params:
|
||||||
url: nats://nats:4222
|
url: nats://nats:4222
|
||||||
exchange: weatherfeeder
|
subject: weatherfeeder
|
||||||
|
|
||||||
# - name: pg_weatherfeeder
|
# - name: pg_weatherfeeder
|
||||||
# driver: postgres
|
# driver: postgres
|
||||||
|
|||||||
@@ -37,9 +37,6 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("config load failed: %v", err)
|
log.Fatalf("config load failed: %v", err)
|
||||||
}
|
}
|
||||||
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, wfpgsink.PostgresSchema()); err != nil {
|
|
||||||
log.Fatalf("postgres schema registration failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Registries ---
|
// --- Registries ---
|
||||||
srcReg := fksources.NewRegistry()
|
srcReg := fksources.NewRegistry()
|
||||||
@@ -48,6 +45,7 @@ func main() {
|
|||||||
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
|
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
|
||||||
sinkReg := fksinks.NewRegistry()
|
sinkReg := fksinks.NewRegistry()
|
||||||
fksinks.RegisterBuiltins(sinkReg)
|
fksinks.RegisterBuiltins(sinkReg)
|
||||||
|
sinkReg.Register("postgres", fksinks.PostgresFactory(wfpgsink.PostgresSchema()))
|
||||||
|
|
||||||
// --- Build sources into scheduler jobs ---
|
// --- Build sources into scheduler jobs ---
|
||||||
var jobs []fkscheduler.Job
|
var jobs []fkscheduler.Job
|
||||||
|
|||||||
@@ -24,13 +24,6 @@ type testInput struct {
|
|||||||
|
|
||||||
func (s testInput) Name() string { return s.name }
|
func (s testInput) Name() string { return s.name }
|
||||||
|
|
||||||
type testKindSource struct {
|
|
||||||
testInput
|
|
||||||
kind fkevent.Kind
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
|
|
||||||
|
|
||||||
type testKindsSource struct {
|
type testKindsSource struct {
|
||||||
testInput
|
testInput
|
||||||
kinds []fkevent.Kind
|
kinds []fkevent.Kind
|
||||||
@@ -38,18 +31,6 @@ type testKindsSource struct {
|
|||||||
|
|
||||||
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
|
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
|
||||||
|
|
||||||
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
|
|
||||||
sc := config.SourceConfig{Kind: "observation"}
|
|
||||||
in := testKindSource{
|
|
||||||
testInput: testInput{name: "test"},
|
|
||||||
kind: fkevent.Kind("observation"),
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
|
|
||||||
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
|
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
|
||||||
sc := config.SourceConfig{Kinds: []string{"observation"}}
|
sc := config.SourceConfig{Kinds: []string{"observation"}}
|
||||||
in := testKindsSource{
|
in := testKindsSource{
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.8.1
|
require gitea.maximumdirect.net/ejr/feedkit v0.8.2
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.8.1 h1:gSZ5J/mYHfNZ6dp27TS0V9RQ0JuP5ZAHXhSeCJaBu60=
|
gitea.maximumdirect.net/ejr/feedkit v0.8.2 h1:6AMxNacfqJ8SXQhFAUMW3LgiVxixs50tf+S8Q9Ivm+Y=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.8.1/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
gitea.maximumdirect.net/ejr/feedkit v0.8.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
|||||||
@@ -9,6 +9,12 @@ import (
|
|||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var builtinRegistrations = []func([]fknormalize.Normalizer) []fknormalize.Normalizer{
|
||||||
|
nws.Register,
|
||||||
|
openmeteo.Register,
|
||||||
|
openweather.Register,
|
||||||
|
}
|
||||||
|
|
||||||
// RegisterBuiltins registers all normalizers shipped with this binary.
|
// RegisterBuiltins registers all normalizers shipped with this binary.
|
||||||
//
|
//
|
||||||
// This mirrors internal/sources.RegisterBuiltins, but note the selection model:
|
// This mirrors internal/sources.RegisterBuiltins, but note the selection model:
|
||||||
@@ -27,9 +33,9 @@ func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
|||||||
//
|
//
|
||||||
// Order here should be stable across releases to reduce surprises when adding
|
// Order here should be stable across releases to reduce surprises when adding
|
||||||
// new normalizers.
|
// new normalizers.
|
||||||
out = nws.Register(out)
|
for _, register := range builtinRegistrations {
|
||||||
out = openmeteo.Register(out)
|
out = register(out)
|
||||||
out = openweather.Register(out)
|
}
|
||||||
|
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,62 +75,63 @@ func normalizeNarrativeForecastEvent(in event.Event) (*event.Event, error) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type forecastPeriodMapper[T any] func(idx int, period T) (model.WeatherForecastPeriod, error)
|
||||||
|
|
||||||
// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
|
// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
|
||||||
func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
||||||
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
|
return buildForecastRun(
|
||||||
if err != nil {
|
parsed.Properties.GeneratedAt,
|
||||||
return model.WeatherForecastRun{}, time.Time{}, err
|
parsed.Properties.UpdateTime,
|
||||||
}
|
parsed.Geometry.Coordinates,
|
||||||
|
|
||||||
// Best-effort location centroid from the GeoJSON polygon (optional).
|
|
||||||
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
|
|
||||||
|
|
||||||
run := newForecastRunBase(
|
|
||||||
issuedAt,
|
|
||||||
updatedAt,
|
|
||||||
model.ForecastProductHourly,
|
|
||||||
lat,
|
|
||||||
lon,
|
|
||||||
parsed.Properties.Elevation.Value,
|
parsed.Properties.Elevation.Value,
|
||||||
|
model.ForecastProductHourly,
|
||||||
|
parsed.Properties.Periods,
|
||||||
|
mapHourlyForecastPeriod,
|
||||||
)
|
)
|
||||||
|
|
||||||
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
|
|
||||||
for i, p := range parsed.Properties.Periods {
|
|
||||||
period, err := mapHourlyForecastPeriod(i, p)
|
|
||||||
if err != nil {
|
|
||||||
return model.WeatherForecastRun{}, time.Time{}, err
|
|
||||||
}
|
|
||||||
periods = append(periods, period)
|
|
||||||
}
|
|
||||||
|
|
||||||
run.Periods = periods
|
|
||||||
|
|
||||||
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
|
|
||||||
return run, issuedAt, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model).
|
// buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model).
|
||||||
func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) {
|
||||||
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
|
return buildForecastRun(
|
||||||
|
parsed.Properties.GeneratedAt,
|
||||||
|
parsed.Properties.UpdateTime,
|
||||||
|
parsed.Geometry.Coordinates,
|
||||||
|
parsed.Properties.Elevation.Value,
|
||||||
|
model.ForecastProductNarrative,
|
||||||
|
parsed.Properties.Periods,
|
||||||
|
mapNarrativeForecastPeriod,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildForecastRun[T any](
|
||||||
|
generatedAt string,
|
||||||
|
updateTime string,
|
||||||
|
coordinates [][][]float64,
|
||||||
|
elevation *float64,
|
||||||
|
product model.ForecastProduct,
|
||||||
|
srcPeriods []T,
|
||||||
|
mapPeriod forecastPeriodMapper[T],
|
||||||
|
) (model.WeatherForecastRun, time.Time, error) {
|
||||||
|
issuedAt, updatedAt, err := parseForecastRunTimes(generatedAt, updateTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, err
|
return model.WeatherForecastRun{}, time.Time{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Best-effort location centroid from the GeoJSON polygon (optional).
|
// Best-effort location centroid from the GeoJSON polygon (optional).
|
||||||
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
|
lat, lon := centroidLatLon(coordinates)
|
||||||
|
|
||||||
run := newForecastRunBase(
|
run := newForecastRunBase(
|
||||||
issuedAt,
|
issuedAt,
|
||||||
updatedAt,
|
updatedAt,
|
||||||
model.ForecastProductNarrative,
|
product,
|
||||||
lat,
|
lat,
|
||||||
lon,
|
lon,
|
||||||
parsed.Properties.Elevation.Value,
|
elevation,
|
||||||
)
|
)
|
||||||
|
|
||||||
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
|
periods := make([]model.WeatherForecastPeriod, 0, len(srcPeriods))
|
||||||
for i, p := range parsed.Properties.Periods {
|
for i, p := range srcPeriods {
|
||||||
period, err := mapNarrativeForecastPeriod(i, p)
|
period, err := mapPeriod(i, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherForecastRun{}, time.Time{}, err
|
return model.WeatherForecastRun{}, time.Time{}, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,55 @@ func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) {
|
|||||||
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
|
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBuildHourlyForecastPreservesUpdatedAtCentroidAndElevation(t *testing.T) {
|
||||||
|
parsed := nwsHourlyForecastResponse{}
|
||||||
|
parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z"
|
||||||
|
parsed.Properties.UpdateTime = "2026-03-16T18:30:00Z"
|
||||||
|
elevation := 123.4
|
||||||
|
parsed.Properties.Elevation.Value = &elevation
|
||||||
|
parsed.Geometry.Coordinates = [][][]float64{
|
||||||
|
{
|
||||||
|
{-90.0, 38.0},
|
||||||
|
{-89.0, 38.0},
|
||||||
|
{-89.0, 39.0},
|
||||||
|
{-90.0, 39.0},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
parsed.Properties.Periods = []nwsHourlyForecastPeriod{
|
||||||
|
{
|
||||||
|
StartTime: "2026-03-16T19:00:00Z",
|
||||||
|
EndTime: "2026-03-16T20:00:00Z",
|
||||||
|
ShortForecast: "Cloudy",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run, effectiveAt, err := buildHourlyForecast(parsed)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("buildHourlyForecast() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantIssued := time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC)
|
||||||
|
wantUpdated := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
|
||||||
|
if !run.IssuedAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
if run.UpdatedAt == nil || !run.UpdatedAt.Equal(wantUpdated) {
|
||||||
|
t.Fatalf("UpdatedAt = %v, want %s", run.UpdatedAt, wantUpdated.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
if run.Latitude == nil || math.Abs(*run.Latitude-38.5) > 0.0001 {
|
||||||
|
t.Fatalf("Latitude = %v, want 38.5", run.Latitude)
|
||||||
|
}
|
||||||
|
if run.Longitude == nil || math.Abs(*run.Longitude+89.5) > 0.0001 {
|
||||||
|
t.Fatalf("Longitude = %v, want -89.5", run.Longitude)
|
||||||
|
}
|
||||||
|
if run.ElevationMeters == nil || math.Abs(*run.ElevationMeters-elevation) > 0.0001 {
|
||||||
|
t.Fatalf("ElevationMeters = %v, want %.1f", run.ElevationMeters, elevation)
|
||||||
|
}
|
||||||
|
if !effectiveAt.Equal(wantIssued) {
|
||||||
|
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
|
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
|
||||||
_, err := normalizeForecastEventBySchema(event.Event{
|
_, err := normalizeForecastEventBySchema(event.Event{
|
||||||
Schema: "raw.nws.daily.forecast.v1",
|
Schema: "raw.nws.daily.forecast.v1",
|
||||||
@@ -85,6 +134,70 @@ func TestNormalizeForecastEventBySchemaRoutesNarrative(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNormalizeForecastEventBySchemaProducesCanonicalWeatherForecastSchema(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
schema string
|
||||||
|
payload map[string]any
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "hourly",
|
||||||
|
schema: standards.SchemaRawNWSHourlyForecastV1,
|
||||||
|
payload: map[string]any{
|
||||||
|
"properties": map[string]any{
|
||||||
|
"generatedAt": "2026-03-16T18:00:00Z",
|
||||||
|
"periods": []map[string]any{
|
||||||
|
{
|
||||||
|
"startTime": "2026-03-16T19:00:00Z",
|
||||||
|
"endTime": "2026-03-16T20:00:00Z",
|
||||||
|
"shortForecast": "Cloudy",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "narrative",
|
||||||
|
schema: standards.SchemaRawNWSNarrativeForecastV1,
|
||||||
|
payload: map[string]any{
|
||||||
|
"properties": map[string]any{
|
||||||
|
"generatedAt": "2026-03-16T18:00:00Z",
|
||||||
|
"periods": []map[string]any{
|
||||||
|
{
|
||||||
|
"startTime": "2026-03-16T19:00:00Z",
|
||||||
|
"endTime": "2026-03-16T20:00:00Z",
|
||||||
|
"shortForecast": "Cloudy",
|
||||||
|
"detailedForecast": "Cloudy",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
out, err := normalizeForecastEventBySchema(event.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
Kind: event.Kind("forecast"),
|
||||||
|
Source: "nws-test",
|
||||||
|
EmittedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
||||||
|
Schema: tt.schema,
|
||||||
|
Payload: tt.payload,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("normalizeForecastEventBySchema() error = %v", err)
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
t.Fatalf("normalizeForecastEventBySchema() returned nil output")
|
||||||
|
}
|
||||||
|
if out.Schema != standards.SchemaWeatherForecastV1 {
|
||||||
|
t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastV1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) {
|
func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) {
|
||||||
parsed := nwsNarrativeForecastResponse{}
|
parsed := nwsNarrativeForecastResponse{}
|
||||||
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"
|
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"
|
||||||
|
|||||||
@@ -5,18 +5,13 @@ import (
|
|||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var builtins = []fknormalize.Normalizer{
|
||||||
|
ObservationNormalizer{},
|
||||||
|
ForecastNormalizer{},
|
||||||
|
AlertsNormalizer{},
|
||||||
|
}
|
||||||
|
|
||||||
// Register appends NWS normalizers in stable order.
|
// Register appends NWS normalizers in stable order.
|
||||||
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
out := in
|
return append(in, builtins...)
|
||||||
|
|
||||||
// Observations
|
|
||||||
out = append(out, ObservationNormalizer{})
|
|
||||||
|
|
||||||
// Forecasts
|
|
||||||
out = append(out, ForecastNormalizer{})
|
|
||||||
|
|
||||||
// Alerts
|
|
||||||
out = append(out, AlertsNormalizer{})
|
|
||||||
|
|
||||||
return out
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,14 +5,12 @@ import (
|
|||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var builtins = []fknormalize.Normalizer{
|
||||||
|
ObservationNormalizer{},
|
||||||
|
ForecastNormalizer{},
|
||||||
|
}
|
||||||
|
|
||||||
// Register appends Open-Meteo normalizers in stable order.
|
// Register appends Open-Meteo normalizers in stable order.
|
||||||
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
out := in
|
return append(in, builtins...)
|
||||||
|
|
||||||
// Observations
|
|
||||||
out = append(out, ObservationNormalizer{})
|
|
||||||
// Forecasts
|
|
||||||
out = append(out, ForecastNormalizer{})
|
|
||||||
|
|
||||||
return out
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,12 +5,11 @@ import (
|
|||||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var builtins = []fknormalize.Normalizer{
|
||||||
|
ObservationNormalizer{},
|
||||||
|
}
|
||||||
|
|
||||||
// Register appends OpenWeather normalizers in stable order.
|
// Register appends OpenWeather normalizers in stable order.
|
||||||
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||||
out := in
|
return append(in, builtins...)
|
||||||
|
|
||||||
// Observations
|
|
||||||
out = append(out, ObservationNormalizer{})
|
|
||||||
|
|
||||||
return out
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,58 +1,6 @@
|
|||||||
package postgres
|
package postgres
|
||||||
|
|
||||||
import (
|
import "testing"
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
||||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
|
|
||||||
err := fksinks.RegisterPostgresSchemaForConfiguredSinks(nil, PostgresSchema())
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "config is nil") {
|
|
||||||
t.Fatalf("error = %q, want config is nil", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
|
|
||||||
cfg := &config.Config{
|
|
||||||
Sinks: []config.SinkConfig{
|
|
||||||
{Name: "stdout_only", Driver: "stdout"},
|
|
||||||
{Name: "nats_only", Driver: "nats"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil {
|
|
||||||
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
|
|
||||||
sinkName := uniqueSinkName("pg_test")
|
|
||||||
cfg := &config.Config{
|
|
||||||
Sinks: []config.SinkConfig{
|
|
||||||
{Name: sinkName, Driver: "postgres"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil {
|
|
||||||
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema())
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "already registered") {
|
|
||||||
t.Fatalf("error = %q, want already registered", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWeatherPostgresSchemaShape(t *testing.T) {
|
func TestWeatherPostgresSchemaShape(t *testing.T) {
|
||||||
s := PostgresSchema()
|
s := PostgresSchema()
|
||||||
@@ -90,7 +38,3 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func uniqueSinkName(prefix string) string {
|
|
||||||
return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano())
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -9,33 +9,30 @@ import (
|
|||||||
fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
|
fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type pollDriverRegistration struct {
|
||||||
|
driver string
|
||||||
|
factory func(config.SourceConfig) (fksource.PollSource, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var pollDriverRegistrations = []pollDriverRegistration{
|
||||||
|
{driver: "nws_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewObservationSource(cfg) }},
|
||||||
|
{driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }},
|
||||||
|
{driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }},
|
||||||
|
{driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }},
|
||||||
|
{driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }},
|
||||||
|
{driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }},
|
||||||
|
{driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
|
return openweather.NewObservationSource(cfg)
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
// RegisterBuiltins registers the source drivers that ship with this binary.
|
// RegisterBuiltins registers the source drivers that ship with this binary.
|
||||||
// Keeping this in one place makes main.go very readable.
|
// Keeping this in one place makes main.go very readable.
|
||||||
func RegisterBuiltins(r *fksource.Registry) {
|
func RegisterBuiltins(r *fksource.Registry) {
|
||||||
// NWS drivers
|
for _, reg := range pollDriverRegistrations {
|
||||||
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
reg := reg
|
||||||
return nws.NewObservationSource(cfg)
|
r.RegisterPoll(reg.driver, func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
})
|
return reg.factory(cfg)
|
||||||
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
})
|
||||||
return nws.NewAlertsSource(cfg)
|
}
|
||||||
})
|
|
||||||
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
|
||||||
return nws.NewHourlyForecastSource(cfg)
|
|
||||||
})
|
|
||||||
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
|
||||||
return nws.NewNarrativeForecastSource(cfg)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Open-Meteo drivers
|
|
||||||
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
|
||||||
return openmeteo.NewObservationSource(cfg)
|
|
||||||
})
|
|
||||||
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
|
||||||
return openmeteo.NewForecastSource(cfg)
|
|
||||||
})
|
|
||||||
|
|
||||||
// OpenWeatherMap drivers
|
|
||||||
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
|
||||||
return openweather.NewObservationSource(cfg)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,13 +47,42 @@ func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) {
|
||||||
|
reg := fksource.NewRegistry()
|
||||||
|
RegisterBuiltins(reg)
|
||||||
|
|
||||||
|
drivers := []string{
|
||||||
|
"nws_observation",
|
||||||
|
"nws_alerts",
|
||||||
|
"nws_forecast_hourly",
|
||||||
|
"nws_forecast_narrative",
|
||||||
|
"openmeteo_observation",
|
||||||
|
"openmeteo_forecast",
|
||||||
|
"openweather_observation",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, driver := range drivers {
|
||||||
|
in, err := reg.BuildInput(sourceConfigForDriver(driver))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BuildInput(%s) error = %v", driver, err)
|
||||||
|
}
|
||||||
|
if _, ok := in.(fksource.PollSource); !ok {
|
||||||
|
t.Fatalf("BuildInput(%s) type = %T, want PollSource", driver, in)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func sourceConfigForDriver(driver string) config.SourceConfig {
|
func sourceConfigForDriver(driver string) config.SourceConfig {
|
||||||
|
url := "https://example.invalid"
|
||||||
|
if driver == "openweather_observation" {
|
||||||
|
url = "https://example.invalid?units=metric"
|
||||||
|
}
|
||||||
return config.SourceConfig{
|
return config.SourceConfig{
|
||||||
Name: "test-source",
|
Name: "test-source",
|
||||||
Driver: driver,
|
Driver: driver,
|
||||||
Mode: config.SourceModePoll,
|
Mode: config.SourceModePoll,
|
||||||
Params: map[string]any{
|
Params: map[string]any{
|
||||||
"url": "https://example.invalid",
|
"url": url,
|
||||||
"user_agent": "test-agent",
|
"user_agent": "test-agent",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,8 +39,8 @@ func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
|||||||
|
|
||||||
func (s *AlertsSource) Name() string { return s.http.Name }
|
func (s *AlertsSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
// Kind is used for routing/policy.
|
// Kinds is used for routing/policy.
|
||||||
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
|
func (s *AlertsSource) Kinds() []event.Kind { return []event.Kind{event.Kind("alert")} }
|
||||||
|
|
||||||
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, changed, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
@@ -71,7 +71,7 @@ func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
return fksources.SingleEvent(
|
||||||
s.Kind(),
|
event.Kind("alert"),
|
||||||
s.http.Name,
|
s.http.Name,
|
||||||
standards.SchemaRawNWSAlertsV1,
|
standards.SchemaRawNWSAlertsV1,
|
||||||
eventID,
|
eventID,
|
||||||
|
|||||||
114
internal/sources/nws/forecast_common.go
Normal file
114
internal/sources/nws/forecast_common.go
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
|
)
|
||||||
|
|
||||||
|
const nwsForecastAccept = "application/geo+json, application/json"
|
||||||
|
|
||||||
|
type forecastSource struct {
|
||||||
|
http *fksources.HTTPSource
|
||||||
|
rawSchema string
|
||||||
|
}
|
||||||
|
|
||||||
|
type forecastMeta struct {
|
||||||
|
Properties struct {
|
||||||
|
GeneratedAt string `json:"generatedAt"`
|
||||||
|
UpdateTime string `json:"updateTime"`
|
||||||
|
Updated string `json:"updated"`
|
||||||
|
} `json:"properties"`
|
||||||
|
|
||||||
|
ParsedGeneratedAt time.Time `json:"-"`
|
||||||
|
ParsedUpdateTime time.Time `json:"-"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func newForecastSource(cfg config.SourceConfig, driver, rawSchema string) (*forecastSource, error) {
|
||||||
|
hs, err := fksources.NewHTTPSource(driver, cfg, nwsForecastAccept)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &forecastSource{
|
||||||
|
http: hs,
|
||||||
|
rawSchema: rawSchema,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *forecastSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
|
func (s *forecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} }
|
||||||
|
|
||||||
|
func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var effectiveAt *time.Time
|
||||||
|
switch {
|
||||||
|
case !meta.ParsedGeneratedAt.IsZero():
|
||||||
|
t := meta.ParsedGeneratedAt.UTC()
|
||||||
|
effectiveAt = &t
|
||||||
|
case !meta.ParsedUpdateTime.IsZero():
|
||||||
|
t := meta.ParsedUpdateTime.UTC()
|
||||||
|
effectiveAt = &t
|
||||||
|
}
|
||||||
|
|
||||||
|
emittedAt := time.Now().UTC()
|
||||||
|
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
|
return fksources.SingleEvent(
|
||||||
|
event.Kind("forecast"),
|
||||||
|
s.http.Name,
|
||||||
|
s.rawSchema,
|
||||||
|
eventID,
|
||||||
|
emittedAt,
|
||||||
|
effectiveAt,
|
||||||
|
raw,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *forecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
|
||||||
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, forecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, forecastMeta{}, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var meta forecastMeta
|
||||||
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
|
return raw, forecastMeta{}, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
|
||||||
|
if genStr != "" {
|
||||||
|
if t, err := nwscommon.ParseTime(genStr); err == nil {
|
||||||
|
meta.ParsedGeneratedAt = t.UTC()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
|
||||||
|
if updStr == "" {
|
||||||
|
updStr = strings.TrimSpace(meta.Properties.Updated)
|
||||||
|
}
|
||||||
|
if updStr != "" {
|
||||||
|
if t, err := nwscommon.ParseTime(updStr); err == nil {
|
||||||
|
meta.ParsedUpdateTime = t.UTC()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return raw, meta, true, nil
|
||||||
|
}
|
||||||
@@ -2,15 +2,7 @@
|
|||||||
package nws
|
package nws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
|
||||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -22,114 +14,15 @@ import (
|
|||||||
// Output schema (current implementation):
|
// Output schema (current implementation):
|
||||||
// - standards.SchemaRawNWSHourlyForecastV1
|
// - standards.SchemaRawNWSHourlyForecastV1
|
||||||
type HourlyForecastSource struct {
|
type HourlyForecastSource struct {
|
||||||
http *fksources.HTTPSource
|
*forecastSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
|
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
|
||||||
const driver = "nws_forecast_hourly"
|
const driver = "nws_forecast_hourly"
|
||||||
|
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSHourlyForecastV1)
|
||||||
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
|
||||||
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &HourlyForecastSource{http: hs}, nil
|
return &HourlyForecastSource{forecastSource: src}, nil
|
||||||
}
|
|
||||||
|
|
||||||
func (s *HourlyForecastSource) Name() string { return s.http.Name }
|
|
||||||
|
|
||||||
// Kind is used for routing/policy.
|
|
||||||
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
|
||||||
|
|
||||||
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|
||||||
raw, meta, changed, err := s.fetchRaw(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if !changed {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
|
||||||
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
|
||||||
var effectiveAt *time.Time
|
|
||||||
switch {
|
|
||||||
case !meta.ParsedGeneratedAt.IsZero():
|
|
||||||
t := meta.ParsedGeneratedAt.UTC()
|
|
||||||
effectiveAt = &t
|
|
||||||
case !meta.ParsedUpdateTime.IsZero():
|
|
||||||
t := meta.ParsedUpdateTime.UTC()
|
|
||||||
effectiveAt = &t
|
|
||||||
}
|
|
||||||
|
|
||||||
emittedAt := time.Now().UTC()
|
|
||||||
|
|
||||||
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
|
|
||||||
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
|
|
||||||
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
|
|
||||||
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
|
||||||
s.Kind(),
|
|
||||||
s.http.Name,
|
|
||||||
standards.SchemaRawNWSHourlyForecastV1,
|
|
||||||
eventID,
|
|
||||||
emittedAt,
|
|
||||||
effectiveAt,
|
|
||||||
raw,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- RAW fetch + minimal metadata decode ----
|
|
||||||
|
|
||||||
type hourlyForecastMeta struct {
|
|
||||||
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
|
|
||||||
ID string `json:"id"`
|
|
||||||
|
|
||||||
Properties struct {
|
|
||||||
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
|
|
||||||
UpdateTime string `json:"updateTime"` // last update time of underlying data
|
|
||||||
Updated string `json:"updated"` // deprecated alias for updateTime
|
|
||||||
} `json:"properties"`
|
|
||||||
|
|
||||||
ParsedGeneratedAt time.Time `json:"-"`
|
|
||||||
ParsedUpdateTime time.Time `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
|
|
||||||
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, hourlyForecastMeta{}, false, err
|
|
||||||
}
|
|
||||||
if !changed {
|
|
||||||
return nil, hourlyForecastMeta{}, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var meta hourlyForecastMeta
|
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
|
||||||
return raw, hourlyForecastMeta{}, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// generatedAt (preferred)
|
|
||||||
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
|
|
||||||
if genStr != "" {
|
|
||||||
if t, err := nwscommon.ParseTime(genStr); err == nil {
|
|
||||||
meta.ParsedGeneratedAt = t.UTC()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateTime, with fallback to deprecated "updated"
|
|
||||||
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
|
|
||||||
if updStr == "" {
|
|
||||||
updStr = strings.TrimSpace(meta.Properties.Updated)
|
|
||||||
}
|
|
||||||
if updStr != "" {
|
|
||||||
if t, err := nwscommon.ParseTime(updStr); err == nil {
|
|
||||||
meta.ParsedUpdateTime = t.UTC()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return raw, meta, true, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,15 +2,7 @@
|
|||||||
package nws
|
package nws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
|
||||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -22,114 +14,15 @@ import (
|
|||||||
// Output schema:
|
// Output schema:
|
||||||
// - standards.SchemaRawNWSNarrativeForecastV1
|
// - standards.SchemaRawNWSNarrativeForecastV1
|
||||||
type NarrativeForecastSource struct {
|
type NarrativeForecastSource struct {
|
||||||
http *fksources.HTTPSource
|
*forecastSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
|
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
|
||||||
const driver = "nws_forecast_narrative"
|
const driver = "nws_forecast_narrative"
|
||||||
|
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSNarrativeForecastV1)
|
||||||
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
|
||||||
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &NarrativeForecastSource{http: hs}, nil
|
return &NarrativeForecastSource{forecastSource: src}, nil
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
|
|
||||||
|
|
||||||
// Kind is used for routing/policy.
|
|
||||||
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
|
||||||
|
|
||||||
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|
||||||
raw, meta, changed, err := s.fetchRaw(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if !changed {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
|
||||||
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
|
||||||
var effectiveAt *time.Time
|
|
||||||
switch {
|
|
||||||
case !meta.ParsedGeneratedAt.IsZero():
|
|
||||||
t := meta.ParsedGeneratedAt.UTC()
|
|
||||||
effectiveAt = &t
|
|
||||||
case !meta.ParsedUpdateTime.IsZero():
|
|
||||||
t := meta.ParsedUpdateTime.UTC()
|
|
||||||
effectiveAt = &t
|
|
||||||
}
|
|
||||||
|
|
||||||
emittedAt := time.Now().UTC()
|
|
||||||
|
|
||||||
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
|
|
||||||
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
|
|
||||||
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
|
|
||||||
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
|
||||||
s.Kind(),
|
|
||||||
s.http.Name,
|
|
||||||
standards.SchemaRawNWSNarrativeForecastV1,
|
|
||||||
eventID,
|
|
||||||
emittedAt,
|
|
||||||
effectiveAt,
|
|
||||||
raw,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- RAW fetch + minimal metadata decode ----
|
|
||||||
|
|
||||||
type narrativeForecastMeta struct {
|
|
||||||
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
|
|
||||||
ID string `json:"id"`
|
|
||||||
|
|
||||||
Properties struct {
|
|
||||||
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
|
|
||||||
UpdateTime string `json:"updateTime"` // last update time of underlying data
|
|
||||||
Updated string `json:"updated"` // deprecated alias for updateTime
|
|
||||||
} `json:"properties"`
|
|
||||||
|
|
||||||
ParsedGeneratedAt time.Time `json:"-"`
|
|
||||||
ParsedUpdateTime time.Time `json:"-"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
|
|
||||||
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, narrativeForecastMeta{}, false, err
|
|
||||||
}
|
|
||||||
if !changed {
|
|
||||||
return nil, narrativeForecastMeta{}, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var meta narrativeForecastMeta
|
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
|
||||||
return raw, narrativeForecastMeta{}, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// generatedAt (preferred)
|
|
||||||
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
|
|
||||||
if genStr != "" {
|
|
||||||
if t, err := nwscommon.ParseTime(genStr); err == nil {
|
|
||||||
meta.ParsedGeneratedAt = t.UTC()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateTime, with fallback to deprecated "updated"
|
|
||||||
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
|
|
||||||
if updStr == "" {
|
|
||||||
updStr = strings.TrimSpace(meta.Properties.Updated)
|
|
||||||
}
|
|
||||||
if updStr != "" {
|
|
||||||
if t, err := nwscommon.ParseTime(updStr); err == nil {
|
|
||||||
meta.ParsedUpdateTime = t.UTC()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return raw, meta, true, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
189
internal/sources/nws/forecast_test.go
Normal file
189
internal/sources/nws/forecast_test.go
Normal file
@@ -0,0 +1,189 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
type forecastPoller interface {
|
||||||
|
Poll(ctx context.Context) ([]event.Event, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForecastSourcesEmitExpectedSchemaAndPreferGeneratedAt(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
driver string
|
||||||
|
wantSchema string
|
||||||
|
newSource func(config.SourceConfig) (forecastPoller, error)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "hourly",
|
||||||
|
driver: "nws_forecast_hourly",
|
||||||
|
wantSchema: standards.SchemaRawNWSHourlyForecastV1,
|
||||||
|
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
|
||||||
|
return NewHourlyForecastSource(cfg)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "narrative",
|
||||||
|
driver: "nws_forecast_narrative",
|
||||||
|
wantSchema: standards.SchemaRawNWSNarrativeForecastV1,
|
||||||
|
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
|
||||||
|
return NewNarrativeForecastSource(cfg)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = w.Write([]byte(`{"properties":{"generatedAt":"2026-03-28T12:00:00Z","updateTime":"2026-03-28T11:00:00Z"}}`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
src, err := tt.newSource(forecastSourceConfig(tt.driver, srv.URL))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("newSource() error = %v", err)
|
||||||
|
}
|
||||||
|
if ks, ok := src.(interface{ Kinds() []event.Kind }); !ok {
|
||||||
|
t.Fatalf("source does not implement Kinds()")
|
||||||
|
} else if gotKinds := ks.Kinds(); len(gotKinds) != 1 || gotKinds[0] != event.Kind("forecast") {
|
||||||
|
t.Fatalf("Kinds() = %#v, want [forecast]", gotKinds)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(got) != 1 {
|
||||||
|
t.Fatalf("Poll() len = %d, want 1", len(got))
|
||||||
|
}
|
||||||
|
if got[0].Schema != tt.wantSchema {
|
||||||
|
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, tt.wantSchema)
|
||||||
|
}
|
||||||
|
if got[0].Kind != event.Kind("forecast") {
|
||||||
|
t.Fatalf("Poll() kind = %q, want forecast", got[0].Kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
wantEffectiveAt := time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC)
|
||||||
|
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(wantEffectiveAt) {
|
||||||
|
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, wantEffectiveAt)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForecastSourcePollEffectiveAtFallbackOrder(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
body string
|
||||||
|
wantEffectiveAt *time.Time
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "updateTime fallback",
|
||||||
|
body: `{"properties":{"updateTime":"2026-03-28T11:00:00Z"}}`,
|
||||||
|
wantEffectiveAt: func() *time.Time {
|
||||||
|
t := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC)
|
||||||
|
return &t
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "updated fallback",
|
||||||
|
body: `{"properties":{"updated":"2026-03-28T10:00:00Z"}}`,
|
||||||
|
wantEffectiveAt: func() *time.Time {
|
||||||
|
t := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC)
|
||||||
|
return &t
|
||||||
|
}(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "omitted when metadata lacks timestamps",
|
||||||
|
body: `{"properties":{}}`,
|
||||||
|
wantEffectiveAt: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = w.Write([]byte(tt.body))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
src, err := NewHourlyForecastSource(forecastSourceConfig("nws_forecast_hourly", srv.URL))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewHourlyForecastSource() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(got) != 1 {
|
||||||
|
t.Fatalf("Poll() len = %d, want 1", len(got))
|
||||||
|
}
|
||||||
|
if tt.wantEffectiveAt == nil {
|
||||||
|
if got[0].EffectiveAt != nil {
|
||||||
|
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(*tt.wantEffectiveAt) {
|
||||||
|
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, *tt.wantEffectiveAt)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForecastSourcePollMetadataDecodeFailureStillEmitsRawEvent(t *testing.T) {
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = w.Write([]byte(`not-json`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
src, err := NewNarrativeForecastSource(forecastSourceConfig("nws_forecast_narrative", srv.URL))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewNarrativeForecastSource() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(got) != 1 {
|
||||||
|
t.Fatalf("Poll() len = %d, want 1", len(got))
|
||||||
|
}
|
||||||
|
if got[0].EffectiveAt != nil {
|
||||||
|
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
|
||||||
|
}
|
||||||
|
if got[0].Schema != standards.SchemaRawNWSNarrativeForecastV1 {
|
||||||
|
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, standards.SchemaRawNWSNarrativeForecastV1)
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, ok := got[0].Payload.(json.RawMessage)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Poll() payload type = %T, want json.RawMessage", got[0].Payload)
|
||||||
|
}
|
||||||
|
if string(raw) != "not-json" {
|
||||||
|
t.Fatalf("Poll() payload = %q, want %q", string(raw), "not-json")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func forecastSourceConfig(driver, url string) config.SourceConfig {
|
||||||
|
return config.SourceConfig{
|
||||||
|
Name: "test-forecast-source",
|
||||||
|
Driver: driver,
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": url,
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,7 +32,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
|
|
||||||
func (s *ObservationSource) Name() string { return s.http.Name }
|
func (s *ObservationSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, changed, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
@@ -54,7 +54,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
|
eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
return fksources.SingleEvent(
|
||||||
s.Kind(),
|
event.Kind("observation"),
|
||||||
s.http.Name,
|
s.http.Name,
|
||||||
standards.SchemaRawNWSObservationV1,
|
standards.SchemaRawNWSObservationV1,
|
||||||
eventID,
|
eventID,
|
||||||
|
|||||||
@@ -41,6 +41,9 @@ func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("NewObservationSource() error = %v", err)
|
t.Fatalf("NewObservationSource() error = %v", err)
|
||||||
}
|
}
|
||||||
|
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
|
||||||
|
t.Fatalf("Kinds() = %#v, want [observation]", got)
|
||||||
|
}
|
||||||
|
|
||||||
first, err := src.Poll(context.Background())
|
first, err := src.Poll(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
|
|||||||
|
|
||||||
func (s *ForecastSource) Name() string { return s.http.Name }
|
func (s *ForecastSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
func (s *ForecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} }
|
||||||
|
|
||||||
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, changed, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
@@ -55,7 +55,7 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
return fksources.SingleEvent(
|
||||||
s.Kind(),
|
event.Kind("forecast"),
|
||||||
s.http.Name,
|
s.http.Name,
|
||||||
standards.SchemaRawOpenMeteoHourlyForecastV1,
|
standards.SchemaRawOpenMeteoHourlyForecastV1,
|
||||||
eventID,
|
eventID,
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
|
|
||||||
func (s *ObservationSource) Name() string { return s.http.Name }
|
func (s *ObservationSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, changed, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
@@ -52,7 +52,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
return fksources.SingleEvent(
|
||||||
s.Kind(),
|
event.Kind("observation"),
|
||||||
s.http.Name,
|
s.http.Name,
|
||||||
standards.SchemaRawOpenMeteoCurrentV1,
|
standards.SchemaRawOpenMeteoCurrentV1,
|
||||||
eventID,
|
eventID,
|
||||||
|
|||||||
44
internal/sources/openmeteo/source_test.go
Normal file
44
internal/sources/openmeteo/source_test.go
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
package openmeteo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestObservationSourceAdvertisesKinds(t *testing.T) {
|
||||||
|
src, err := NewObservationSource(config.SourceConfig{
|
||||||
|
Name: "openmeteo-observation-test",
|
||||||
|
Driver: "openmeteo_observation",
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": "https://example.invalid",
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewObservationSource() error = %v", err)
|
||||||
|
}
|
||||||
|
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
|
||||||
|
t.Fatalf("Kinds() = %#v, want [observation]", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForecastSourceAdvertisesKinds(t *testing.T) {
|
||||||
|
src, err := NewForecastSource(config.SourceConfig{
|
||||||
|
Name: "openmeteo-forecast-test",
|
||||||
|
Driver: "openmeteo_forecast",
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": "https://example.invalid",
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewForecastSource() error = %v", err)
|
||||||
|
}
|
||||||
|
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast") {
|
||||||
|
t.Fatalf("Kinds() = %#v, want [forecast]", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -35,7 +35,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
|
|
||||||
func (s *ObservationSource) Name() string { return s.http.Name }
|
func (s *ObservationSource) Name() string { return s.http.Name }
|
||||||
|
|
||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
if err := owcommon.RequireMetricUnits(s.http.URL); err != nil {
|
if err := owcommon.RequireMetricUnits(s.http.URL); err != nil {
|
||||||
@@ -60,7 +60,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||||
|
|
||||||
return fksources.SingleEvent(
|
return fksources.SingleEvent(
|
||||||
s.Kind(),
|
event.Kind("observation"),
|
||||||
s.http.Name,
|
s.http.Name,
|
||||||
standards.SchemaRawOpenWeatherCurrentV1,
|
standards.SchemaRawOpenWeatherCurrentV1,
|
||||||
eventID,
|
eventID,
|
||||||
|
|||||||
26
internal/sources/openweather/source_test.go
Normal file
26
internal/sources/openweather/source_test.go
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
package openweather
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestObservationSourceAdvertisesKinds(t *testing.T) {
|
||||||
|
src, err := NewObservationSource(config.SourceConfig{
|
||||||
|
Name: "openweather-observation-test",
|
||||||
|
Driver: "openweather_observation",
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": "https://example.invalid?units=metric",
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewObservationSource() error = %v", err)
|
||||||
|
}
|
||||||
|
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
|
||||||
|
t.Fatalf("Kinds() = %#v, want [observation]", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user