3 Commits

Author SHA1 Message Date
40f17c9d86 Updates to track upstream feedkit v0.8.2
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 13:53:54 -05:00
c76088c38c Code cleanup and deduplication pass through weatherfeeder
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
ci/woodpecker/manual/build-image Pipeline was successful
2026-03-28 12:01:07 -05:00
2c1278a70a Moved generic and broadly useful helper functions upstream into feedkit
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-28 11:30:20 -05:00
34 changed files with 697 additions and 776 deletions

View File

@@ -90,7 +90,7 @@ sinks:
driver: nats driver: nats
params: params:
url: nats://nats:4222 url: nats://nats:4222
exchange: weatherfeeder subject: weatherfeeder
# - name: pg_weatherfeeder # - name: pg_weatherfeeder
# driver: postgres # driver: postgres

View File

@@ -3,14 +3,10 @@ package main
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"sort"
"strings"
"syscall" "syscall"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
@@ -41,9 +37,6 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("config load failed: %v", err) log.Fatalf("config load failed: %v", err)
} }
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
log.Fatalf("postgres schema registration failed: %v", err)
}
// --- Registries --- // --- Registries ---
srcReg := fksources.NewRegistry() srcReg := fksources.NewRegistry()
@@ -51,15 +44,8 @@ func main() {
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs. // Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
sinkReg := fksinks.NewRegistry() sinkReg := fksinks.NewRegistry()
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) { fksinks.RegisterBuiltins(sinkReg)
return fksinks.NewStdoutSink(cfg.Name), nil sinkReg.Register("postgres", fksinks.PostgresFactory(wfpgsink.PostgresSchema()))
})
sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewPostgresSinkFromConfig(cfg)
})
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewNATSSinkFromConfig(cfg)
})
// --- Build sources into scheduler jobs --- // --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job var jobs []fkscheduler.Job
@@ -69,7 +55,7 @@ func main() {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
} }
if err := validateSourceExpectedKinds(sc, in); err != nil { if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
} }
@@ -99,7 +85,7 @@ func main() {
} }
// --- Compile routes --- // --- Compile routes ---
routes, err := compileRoutes(cfg, builtSinks) routes, err := fkdispatch.CompileRoutes(cfg)
if err != nil { if err != nil {
log.Fatalf("compile routes failed: %v", err) log.Fatalf("compile routes failed: %v", err)
} }
@@ -160,124 +146,6 @@ func main() {
log.Printf("shutdown complete") log.Printf("shutdown complete")
} }
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
if len(cfg.Routes) == 0 {
return defaultRoutes(builtSinks), nil
}
var routes []fkdispatch.Route
for i, r := range cfg.Routes {
if strings.TrimSpace(r.Sink) == "" {
return nil, fmt.Errorf("routes[%d].sink is empty", i)
}
if _, ok := builtSinks[r.Sink]; !ok {
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
}
kinds := map[fkevent.Kind]bool{}
for j, k := range r.Kinds {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
}
kinds[kind] = true
}
routes = append(routes, fkdispatch.Route{
SinkName: r.Sink,
Kinds: kinds,
})
}
return routes, nil
}
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
// nil Kinds means "match all kinds" by convention
var allKinds map[fkevent.Kind]bool = nil
routes := make([]fkdispatch.Route, 0, len(builtSinks))
for name := range builtSinks {
routes = append(routes, fkdispatch.Route{
SinkName: name,
Kinds: allKinds,
})
}
return routes
}
func isContextShutdown(err error) bool { func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
} }
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
if err != nil {
return err
}
if len(expectedKinds) == 0 {
return nil
}
advertisedKinds := advertisedSourceKinds(in)
if len(advertisedKinds) == 0 {
return nil
}
for kind := range expectedKinds {
if !advertisedKinds[kind] {
return fmt.Errorf(
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
kind,
sortedKinds(expectedKinds),
sortedKinds(advertisedKinds),
)
}
}
return nil
}
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
kinds := map[fkevent.Kind]bool{}
for i, k := range raw {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
}
kinds[kind] = true
}
return kinds, nil
}
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
if in == nil {
return nil
}
kinds := map[fkevent.Kind]bool{}
if ks, ok := in.(fksources.KindsSource); ok {
for _, kind := range ks.Kinds() {
kinds[kind] = true
}
return kinds
}
if ks, ok := in.(fksources.KindSource); ok {
kinds[ks.Kind()] = true
return kinds
}
return nil
}
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
out := make([]string, 0, len(kindSet))
for kind := range kindSet {
out = append(out, string(kind))
}
sort.Strings(out)
return out
}
// keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second

View File

