diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4b30d93 --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# ---> Go +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work +go.work.sum + +# env file +.env + +# compiled binary for this application +cmd/weatherfeeder/weatherfeeder diff --git a/README.md b/README.md index 3c99e53..7f27002 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ # weatherfeeder -A small daemon to poll weather observations, alerts, and forecasts from a variety of sources. \ No newline at end of file +A small daemon to poll weather observations, alerts, and forecasts from a variety of sources. \ No newline at end of file diff --git a/cmd/weatherfeeder/config.yml b/cmd/weatherfeeder/config.yml new file mode 100644 index 0000000..a7bc90d --- /dev/null +++ b/cmd/weatherfeeder/config.yml @@ -0,0 +1,66 @@ +--- +sources: + - name: NWSObservationKSTL + kind: observation + driver: nws_observation + every: 12m + params: + url: "https://api.weather.gov/stations/KSTL/observations/latest" + user_agent: "HomeOps (eric@maximumdirect.net)" + + - name: OpenMeteoObservation + kind: observation + driver: openmeteo_observation + every: 12m + params: + url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1" + user_agent: "HomeOps (eric@maximumdirect.net)" + + - name: OpenWeatherObservation + kind: observation + driver: openweather_observation + every: 12m + params: + url: "https://api.openweathermap.org/data/2.5/weather?lat=38.6239&lon=-90.3571&appid=c954f2566cb7ccb56b43737b52e88fc6&units=metric" + user_agent: "HomeOps (eric@maximumdirect.net)" + +# - name: NWSObservationKSUS +# kind: observation +# driver: nws_observation +# every: 18s +# params: +# url: "https://api.weather.gov/stations/KSUS/observations/latest" +# user_agent: "HomeOps (eric@maximumdirect.net)" + +# - name: NWSObservationKCPS +# kind: observation +# driver: nws_observation +# every: 12m +# params: +# url: "https://api.weather.gov/stations/KCPS/observations/latest" +# user_agent: "HomeOps (eric@maximumdirect.net)" + +# - name: NWSAlertsSTL +# kind: alert +# driver: nws_alerts +# every: 1m +# params: +# url: "https://api.weather.gov/alerts?point=38.6239,-90.3571&limit=500" +# user_agent: "HomeOps (eric@maximumdirect.net)" + +sinks: + - name: stdout + driver: stdout + params: {} + +# - name: logfile +# driver: file +# params: +# path: "/Users/eric/weatherd.log" + +routes: + - sink: stdout + kinds: ["observation"] + +# - sink: logfile +# kinds: ["observation", "alert", "forecast"] diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go new file mode 100644 index 0000000..74044a7 --- /dev/null +++ b/cmd/weatherfeeder/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" + fkevent "gitea.maximumdirect.net/ejr/feedkit/event" + fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" + fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" + fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" + + wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources" +) + +func main() { + log.SetFlags(log.LstdFlags | log.Lmicroseconds) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + cfgPath := "config.yml" + cfg, err := config.Load(cfgPath) + if err != nil { + log.Fatalf("config load failed: %v", err) + } + + // --- Registries --- + srcReg := fksources.NewRegistry() + wfsources.RegisterBuiltins(srcReg) + + // Minimal sink set to compile: stdout only. + sinkReg := fksinks.NewRegistry() + sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) { + return fksinks.NewStdoutSink(cfg.Name), nil + }) + + // --- Build sources into scheduler jobs --- + var jobs []fkscheduler.Job + for i, sc := range cfg.Sources { + src, err := srcReg.Build(sc) + if err != nil { + log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) + } + + // Optional safety: if config.kind is set, ensure it matches the source.Kind(). + if strings.TrimSpace(sc.Kind) != "" { + expectedKind, err := fkevent.ParseKind(sc.Kind) + if err != nil { + log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err) + } + if src.Kind() != expectedKind { + log.Fatalf( + "source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q", + i, sc.Name, sc.Driver, expectedKind, src.Kind(), + ) + } + } + + jobs = append(jobs, fkscheduler.Job{ + Source: src, + Every: sc.Every.Duration, + }) + } + + // --- Build sinks --- + builtSinks := map[string]fksinks.Sink{} + for i, sk := range cfg.Sinks { + s, err := sinkReg.Build(sk) + if err != nil { + log.Fatalf("build sink failed (sinks[%d] name=%q driver=%q): %v", i, sk.Name, sk.Driver, err) + } + builtSinks[sk.Name] = s + } + + // --- Compile routes --- + routes, err := compileRoutes(cfg, builtSinks) + if err != nil { + log.Fatalf("compile routes failed: %v", err) + } + + events := make(chan fkevent.Event, 256) + + pl := &fkpipeline.Pipeline{ + Processors: nil, + } + + s := &fkscheduler.Scheduler{ + Jobs: jobs, + Out: events, + Logf: log.Printf, + } + + d := &fkdispatch.Dispatcher{ + In: events, + Pipeline: pl, + Sinks: builtSinks, + Routes: routes, + } + + errCh := make(chan error, 2) + + go func() { errCh <- s.Run(ctx) }() + go func() { errCh <- d.Run(ctx, log.Printf) }() + + for i := 0; i < 2; i++ { + err := <-errCh + if err == nil || isContextShutdown(err) { + continue + } + log.Printf("fatal error: %v", err) + cancel() + } + + log.Printf("shutdown complete") +} + +func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) { + if len(cfg.Routes) == 0 { + return defaultRoutes(builtSinks), nil + } + + var routes []fkdispatch.Route + for i, r := range cfg.Routes { + if strings.TrimSpace(r.Sink) == "" { + return nil, fmt.Errorf("routes[%d].sink is empty", i) + } + if _, ok := builtSinks[r.Sink]; !ok { + return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink) + } + + kinds := map[fkevent.Kind]bool{} + for j, k := range r.Kinds { + kind, err := fkevent.ParseKind(k) + if err != nil { + return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err) + } + kinds[kind] = true + } + + routes = append(routes, fkdispatch.Route{ + SinkName: r.Sink, + Kinds: kinds, + }) + } + + return routes, nil +} + +func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route { + // nil Kinds means "match all kinds" by convention + var allKinds map[fkevent.Kind]bool = nil + + routes := make([]fkdispatch.Route, 0, len(builtSinks)) + for name := range builtSinks { + routes = append(routes, fkdispatch.Route{ + SinkName: name, + Kinds: allKinds, + }) + } + return routes +} + +func isContextShutdown(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} + +// keep time imported (mirrors your previous main.go defensive trick) +var _ = time.Second diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..25da68f --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module gitea.maximumdirect.net/ejr/weatherfeeder + +go 1.22 + +require gitea.maximumdirect.net/ejr/feedkit v0.0.0 + +require gopkg.in/yaml.v3 v3.0.1 // indirect + +replace gitea.maximumdirect.net/ejr/feedkit => ../feedkit diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a62c313 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/model/alert.go b/internal/model/alert.go new file mode 100644 index 0000000..54fcdfc --- /dev/null +++ b/internal/model/alert.go @@ -0,0 +1,23 @@ +package model + +import "time" + +// Placeholder for NWS alerts (GeoJSON feature properties are rich). +type WeatherAlert struct { + ID string + + Event string + Headline string + Description string + Instruction string + + Severity string + Urgency string + Certainty string + + Sent *time.Time + Effective *time.Time + Expires *time.Time + + Areas []string +} diff --git a/internal/model/event.go b/internal/model/event.go new file mode 100644 index 0000000..0531348 --- /dev/null +++ b/internal/model/event.go @@ -0,0 +1,212 @@ +package model + +import ( + "errors" + "fmt" + "strings" + "time" +) + +// ErrInvalidEvent is a sentinel error used for errors.Is checks. +var ErrInvalidEvent = errors.New("invalid event") + +// EventValidationError reports one or more problems with an Event. +// +// We keep this structured because it makes debugging faster than a single +// "invalid event" string; you get all issues in one pass. +type EventValidationError struct { + Problems []string +} + +func (e *EventValidationError) Error() string { + if e == nil || len(e.Problems) == 0 { + return "invalid event" + } + var b strings.Builder + b.WriteString("invalid event:\n") + for _, p := range e.Problems { + b.WriteString(" - ") + b.WriteString(p) + b.WriteString("\n") + } + return strings.TrimRight(b.String(), "\n") +} + +// Is lets errors.Is(err, ErrInvalidEvent) work. +func (e *EventValidationError) Is(target error) bool { + return target == ErrInvalidEvent +} + +// Event is the normalized unit your pipeline moves around. +// It wraps exactly one of Observation/Forecast/Alert plus metadata. +type Event struct { + ID string // stable dedupe/storage key (source-defined or computed) + Kind Kind + Source string // configured source name (e.g. "NWSObservationKSTL") + EmittedAt time.Time // when *your* system emitted this event + EffectiveAt *time.Time // optional: “time the event applies” + + // Union payload: EXACTLY ONE must be non-nil. + Observation *WeatherObservation + Forecast *WeatherForecast + Alert *WeatherAlert +} + +// Validate enforces Event invariants. +// +// This is intentionally strict. If an event is invalid, we want to find out +// immediately rather than letting it drift into sinks or storage. +// +// Invariants enforced: +// - ID is non-empty +// - Kind is known +// - Source is non-empty +// - EmittedAt is non-zero +// - Exactly one payload pointer is non-nil +// - Kind matches the non-nil payload +func (e Event) Validate() error { + var problems []string + + if strings.TrimSpace(e.ID) == "" { + problems = append(problems, "ID is required") + } + if !e.Kind.IsKnown() { + problems = append(problems, fmt.Sprintf("Kind %q is not recognized", string(e.Kind))) + } + if strings.TrimSpace(e.Source) == "" { + problems = append(problems, "Source is required") + } + if e.EmittedAt.IsZero() { + problems = append(problems, "EmittedAt must be set (non-zero)") + } + + // Count payloads and ensure Kind matches. + payloadCount := 0 + if e.Observation != nil { + payloadCount++ + if e.Kind != KindObservation { + problems = append(problems, fmt.Sprintf("Observation payload present but Kind=%q", string(e.Kind))) + } + } + if e.Forecast != nil { + payloadCount++ + if e.Kind != KindForecast { + problems = append(problems, fmt.Sprintf("Forecast payload present but Kind=%q", string(e.Kind))) + } + } + if e.Alert != nil { + payloadCount++ + if e.Kind != KindAlert { + problems = append(problems, fmt.Sprintf("Alert payload present but Kind=%q", string(e.Kind))) + } + } + + if payloadCount == 0 { + problems = append(problems, "exactly one payload must be set; all payloads are nil") + } else if payloadCount > 1 { + problems = append(problems, "exactly one payload must be set; multiple payloads are non-nil") + } + + if len(problems) > 0 { + return &EventValidationError{Problems: problems} + } + return nil +} + +// NewObservationEvent constructs a valid observation Event. +// +// If emittedAt is zero, it defaults to time.Now().UTC(). +// effectiveAt is optional (nil allowed). +// +// The returned Event is guaranteed valid (or you get an error). +func NewObservationEvent( + id string, + source string, + emittedAt time.Time, + effectiveAt *time.Time, + obs *WeatherObservation, +) (Event, error) { + if obs == nil { + return Event{}, fmt.Errorf("%w: observation payload is nil", ErrInvalidEvent) + } + + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } + + e := Event{ + ID: strings.TrimSpace(id), + Kind: KindObservation, + Source: strings.TrimSpace(source), + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Observation: obs, + } + + if err := e.Validate(); err != nil { + return Event{}, err + } + return e, nil +} + +// NewForecastEvent constructs a valid forecast Event. +func NewForecastEvent( + id string, + source string, + emittedAt time.Time, + effectiveAt *time.Time, + fc *WeatherForecast, +) (Event, error) { + if fc == nil { + return Event{}, fmt.Errorf("%w: forecast payload is nil", ErrInvalidEvent) + } + + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } + + e := Event{ + ID: strings.TrimSpace(id), + Kind: KindForecast, + Source: strings.TrimSpace(source), + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Forecast: fc, + } + + if err := e.Validate(); err != nil { + return Event{}, err + } + return e, nil +} + +// NewAlertEvent constructs a valid alert Event. +func NewAlertEvent( + id string, + source string, + emittedAt time.Time, + effectiveAt *time.Time, + a *WeatherAlert, +) (Event, error) { + if a == nil { + return Event{}, fmt.Errorf("%w: alert payload is nil", ErrInvalidEvent) + } + + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } + + e := Event{ + ID: strings.TrimSpace(id), + Kind: KindAlert, + Source: strings.TrimSpace(source), + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Alert: a, + } + + if err := e.Validate(); err != nil { + return Event{}, err + } + return e, nil +} diff --git a/internal/model/forecast.go b/internal/model/forecast.go new file mode 100644 index 0000000..ce2f251 --- /dev/null +++ b/internal/model/forecast.go @@ -0,0 +1,17 @@ +package model + +import "time" + +// WeatherForecast identity fields (as you described). +type WeatherForecast struct { + IssuedBy string // e.g. "NWS" + IssuedAt time.Time // when forecast product was issued + ForecastType string // e.g. "hourly", "daily" + ForecastStart time.Time // start of the applicable forecast period + + // TODO: You’ll likely want ForecastEnd too. + + // TODO: Add meteorological fields you care about. + // Temperature, precip probability, wind, etc. + // Decide if you want a single "period" model or an array of periods. +} diff --git a/internal/model/kind.go b/internal/model/kind.go new file mode 100644 index 0000000..a11d65d --- /dev/null +++ b/internal/model/kind.go @@ -0,0 +1,23 @@ +package model + +// Kind identifies which payload an Event carries. +type Kind string + +const ( + KindObservation Kind = "observation" + KindForecast Kind = "forecast" + KindAlert Kind = "alert" +) + +// IsKnown returns true if k is one of the kinds supported by this binary. +// +// This is intentionally strict: if you add new kinds later, update this list. +// That keeps validation useful (it catches partially-constructed events). +func (k Kind) IsKnown() bool { + switch k { + case KindObservation, KindForecast, KindAlert: + return true + default: + return false + } +} diff --git a/internal/model/observation.go b/internal/model/observation.go new file mode 100644 index 0000000..071f536 --- /dev/null +++ b/internal/model/observation.go @@ -0,0 +1,72 @@ +package model + +import "time" + +type WeatherObservation struct { + // Identity / metadata + StationID string + StationName string + Timestamp time.Time + + // Canonical internal representation (provider-independent). + // + // ConditionCode should be populated by all sources. ConditionText should be the + // canonical human-readable string derived from the WMO code (not the provider's + // original wording). + // + // IsDay is optional; some providers supply a day/night flag (e.g., Open-Meteo), + // while others may not (e.g., NWS observations). When unknown, it can be nil. + ConditionCode WMOCode + ConditionText string + IsDay *bool + + // Provider-specific “evidence” for troubleshooting mapping and drift. + // + // This is intentionally limited: it is not intended to be used downstream for + // business logic. Downstream logic should rely on ConditionCode / ConditionText. + ProviderRawDescription string + + // Human-facing (legacy / transitional) + // + // TextDescription currently carries provider text in existing drivers. + // As we transition to WMO-based normalization, downstream presentation should + // switch to using ConditionText. After migration, this may be removed or repurposed. + TextDescription string + + // Provider-specific (legacy / transitional) + // + // IconURL is not part of the canonical internal vocabulary. It's retained only + // because current sources populate it; it is not required for downstream systems. + IconURL string + + // Core measurements (nullable) + TemperatureC *float64 + DewpointC *float64 + + WindDirectionDegrees *float64 + WindSpeedKmh *float64 + WindGustKmh *float64 + + BarometricPressurePa *float64 + SeaLevelPressurePa *float64 + VisibilityMeters *float64 + + RelativeHumidityPercent *float64 + WindChillC *float64 + HeatIndexC *float64 + + ElevationMeters *float64 + RawMessage string + + PresentWeather []PresentWeather + CloudLayers []CloudLayer +} + +type CloudLayer struct { + BaseMeters *float64 + Amount string +} + +type PresentWeather struct { + Raw map[string]any +} diff --git a/internal/model/wmo.go b/internal/model/wmo.go new file mode 100644 index 0000000..3f69d1c --- /dev/null +++ b/internal/model/wmo.go @@ -0,0 +1,15 @@ +package model + +// WMOCode is the canonical internal “current conditions” vocabulary. +// +// We standardize on the WMO weather interpretation codes used by providers like +// Open-Meteo, and we map other providers (e.g., NWS) into these codes. +// +// Reference codes include: 0,1,2,3,45,48,51,53,...,99. +type WMOCode int + +const ( + // WMOUnknown is used when we cannot confidently map an upstream condition + // into a known WMO code. + WMOUnknown WMOCode = -1 +) diff --git a/internal/sources/builtins.go b/internal/sources/builtins.go new file mode 100644 index 0000000..b87f80b --- /dev/null +++ b/internal/sources/builtins.go @@ -0,0 +1,56 @@ +package sources + +import ( + "fmt" + + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/openmeteo" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/openweather" + + "gitea.maximumdirect.net/ejr/feedkit/config" + fksource "gitea.maximumdirect.net/ejr/feedkit/sources" +) + +// RegisterBuiltins registers the source drivers that ship with this binary. +// Keeping this in one place makes main.go very readable. +func RegisterBuiltins(r *fksource.Registry) { + // NWS drivers + r.Register("nws_observation", func(cfg config.SourceConfig) (fksource.Source, error) { + return nws.NewObservationSource(cfg) + }) + r.Register("nws_alerts", func(cfg config.SourceConfig) (fksource.Source, error) { + return nws.NewAlertsSource(cfg) + }) + r.Register("nws_forecast", func(cfg config.SourceConfig) (fksource.Source, error) { + return nws.NewForecastSource(cfg) + }) + + // Open-Meteo drivers + r.Register("openmeteo_observation", func(cfg config.SourceConfig) (fksource.Source, error) { + return openmeteo.NewObservationSource(cfg) + }) + + // OpenWeatherMap drivers + r.Register("openweather_observation", func(cfg config.SourceConfig) (fksource.Source, error) { + return openweather.NewObservationSource(cfg) + }) +} + +// Optional: centralize some common config checks used by multiple drivers. +// +// NOTE: feedkit/config.SourceConfig intentionally keeps driver-specific options +// inside cfg.Params, so drivers can evolve independently without feedkit +// importing domain config packages. +func RequireURL(cfg config.SourceConfig) error { + if cfg.Params == nil { + return fmt.Errorf("source %q: params.url is required", cfg.Name) + } + + // Canonical key is "url". We also accept "URL" as a convenience. + url, ok := cfg.ParamString("url", "URL") + if !ok { + return fmt.Errorf("source %q: params.url is required", cfg.Name) + } + _ = url // (optional) return it if you want this helper to provide the value + return nil +} diff --git a/internal/sources/nws/alerts.go b/internal/sources/nws/alerts.go new file mode 100644 index 0000000..9dc44a9 --- /dev/null +++ b/internal/sources/nws/alerts.go @@ -0,0 +1,54 @@ +package nws + +import ( + "context" + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type AlertsSource struct { + name string + url string + userAgent string +} + +func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) { + if strings.TrimSpace(cfg.Name) == "" { + return nil, fmt.Errorf("nws_alerts: name is required") + } + if cfg.Params == nil { + return nil, fmt.Errorf("nws_alerts %q: params are required (need params.url and params.user_agent)", cfg.Name) + } + + // Driver-specific options live in cfg.Params to keep feedkit domain-agnostic. + // Use the typed accessor so callers can’t accidentally pass non-strings to TrimSpace. + url, ok := cfg.ParamString("url", "URL") + if !ok { + return nil, fmt.Errorf("nws_alerts %q: params.url is required", cfg.Name) + } + + ua, ok := cfg.ParamString("user_agent", "userAgent") + if !ok { + return nil, fmt.Errorf("nws_alerts %q: params.user_agent is required", cfg.Name) + } + + return &AlertsSource{ + name: cfg.Name, + url: url, + userAgent: ua, + }, nil +} + +func (s *AlertsSource) Name() string { return s.name } + +// Kind is used for routing/policy. +// The envelope type is event.Event; payload will eventually be something like model.WeatherAlert. +func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } + +func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { + _ = ctx + return nil, fmt.Errorf("nws.AlertsSource.Poll: TODO implement (url=%s)", s.url) +} diff --git a/internal/sources/nws/forecast.go b/internal/sources/nws/forecast.go new file mode 100644 index 0000000..d183067 --- /dev/null +++ b/internal/sources/nws/forecast.go @@ -0,0 +1,51 @@ +package nws + +import ( + "context" + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type ForecastSource struct { + name string + url string + userAgent string +} + +func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { + if strings.TrimSpace(cfg.Name) == "" { + return nil, fmt.Errorf("nws_forecast: name is required") + } + if cfg.Params == nil { + return nil, fmt.Errorf("nws_forecast %q: params are required (need params.url and params.user_agent)", cfg.Name) + } + + url, ok := cfg.ParamString("url", "URL") + if !ok { + return nil, fmt.Errorf("nws_forecast %q: params.url is required", cfg.Name) + } + + ua, ok := cfg.ParamString("user_agent", "userAgent") + if !ok { + return nil, fmt.Errorf("nws_forecast %q: params.user_agent is required", cfg.Name) + } + + return &ForecastSource{ + name: cfg.Name, + url: url, + userAgent: ua, + }, nil +} + +func (s *ForecastSource) Name() string { return s.name } + +// Kind is used for routing/policy. +func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } + +func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { + _ = ctx + return nil, fmt.Errorf("nws.ForecastSource.Poll: TODO implement (url=%s)", s.url) +} diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go new file mode 100644 index 0000000..db47778 --- /dev/null +++ b/internal/sources/nws/observation.go @@ -0,0 +1,709 @@ +package nws + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ObservationSource polls an NWS station observation endpoint and emits a single Observation Event. +// +// This corresponds to URLs like: +// +// https://api.weather.gov/stations/KSTL/observations/latest +type ObservationSource struct { + name string + url string + userAgent string + client *http.Client +} + +func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { + if strings.TrimSpace(cfg.Name) == "" { + return nil, fmt.Errorf("nws_observation: name is required") + } + if cfg.Params == nil { + return nil, fmt.Errorf("nws_observation %q: params are required (need params.url and params.user_agent)", cfg.Name) + } + + // feedkit keeps config domain-agnostic by storing driver-specific settings in Params. + // Use ParamString so we don't have to type-assert cfg.Params["url"] everywhere. + url, ok := cfg.ParamString("url", "URL") + if !ok { + return nil, fmt.Errorf("nws_observation %q: params.url is required", cfg.Name) + } + + ua, ok := cfg.ParamString("user_agent", "userAgent") + if !ok { + return nil, fmt.Errorf("nws_observation %q: params.user_agent is required", cfg.Name) + } + + // A small timeout is good hygiene for daemons: you want polls to fail fast, + // not hang forever and block subsequent ticks. + client := &http.Client{ + Timeout: 10 * time.Second, + } + + return &ObservationSource{ + name: cfg.Name, + url: url, + userAgent: ua, + client: client, + }, nil +} + +func (s *ObservationSource) Name() string { return s.name } + +// Kind is used for routing/policy. +func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } + +// Poll fetches "current conditions" and emits exactly one Event (under normal conditions). +func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { + obs, eventID, err := s.fetchAndParse(ctx) + if err != nil { + return nil, err + } + + // EffectiveAt is optional. + // For observations, the natural effective time is the observation timestamp. + var effectiveAt *time.Time + if !obs.Timestamp.IsZero() { + t := obs.Timestamp + effectiveAt = &t + } + + e := event.Event{ + ID: eventID, + Kind: s.Kind(), + Source: s.name, + EmittedAt: time.Now().UTC(), + EffectiveAt: effectiveAt, + + // Optional: makes downstream decoding/inspection easier. + Schema: "weather.observation.v1", + + // Payload remains domain-specific for now. + Payload: obs, + } + + if err := e.Validate(); err != nil { + return nil, err + } + + return []event.Event{e}, nil +} + +// --- JSON parsing (minimal model of NWS observation payload) --- + +type nwsObservationResponse struct { + ID string `json:"id"` // a stable unique identifier URL in the payload you pasted + Properties struct { + StationID string `json:"stationId"` + StationName string `json:"stationName"` + Timestamp string `json:"timestamp"` + TextDescription string `json:"textDescription"` + Icon string `json:"icon"` + + RawMessage string `json:"rawMessage"` + + Elevation struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"elevation"` + + Temperature struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"temperature"` + + Dewpoint struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"dewpoint"` + + WindDirection struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windDirection"` + + WindSpeed struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windSpeed"` + + WindGust struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windGust"` + + BarometricPressure struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"barometricPressure"` + + SeaLevelPressure struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"seaLevelPressure"` + + Visibility struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"visibility"` + + RelativeHumidity struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"relativeHumidity"` + + WindChill struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windChill"` + + HeatIndex struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"heatIndex"` + + // NWS returns "presentWeather" as decoded METAR phenomena objects. + // We decode these initially as generic maps so we can: + // 1) preserve the raw objects in model.PresentWeather{Raw: ...} + // 2) also decode them into a typed struct for our WMO mapping logic. + PresentWeather []map[string]any `json:"presentWeather"` + + CloudLayers []struct { + Base struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"base"` + Amount string `json:"amount"` + } `json:"cloudLayers"` + } `json:"properties"` +} + +// metarPhenomenon is a typed view of NWS presentWeather objects. +// You provided the schema for these values (intensity/modifier/weather/rawString). +type metarPhenomenon struct { + Intensity *string `json:"intensity"` // "light", "heavy", or null + Modifier *string `json:"modifier"` // "freezing", "showers", etc., or null + Weather string `json:"weather"` // e.g., "rain", "snow", "fog_mist", ... + RawString string `json:"rawString"` + // InVicinity exists in the schema; we ignore it for now because WMO codes + // don't directly represent "in vicinity" semantics. + InVicinity *bool `json:"inVicinity"` +} + +func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil) + if err != nil { + return model.WeatherObservation{}, "", err + } + + // NWS requests: a real User-Agent with contact info is strongly recommended. + req.Header.Set("User-Agent", s.userAgent) + req.Header.Set("Accept", "application/geo+json, application/json") + + res, err := s.client.Do(req) + if err != nil { + return model.WeatherObservation{}, "", err + } + defer res.Body.Close() + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return model.WeatherObservation{}, "", fmt.Errorf("nws_observation %q: HTTP %s", s.name, res.Status) + } + + var parsed nwsObservationResponse + if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { + return model.WeatherObservation{}, "", err + } + + // Parse timestamp (RFC3339) + var ts time.Time + if strings.TrimSpace(parsed.Properties.Timestamp) != "" { + t, err := time.Parse(time.RFC3339, parsed.Properties.Timestamp) + if err != nil { + return model.WeatherObservation{}, "", fmt.Errorf("nws_observation %q: invalid timestamp %q: %w", + s.name, parsed.Properties.Timestamp, err) + } + ts = t + } + + cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers)) + for _, cl := range parsed.Properties.CloudLayers { + cloudLayers = append(cloudLayers, model.CloudLayer{ + BaseMeters: cl.Base.Value, + Amount: cl.Amount, + }) + } + + // Preserve the raw presentWeather objects (as before) in the domain model. + present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather)) + for _, pw := range parsed.Properties.PresentWeather { + present = append(present, model.PresentWeather{Raw: pw}) + } + + // Decode presentWeather into a typed slice for improved mapping. + phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather) + + // Provider description (NWS vocabulary). We store this for troubleshooting only. + providerDesc := strings.TrimSpace(parsed.Properties.TextDescription) + + // Map NWS -> canonical WMO code using best-effort heuristics: + // 1) presentWeather (METAR phenomena) if present + // 2) provider textDescription keywords + // 3) cloud layers fallback + wmo := mapNWSToWMO(providerDesc, cloudLayers, phenomena) + + // Canonical text comes from our shared WMO table. + // NWS does not give us an explicit day/night flag here, so we leave it nil. + canonicalText := standards.WMOText(wmo, nil) + + obs := model.WeatherObservation{ + StationID: parsed.Properties.StationID, + StationName: parsed.Properties.StationName, + Timestamp: ts, + + // Canonical conditions + ConditionCode: wmo, + ConditionText: canonicalText, + IsDay: nil, + + // Provider evidence (for troubleshooting mapping) + ProviderRawDescription: providerDesc, + + // Human-facing fields: + // Populate TextDescription with canonical text so downstream output stays consistent. + TextDescription: canonicalText, + IconURL: parsed.Properties.Icon, + + TemperatureC: parsed.Properties.Temperature.Value, + DewpointC: parsed.Properties.Dewpoint.Value, + + WindDirectionDegrees: parsed.Properties.WindDirection.Value, + WindSpeedKmh: parsed.Properties.WindSpeed.Value, + WindGustKmh: parsed.Properties.WindGust.Value, + + BarometricPressurePa: parsed.Properties.BarometricPressure.Value, + SeaLevelPressurePa: parsed.Properties.SeaLevelPressure.Value, + VisibilityMeters: parsed.Properties.Visibility.Value, + + RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value, + WindChillC: parsed.Properties.WindChill.Value, + HeatIndexC: parsed.Properties.HeatIndex.Value, + + ElevationMeters: parsed.Properties.Elevation.Value, + RawMessage: parsed.Properties.RawMessage, + + PresentWeather: present, + CloudLayers: cloudLayers, + } + + // Event ID: prefer the NWS-provided "id" (stable unique URL), else fall back to computed. + eventID := strings.TrimSpace(parsed.ID) + if eventID == "" { + eventID = fmt.Sprintf("observation:%s:%s:%s", + s.name, + obs.StationID, + obs.Timestamp.UTC().Format(time.RFC3339Nano), + ) + } + + return obs, eventID, nil +} + +func decodeMetarPhenomena(raw []map[string]any) []metarPhenomenon { + if len(raw) == 0 { + return nil + } + + out := make([]metarPhenomenon, 0, len(raw)) + for _, m := range raw { + // Encode/decode is slightly inefficient, but it's simple and very readable. + // presentWeather payloads are small; this is fine for a polling daemon. + b, err := json.Marshal(m) + if err != nil { + continue + } + + var p metarPhenomenon + if err := json.Unmarshal(b, &p); err != nil { + continue + } + + p.Weather = strings.ToLower(strings.TrimSpace(p.Weather)) + p.RawString = strings.TrimSpace(p.RawString) + out = append(out, p) + } + return out +} + +// mapNWSToWMO maps NWS signals into a canonical WMO code. +// +// Precedence: +// 1. METAR phenomena (presentWeather) — most reliable for precip/hazards +// 2. textDescription keywords — weaker, but still useful +// 3. cloud layers fallback — only for sky-only conditions +func mapNWSToWMO(providerDesc string, cloudLayers []model.CloudLayer, phenomena []metarPhenomenon) model.WMOCode { + // 1) Prefer METAR phenomena if present. + if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown { + return code + } + + // 2) Fall back to provider textDescription keywords. + if code := wmoFromTextDescription(providerDesc); code != model.WMOUnknown { + return code + } + + // 3) Fall back to cloud layers. + if code := wmoFromCloudLayers(cloudLayers); code != model.WMOUnknown { + return code + } + + return model.WMOUnknown +} + +func wmoFromPhenomena(phenomena []metarPhenomenon) model.WMOCode { + if len(phenomena) == 0 { + return model.WMOUnknown + } + + // Helper accessors (avoid repeating nil checks everywhere). + intensityOf := func(p metarPhenomenon) string { + if p.Intensity == nil { + return "" + } + return strings.ToLower(strings.TrimSpace(*p.Intensity)) + } + modifierOf := func(p metarPhenomenon) string { + if p.Modifier == nil { + return "" + } + return strings.ToLower(strings.TrimSpace(*p.Modifier)) + } + + // Pass 1: thunder + hail overrides everything (hazard). + // + // WMO provides: + // 95 = thunderstorm + // 96 = light thunderstorms with hail + // 99 = thunderstorms with hail + hasThunder := false + hailIntensity := "" + for _, p := range phenomena { + switch p.Weather { + case "thunderstorms": + hasThunder = true + case "hail": + if hailIntensity == "" { + hailIntensity = intensityOf(p) + } + } + } + if hasThunder { + if hailIntensity != "" || containsWeather(phenomena, "hail") { + if hailIntensity == "heavy" { + return 99 + } + // Default to "light" hail when unknown + return 96 + } + return 95 + } + + // Pass 2: freezing hazards. + // + // Modifier includes "freezing". + for _, p := range phenomena { + if modifierOf(p) != "freezing" { + continue + } + + switch p.Weather { + case "rain": + if intensityOf(p) == "light" { + return 66 + } + // Default to freezing rain when unknown/heavy. + return 67 + + case "drizzle": + if intensityOf(p) == "light" { + return 56 + } + return 57 + + case "fog", "fog_mist": + // "Freezing fog" isn't a perfect match for "Rime Fog", + // but within our current WMO subset, 48 is the closest. + return 48 + } + } + + // Pass 3: fog / obscuration. + for _, p := range phenomena { + switch p.Weather { + case "fog", "fog_mist": + return 45 + case "haze", "smoke", "dust", "sand", "spray", "volcanic_ash": + // Our current WMO table subset doesn't include haze/smoke/dust codes. + // "Foggy" (45) is a reasonable umbrella for "visibility obscured". + return 45 + } + } + + // Pass 4: precip families. + for _, p := range phenomena { + inten := intensityOf(p) + mod := modifierOf(p) + + // Handle "showers" modifier explicitly (rain vs snow showers). + if mod == "showers" { + switch p.Weather { + case "rain": + if inten == "light" { + return 80 + } + if inten == "heavy" { + return 82 + } + return 81 + + case "snow": + if inten == "light" { + return 85 + } + return 86 + } + } + + switch p.Weather { + // Drizzle + case "drizzle": + if inten == "heavy" { + return 55 + } + if inten == "light" { + return 51 + } + return 53 + + // Rain + case "rain": + if inten == "heavy" { + return 65 + } + if inten == "light" { + return 61 + } + return 63 + + // Snow + case "snow": + if inten == "heavy" { + return 75 + } + if inten == "light" { + return 71 + } + return 73 + + // Snow grains + case "snow_grains": + return 77 + + // We don’t currently have sleet/ice pellet codes in our shared WMO subset. + // We make conservative choices within the available codes. + case "ice_pellets", "snow_pellets": + // Closest within our subset is "Snow" (73). If you later expand the WMO table + // to include sleet/ice pellet codes, update this mapping. + return 73 + } + } + + return model.WMOUnknown +} + +func containsWeather(phenomena []metarPhenomenon, weather string) bool { + weather = strings.ToLower(strings.TrimSpace(weather)) + for _, p := range phenomena { + if p.Weather == weather { + return true + } + } + return false +} + +func wmoFromTextDescription(providerDesc string) model.WMOCode { + s := strings.ToLower(strings.TrimSpace(providerDesc)) + if s == "" { + return model.WMOUnknown + } + + // Thunder / hail + if strings.Contains(s, "thunder") { + if strings.Contains(s, "hail") { + return 99 + } + return 95 + } + + // Freezing hazards + if strings.Contains(s, "freezing rain") { + if strings.Contains(s, "light") { + return 66 + } + return 67 + } + if strings.Contains(s, "freezing drizzle") { + if strings.Contains(s, "light") { + return 56 + } + return 57 + } + + // Drizzle + if strings.Contains(s, "drizzle") { + if strings.Contains(s, "heavy") || strings.Contains(s, "dense") { + return 55 + } + if strings.Contains(s, "light") { + return 51 + } + return 53 + } + + // Showers + if strings.Contains(s, "showers") { + if strings.Contains(s, "heavy") { + return 82 + } + if strings.Contains(s, "light") { + return 80 + } + return 81 + } + + // Rain + if strings.Contains(s, "rain") { + if strings.Contains(s, "heavy") { + return 65 + } + if strings.Contains(s, "light") { + return 61 + } + return 63 + } + + // Snow + if strings.Contains(s, "snow showers") { + if strings.Contains(s, "light") { + return 85 + } + return 86 + } + if strings.Contains(s, "snow grains") { + return 77 + } + if strings.Contains(s, "snow") { + if strings.Contains(s, "heavy") { + return 75 + } + if strings.Contains(s, "light") { + return 71 + } + return 73 + } + + // Fog + if strings.Contains(s, "rime fog") { + return 48 + } + if strings.Contains(s, "fog") || strings.Contains(s, "mist") { + return 45 + } + + // Sky-only + if strings.Contains(s, "overcast") { + return 3 + } + if strings.Contains(s, "cloudy") { + return 3 + } + if strings.Contains(s, "partly cloudy") { + return 2 + } + if strings.Contains(s, "mostly sunny") || strings.Contains(s, "mostly clear") || + strings.Contains(s, "mainly sunny") || strings.Contains(s, "mainly clear") { + return 1 + } + if strings.Contains(s, "clear") || strings.Contains(s, "sunny") { + return 0 + } + + return model.WMOUnknown +} + +func wmoFromCloudLayers(cloudLayers []model.CloudLayer) model.WMOCode { + // NWS cloud layer amount values commonly include: + // OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky) + // + // We interpret these conservatively: + // - OVC / BKN / VV => Cloudy (3) + // - SCT => Partly Cloudy (2) + // - FEW => Mainly Sunny/Clear (1) + // - CLR / SKC => Sunny/Clear (0) + // + // If multiple layers exist, we bias toward the "most cloudy" layer. + mostCloudy := "" + + for _, cl := range cloudLayers { + a := strings.ToUpper(strings.TrimSpace(cl.Amount)) + if a == "" { + continue + } + + switch a { + case "OVC": + return 3 + case "BKN", "VV": + if mostCloudy != "OVC" { + mostCloudy = a + } + case "SCT": + if mostCloudy == "" { + mostCloudy = "SCT" + } + case "FEW": + if mostCloudy == "" { + mostCloudy = "FEW" + } + case "CLR", "SKC": + if mostCloudy == "" { + mostCloudy = "CLR" + } + } + } + + switch mostCloudy { + case "BKN", "VV": + return 3 + case "SCT": + return 2 + case "FEW": + return 1 + case "CLR": + return 0 + default: + return model.WMOUnknown + } +} diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go new file mode 100644 index 0000000..443e3a2 --- /dev/null +++ b/internal/sources/openmeteo/observation.go @@ -0,0 +1,238 @@ +package openmeteo + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ObservationSource polls an Open-Meteo endpoint and emits one Observation event. +// +// Typical URL shape (you provide this via config): +// +// https://api.open-meteo.com/v1/forecast?latitude=...&longitude=...¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,wind_gusts_10m,surface_pressure,pressure_msl&timezone=GMT +type ObservationSource struct { + name string + url string + userAgent string + client *http.Client +} + +func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { + if strings.TrimSpace(cfg.Name) == "" { + return nil, fmt.Errorf("openmeteo_observation: name is required") + } + if cfg.Params == nil { + return nil, fmt.Errorf("openmeteo_observation %q: params are required (need params.url)", cfg.Name) + } + + // Open-Meteo needs only a URL; everything else is optional. + url, ok := cfg.ParamString("url", "URL") + if !ok { + return nil, fmt.Errorf("openmeteo_observation %q: params.url is required", cfg.Name) + } + + // Open-Meteo doesn't require a special User-Agent, but including one is polite. + // If the caller doesn't provide one, we supply a reasonable default. + ua := cfg.ParamStringDefault("weatherfeeder (open-meteo client)", "user_agent", "userAgent") + + return &ObservationSource{ + name: cfg.Name, + url: url, + userAgent: ua, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + }, nil +} + +func (s *ObservationSource) Name() string { return s.name } + +// Kind is used for routing/policy. Note that the TYPE is domain-agnostic (event.Kind). +func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } + +func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { + obs, effectiveAt, eventID, err := s.fetchAndParse(ctx) + if err != nil { + return nil, err + } + + // Make EffectiveAt a stable pointer. + effectiveAtCopy := effectiveAt + + e := event.Event{ + ID: eventID, + Kind: s.Kind(), + Source: s.name, + EmittedAt: time.Now().UTC(), + EffectiveAt: &effectiveAtCopy, + + // Optional but useful for downstream consumers once multiple event types exist. + Schema: "weather.observation.v1", + + // The payload domain-specific (model.WeatherObservation). + // feedkit treats this as opaque. + Payload: obs, + } + + if err := e.Validate(); err != nil { + return nil, err + } + + return []event.Event{e}, nil +} + +// ---- Open-Meteo JSON parsing ---- + +type omResponse struct { + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Timezone string `json:"timezone"` + UTCOffsetSeconds int `json:"utc_offset_seconds"` + Elevation float64 `json:"elevation"` + + Current omCurrent `json:"current"` +} + +type omCurrent struct { + Time string `json:"time"` // e.g. "2026-01-10T12:30" + Interval int `json:"interval"` + Temperature2m float64 `json:"temperature_2m"` + RelativeHumidity2m float64 `json:"relative_humidity_2m"` + WeatherCode int `json:"weather_code"` + + WindSpeed10m float64 `json:"wind_speed_10m"` // km/h + WindDirection10m float64 `json:"wind_direction_10m"` // degrees + WindGusts10m float64 `json:"wind_gusts_10m"` // km/h + + Precipitation float64 `json:"precipitation"` + + SurfacePressure float64 `json:"surface_pressure"` // hPa + PressureMSL float64 `json:"pressure_msl"` // hPa + + CloudCover float64 `json:"cloud_cover"` + ApparentTemperature float64 `json:"apparent_temperature"` + IsDay int `json:"is_day"` +} + +func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, time.Time, string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil) + if err != nil { + return model.WeatherObservation{}, time.Time{}, "", err + } + + req.Header.Set("User-Agent", s.userAgent) + req.Header.Set("Accept", "application/json") + + res, err := s.client.Do(req) + if err != nil { + return model.WeatherObservation{}, time.Time{}, "", err + } + defer res.Body.Close() + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return model.WeatherObservation{}, time.Time{}, "", fmt.Errorf("openmeteo_observation %q: HTTP %s", s.name, res.Status) + } + + var parsed omResponse + if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { + return model.WeatherObservation{}, time.Time{}, "", err + } + + // Parse current.time. + // Open-Meteo "time" commonly looks like "YYYY-MM-DDTHH:MM" (no timezone suffix). + // We'll interpret it in the timezone returned by the API (best-effort). + t, err := parseOpenMeteoTime(parsed.Current.Time, parsed.Timezone, parsed.UTCOffsetSeconds) + if err != nil { + return model.WeatherObservation{}, time.Time{}, "", fmt.Errorf("openmeteo_observation %q: parse time %q: %w", s.name, parsed.Current.Time, err) + } + + // Normalize to UTC inside the domain model; presentation can localize later. + effectiveAt := t.UTC() + + // Measurements + tempC := parsed.Current.Temperature2m + rh := parsed.Current.RelativeHumidity2m + wdir := parsed.Current.WindDirection10m + wsKmh := parsed.Current.WindSpeed10m + wgKmh := parsed.Current.WindGusts10m + + surfacePa := parsed.Current.SurfacePressure * 100.0 + mslPa := parsed.Current.PressureMSL * 100.0 + + elevM := parsed.Elevation + + // Canonical condition (WMO) + isDay := parsed.Current.IsDay == 1 + wmo := model.WMOCode(parsed.Current.WeatherCode) + canonicalText := standards.WMOText(wmo, &isDay) + + obs := model.WeatherObservation{ + // Open-Meteo isn't a station feed; we’ll label this with a synthetic identifier. + StationID: fmt.Sprintf("OPENMETEO(%.5f,%.5f)", parsed.Latitude, parsed.Longitude), + StationName: "Open-Meteo", + + Timestamp: effectiveAt, + + // Canonical conditions + ConditionCode: wmo, + ConditionText: canonicalText, + IsDay: &isDay, + + // Provider evidence (Open-Meteo does not provide a separate raw description here) + ProviderRawDescription: "", + + // Human-facing fields: + // Populate TextDescription with canonical text so downstream output remains consistent. + TextDescription: canonicalText, + + TemperatureC: &tempC, + RelativeHumidityPercent: &rh, + + WindDirectionDegrees: &wdir, + WindSpeedKmh: &wsKmh, + WindGustKmh: &wgKmh, + + BarometricPressurePa: &surfacePa, + SeaLevelPressurePa: &mslPa, + + ElevationMeters: &elevM, + } + + // Build a stable event ID. + // Open-Meteo doesn't supply a unique ID, so we key by source + effective time. + eventID := fmt.Sprintf("openmeteo:%s:%s", s.name, effectiveAt.Format(time.RFC3339Nano)) + + return obs, effectiveAt, eventID, nil +} + +func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) { + s = strings.TrimSpace(s) + if s == "" { + return time.Time{}, fmt.Errorf("empty time") + } + + // Typical Open-Meteo format: "2006-01-02T15:04" + const layout = "2006-01-02T15:04" + + // Best effort: try to load the timezone as an IANA name. + // Examples Open-Meteo might return: "GMT", "America/Chicago". + if tz != "" { + if loc, err := time.LoadLocation(tz); err == nil { + return time.ParseInLocation(layout, s, loc) + } + } + + // Fallback: use the offset seconds to create a fixed zone. + // (If offset is 0, this is UTC.) + loc := time.FixedZone("open-meteo", utcOffsetSeconds) + return time.ParseInLocation(layout, s, loc) +} diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go new file mode 100644 index 0000000..18992e2 --- /dev/null +++ b/internal/sources/openweather/observation.go @@ -0,0 +1,438 @@ +package openweather + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits one Observation event. +// +// Typical URL shape (you provide this via config): +// +// https://api.openweathermap.org/data/2.5/weather?lat=...&lon=...&appid=...&units=metric +// +// Unit notes: +// - If `units` is omitted, OpenWeather uses "standard" units (temp Kelvin, wind m/s). +// - `units=metric` => temp Celsius, wind m/s. +// - `units=imperial` => temp Fahrenheit, wind mph. +// +// weatherd normalizes to: +// - TemperatureC in °C +// - WindSpeedKmh in km/h +// - Pressure in Pa (OpenWeather provides hPa) +type ObservationSource struct { + name string + url string + userAgent string + client *http.Client +} + +func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { + if strings.TrimSpace(cfg.Name) == "" { + return nil, fmt.Errorf("openweather_observation: name is required") + } + if cfg.Params == nil { + return nil, fmt.Errorf("openweather_observation %q: params are required (need params.url)", cfg.Name) + } + + // Driver-specific settings live under cfg.Params to keep feedkit domain-agnostic. + url, ok := cfg.ParamString("url", "URL") + if !ok { + return nil, fmt.Errorf("openweather_observation %q: params.url is required", cfg.Name) + } + + // Optional User-Agent. + ua := cfg.ParamStringDefault("weatherfeeder (openweather client)", "user_agent", "userAgent") + + return &ObservationSource{ + name: cfg.Name, + url: url, + userAgent: ua, + client: &http.Client{ + Timeout: 10 * time.Second, + }, + }, nil +} + +func (s *ObservationSource) Name() string { return s.name } + +// Kind is used for routing/policy. +func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } + +func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { + obs, eventID, err := s.fetchAndParse(ctx) + if err != nil { + return nil, err + } + + // EffectiveAt is optional. If we have a real observation timestamp, use it. + // We intentionally take a copy so the pointer is stable and not tied to a struct field. + var effectiveAt *time.Time + if !obs.Timestamp.IsZero() { + t := obs.Timestamp + effectiveAt = &t + } + + e := event.Event{ + ID: eventID, + Kind: s.Kind(), + Source: s.name, + EmittedAt: time.Now().UTC(), + EffectiveAt: effectiveAt, + + Schema: "weather.observation.v1", + Payload: obs, + } + + if err := e.Validate(); err != nil { + return nil, err + } + + return []event.Event{e}, nil +} + +// --- OpenWeather JSON parsing (minimal subset) --- + +type owmResponse struct { + Coord struct { + Lon float64 `json:"lon"` + Lat float64 `json:"lat"` + } `json:"coord"` + + Weather []struct { + ID int `json:"id"` + Main string `json:"main"` + Description string `json:"description"` + Icon string `json:"icon"` // e.g. "04d" or "01n" + } `json:"weather"` + + Main struct { + Temp float64 `json:"temp"` + Pressure float64 `json:"pressure"` // hPa + Humidity float64 `json:"humidity"` // % + SeaLevel *float64 `json:"sea_level"` // hPa (optional) + } `json:"main"` + + Visibility *float64 `json:"visibility"` // meters (optional) + + Wind struct { + Speed float64 `json:"speed"` // units depend on `units=...` + Deg *float64 `json:"deg"` + Gust *float64 `json:"gust"` // units depend on `units=...` + } `json:"wind"` + + Clouds struct { + All *float64 `json:"all"` // cloudiness % + } `json:"clouds"` + + Dt int64 `json:"dt"` // unix seconds, UTC + + Sys struct { + Country string `json:"country"` + Sunrise int64 `json:"sunrise"` // unix, UTC + Sunset int64 `json:"sunset"` // unix, UTC + } `json:"sys"` + + Timezone int `json:"timezone"` // seconds offset from UTC + ID int64 `json:"id"` // city id + Name string `json:"name"` // city name + Cod int `json:"cod"` +} + +func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil) + if err != nil { + return model.WeatherObservation{}, "", err + } + + req.Header.Set("User-Agent", s.userAgent) + req.Header.Set("Accept", "application/json") + + res, err := s.client.Do(req) + if err != nil { + return model.WeatherObservation{}, "", err + } + defer res.Body.Close() + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return model.WeatherObservation{}, "", fmt.Errorf("openweather_observation %q: HTTP %s", s.name, res.Status) + } + + var parsed owmResponse + if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { + return model.WeatherObservation{}, "", err + } + + // Timestamp: dt is unix seconds, UTC. + ts := time.Unix(parsed.Dt, 0).UTC() + + // Primary weather condition: OpenWeather returns a list; we treat [0] as primary. + // If missing, we degrade gracefully. + owmID := 0 + rawDesc := "" + icon := "" + if len(parsed.Weather) > 0 { + owmID = parsed.Weather[0].ID + rawDesc = strings.TrimSpace(parsed.Weather[0].Description) + icon = strings.TrimSpace(parsed.Weather[0].Icon) + } + + // Day/night inference: + // - Prefer icon suffix if present ("d" or "n") + // - Else fall back to sunrise/sunset bounds + var isDay *bool + if icon != "" { + last := icon[len(icon)-1] + switch last { + case 'd': + v := true + isDay = &v + case 'n': + v := false + isDay = &v + } + } + if isDay == nil && parsed.Sys.Sunrise > 0 && parsed.Sys.Sunset > 0 { + v := parsed.Dt >= parsed.Sys.Sunrise && parsed.Dt < parsed.Sys.Sunset + isDay = &v + } + + // Units handling based on the request URL. + unitSystem := getUnitsFromURL(s.url) + + // Temperature normalization to Celsius. + tempC := normalizeTempToC(parsed.Main.Temp, unitSystem) + + // Humidity is already percent. + rh := parsed.Main.Humidity + + // Pressure hPa -> Pa + surfacePa := parsed.Main.Pressure * 100.0 + var seaLevelPa *float64 + if parsed.Main.SeaLevel != nil { + v := (*parsed.Main.SeaLevel) * 100.0 + seaLevelPa = &v + } + + // Wind speed normalization to km/h + wsKmh := normalizeSpeedToKmh(parsed.Wind.Speed, unitSystem) + + var wgKmh *float64 + if parsed.Wind.Gust != nil { + v := normalizeSpeedToKmh(*parsed.Wind.Gust, unitSystem) + wgKmh = &v + } + + // Visibility in meters (already matches our model) + var visM *float64 + if parsed.Visibility != nil { + v := *parsed.Visibility + visM = &v + } + + // Map OpenWeather condition IDs -> canonical WMO code (our internal vocabulary). + wmo := mapOpenWeatherToWMO(owmID) + + // Canonical text from our shared table. + canonicalText := standards.WMOText(wmo, isDay) + + // Icon URL (optional). + iconURL := "" + if icon != "" { + iconURL = fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon) + } + + stationID := "" + if parsed.ID != 0 { + stationID = fmt.Sprintf("OPENWEATHER(%d)", parsed.ID) + } else { + stationID = fmt.Sprintf("OPENWEATHER(%.5f,%.5f)", parsed.Coord.Lat, parsed.Coord.Lon) + } + + stationName := strings.TrimSpace(parsed.Name) + if stationName == "" { + stationName = "OpenWeatherMap" + } + + obs := model.WeatherObservation{ + StationID: stationID, + StationName: stationName, + Timestamp: ts, + + // Canonical internal representation + ConditionCode: wmo, + ConditionText: canonicalText, + IsDay: isDay, + + // Provider evidence for troubleshooting mappings + ProviderRawDescription: rawDesc, + + // Human-facing legacy fields: we populate with canonical text for consistency + TextDescription: canonicalText, + IconURL: iconURL, + + TemperatureC: &tempC, + + WindDirectionDegrees: parsed.Wind.Deg, + WindSpeedKmh: &wsKmh, + WindGustKmh: wgKmh, + + BarometricPressurePa: &surfacePa, + SeaLevelPressurePa: seaLevelPa, + VisibilityMeters: visM, + + RelativeHumidityPercent: &rh, + } + + // Stable event ID: key by source + timestamp. + eventID := fmt.Sprintf("openweather:%s:%s", s.name, obs.Timestamp.UTC().Format(time.RFC3339Nano)) + + return obs, eventID, nil +} + +func getUnitsFromURL(raw string) string { + u, err := url.Parse(raw) + if err != nil { + return "standard" + } + q := u.Query() + units := strings.TrimSpace(strings.ToLower(q.Get("units"))) + if units == "" { + return "standard" + } + switch units { + case "standard", "metric", "imperial": + return units + default: + return "standard" + } +} + +func normalizeTempToC(v float64, unitSystem string) float64 { + switch unitSystem { + case "metric": + // Already °C + return v + case "imperial": + // °F -> °C + return (v - 32.0) * 5.0 / 9.0 + default: + // "standard" => Kelvin -> °C + return v - 273.15 + } +} + +func normalizeSpeedToKmh(v float64, unitSystem string) float64 { + switch unitSystem { + case "imperial": + // mph -> km/h + return v * 1.609344 + default: + // m/s -> km/h + return v * 3.6 + } +} + +// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into your internal WMO code vocabulary. +// +// This is an approximate semantic mapping between two different code systems. +// Your current canonical WMO table is intentionally small and text-focused, +// so we map into that set (0/1/2/3/45/48/51/.../99) conservatively. +func mapOpenWeatherToWMO(owmID int) model.WMOCode { + switch { + // 2xx Thunderstorm + case owmID >= 200 && owmID <= 232: + return model.WMOCode(95) + + // 3xx Drizzle + case owmID >= 300 && owmID <= 321: + if owmID == 300 { + return model.WMOCode(51) + } + if owmID == 302 { + return model.WMOCode(55) + } + return model.WMOCode(53) + + // 5xx Rain + case owmID >= 500 && owmID <= 531: + // 511 is "freezing rain" + if owmID == 511 { + return model.WMOCode(67) + } + + // showers bucket (520-531) + if owmID >= 520 && owmID <= 531 { + if owmID == 520 { + return model.WMOCode(80) + } + if owmID == 522 { + return model.WMOCode(82) + } + return model.WMOCode(81) + } + + // normal rain intensity + if owmID == 500 { + return model.WMOCode(61) + } + if owmID == 501 { + return model.WMOCode(63) + } + if owmID >= 502 && owmID <= 504 { + return model.WMOCode(65) + } + return model.WMOCode(63) + + // 6xx Snow + case owmID >= 600 && owmID <= 622: + if owmID == 600 { + return model.WMOCode(71) + } + if owmID == 601 { + return model.WMOCode(73) + } + if owmID == 602 { + return model.WMOCode(75) + } + + // Snow showers bucket (620-622) + if owmID == 620 { + return model.WMOCode(85) + } + if owmID == 621 || owmID == 622 { + return model.WMOCode(86) + } + + return model.WMOCode(73) + + // 7xx Atmosphere (mist/smoke/haze/dust/fog/etc.) + case owmID >= 701 && owmID <= 781: + return model.WMOCode(45) + + // 800 Clear + case owmID == 800: + return model.WMOCode(0) + + // 80x Clouds + case owmID == 801: + return model.WMOCode(1) + case owmID == 802: + return model.WMOCode(2) + case owmID == 803 || owmID == 804: + return model.WMOCode(3) + + default: + return model.WMOUnknown + } +} diff --git a/internal/standards/doc.go b/internal/standards/doc.go new file mode 100644 index 0000000..e3c6a8c --- /dev/null +++ b/internal/standards/doc.go @@ -0,0 +1,8 @@ +// Package standards contains shared canonical vocabularies and lookup tables +// used across multiple providers. +// +// The guiding principle is: +// - Canonical types live in internal/model (provider-independent). +// - Shared reference tables and helpers live here. +// - Provider-specific mapping logic lives in internal/sources/. +package standards diff --git a/internal/standards/wmo.go b/internal/standards/wmo.go new file mode 100644 index 0000000..ab8ccf6 --- /dev/null +++ b/internal/standards/wmo.go @@ -0,0 +1,102 @@ +package standards + +import ( + "fmt" + + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" +) + +type WMODescription struct { + Day string + Night string +} + +// WMODescriptions is the canonical internal mapping of WMO code -> day/night text. +// These are used to populate model.WeatherObservation.ConditionText. +var WMODescriptions = map[model.WMOCode]WMODescription{ + 0: {Day: "Sunny", Night: "Clear"}, + 1: {Day: "Mainly Sunny", Night: "Mainly Clear"}, + 2: {Day: "Partly Cloudy", Night: "Partly Cloudy"}, + 3: {Day: "Cloudy", Night: "Cloudy"}, + 45: {Day: "Foggy", Night: "Foggy"}, + 48: {Day: "Rime Fog", Night: "Rime Fog"}, + 51: {Day: "Light Drizzle", Night: "Light Drizzle"}, + 53: {Day: "Drizzle", Night: "Drizzle"}, + 55: {Day: "Heavy Drizzle", Night: "Heavy Drizzle"}, + 56: {Day: "Light Freezing Drizzle", Night: "Light Freezing Drizzle"}, + 57: {Day: "Freezing Drizzle", Night: "Freezing Drizzle"}, + 61: {Day: "Light Rain", Night: "Light Rain"}, + 63: {Day: "Rain", Night: "Rain"}, + 65: {Day: "Heavy Rain", Night: "Heavy Rain"}, + 66: {Day: "Light Freezing Rain", Night: "Light Freezing Rain"}, + 67: {Day: "Freezing Rain", Night: "Freezing Rain"}, + 71: {Day: "Light Snow", Night: "Light Snow"}, + 73: {Day: "Snow", Night: "Snow"}, + 75: {Day: "Heavy Snow", Night: "Heavy Snow"}, + 77: {Day: "Snow Grains", Night: "Snow Grains"}, + 80: {Day: "Light Showers", Night: "Light Showers"}, + 81: {Day: "Showers", Night: "Showers"}, + 82: {Day: "Heavy Showers", Night: "Heavy Showers"}, + 85: {Day: "Light Snow Showers", Night: "Light Snow Showers"}, + 86: {Day: "Snow Showers", Night: "Snow Showers"}, + 95: {Day: "Thunderstorm", Night: "Thunderstorm"}, + 96: {Day: "Light Thunderstorms With Hail", Night: "Light Thunderstorms With Hail"}, + 99: {Day: "Thunderstorm With Hail", Night: "Thunderstorm With Hail"}, +} + +// WMOText returns the canonical text description for a WMO code. +// If isDay is nil, it prefers the Day description (if present). +// +// This is intended to be used by drivers after they set ConditionCode. +func WMOText(code model.WMOCode, isDay *bool) string { + if code == model.WMOUnknown { + return "Unknown" + } + + desc, ok := WMODescriptions[code] + if !ok { + // Preserve the code in the message so it's diagnosable. + return fmt.Sprintf("Unknown (WMO %d)", int(code)) + } + + // If day/night is unknown, default to Day if it exists. + if isDay == nil { + if desc.Day != "" { + return desc.Day + } + if desc.Night != "" { + return desc.Night + } + return fmt.Sprintf("Unknown (WMO %d)", int(code)) + } + + if *isDay { + if desc.Day != "" { + return desc.Day + } + // Fallback + if desc.Night != "" { + return desc.Night + } + return fmt.Sprintf("Unknown (WMO %d)", int(code)) + } + + // Night + if desc.Night != "" { + return desc.Night + } + // Fallback + if desc.Day != "" { + return desc.Day + } + return fmt.Sprintf("Unknown (WMO %d)", int(code)) +} + +// IsKnownWMO returns true if the code exists in our mapping table. +func IsKnownWMO(code model.WMOCode) bool { + if code == model.WMOUnknown { + return false + } + _, ok := WMODescriptions[code] + return ok +} diff --git a/internal/standards/wmo_categories.go b/internal/standards/wmo_categories.go new file mode 100644 index 0000000..90b897e --- /dev/null +++ b/internal/standards/wmo_categories.go @@ -0,0 +1,127 @@ +package standards + +import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + +// This file provides small, shared helper functions for reasoning about WMO codes. +// These are intentionally "coarse" categories that are useful for business logic, +// dashboards, and alerting decisions. +// +// Example uses: +// - jogging suitability: precipitation? thunderstorm? freezing precip? +// - quick glance: "is it cloudy?" "is there any precip?" +// - downstream normalizers / aggregators + +func IsThunderstorm(code model.WMOCode) bool { + switch code { + case 95, 96, 99: + return true + default: + return false + } +} + +func IsHail(code model.WMOCode) bool { + switch code { + case 96, 99: + return true + default: + return false + } +} + +func IsFog(code model.WMOCode) bool { + switch code { + case 45, 48: + return true + default: + return false + } +} + +// IsPrecipitation returns true if the code represents any precipitation +// (drizzle, rain, snow, showers, etc.). +func IsPrecipitation(code model.WMOCode) bool { + switch code { + // Drizzle + case 51, 53, 55, 56, 57: + return true + + // Rain + case 61, 63, 65, 66, 67: + return true + + // Snow + case 71, 73, 75, 77: + return true + + // Showers + case 80, 81, 82, 85, 86: + return true + + // Thunderstorm (often includes rain/hail) + case 95, 96, 99: + return true + + default: + return false + } +} + +func IsRainFamily(code model.WMOCode) bool { + switch code { + // Drizzle + freezing drizzle + case 51, 53, 55, 56, 57: + return true + + // Rain + freezing rain + case 61, 63, 65, 66, 67: + return true + + // Rain showers + case 80, 81, 82: + return true + + // Thunderstorm often implies rain + case 95, 96, 99: + return true + + default: + return false + } +} + +func IsSnowFamily(code model.WMOCode) bool { + switch code { + // Snow and related + case 71, 73, 75, 77: + return true + + // Snow showers + case 85, 86: + return true + + default: + return false + } +} + +// IsFreezingPrecip returns true if the code represents freezing drizzle/rain. +func IsFreezingPrecip(code model.WMOCode) bool { + switch code { + case 56, 57, 66, 67: + return true + default: + return false + } +} + +// IsSkyOnly returns true for codes that represent "sky condition only" +// (clear/mostly/partly/cloudy) rather than fog/precip/etc. +func IsSkyOnly(code model.WMOCode) bool { + switch code { + case 0, 1, 2, 3: + return true + default: + return false + } +}