From 4910440756a8cdc93e74ac802c5ab83c428a1ae1 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Sat, 28 Mar 2026 09:59:58 -0500 Subject: [PATCH] Moved common HTTP polling helpers into feedkit and implemented support for ETag and Last-Modified --- doc.go | 13 +++ sources/doc.go | 10 ++ sources/http.go | 147 ++++++++++++++++++++++++++ sources/http_test.go | 96 +++++++++++++++++ transport/http.go | 123 ++++++++++++++++++++-- transport/http_test.go | 232 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 610 insertions(+), 11 deletions(-) create mode 100644 sources/http.go create mode 100644 sources/http_test.go create mode 100644 transport/http_test.go diff --git a/doc.go b/doc.go index ced3e43..047cff0 100644 --- a/doc.go +++ b/doc.go @@ -50,6 +50,19 @@ // Both share Input{Name()}. A source may emit 0..N events per poll/run step, // and may emit multiple event kinds. // +// For HTTP-backed polling sources, sources.NewHTTPSource provides a shared +// helper for generic params: +// +// - params.url +// +// - params.user_agent +// +// - params.conditional (optional, default true) +// +// When conditional polling is enabled, feedkit opportunistically uses ETag +// and Last-Modified validators. A 304 Not Modified response is treated as a +// successful poll that emits no events. +// // - scheduler // Runs one goroutine per job: // diff --git a/sources/doc.go b/sources/doc.go index 939eb48..7bf9f19 100644 --- a/sources/doc.go +++ b/sources/doc.go @@ -11,4 +11,14 @@ // // A single source may emit 0..N events per poll or stream iteration, and those // events may span multiple event kinds. +// +// HTTP-backed polling sources can share NewHTTPSource for generic HTTP config +// parsing and conditional GET behavior. The helper understands: +// - params.url +// - params.user_agent +// - params.conditional (optional, default true) +// +// When validators are available, NewHTTPSource prefers ETag/If-None-Match and +// falls back to Last-Modified/If-Modified-Since. A 304 Not Modified response is +// treated as a successful unchanged poll. package sources diff --git a/sources/http.go b/sources/http.go new file mode 100644 index 0000000..f4b636d --- /dev/null +++ b/sources/http.go @@ -0,0 +1,147 @@ +package sources + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/transport" +) + +// HTTPSource is a reusable helper for polling HTTP-backed sources. +// +// It centralizes generic source config parsing (`params.url`, +// `params.user_agent`, and optional `params.conditional`), default HTTP client +// setup, and conditional GET validator handling. Concrete daemon sources remain +// responsible for decoding the response body and constructing events. +type HTTPSource struct { + Driver string + Name string + URL string + UserAgent string + Accept string + Conditional bool + Client *http.Client + + mu sync.Mutex + validators transport.HTTPValidators +} + +// NewHTTPSource builds a generic HTTP polling helper from SourceConfig. +// +// Required params: +// - params.url +// - params.user_agent +// +// Optional params: +// - params.conditional (default true): enable conditional GET using cached +// ETag / Last-Modified validators +func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) { + name := strings.TrimSpace(cfg.Name) + if name == "" { + return nil, fmt.Errorf("%s: name is required", driver) + } + if cfg.Params == nil { + return nil, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name) + } + + url, ok := cfg.ParamString("url", "URL") + if !ok { + return nil, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name) + } + + userAgent, ok := cfg.ParamString("user_agent", "userAgent") + if !ok { + return nil, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name) + } + + conditional, err := parseConditionalParam(cfg) + if err != nil { + return nil, err + } + + return &HTTPSource{ + Driver: driver, + Name: name, + URL: url, + UserAgent: userAgent, + Accept: accept, + Conditional: conditional, + Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout), + }, nil +} + +// FetchBytesIfChanged fetches the configured URL and reports whether the +// upstream content changed. An unchanged 304 response returns changed=false +// with no body and no error. +func (s *HTTPSource) FetchBytesIfChanged(ctx context.Context) ([]byte, bool, error) { + client := s.Client + if client == nil { + client = transport.NewHTTPClient(transport.DefaultHTTPTimeout) + } + + s.mu.Lock() + validators := s.validators + s.mu.Unlock() + + body, changed, next, err := transport.FetchBodyIfChanged( + ctx, + client, + s.URL, + s.UserAgent, + s.Accept, + s.Conditional, + validators, + ) + if err != nil { + return nil, false, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err) + } + + if s.Conditional { + s.mu.Lock() + s.validators = next + s.mu.Unlock() + } + + return body, changed, nil +} + +// FetchJSONIfChanged fetches the configured URL and returns the raw response +// body as json.RawMessage when content changed. An unchanged 304 response +// returns changed=false with a nil body and no error. +func (s *HTTPSource) FetchJSONIfChanged(ctx context.Context) (json.RawMessage, bool, error) { + body, changed, err := s.FetchBytesIfChanged(ctx) + if err != nil || !changed { + return nil, changed, err + } + return json.RawMessage(body), true, nil +} + +func parseConditionalParam(cfg config.SourceConfig) (bool, error) { + raw, ok := cfg.Params["conditional"] + if !ok || raw == nil { + return true, nil + } + + switch v := raw.(type) { + case bool: + return v, nil + case string: + s := strings.TrimSpace(v) + if s == "" { + return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) + } + parsed, err := strconv.ParseBool(s) + if err != nil { + return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) + } + return parsed, nil + default: + return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name) + } +} diff --git a/sources/http_test.go b/sources/http_test.go new file mode 100644 index 0000000..5b664b7 --- /dev/null +++ b/sources/http_test.go @@ -0,0 +1,96 @@ +package sources + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +func TestNewHTTPSourceConditionalDefaultsTrue(t *testing.T) { + src, err := NewHTTPSource("test_driver", config.SourceConfig{ + Name: "test-source", + Driver: "test_driver", + Params: map[string]any{ + "url": "https://example.invalid", + "user_agent": "test-agent", + }, + }, "application/json") + if err != nil { + t.Fatalf("NewHTTPSource() error = %v", err) + } + if !src.Conditional { + t.Fatalf("Conditional = false, want true") + } +} + +func TestNewHTTPSourceRejectsInvalidConditional(t *testing.T) { + _, err := NewHTTPSource("test_driver", config.SourceConfig{ + Name: "test-source", + Driver: "test_driver", + Params: map[string]any{ + "url": "https://example.invalid", + "user_agent": "test-agent", + "conditional": "sometimes", + }, + }, "application/json") + if err == nil { + t.Fatalf("NewHTTPSource() error = nil, want error") + } +} + +func TestHTTPSourceFetchJSONIfChanged(t *testing.T) { + var call int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + call++ + switch call { + case 1: + w.Header().Set("ETag", `"v1"`) + _, _ = w.Write([]byte(`{"ok":true}`)) + case 2: + if got := r.Header.Get("If-None-Match"); got != `"v1"` { + t.Fatalf("second request If-None-Match = %q", got) + } + w.WriteHeader(http.StatusNotModified) + default: + t.Fatalf("unexpected call count %d", call) + } + })) + defer srv.Close() + + src, err := NewHTTPSource("test_driver", config.SourceConfig{ + Name: "test-source", + Driver: "test_driver", + Params: map[string]any{ + "url": srv.URL, + "user_agent": "test-agent", + }, + }, "application/json") + if err != nil { + t.Fatalf("NewHTTPSource() error = %v", err) + } + + raw, changed, err := src.FetchJSONIfChanged(context.Background()) + if err != nil { + t.Fatalf("first FetchJSONIfChanged() error = %v", err) + } + if !changed { + t.Fatalf("first FetchJSONIfChanged() changed = false, want true") + } + if got := string(raw); got != `{"ok":true}` { + t.Fatalf("first FetchJSONIfChanged() body = %q", got) + } + + raw, changed, err = src.FetchJSONIfChanged(context.Background()) + if err != nil { + t.Fatalf("second FetchJSONIfChanged() error = %v", err) + } + if changed { + t.Fatalf("second FetchJSONIfChanged() changed = true, want false") + } + if raw != nil { + t.Fatalf("second FetchJSONIfChanged() body = %q, want nil", string(raw)) + } +} diff --git a/transport/http.go b/transport/http.go index 03c1b48..79d068c 100644 --- a/transport/http.go +++ b/transport/http.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "strings" "time" ) @@ -28,7 +29,80 @@ func NewHTTPClient(timeout time.Duration) *http.Client { } func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept string) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + res, err := doRequest(ctx, client, http.MethodGet, url, userAgent, accept, "", "") + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return nil, fmt.Errorf("HTTP %s", res.Status) + } + + return readValidatedBody(res.Body) +} + +// HTTPValidators are cache validators learned from prior successful GET responses. +// +// ETag is preferred when present. LastModified is used as a fallback validator +// when ETag is unavailable. +type HTTPValidators struct { + ETag string + LastModified string +} + +// FetchBodyIfChanged performs an HTTP GET and opportunistically uses conditional +// request headers based on the provided validators. +// +// Behavior: +// - if conditional is false, this behaves like a normal GET and leaves validators unchanged +// - if validators.ETag is set, sends If-None-Match +// - else if validators.LastModified is set, sends If-Modified-Since +// - 304 Not Modified is treated as success with changed=false and no body +// - 200 responses are treated as changed=true and still enforce the normal body checks +// +// Returned validators reflect any updates learned from the response headers. +func FetchBodyIfChanged( + ctx context.Context, + client *http.Client, + url, userAgent, accept string, + conditional bool, + validators HTTPValidators, +) ([]byte, bool, HTTPValidators, error) { + headerName, headerValue := conditionalHeader(conditional, validators) + + res, err := doRequest(ctx, client, http.MethodGet, url, userAgent, accept, headerName, headerValue) + if err != nil { + return nil, false, validators, err + } + defer res.Body.Close() + + switch res.StatusCode { + case http.StatusNotModified: + if conditional { + validators = refreshValidators(validators, res.Header) + } + return nil, false, validators, nil + default: + if res.StatusCode < 200 || res.StatusCode >= 300 { + return nil, false, validators, fmt.Errorf("HTTP %s", res.Status) + } + } + + b, err := readValidatedBody(res.Body) + if err != nil { + return nil, false, validators, err + } + + if conditional { + validators = replaceValidators(res.Header) + } + + return b, true, validators, nil +} + +func doRequest(ctx context.Context, client *http.Client, method, url, userAgent, accept, headerName, headerValue string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, method, url, nil) if err != nil { return nil, err } @@ -39,19 +113,46 @@ func FetchBody(ctx context.Context, client *http.Client, url, userAgent, accept if accept != "" { req.Header.Set("Accept", accept) } - - res, err := client.Do(req) - if err != nil { - return nil, err - } - defer res.Body.Close() - - if res.StatusCode < 200 || res.StatusCode >= 300 { - return nil, fmt.Errorf("HTTP %s", res.Status) + if headerName != "" && headerValue != "" { + req.Header.Set(headerName, headerValue) } + return client.Do(req) +} + +func conditionalHeader(enabled bool, validators HTTPValidators) (string, string) { + if !enabled { + return "", "" + } + if etag := strings.TrimSpace(validators.ETag); etag != "" { + return "If-None-Match", etag + } + if lastModified := strings.TrimSpace(validators.LastModified); lastModified != "" { + return "If-Modified-Since", lastModified + } + return "", "" +} + +func replaceValidators(header http.Header) HTTPValidators { + return HTTPValidators{ + ETag: strings.TrimSpace(header.Get("ETag")), + LastModified: strings.TrimSpace(header.Get("Last-Modified")), + } +} + +func refreshValidators(current HTTPValidators, header http.Header) HTTPValidators { + if etag := strings.TrimSpace(header.Get("ETag")); etag != "" { + current.ETag = etag + } + if lastModified := strings.TrimSpace(header.Get("Last-Modified")); lastModified != "" { + current.LastModified = lastModified + } + return current +} + +func readValidatedBody(r io.Reader) ([]byte, error) { // Read at most maxResponseBodyBytes + 1 so we can detect overflow. - limited := io.LimitReader(res.Body, maxResponseBodyBytes+1) + limited := io.LimitReader(r, maxResponseBodyBytes+1) b, err := io.ReadAll(limited) if err != nil { diff --git a/transport/http_test.go b/transport/http_test.go new file mode 100644 index 0000000..01a0e00 --- /dev/null +++ b/transport/http_test.go @@ -0,0 +1,232 @@ +package transport + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" +) + +func TestFetchBodyIfChangedPrefersETagAndTreats304AsUnchanged(t *testing.T) { + t.Helper() + + var call int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + call++ + switch call { + case 1: + if got := r.Header.Get("If-None-Match"); got != "" { + t.Fatalf("first request If-None-Match = %q, want empty", got) + } + if got := r.Header.Get("If-Modified-Since"); got != "" { + t.Fatalf("first request If-Modified-Since = %q, want empty", got) + } + w.Header().Set("ETag", `"v1"`) + w.Header().Set("Last-Modified", "Mon, 02 Jan 2006 15:04:05 GMT") + _, _ = w.Write([]byte(`{"ok":true}`)) + case 2: + if got := r.Header.Get("If-None-Match"); got != `"v1"` { + t.Fatalf("second request If-None-Match = %q, want %q", got, `"v1"`) + } + if got := r.Header.Get("If-Modified-Since"); got != "" { + t.Fatalf("second request If-Modified-Since = %q, want empty when ETag is cached", got) + } + w.WriteHeader(http.StatusNotModified) + default: + t.Fatalf("unexpected call count %d", call) + } + })) + defer srv.Close() + + validators := HTTPValidators{} + body, changed, next, err := FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "application/json", true, validators) + if err != nil { + t.Fatalf("first FetchBodyIfChanged() error = %v", err) + } + if !changed { + t.Fatalf("first FetchBodyIfChanged() changed = false, want true") + } + if got := string(body); got != `{"ok":true}` { + t.Fatalf("first FetchBodyIfChanged() body = %q", got) + } + if got := next.ETag; got != `"v1"` { + t.Fatalf("cached ETag = %q, want %q", got, `"v1"`) + } + if got := next.LastModified; got != "Mon, 02 Jan 2006 15:04:05 GMT" { + t.Fatalf("cached Last-Modified = %q", got) + } + + body, changed, next, err = FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "application/json", true, next) + if err != nil { + t.Fatalf("second FetchBodyIfChanged() error = %v", err) + } + if changed { + t.Fatalf("second FetchBodyIfChanged() changed = true, want false") + } + if body != nil { + t.Fatalf("second FetchBodyIfChanged() body = %q, want nil", string(body)) + } + if got := next.ETag; got != `"v1"` { + t.Fatalf("cached ETag after 304 = %q, want %q", got, `"v1"`) + } +} + +func TestFetchBodyIfChangedFallsBackToIfModifiedSince(t *testing.T) { + t.Helper() + + var call int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + call++ + switch call { + case 1: + w.Header().Set("Last-Modified", "Tue, 03 Jan 2006 15:04:05 GMT") + _, _ = w.Write([]byte(`first`)) + case 2: + if got := r.Header.Get("If-None-Match"); got != "" { + t.Fatalf("second request If-None-Match = %q, want empty", got) + } + if got := r.Header.Get("If-Modified-Since"); got != "Tue, 03 Jan 2006 15:04:05 GMT" { + t.Fatalf("second request If-Modified-Since = %q", got) + } + w.WriteHeader(http.StatusNotModified) + default: + t.Fatalf("unexpected call count %d", call) + } + })) + defer srv.Close() + + _, changed, validators, err := FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "", true, HTTPValidators{}) + if err != nil { + t.Fatalf("first FetchBodyIfChanged() error = %v", err) + } + if !changed { + t.Fatalf("first FetchBodyIfChanged() changed = false, want true") + } + if got := validators.LastModified; got != "Tue, 03 Jan 2006 15:04:05 GMT" { + t.Fatalf("cached Last-Modified = %q", got) + } + + _, changed, _, err = FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "", true, validators) + if err != nil { + t.Fatalf("second FetchBodyIfChanged() error = %v", err) + } + if changed { + t.Fatalf("second FetchBodyIfChanged() changed = true, want false") + } +} + +func TestFetchBodyIfChangedClearsValidatorsOn200WithoutValidators(t *testing.T) { + t.Helper() + + var call int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + call++ + switch call { + case 1: + w.Header().Set("ETag", `"v1"`) + _, _ = w.Write([]byte(`first`)) + case 2: + if got := r.Header.Get("If-None-Match"); got != `"v1"` { + t.Fatalf("second request If-None-Match = %q", got) + } + _, _ = w.Write([]byte(`second`)) + case 3: + if got := r.Header.Get("If-None-Match"); got != "" { + t.Fatalf("third request If-None-Match = %q, want empty", got) + } + if got := r.Header.Get("If-Modified-Since"); got != "" { + t.Fatalf("third request If-Modified-Since = %q, want empty", got) + } + _, _ = w.Write([]byte(`third`)) + default: + t.Fatalf("unexpected call count %d", call) + } + })) + defer srv.Close() + + _, _, validators, err := FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "", true, HTTPValidators{}) + if err != nil { + t.Fatalf("first FetchBodyIfChanged() error = %v", err) + } + _, _, validators, err = FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "", true, validators) + if err != nil { + t.Fatalf("second FetchBodyIfChanged() error = %v", err) + } + if validators.ETag != "" || validators.LastModified != "" { + t.Fatalf("validators after 200 without validators = %+v, want cleared", validators) + } + _, _, _, err = FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "", true, validators) + if err != nil { + t.Fatalf("third FetchBodyIfChanged() error = %v", err) + } +} + +func TestFetchBodyIfChangedConditionalDisabledSkipsConditionalHeaders(t *testing.T) { + t.Helper() + + var calls int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls++ + if got := r.Header.Get("If-None-Match"); got != "" { + t.Fatalf("request If-None-Match = %q, want empty", got) + } + if got := r.Header.Get("If-Modified-Since"); got != "" { + t.Fatalf("request If-Modified-Since = %q, want empty", got) + } + _, _ = w.Write([]byte(`body`)) + })) + defer srv.Close() + + validators := HTTPValidators{ETag: `"v1"`, LastModified: "Wed, 04 Jan 2006 15:04:05 GMT"} + _, changed, next, err := FetchBodyIfChanged(context.Background(), srv.Client(), srv.URL, "test-agent", "", false, validators) + if err != nil { + t.Fatalf("FetchBodyIfChanged() error = %v", err) + } + if !changed { + t.Fatalf("FetchBodyIfChanged() changed = false, want true") + } + if next != validators { + t.Fatalf("validators changed when conditional disabled: got %+v want %+v", next, validators) + } + if calls != 1 { + t.Fatalf("calls = %d, want 1", calls) + } +} + +func TestFetchBodyIfChangedAllowsEmpty304ButRejectsEmpty200(t *testing.T) { + t.Helper() + + notModified := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotModified) + })) + defer notModified.Close() + + _, changed, _, err := FetchBodyIfChanged( + context.Background(), + notModified.Client(), + notModified.URL, + "test-agent", + "", + true, + HTTPValidators{ETag: `"v1"`}, + ) + if err != nil { + t.Fatalf("304 FetchBodyIfChanged() error = %v", err) + } + if changed { + t.Fatalf("304 FetchBodyIfChanged() changed = true, want false") + } + + emptyBody := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer emptyBody.Close() + + _, _, _, err = FetchBodyIfChanged(context.Background(), emptyBody.Client(), emptyBody.URL, "test-agent", "", true, HTTPValidators{}) + if err == nil { + t.Fatalf("empty 200 FetchBodyIfChanged() error = nil, want error") + } + if err.Error() != "empty response body" { + t.Fatalf("empty 200 FetchBodyIfChanged() error = %q", err) + } +}