@@ -13,6 +13,7 @@ import (
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors" fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe" fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
) )
@@ -23,13 +24,6 @@ type testInput struct {
func (s testInput) Name() string { return s.name } func (s testInput) Name() string { return s.name }
type testKindSource struct {
testInput
kind fkevent.Kind
}
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
type testKindsSource struct { type testKindsSource struct {
testInput testInput
kinds []fkevent.Kind kinds []fkevent.Kind
@@ -37,18 +31,6 @@ type testKindsSource struct {
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds } func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
sc := config.SourceConfig{Kind: "observation"}
in := testKindSource{
testInput: testInput{name: "test"},
kind: fkevent.Kind("observation"),
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) { func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"observation"}} sc := config.SourceConfig{Kinds: []string{"observation"}}
in := testKindsSource{ in := testKindsSource{
@@ -56,8 +38,8 @@ func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
kinds: []fkevent.Kind{"observation", "forecast"}, kinds: []fkevent.Kind{"observation", "forecast"},
} }
if err := validateSourceExpectedKinds(sc, in); err != nil { if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
} }
} }
@@ -68,12 +50,12 @@ func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
kinds: []fkevent.Kind{"observation", "forecast"}, kinds: []fkevent.Kind{"observation", "forecast"},
} }
err := validateSourceExpectedKinds(sc, in) err := fksources.ValidateExpectedKinds(sc, in)
if err == nil { if err == nil {
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil") t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil")
} }
if !strings.Contains(err.Error(), "configured expected kind") { if !strings.Contains(err.Error(), "configured expected kind") {
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err) t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err)
} }
} }
@@ -81,14 +63,8 @@ func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}} sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testInput{name: "test"} in := testInput{name: "test"}
if err := validateSourceExpectedKinds(sc, in); err != nil { if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err) t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
}
}
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
if _, err := parseExpectedKinds([]string{""}); err == nil {
t.Fatalf("parseExpectedKinds() expected error for empty kind")
} }
} }

2
go.mod
View File

@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25 go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.8.0 require gitea.maximumdirect.net/ejr/feedkit v0.8.2
require ( require (
github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/compress v1.17.2 // indirect

4
go.sum
View File

@@ -1,5 +1,5 @@
gitea.maximumdirect.net/ejr/feedkit v0.8.0 h1:JdEEy6T3AQ97alLNYcQ3crN3tOEZPLMBD0Qr/MH5/dw= gitea.maximumdirect.net/ejr/feedkit v0.8.2 h1:6AMxNacfqJ8SXQhFAUMW3LgiVxixs50tf+S8Q9Ivm+Y=
gitea.maximumdirect.net/ejr/feedkit v0.8.0/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= gitea.maximumdirect.net/ejr/feedkit v0.8.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=

View File

@@ -9,6 +9,12 @@ import (
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
) )
var builtinRegistrations = []func([]fknormalize.Normalizer) []fknormalize.Normalizer{
nws.Register,
openmeteo.Register,
openweather.Register,
}
// RegisterBuiltins registers all normalizers shipped with this binary. // RegisterBuiltins registers all normalizers shipped with this binary.
// //
// This mirrors internal/sources.RegisterBuiltins, but note the selection model: // This mirrors internal/sources.RegisterBuiltins, but note the selection model:
@@ -27,9 +33,9 @@ func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
// //
// Order here should be stable across releases to reduce surprises when adding // Order here should be stable across releases to reduce surprises when adding
// new normalizers. // new normalizers.
out = nws.Register(out) for _, register := range builtinRegistrations {
out = openmeteo.Register(out) out = register(out)
out = openweather.Register(out) }
return out return out
} }

View File

@@ -5,6 +5,7 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// Finalize builds the output event envelope by copying the input and applying the // Finalize builds the output event envelope by copying the input and applying the
@@ -16,20 +17,7 @@ import (
// If effectiveAt is zero, any existing in.EffectiveAt is preserved. // If effectiveAt is zero, any existing in.EffectiveAt is preserved.
// - Payload floats are rounded to a stable wire-friendly precision (see round.go). // - Payload floats are rounded to a stable wire-friendly precision (see round.go).
func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) { func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) {
out := in // Enforce stable numeric presentation for weather payloads before delegating to feedkit's
out.Schema = outSchema // generic envelope finalizer.
return fknormalize.FinalizeEvent(in, outSchema, RoundFloats(outPayload, DefaultFloatPrecision), effectiveAt)
// Enforce stable numeric presentation for sinks: round floats in the canonical payload.
out.Payload = RoundFloats(outPayload, DefaultFloatPrecision)
if !effectiveAt.IsZero() {
t := effectiveAt.UTC()
out.EffectiveAt = &t
}
if err := out.Validate(); err != nil {
return nil, err
}
return &out, nil
} }

View File

@@ -0,0 +1,36 @@
package common
import (
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestFinalizeRoundsWeatherPayloadFloats(t *testing.T) {
type payload struct {
Value float64
}
in := event.Event{
ID: "evt-1",
Kind: event.Kind("observation"),
Source: "source-a",
EmittedAt: time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC),
Schema: "raw.example.v1",
Payload: map[string]any{"old": true},
}
out, err := Finalize(in, "weather.example.v1", payload{Value: 1.234567}, time.Time{})
if err != nil {
t.Fatalf("Finalize() unexpected error: %v", err)
}
got, ok := out.Payload.(payload)
if !ok {
t.Fatalf("Finalize() payload type = %T, want payload", out.Payload)
}
if got.Value != 1.2346 {
t.Fatalf("Finalize() rounded value = %v, want 1.2346", got.Value)
}
}

View File

