Moved common HTTP polling helpers into feedkit and implemented support for ETag and Last-Modified

This commit is contained in:
2026-03-28 09:59:58 -05:00
parent 3b92c2284d
commit 4910440756
6 changed files with 610 additions and 11 deletions

View File

@@ -11,4 +11,14 @@
//
// A single source may emit 0..N events per poll or stream iteration, and those
// events may span multiple event kinds.
//
// HTTP-backed polling sources can share NewHTTPSource for generic HTTP config
// parsing and conditional GET behavior. The helper understands:
// - params.url
// - params.user_agent
// - params.conditional (optional, default true)
//
// When validators are available, NewHTTPSource prefers ETag/If-None-Match and
// falls back to Last-Modified/If-Modified-Since. A 304 Not Modified response is
// treated as a successful unchanged poll.
package sources

147
sources/http.go Normal file
View File

@@ -0,0 +1,147 @@
package sources
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/transport"
)
// HTTPSource is a reusable helper for polling HTTP-backed sources.
//
// It centralizes generic source config parsing (`params.url`,
// `params.user_agent`, and optional `params.conditional`), default HTTP client
// setup, and conditional GET validator handling. Concrete daemon sources remain
// responsible for decoding the response body and constructing events.
type HTTPSource struct {
Driver string
Name string
URL string
UserAgent string
Accept string
Conditional bool
Client *http.Client
mu sync.Mutex
validators transport.HTTPValidators
}
// NewHTTPSource builds a generic HTTP polling helper from SourceConfig.
//
// Required params:
// - params.url
// - params.user_agent
//
// Optional params:
// - params.conditional (default true): enable conditional GET using cached
// ETag / Last-Modified validators
func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) {
name := strings.TrimSpace(cfg.Name)
if name == "" {
return nil, fmt.Errorf("%s: name is required", driver)
}
if cfg.Params == nil {
return nil, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name)
}
url, ok := cfg.ParamString("url", "URL")
if !ok {
return nil, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
}
userAgent, ok := cfg.ParamString("user_agent", "userAgent")
if !ok {
return nil, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
}
conditional, err := parseConditionalParam(cfg)
if err != nil {
return nil, err
}
return &HTTPSource{
Driver: driver,
Name: name,
URL: url,
UserAgent: userAgent,
Accept: accept,
Conditional: conditional,
Client: transport.NewHTTPClient(transport.DefaultHTTPTimeout),
}, nil
}
// FetchBytesIfChanged fetches the configured URL and reports whether the
// upstream content changed. An unchanged 304 response returns changed=false
// with no body and no error.
func (s *HTTPSource) FetchBytesIfChanged(ctx context.Context) ([]byte, bool, error) {
client := s.Client
if client == nil {
client = transport.NewHTTPClient(transport.DefaultHTTPTimeout)
}
s.mu.Lock()
validators := s.validators
s.mu.Unlock()
body, changed, next, err := transport.FetchBodyIfChanged(
ctx,
client,
s.URL,
s.UserAgent,
s.Accept,
s.Conditional,
validators,
)
if err != nil {
return nil, false, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err)
}
if s.Conditional {
s.mu.Lock()
s.validators = next
s.mu.Unlock()
}
return body, changed, nil
}
// FetchJSONIfChanged fetches the configured URL and returns the raw response
// body as json.RawMessage when content changed. An unchanged 304 response
// returns changed=false with a nil body and no error.
func (s *HTTPSource) FetchJSONIfChanged(ctx context.Context) (json.RawMessage, bool, error) {
body, changed, err := s.FetchBytesIfChanged(ctx)
if err != nil || !changed {
return nil, changed, err
}
return json.RawMessage(body), true, nil
}
func parseConditionalParam(cfg config.SourceConfig) (bool, error) {
raw, ok := cfg.Params["conditional"]
if !ok || raw == nil {
return true, nil
}
switch v := raw.(type) {
case bool:
return v, nil
case string:
s := strings.TrimSpace(v)
if s == "" {
return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
parsed, err := strconv.ParseBool(s)
if err != nil {
return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
return parsed, nil
default:
return false, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
}

96
sources/http_test.go Normal file
View File

@@ -0,0 +1,96 @@
package sources
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
func TestNewHTTPSourceConditionalDefaultsTrue(t *testing.T) {
src, err := NewHTTPSource("test_driver", config.SourceConfig{
Name: "test-source",
Driver: "test_driver",
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
}, "application/json")
if err != nil {
t.Fatalf("NewHTTPSource() error = %v", err)
}
if !src.Conditional {
t.Fatalf("Conditional = false, want true")
}
}
func TestNewHTTPSourceRejectsInvalidConditional(t *testing.T) {
_, err := NewHTTPSource("test_driver", config.SourceConfig{
Name: "test-source",
Driver: "test_driver",
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
"conditional": "sometimes",
},
}, "application/json")
if err == nil {
t.Fatalf("NewHTTPSource() error = nil, want error")
}
}
func TestHTTPSourceFetchJSONIfChanged(t *testing.T) {
var call int
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
call++
switch call {
case 1:
w.Header().Set("ETag", `"v1"`)
_, _ = w.Write([]byte(`{"ok":true}`))
case 2:
if got := r.Header.Get("If-None-Match"); got != `"v1"` {
t.Fatalf("second request If-None-Match = %q", got)
}
w.WriteHeader(http.StatusNotModified)
default:
t.Fatalf("unexpected call count %d", call)
}
}))
defer srv.Close()
src, err := NewHTTPSource("test_driver", config.SourceConfig{
Name: "test-source",
Driver: "test_driver",
Params: map[string]any{
"url": srv.URL,
"user_agent": "test-agent",
},
}, "application/json")
if err != nil {
t.Fatalf("NewHTTPSource() error = %v", err)
}
raw, changed, err := src.FetchJSONIfChanged(context.Background())
if err != nil {
t.Fatalf("first FetchJSONIfChanged() error = %v", err)
}
if !changed {
t.Fatalf("first FetchJSONIfChanged() changed = false, want true")
}
if got := string(raw); got != `{"ok":true}` {
t.Fatalf("first FetchJSONIfChanged() body = %q", got)
}
raw, changed, err = src.FetchJSONIfChanged(context.Background())
if err != nil {
t.Fatalf("second FetchJSONIfChanged() error = %v", err)
}
if changed {
t.Fatalf("second FetchJSONIfChanged() changed = true, want false")
}
if raw != nil {
t.Fatalf("second FetchJSONIfChanged() body = %q, want nil", string(raw))
}
}