Moved HTTP polling helpers upstream into feedkit, and updated to feedkit v0.8.0
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
This commit is contained in:
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
|
require gitea.maximumdirect.net/ejr/feedkit v0.8.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
|
gitea.maximumdirect.net/ejr/feedkit v0.8.0 h1:JdEEy6T3AQ97alLNYcQ3crN3tOEZPLMBD0Qr/MH5/dw=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
gitea.maximumdirect.net/ejr/feedkit v0.8.0/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
|||||||
@@ -1,104 +0,0 @@
|
|||||||
// FILE: ./internal/sources/common/config.go
|
|
||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This file centralizes small, boring config-validation patterns shared across
|
|
||||||
// weatherfeeder source drivers.
|
|
||||||
//
|
|
||||||
// Goal: keep driver constructors (New*Source) easy to read and consistent, while
|
|
||||||
// keeping driver-specific options in cfg.Params (feedkit remains domain-agnostic).
|
|
||||||
|
|
||||||
// HTTPSourceConfig is the standard "HTTP-polling source" config shape used across drivers.
|
|
||||||
type HTTPSourceConfig struct {
|
|
||||||
Name string
|
|
||||||
URL string
|
|
||||||
UserAgent string
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireHTTPSourceConfig enforces weatherfeeder's standard HTTP source config:
|
|
||||||
//
|
|
||||||
// - cfg.Name must be present
|
|
||||||
// - cfg.Params must be present
|
|
||||||
// - params.url must be present (accepts "url" or "URL")
|
|
||||||
// - params.user_agent must be present (accepts "user_agent" or "userAgent")
|
|
||||||
//
|
|
||||||
// We intentionally require a User-Agent for *all* sources, even when upstreams
|
|
||||||
// do not strictly require one. This keeps config uniform across providers.
|
|
||||||
func RequireHTTPSourceConfig(driver string, cfg config.SourceConfig) (HTTPSourceConfig, error) {
|
|
||||||
if strings.TrimSpace(cfg.Name) == "" {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s: name is required", driver)
|
|
||||||
}
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
url, ok := cfg.ParamString("url", "URL")
|
|
||||||
if !ok {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
|
||||||
if !ok {
|
|
||||||
return HTTPSourceConfig{}, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
return HTTPSourceConfig{
|
|
||||||
Name: cfg.Name,
|
|
||||||
URL: url,
|
|
||||||
UserAgent: ua,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- The helpers below remain useful for future drivers; they are not required
|
|
||||||
// --- by the observation sources after adopting RequireHTTPSourceConfig.
|
|
||||||
|
|
||||||
// RequireName ensures cfg.Name is present and non-whitespace.
|
|
||||||
func RequireName(driver string, cfg config.SourceConfig) error {
|
|
||||||
if strings.TrimSpace(cfg.Name) == "" {
|
|
||||||
return fmt.Errorf("%s: name is required", driver)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireParams ensures cfg.Params is non-nil. The "want" string should be a short
|
|
||||||
// description of required keys, e.g. "need params.url and params.user_agent".
|
|
||||||
func RequireParams(driver string, cfg config.SourceConfig, want string) error {
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return fmt.Errorf("%s %q: params are required (%s)", driver, cfg.Name, want)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireURL returns the configured URL for a source.
|
|
||||||
// Canonical key is "url"; we also accept "URL" as a convenience.
|
|
||||||
func RequireURL(driver string, cfg config.SourceConfig) (string, error) {
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return "", fmt.Errorf("%s %q: params are required (need params.url)", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
u, ok := cfg.ParamString("url", "URL")
|
|
||||||
if !ok {
|
|
||||||
return "", fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
return u, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequireUserAgent returns the configured User-Agent for a source.
|
|
||||||
// Canonical key is "user_agent"; we also accept "userAgent" as a convenience.
|
|
||||||
func RequireUserAgent(driver string, cfg config.SourceConfig) (string, error) {
|
|
||||||
if cfg.Params == nil {
|
|
||||||
return "", fmt.Errorf("%s %q: params are required (need params.user_agent)", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
ua, ok := cfg.ParamString("user_agent", "userAgent")
|
|
||||||
if !ok {
|
|
||||||
return "", fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
|
|
||||||
}
|
|
||||||
return ua, nil
|
|
||||||
}
|
|
||||||
@@ -1,76 +0,0 @@
|
|||||||
// FILE: ./internal/sources/common/http_source.go
|
|
||||||
package common
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
// HTTPSource is a tiny, reusable "HTTP polling spine" for weatherfeeder sources.
|
|
||||||
//
|
|
||||||
// It centralizes the boring parts:
|
|
||||||
// - standard config shape (url + user_agent) via RequireHTTPSourceConfig
|
|
||||||
// - a default http.Client with timeout
|
|
||||||
// - FetchBody / headers / max-body safety limit
|
|
||||||
// - consistent error wrapping (driver + source name)
|
|
||||||
//
|
|
||||||
// Individual drivers remain responsible for:
|
|
||||||
// - decoding minimal metadata (for Event.ID / EffectiveAt)
|
|
||||||
// - constructing the event envelope (kind/schema/payload)
|
|
||||||
type HTTPSource struct {
|
|
||||||
Driver string
|
|
||||||
Name string
|
|
||||||
URL string
|
|
||||||
UserAgent string
|
|
||||||
Accept string
|
|
||||||
Client *http.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHTTPSource builds an HTTPSource using weatherfeeder's standard HTTP source
|
|
||||||
// config (params.url + params.user_agent) and a default HTTP client.
|
|
||||||
func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) {
|
|
||||||
c, err := RequireHTTPSourceConfig(driver, cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &HTTPSource{
|
|
||||||
Driver: driver,
|
|
||||||
Name: c.Name,
|
|
||||||
URL: c.URL,
|
|
||||||
UserAgent: c.UserAgent,
|
|
||||||
Accept: accept,
|
|
||||||
Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchBytes fetches the URL and returns the raw response body bytes.
|
|
||||||
func (s *HTTPSource) FetchBytes(ctx context.Context) ([]byte, error) {
|
|
||||||
client := s.Client
|
|
||||||
if client == nil {
|
|
||||||
// Defensive: allow tests or callers to nil out Client; keep behavior sane.
|
|
||||||
client = transport.NewHTTPClient(transport.DefaultHTTPTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := transport.FetchBody(ctx, client, s.URL, s.UserAgent, s.Accept)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err)
|
|
||||||
}
|
|
||||||
return b, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchJSON fetches the URL and returns the raw body as json.RawMessage.
|
|
||||||
// json.Unmarshal accepts json.RawMessage directly, so callers can decode minimal
|
|
||||||
// metadata without keeping both []byte and RawMessage in their own structs.
|
|
||||||
func (s *HTTPSource) FetchJSON(ctx context.Context) (json.RawMessage, error) {
|
|
||||||
b, err := s.FetchBytes(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return json.RawMessage(b), nil
|
|
||||||
}
|
|
||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -22,14 +23,14 @@ import (
|
|||||||
// Output schema:
|
// Output schema:
|
||||||
// - standards.SchemaRawNWSAlertsV1
|
// - standards.SchemaRawNWSAlertsV1
|
||||||
type AlertsSource struct {
|
type AlertsSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
||||||
const driver = "nws_alerts"
|
const driver = "nws_alerts"
|
||||||
|
|
||||||
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
|
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -43,10 +44,13 @@ func (s *AlertsSource) Name() string { return s.http.Name }
|
|||||||
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
|
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
|
||||||
|
|
||||||
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt policy for alerts:
|
// EffectiveAt policy for alerts:
|
||||||
// Prefer the collection-level "updated" timestamp (best dedupe signal).
|
// Prefer the collection-level "updated" timestamp (best dedupe signal).
|
||||||
@@ -97,16 +101,19 @@ type alertsMeta struct {
|
|||||||
ParsedLatestFeatureTime time.Time `json:"-"`
|
ParsedLatestFeatureTime time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) {
|
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, alertsMeta{}, err
|
return nil, alertsMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, alertsMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta alertsMeta
|
var meta alertsMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, alertsMeta{}, nil
|
return raw, alertsMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Top-level updated (preferred).
|
// Top-level updated (preferred).
|
||||||
@@ -143,5 +150,5 @@ func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMet
|
|||||||
}
|
}
|
||||||
meta.ParsedLatestFeatureTime = latest
|
meta.ParsedLatestFeatureTime = latest
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -22,14 +23,14 @@ import (
|
|||||||
// Output schema (current implementation):
|
// Output schema (current implementation):
|
||||||
// - standards.SchemaRawNWSHourlyForecastV1
|
// - standards.SchemaRawNWSHourlyForecastV1
|
||||||
type HourlyForecastSource struct {
|
type HourlyForecastSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
|
func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
|
||||||
const driver = "nws_forecast_hourly"
|
const driver = "nws_forecast_hourly"
|
||||||
|
|
||||||
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -43,10 +44,13 @@ func (s *HourlyForecastSource) Name() string { return s.http.Name }
|
|||||||
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||||
|
|
||||||
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
||||||
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
||||||
@@ -94,16 +98,19 @@ type hourlyForecastMeta struct {
|
|||||||
ParsedUpdateTime time.Time `json:"-"`
|
ParsedUpdateTime time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, error) {
|
func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, hourlyForecastMeta{}, err
|
return nil, hourlyForecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, hourlyForecastMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta hourlyForecastMeta
|
var meta hourlyForecastMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, hourlyForecastMeta{}, nil
|
return raw, hourlyForecastMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// generatedAt (preferred)
|
// generatedAt (preferred)
|
||||||
@@ -125,5 +132,5 @@ func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, h
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -22,14 +23,14 @@ import (
|
|||||||
// Output schema:
|
// Output schema:
|
||||||
// - standards.SchemaRawNWSNarrativeForecastV1
|
// - standards.SchemaRawNWSNarrativeForecastV1
|
||||||
type NarrativeForecastSource struct {
|
type NarrativeForecastSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
|
func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) {
|
||||||
const driver = "nws_forecast_narrative"
|
const driver = "nws_forecast_narrative"
|
||||||
|
|
||||||
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -43,10 +44,13 @@ func (s *NarrativeForecastSource) Name() string { return s.http.Name }
|
|||||||
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||||
|
|
||||||
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
// EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time.
|
||||||
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
// NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated.
|
||||||
@@ -94,16 +98,19 @@ type narrativeForecastMeta struct {
|
|||||||
ParsedUpdateTime time.Time `json:"-"`
|
ParsedUpdateTime time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, error) {
|
func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, narrativeForecastMeta{}, err
|
return nil, narrativeForecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, narrativeForecastMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta narrativeForecastMeta
|
var meta narrativeForecastMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, narrativeForecastMeta{}, nil
|
return raw, narrativeForecastMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// generatedAt (preferred)
|
// generatedAt (preferred)
|
||||||
@@ -125,5 +132,5 @@ func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -16,13 +17,13 @@ import (
|
|||||||
|
|
||||||
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "nws_observation"
|
const driver = "nws_observation"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -35,10 +36,13 @@ func (s *ObservationSource) Name() string { return s.http.Name }
|
|||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
// EffectiveAt is optional; for observations it’s naturally the observation timestamp.
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
@@ -72,16 +76,19 @@ type observationMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, observationMeta{}, err
|
return nil, observationMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, observationMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta observationMeta
|
var meta observationMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
|
// If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt.
|
||||||
return raw, observationMeta{}, nil
|
return raw, observationMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
tsStr := strings.TrimSpace(meta.Properties.Timestamp)
|
||||||
@@ -91,5 +98,5 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, obse
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
63
internal/sources/nws/observation_test.go
Normal file
63
internal/sources/nws/observation_test.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) {
|
||||||
|
var call int
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
call++
|
||||||
|
switch call {
|
||||||
|
case 1:
|
||||||
|
w.Header().Set("ETag", `"obs-v1"`)
|
||||||
|
_, _ = w.Write([]byte(`{"id":"obs-1","properties":{"timestamp":"2026-03-28T12:00:00Z"}}`))
|
||||||
|
case 2:
|
||||||
|
if got := r.Header.Get("If-None-Match"); got != `"obs-v1"` {
|
||||||
|
t.Fatalf("second request If-None-Match = %q", got)
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNotModified)
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected call count %d", call)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
src, err := NewObservationSource(config.SourceConfig{
|
||||||
|
Name: "NWSObservationTest",
|
||||||
|
Driver: "nws_observation",
|
||||||
|
Mode: config.SourceModePoll,
|
||||||
|
Params: map[string]any{
|
||||||
|
"url": srv.URL,
|
||||||
|
"user_agent": "test-agent",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewObservationSource() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
first, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("first Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(first) != 1 {
|
||||||
|
t.Fatalf("first Poll() len = %d, want 1", len(first))
|
||||||
|
}
|
||||||
|
if first[0].Kind != event.Kind("observation") {
|
||||||
|
t.Fatalf("first Poll() kind = %q", first[0].Kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
second, err := src.Poll(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second Poll() error = %v", err)
|
||||||
|
}
|
||||||
|
if len(second) != 0 {
|
||||||
|
t.Fatalf("second Poll() len = %d, want 0", len(second))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -15,13 +16,13 @@ import (
|
|||||||
|
|
||||||
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.
|
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.
|
||||||
type ForecastSource struct {
|
type ForecastSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
|
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
|
||||||
const driver = "openmeteo_forecast"
|
const driver = "openmeteo_forecast"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -34,10 +35,13 @@ func (s *ForecastSource) Name() string { return s.http.Name }
|
|||||||
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") }
|
||||||
|
|
||||||
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Open-Meteo does not expose a true "issued at" timestamp for forecast runs.
|
// Open-Meteo does not expose a true "issued at" timestamp for forecast runs.
|
||||||
// We use current.time when present; otherwise we fall back to the first hourly time
|
// We use current.time when present; otherwise we fall back to the first hourly time
|
||||||
@@ -79,16 +83,19 @@ type forecastMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) {
|
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, forecastMeta{}, err
|
return nil, forecastMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, forecastMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta forecastMeta
|
var meta forecastMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||||
return raw, forecastMeta{}, nil
|
return raw, forecastMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := strings.TrimSpace(meta.Current.Time)
|
ts := strings.TrimSpace(meta.Current.Time)
|
||||||
@@ -106,5 +113,5 @@ func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
@@ -15,13 +16,13 @@ import (
|
|||||||
|
|
||||||
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "openmeteo_observation"
|
const driver = "openmeteo_observation"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -34,10 +35,13 @@ func (s *ObservationSource) Name() string { return s.http.Name }
|
|||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
if !meta.ParsedTimestamp.IsZero() {
|
if !meta.ParsedTimestamp.IsZero() {
|
||||||
@@ -72,21 +76,24 @@ type openMeteoMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, openMeteoMeta{}, err
|
return nil, openMeteoMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, openMeteoMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta openMeteoMeta
|
var meta openMeteoMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
// If metadata decode fails, still return raw; envelope will omit EffectiveAt.
|
// If metadata decode fails, still return raw; envelope will omit EffectiveAt.
|
||||||
return raw, openMeteoMeta{}, nil
|
return raw, openMeteoMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil {
|
if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil {
|
||||||
meta.ParsedTimestamp = t.UTC()
|
meta.ParsedTimestamp = t.UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,19 +9,20 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
|
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
http *common.HTTPSource
|
http *fksources.HTTPSource
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
||||||
const driver = "openweather_observation"
|
const driver = "openweather_observation"
|
||||||
|
|
||||||
hs, err := common.NewHTTPSource(driver, cfg, "application/json")
|
hs, err := fksources.NewHTTPSource(driver, cfg, "application/json")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -42,10 +43,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, meta, err := s.fetchRaw(ctx)
|
raw, meta, changed, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
if !meta.ParsedTimestamp.IsZero() {
|
if !meta.ParsedTimestamp.IsZero() {
|
||||||
@@ -75,20 +79,23 @@ type openWeatherMeta struct {
|
|||||||
ParsedTimestamp time.Time `json:"-"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, bool, error) {
|
||||||
raw, err := s.http.FetchJSON(ctx)
|
raw, changed, err := s.http.FetchJSONIfChanged(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, openWeatherMeta{}, err
|
return nil, openWeatherMeta{}, false, err
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil, openWeatherMeta{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var meta openWeatherMeta
|
var meta openWeatherMeta
|
||||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||||
return raw, openWeatherMeta{}, nil
|
return raw, openWeatherMeta{}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if meta.Dt > 0 {
|
if meta.Dt > 0 {
|
||||||
meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC()
|
meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC()
|
||||||
}
|
}
|
||||||
|
|
||||||
return raw, meta, nil
|
return raw, meta, true, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user