@@ -2,11 +2,11 @@
package common package common
import ( import (
"encoding/json"
"fmt" "fmt"
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T. // DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T.
@@ -18,19 +18,7 @@ import (
// Errors include a small amount of stage context ("extract payload", "decode raw payload"). // Errors include a small amount of stage context ("extract payload", "decode raw payload").
// Callers typically wrap these with a provider/kind label. // Callers typically wrap these with a provider/kind label.
func DecodeJSONPayload[T any](in event.Event) (T, error) { func DecodeJSONPayload[T any](in event.Event) (T, error) {
var zero T return fknormalize.DecodeJSONPayload[T](in)
b, err := PayloadBytes(in)
if err != nil {
return zero, fmt.Errorf("extract payload: %w", err)
}
var parsed T
if err := json.Unmarshal(b, &parsed); err != nil {
return zero, fmt.Errorf("decode raw payload: %w", err)
}
return parsed, nil
} }
// NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern: // NormalizeJSON is a convenience wrapper for the common JSON-normalizer pattern:

View File

@@ -1,53 +0,0 @@
package common
import (
"encoding/json"
"fmt"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// PayloadBytes extracts a JSON payload into bytes suitable for json.Unmarshal.
//
// Supported payload shapes (weatherfeeder convention):
// - json.RawMessage (recommended for raw events)
// - []byte
// - string (assumed to contain JSON)
// - map[string]any (re-marshaled to JSON)
//
// If you add other raw representations later, extend this function.
func PayloadBytes(e event.Event) ([]byte, error) {
if e.Payload == nil {
return nil, fmt.Errorf("payload is nil")
}
switch v := e.Payload.(type) {
case json.RawMessage:
if len(v) == 0 {
return nil, fmt.Errorf("payload is empty json.RawMessage")
}
return []byte(v), nil
case []byte:
if len(v) == 0 {
return nil, fmt.Errorf("payload is empty []byte")
}
return v, nil
case string:
if v == "" {
return nil, fmt.Errorf("payload is empty string")
}
return []byte(v), nil
case map[string]any:
b, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("marshal map payload: %w", err)
}
return b, nil
default:
return nil, fmt.Errorf("unsupported payload type %T", e.Payload)
}
}

View File

@@ -75,62 +75,63 @@ func normalizeNarrativeForecastEvent(in event.Event) (*event.Event, error) {
) )
} }
type forecastPeriodMapper[T any] func(idx int, period T) (model.WeatherForecastPeriod, error)
// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model). // buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) { func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) return buildForecastRun(
if err != nil { parsed.Properties.GeneratedAt,
return model.WeatherForecastRun{}, time.Time{}, err parsed.Properties.UpdateTime,
} parsed.Geometry.Coordinates,
// Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
run := newForecastRunBase(
issuedAt,
updatedAt,
model.ForecastProductHourly,
lat,
lon,
parsed.Properties.Elevation.Value, parsed.Properties.Elevation.Value,
model.ForecastProductHourly,
parsed.Properties.Periods,
mapHourlyForecastPeriod,
) )
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
for i, p := range parsed.Properties.Periods {
period, err := mapHourlyForecastPeriod(i, p)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
periods = append(periods, period)
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
} }
// buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model). // buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model).
func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) { func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) return buildForecastRun(
parsed.Properties.GeneratedAt,
parsed.Properties.UpdateTime,
parsed.Geometry.Coordinates,
parsed.Properties.Elevation.Value,
model.ForecastProductNarrative,
parsed.Properties.Periods,
mapNarrativeForecastPeriod,
)
}
func buildForecastRun[T any](
generatedAt string,
updateTime string,
coordinates [][][]float64,
elevation *float64,
product model.ForecastProduct,
srcPeriods []T,
mapPeriod forecastPeriodMapper[T],
) (model.WeatherForecastRun, time.Time, error) {
issuedAt, updatedAt, err := parseForecastRunTimes(generatedAt, updateTime)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err return model.WeatherForecastRun{}, time.Time{}, err
} }
// Best-effort location centroid from the GeoJSON polygon (optional). // Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates) lat, lon := centroidLatLon(coordinates)
run := newForecastRunBase( run := newForecastRunBase(
issuedAt, issuedAt,
updatedAt, updatedAt,
model.ForecastProductNarrative, product,
lat, lat,
lon, lon,
parsed.Properties.Elevation.Value, elevation,
) )
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) periods := make([]model.WeatherForecastPeriod, 0, len(srcPeriods))
for i, p := range parsed.Properties.Periods { for i, p := range srcPeriods {
period, err := mapNarrativeForecastPeriod(i, p) period, err := mapPeriod(i, p)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err return model.WeatherForecastRun{}, time.Time{}, err
} }

View File

@@ -47,6 +47,55 @@ func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) {
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0]) assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
} }
func TestBuildHourlyForecastPreservesUpdatedAtCentroidAndElevation(t *testing.T) {
parsed := nwsHourlyForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z"
parsed.Properties.UpdateTime = "2026-03-16T18:30:00Z"
elevation := 123.4
parsed.Properties.Elevation.Value = &elevation
parsed.Geometry.Coordinates = [][][]float64{
{
{-90.0, 38.0},
{-89.0, 38.0},
{-89.0, 39.0},
{-90.0, 39.0},
},
}
parsed.Properties.Periods = []nwsHourlyForecastPeriod{
{
StartTime: "2026-03-16T19:00:00Z",
EndTime: "2026-03-16T20:00:00Z",
ShortForecast: "Cloudy",
},
}
run, effectiveAt, err := buildHourlyForecast(parsed)
if err != nil {
t.Fatalf("buildHourlyForecast() error = %v", err)
}
wantIssued := time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC)
wantUpdated := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
if !run.IssuedAt.Equal(wantIssued) {
t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
if run.UpdatedAt == nil || !run.UpdatedAt.Equal(wantUpdated) {
t.Fatalf("UpdatedAt = %v, want %s", run.UpdatedAt, wantUpdated.Format(time.RFC3339))
}
if run.Latitude == nil || math.Abs(*run.Latitude-38.5) > 0.0001 {
t.Fatalf("Latitude = %v, want 38.5", run.Latitude)
}
if run.Longitude == nil || math.Abs(*run.Longitude+89.5) > 0.0001 {
t.Fatalf("Longitude = %v, want -89.5", run.Longitude)
}
if run.ElevationMeters == nil || math.Abs(*run.ElevationMeters-elevation) > 0.0001 {
t.Fatalf("ElevationMeters = %v, want %.1f", run.ElevationMeters, elevation)
}
if !effectiveAt.Equal(wantIssued) {
t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339))
}
}
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) { func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{ _, err := normalizeForecastEventBySchema(event.Event{
Schema: "raw.nws.daily.forecast.v1", Schema: "raw.nws.daily.forecast.v1",
@@ -85,6 +134,70 @@ func TestNormalizeForecastEventBySchemaRoutesNarrative(t *testing.T) {
} }
} }
func TestNormalizeForecastEventBySchemaProducesCanonicalWeatherForecastSchema(t *testing.T) {
tests := []struct {
name string
schema string
payload map[string]any
}{
{
name: "hourly",
schema: standards.SchemaRawNWSHourlyForecastV1,
payload: map[string]any{
"properties": map[string]any{
"generatedAt": "2026-03-16T18:00:00Z",
"periods": []map[string]any{
{
"startTime": "2026-03-16T19:00:00Z",
"endTime": "2026-03-16T20:00:00Z",
"shortForecast": "Cloudy",
},
},
},
},
},
{
name: "narrative",
schema: standards.SchemaRawNWSNarrativeForecastV1,
payload: map[string]any{
"properties": map[string]any{
"generatedAt": "2026-03-16T18:00:00Z",
"periods": []map[string]any{
{
"startTime": "2026-03-16T19:00:00Z",
"endTime": "2026-03-16T20:00:00Z",
"shortForecast": "Cloudy",
"detailedForecast": "Cloudy",
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := normalizeForecastEventBySchema(event.Event{
ID: "evt-1",
Kind: event.Kind("forecast"),
Source: "nws-test",
EmittedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
Schema: tt.schema,
Payload: tt.payload,
})
if err != nil {
t.Fatalf("normalizeForecastEventBySchema() error = %v", err)
}
if out == nil {
t.Fatalf("normalizeForecastEventBySchema() returned nil output")
}
if out.Schema != standards.SchemaWeatherForecastV1 {
t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastV1)
}
})
}
}
func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) { func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) {
parsed := nwsNarrativeForecastResponse{} parsed := nwsNarrativeForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z" parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z"

View File

@@ -5,18 +5,13 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
var builtins = []fknormalize.Normalizer{
ObservationNormalizer{},
ForecastNormalizer{},
AlertsNormalizer{},
}
// Register appends NWS normalizers in stable order. // Register appends NWS normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in return append(in, builtins...)
// Observations
out = append(out, ObservationNormalizer{})
// Forecasts
out = append(out, ForecastNormalizer{})
// Alerts
out = append(out, AlertsNormalizer{})
return out
} }

View File

@@ -5,14 +5,12 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
var builtins = []fknormalize.Normalizer{
ObservationNormalizer{},
ForecastNormalizer{},
}
// Register appends Open-Meteo normalizers in stable order. // Register appends Open-Meteo normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in return append(in, builtins...)
// Observations
out = append(out, ObservationNormalizer{})
// Forecasts
out = append(out, ForecastNormalizer{})
return out
} }

View File

@@ -5,12 +5,11 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
) )
var builtins = []fknormalize.Normalizer{
ObservationNormalizer{},
}
// Register appends OpenWeather normalizers in stable order. // Register appends OpenWeather normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in return append(in, builtins...)
// Observations
out = append(out, ObservationNormalizer{})
return out
} }

View File

@@ -238,7 +238,7 @@ func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWri
} }
func tableColumnCounts() map[string]int { func tableColumnCounts() map[string]int {
s := weatherPostgresSchema() s := PostgresSchema()
m := make(map[string]int, len(s.Tables)) m := make(map[string]int, len(s.Tables))
for _, tbl := range s.Tables { for _, tbl := range s.Tables {
m[tbl.Name] = len(tbl.Columns) m[tbl.Name] = len(tbl.Columns)

View File

@@ -1,10 +1,6 @@
package postgres package postgres
import ( import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
) )
@@ -18,31 +14,8 @@ const (
tableAlertReferences = "alert_references" tableAlertReferences = "alert_references"
) )
// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each // PostgresSchema returns weatherfeeder's Postgres schema definition.
// configured sink using driver=postgres. func PostgresSchema() fksinks.PostgresSchema {
func RegisterPostgresSchemas(cfg *config.Config) error {
if cfg == nil {
return fmt.Errorf("register postgres schemas: config is nil")
}
schema := weatherPostgresSchema()
for i, sk := range cfg.Sinks {
if !isPostgresDriver(sk.Driver) {
continue
}
if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil {
return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err)
}
}
return nil
}
func isPostgresDriver(driver string) bool {
return strings.EqualFold(strings.TrimSpace(driver), "postgres")
}
func weatherPostgresSchema() fksinks.PostgresSchema {
return fksinks.PostgresSchema{ return fksinks.PostgresSchema{
Tables: []fksinks.PostgresTable{ Tables: []fksinks.PostgresTable{
{ {

View File

@@ -1,62 +1,11 @@
package postgres package postgres
import ( import "testing"
"fmt"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
err := RegisterPostgresSchemas(nil)
if err == nil {
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
}
if !strings.Contains(err.Error(), "config is nil") {
t.Fatalf("error = %q, want config is nil", err)
}
}
func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
cfg := &config.Config{
Sinks: []config.SinkConfig{
{Name: "stdout_only", Driver: "stdout"},
{Name: "nats_only", Driver: "nats"},
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
}
}
func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
sinkName := uniqueSinkName("pg_test")
cfg := &config.Config{
Sinks: []config.SinkConfig{
{Name: sinkName, Driver: "postgres"},
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
}
err := RegisterPostgresSchemas(cfg)
if err == nil {
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
}
if !strings.Contains(err.Error(), "already registered") {
t.Fatalf("error = %q, want already registered", err)
}
}
func TestWeatherPostgresSchemaShape(t *testing.T) { func TestWeatherPostgresSchemaShape(t *testing.T) {
s := weatherPostgresSchema() s := PostgresSchema()
if s.MapEvent == nil { if s.MapEvent == nil {
t.Fatalf("weatherPostgresSchema().MapEvent is nil") t.Fatalf("PostgresSchema().MapEvent is nil")
} }
wantTables := map[string]bool{ wantTables := map[string]bool{
@@ -70,7 +19,7 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
} }
if len(s.Tables) != len(wantTables) { if len(s.Tables) != len(wantTables) {
t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables)) t.Fatalf("PostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
} }
seenIndexes := map[string]bool{} seenIndexes := map[string]bool{}
@@ -89,7 +38,3 @@ func TestWeatherPostgresSchemaShape(t *testing.T) {
} }
} }
} }
func uniqueSinkName(prefix string) string {
return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano())
}

View File

@@ -9,33 +9,30 @@ import (
fksource "gitea.maximumdirect.net/ejr/feedkit/sources" fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
) )
type pollDriverRegistration struct {
driver string
factory func(config.SourceConfig) (fksource.PollSource, error)
}
var pollDriverRegistrations = []pollDriverRegistration{
{driver: "nws_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewObservationSource(cfg) }},
{driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }},
{driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }},
{driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }},
{driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }},
{driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }},
{driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
}},
}
// RegisterBuiltins registers the source drivers that ship with this binary. // RegisterBuiltins registers the source drivers that ship with this binary.
// Keeping this in one place makes main.go very readable. // Keeping this in one place makes main.go very readable.
func RegisterBuiltins(r *fksource.Registry) { func RegisterBuiltins(r *fksource.Registry) {
// NWS drivers for _, reg := range pollDriverRegistrations {
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) { reg := reg
return nws.NewObservationSource(cfg) r.RegisterPoll(reg.driver, func(cfg config.SourceConfig) (fksource.PollSource, error) {
}) return reg.factory(cfg)
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) { })
return nws.NewAlertsSource(cfg) }
})
r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewHourlyForecastSource(cfg)
})
r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewNarrativeForecastSource(cfg)
})
// Open-Meteo drivers
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewObservationSource(cfg)
})
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewForecastSource(cfg)
})
// OpenWeatherMap drivers
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
})
} }

View File

@@ -47,13 +47,42 @@ func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
} }
} }
func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
drivers := []string{
"nws_observation",
"nws_alerts",
"nws_forecast_hourly",
"nws_forecast_narrative",
"openmeteo_observation",
"openmeteo_forecast",
"openweather_observation",
}
for _, driver := range drivers {
in, err := reg.BuildInput(sourceConfigForDriver(driver))
if err != nil {
t.Fatalf("BuildInput(%s) error = %v", driver, err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(%s) type = %T, want PollSource", driver, in)
}
}
}
func sourceConfigForDriver(driver string) config.SourceConfig { func sourceConfigForDriver(driver string) config.SourceConfig {
url := "https://example.invalid"
if driver == "openweather_observation" {
url = "https://example.invalid?units=metric"
}
return config.SourceConfig{ return config.SourceConfig{
Name: "test-source", Name: "test-source",
Driver: driver, Driver: driver,
Mode: config.SourceModePoll, Mode: config.SourceModePoll,
Params: map[string]any{ Params: map[string]any{
"url": "https://example.invalid", "url": url,
"user_agent": "test-agent", "user_agent": "test-agent",
}, },
} }

View File

@@ -1,54 +0,0 @@
// FILE: ./internal/sources/common/event.go
package common
import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
// SingleRawEvent constructs, validates, and returns a slice containing exactly one event.
//
// This removes repetitive "event envelope ceremony" from individual sources.
// Sources remain responsible for:
// - fetching bytes (raw payload)
// - choosing Schema (raw schema identifier)
// - computing Event.ID and (optional) EffectiveAt
//
// emittedAt is explicit so callers can compute IDs using the same timestamp (or
// so tests can provide a stable value).
func SingleRawEvent(
kind event.Kind,
sourceName string,
schema string,
id string,
emittedAt time.Time,
effectiveAt *time.Time,
payload any,
) ([]event.Event, error) {
if emittedAt.IsZero() {
emittedAt = time.Now().UTC()
} else {
emittedAt = emittedAt.UTC()
}
e := event.Event{
ID: id,
Kind: kind,
Source: sourceName,
EmittedAt: emittedAt,
EffectiveAt: effectiveAt,
// RAW schema (normalizer matches on this).
Schema: schema,
// Raw payload (usually json.RawMessage). Normalizer will decode and map to canonical model.
Payload: payload,
}
if err := e.Validate(); err != nil {
return nil, err
}
return []event.Event{e}, nil
}

View File

@@ -1,39 +0,0 @@
// FILE: ./internal/sources/common/id.go
package common
import (
"fmt"
"strings"
"time"
)
// ChooseEventID applies weatherfeeder's opinionated Event.ID policy:
//
// - If upstream provides an ID, use it (trimmed).
// - Otherwise, ID is "<Source>:<EffectiveAt>" when available.
// - If EffectiveAt is unavailable, fall back to "<Source>:<EmittedAt>".
//
// Timestamps are encoded as RFC3339Nano in UTC.
func ChooseEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string {
if id := strings.TrimSpace(upstreamID); id != "" {
return id
}
src := strings.TrimSpace(sourceName)
if src == "" {
src = "UNKNOWN_SOURCE"
}
// Prefer EffectiveAt for dedupe friendliness.
if effectiveAt != nil && !effectiveAt.IsZero() {
return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano))
}
// Fall back to EmittedAt (still stable within a poll invocation).
t := emittedAt.UTC()
if t.IsZero() {
t = time.Now().UTC()
}
return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano))
}

View File

@@ -11,7 +11,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -40,8 +39,8 @@ func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
func (s *AlertsSource) Name() string { return s.http.Name } func (s *AlertsSource) Name() string { return s.http.Name }
// Kind is used for routing/policy. // Kinds is used for routing/policy.
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } func (s *AlertsSource) Kinds() []event.Kind { return []event.Kind{event.Kind("alert")} }
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -69,10 +68,10 @@ func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
// NWS alerts collections do not provide a stable per-snapshot ID. // NWS alerts collections do not provide a stable per-snapshot ID.
// Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness. // Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness.
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("alert"),
s.http.Name, s.http.Name,
standards.SchemaRawNWSAlertsV1, standards.SchemaRawNWSAlertsV1,
eventID, eventID,

View File

@@ -0,0 +1,114 @@
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
)
const nwsForecastAccept = "application/geo+json, application/json"
type forecastSource struct {
http *fksources.HTTPSource
rawSchema string
}
type forecastMeta struct {
Properties struct {
GeneratedAt string `json:"generatedAt"`
UpdateTime string `json:"updateTime"`
Updated string `json:"updated"`
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func newForecastSource(cfg config.SourceConfig, driver, rawSchema string) (*forecastSource, error) {
hs, err := fksources.NewHTTPSource(driver, cfg, nwsForecastAccept)
if err != nil {
return nil, err
}
return &forecastSource{
http: hs,
rawSchema: rawSchema,
}, nil
}
func (s *forecastSource) Name() string { return s.http.Name }
func (s *forecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} }
func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return fksources.SingleEvent(
event.Kind("forecast"),
s.http.Name,
s.rawSchema,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
func (s *forecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, forecastMeta{}, false, err
}
if !changed {
return nil, forecastMeta{}, false, nil
}
var meta forecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
return raw, forecastMeta{}, true, nil
}
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
}

View File

@@ -2,16 +2,7 @@
package nws package nws
import ( import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -23,114 +14,15 @@ import (
// Output schema (current implementation): // Output schema (current implementation):
// - standards.SchemaRawNWSHourlyForecastV1 // - standards.SchemaRawNWSHourlyForecastV1
type HourlyForecastSource struct { type HourlyForecastSource struct {
http *fksources.HTTPSource *forecastSource
} }
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) { func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
const driver = "nws_forecast_hourly" const driver = "nws_forecast_hourly"
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSHourlyForecastV1)
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &HourlyForecastSource{http: hs}, nil return &HourlyForecastSource{forecastSource: src}, nil
}
func (s *HourlyForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSHourlyForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type hourlyForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, hourlyForecastMeta{}, false, err
}
if !changed {
return nil, hourlyForecastMeta{}, false, nil
}
var meta hourlyForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, hourlyForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
} }

View File

@@ -2,16 +2,7 @@
package nws package nws
import ( import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -23,114 +14,15 @@ import (
// Output schema: // Output schema:
// - standards.SchemaRawNWSNarrativeForecastV1 // - standards.SchemaRawNWSNarrativeForecastV1
type NarrativeForecastSource struct { type NarrativeForecastSource struct {
http *fksources.HTTPSource *forecastSource
} }
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) { func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
const driver = "nws_forecast_narrative" const driver = "nws_forecast_narrative"
src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSNarrativeForecastV1)
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &NarrativeForecastSource{http: hs}, nil return &NarrativeForecastSource{forecastSource: src}, nil
}
func (s *NarrativeForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
if !changed {
return nil, nil
}
// EffectiveAt is optional; for forecasts its most naturally the run “issued” time.
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
var effectiveAt *time.Time
switch {
case !meta.ParsedGeneratedAt.IsZero():
t := meta.ParsedGeneratedAt.UTC()
effectiveAt = &t
case !meta.ParsedUpdateTime.IsZero():
t := meta.ParsedUpdateTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL.
// That is *not* unique per issued run, so we intentionally do not use it for Event.ID.
// Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback).
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSNarrativeForecastV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
type narrativeForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"`
Properties struct {
GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time
UpdateTime string `json:"updateTime"` // last update time of underlying data
Updated string `json:"updated"` // deprecated alias for updateTime
} `json:"properties"`
ParsedGeneratedAt time.Time `json:"-"`
ParsedUpdateTime time.Time `json:"-"`
}
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
if err != nil {
return nil, narrativeForecastMeta{}, false, err
}
if !changed {
return nil, narrativeForecastMeta{}, false, nil
}
var meta narrativeForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, narrativeForecastMeta{}, true, nil
}
// generatedAt (preferred)
genStr := strings.TrimSpace(meta.Properties.GeneratedAt)
if genStr != "" {
if t, err := nwscommon.ParseTime(genStr); err == nil {
meta.ParsedGeneratedAt = t.UTC()
}
}
// updateTime, with fallback to deprecated "updated"
updStr := strings.TrimSpace(meta.Properties.UpdateTime)
if updStr == "" {
updStr = strings.TrimSpace(meta.Properties.Updated)
}
if updStr != "" {
if t, err := nwscommon.ParseTime(updStr); err == nil {
meta.ParsedUpdateTime = t.UTC()
}
}
return raw, meta, true, nil
} }

View File

@@ -0,0 +1,189 @@
package nws
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type forecastPoller interface {
Poll(ctx context.Context) ([]event.Event, error)
}
func TestForecastSourcesEmitExpectedSchemaAndPreferGeneratedAt(t *testing.T) {
tests := []struct {
name string
driver string
wantSchema string
newSource func(config.SourceConfig) (forecastPoller, error)
}{
{
name: "hourly",
driver: "nws_forecast_hourly",
wantSchema: standards.SchemaRawNWSHourlyForecastV1,
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
return NewHourlyForecastSource(cfg)
},
},
{
name: "narrative",
driver: "nws_forecast_narrative",
wantSchema: standards.SchemaRawNWSNarrativeForecastV1,
newSource: func(cfg config.SourceConfig) (forecastPoller, error) {
return NewNarrativeForecastSource(cfg)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"properties":{"generatedAt":"2026-03-28T12:00:00Z","updateTime":"2026-03-28T11:00:00Z"}}`))
}))
defer srv.Close()
src, err := tt.newSource(forecastSourceConfig(tt.driver, srv.URL))
if err != nil {
t.Fatalf("newSource() error = %v", err)
}
if ks, ok := src.(interface{ Kinds() []event.Kind }); !ok {
t.Fatalf("source does not implement Kinds()")
} else if gotKinds := ks.Kinds(); len(gotKinds) != 1 || gotKinds[0] != event.Kind("forecast") {
t.Fatalf("Kinds() = %#v, want [forecast]", gotKinds)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if got[0].Schema != tt.wantSchema {
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, tt.wantSchema)
}
if got[0].Kind != event.Kind("forecast") {
t.Fatalf("Poll() kind = %q, want forecast", got[0].Kind)
}
wantEffectiveAt := time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC)
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(wantEffectiveAt) {
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, wantEffectiveAt)
}
})
}
}
func TestForecastSourcePollEffectiveAtFallbackOrder(t *testing.T) {
tests := []struct {
name string
body string
wantEffectiveAt *time.Time
}{
{
name: "updateTime fallback",
body: `{"properties":{"updateTime":"2026-03-28T11:00:00Z"}}`,
wantEffectiveAt: func() *time.Time {
t := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC)
return &t
}(),
},
{
name: "updated fallback",
body: `{"properties":{"updated":"2026-03-28T10:00:00Z"}}`,
wantEffectiveAt: func() *time.Time {
t := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC)
return &t
}(),
},
{
name: "omitted when metadata lacks timestamps",
body: `{"properties":{}}`,
wantEffectiveAt: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(tt.body))
}))
defer srv.Close()
src, err := NewHourlyForecastSource(forecastSourceConfig("nws_forecast_hourly", srv.URL))
if err != nil {
t.Fatalf("NewHourlyForecastSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if tt.wantEffectiveAt == nil {
if got[0].EffectiveAt != nil {
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
}
return
}
if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(*tt.wantEffectiveAt) {
t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, *tt.wantEffectiveAt)
}
})
}
}
func TestForecastSourcePollMetadataDecodeFailureStillEmitsRawEvent(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`not-json`))
}))
defer srv.Close()
src, err := NewNarrativeForecastSource(forecastSourceConfig("nws_forecast_narrative", srv.URL))
if err != nil {
t.Fatalf("NewNarrativeForecastSource() error = %v", err)
}
got, err := src.Poll(context.Background())
if err != nil {
t.Fatalf("Poll() error = %v", err)
}
if len(got) != 1 {
t.Fatalf("Poll() len = %d, want 1", len(got))
}
if got[0].EffectiveAt != nil {
t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt)
}
if got[0].Schema != standards.SchemaRawNWSNarrativeForecastV1 {
t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, standards.SchemaRawNWSNarrativeForecastV1)
}
raw, ok := got[0].Payload.(json.RawMessage)
if !ok {
t.Fatalf("Poll() payload type = %T, want json.RawMessage", got[0].Payload)
}
if string(raw) != "not-json" {
t.Fatalf("Poll() payload = %q, want %q", string(raw), "not-json")
}
}
func forecastSourceConfig(driver, url string) config.SourceConfig {
return config.SourceConfig{
Name: "test-forecast-source",
Driver: driver,
Mode: config.SourceModePoll,
Params: map[string]any{
"url": url,
"user_agent": "test-agent",
},
}
}

View File

@@ -11,7 +11,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -33,7 +32,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -52,10 +51,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("observation"),
s.http.Name, s.http.Name,
standards.SchemaRawNWSObservationV1, standards.SchemaRawNWSObservationV1,
eventID, eventID,

View File

@@ -41,6 +41,9 @@ func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("NewObservationSource() error = %v", err) t.Fatalf("NewObservationSource() error = %v", err)
} }
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
t.Fatalf("Kinds() = %#v, want [observation]", got)
}
first, err := src.Poll(context.Background()) first, err := src.Poll(context.Background())
if err != nil { if err != nil {

View File

@@ -10,7 +10,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -32,7 +31,7 @@ func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
func (s *ForecastSource) Name() string { return s.http.Name } func (s *ForecastSource) Name() string { return s.http.Name }
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *ForecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} }
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -53,10 +52,10 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("forecast"),
s.http.Name, s.http.Name,
standards.SchemaRawOpenMeteoHourlyForecastV1, standards.SchemaRawOpenMeteoHourlyForecastV1,
eventID, eventID,

View File

@@ -10,7 +10,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -32,7 +31,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, changed, err := s.fetchRaw(ctx) raw, meta, changed, err := s.fetchRaw(ctx)
@@ -50,10 +49,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("observation"),
s.http.Name, s.http.Name,
standards.SchemaRawOpenMeteoCurrentV1, standards.SchemaRawOpenMeteoCurrentV1,
eventID, eventID,

View File

@@ -0,0 +1,44 @@
package openmeteo
import (
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestObservationSourceAdvertisesKinds(t *testing.T) {
src, err := NewObservationSource(config.SourceConfig{
Name: "openmeteo-observation-test",
Driver: "openmeteo_observation",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewObservationSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
t.Fatalf("Kinds() = %#v, want [observation]", got)
}
}
func TestForecastSourceAdvertisesKinds(t *testing.T) {
src, err := NewForecastSource(config.SourceConfig{
Name: "openmeteo-forecast-test",
Driver: "openmeteo_forecast",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewForecastSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast") {
t.Fatalf("Kinds() = %#v, want [forecast]", got)
}
}

View File

@@ -11,7 +11,6 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather" owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
@@ -36,7 +35,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Name() string { return s.http.Name }
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} }
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
if err := owcommon.RequireMetricUnits(s.http.URL); err != nil { if err := owcommon.RequireMetricUnits(s.http.URL); err != nil {
@@ -58,10 +57,10 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
} }
emittedAt := time.Now().UTC() emittedAt := time.Now().UTC()
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent( return fksources.SingleEvent(
s.Kind(), event.Kind("observation"),
s.http.Name, s.http.Name,
standards.SchemaRawOpenWeatherCurrentV1, standards.SchemaRawOpenWeatherCurrentV1,
eventID, eventID,

View File

@@ -0,0 +1,26 @@
package openweather
import (
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
)
func TestObservationSourceAdvertisesKinds(t *testing.T) {
src, err := NewObservationSource(config.SourceConfig{
Name: "openweather-observation-test",
Driver: "openweather_observation",
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid?units=metric",
"user_agent": "test-agent",
},
})
if err != nil {
t.Fatalf("NewObservationSource() error = %v", err)
}
if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") {
t.Fatalf("Kinds() = %#v, want [observation]", got)
}
}