Moved broadly useful helper functions upstream into feedkit
This commit is contained in:
84
processors/normalize/helpers.go
Normal file
84
processors/normalize/helpers.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package normalize
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
)
|
||||
|
||||
// PayloadJSONBytes extracts a JSON payload into bytes suitable for json.Unmarshal.
|
||||
//
|
||||
// Supported payload shapes:
|
||||
// - json.RawMessage
|
||||
// - []byte
|
||||
// - string
|
||||
// - map[string]any
|
||||
func PayloadJSONBytes(e event.Event) ([]byte, error) {
|
||||
if e.Payload == nil {
|
||||
return nil, fmt.Errorf("payload is nil")
|
||||
}
|
||||
|
||||
switch v := e.Payload.(type) {
|
||||
case json.RawMessage:
|
||||
if len(v) == 0 {
|
||||
return nil, fmt.Errorf("payload is empty json.RawMessage")
|
||||
}
|
||||
return []byte(v), nil
|
||||
case []byte:
|
||||
if len(v) == 0 {
|
||||
return nil, fmt.Errorf("payload is empty []byte")
|
||||
}
|
||||
return v, nil
|
||||
case string:
|
||||
if v == "" {
|
||||
return nil, fmt.Errorf("payload is empty string")
|
||||
}
|
||||
return []byte(v), nil
|
||||
case map[string]any:
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal map payload: %w", err)
|
||||
}
|
||||
return b, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported payload type %T", e.Payload)
|
||||
}
|
||||
}
|
||||
|
||||
// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T.
|
||||
func DecodeJSONPayload[T any](in event.Event) (T, error) {
|
||||
var zero T
|
||||
|
||||
b, err := PayloadJSONBytes(in)
|
||||
if err != nil {
|
||||
return zero, fmt.Errorf("extract payload: %w", err)
|
||||
}
|
||||
|
||||
var parsed T
|
||||
if err := json.Unmarshal(b, &parsed); err != nil {
|
||||
return zero, fmt.Errorf("decode raw payload: %w", err)
|
||||
}
|
||||
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
// FinalizeEvent builds the output event envelope by copying the input and applying
|
||||
// the new schema/payload, plus optional EffectiveAt.
|
||||
func FinalizeEvent(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) {
|
||||
out := in
|
||||
out.Schema = outSchema
|
||||
out.Payload = outPayload
|
||||
|
||||
if !effectiveAt.IsZero() {
|
||||
t := effectiveAt.UTC()
|
||||
out.EffectiveAt = &t
|
||||
}
|
||||
|
||||
if err := out.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
118
processors/normalize/helpers_test.go
Normal file
118
processors/normalize/helpers_test.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package normalize
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
)
|
||||
|
||||
func TestPayloadJSONBytesSupportedShapes(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
payload any
|
||||
want string
|
||||
}{
|
||||
{name: "rawmessage", payload: json.RawMessage(`{"a":1}`), want: `{"a":1}`},
|
||||
{name: "bytes", payload: []byte(`{"a":2}`), want: `{"a":2}`},
|
||||
{name: "string", payload: `{"a":3}`, want: `{"a":3}`},
|
||||
{name: "map", payload: map[string]any{"a": 4}, want: `{"a":4}`},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got, err := PayloadJSONBytes(event.Event{Payload: tc.payload})
|
||||
if err != nil {
|
||||
t.Fatalf("PayloadJSONBytes() unexpected error: %v", err)
|
||||
}
|
||||
if string(got) != tc.want {
|
||||
t.Fatalf("PayloadJSONBytes() = %s, want %s", string(got), tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadJSONBytesRejectsInvalidPayloads(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
payload any
|
||||
want string
|
||||
}{
|
||||
{name: "nil", payload: nil, want: "payload is nil"},
|
||||
{name: "empty rawmessage", payload: json.RawMessage{}, want: "payload is empty json.RawMessage"},
|
||||
{name: "empty bytes", payload: []byte{}, want: "payload is empty []byte"},
|
||||
{name: "empty string", payload: "", want: "payload is empty string"},
|
||||
{name: "unsupported", payload: 123, want: "unsupported payload type"},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, err := PayloadJSONBytes(event.Event{Payload: tc.payload})
|
||||
if err == nil {
|
||||
t.Fatalf("PayloadJSONBytes() expected error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), tc.want) {
|
||||
t.Fatalf("PayloadJSONBytes() error = %q, want substring %q", err, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeJSONPayload(t *testing.T) {
|
||||
type payload struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
got, err := DecodeJSONPayload[payload](event.Event{
|
||||
Payload: json.RawMessage(`{"name":"alice"}`),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("DecodeJSONPayload() unexpected error: %v", err)
|
||||
}
|
||||
if got.Name != "alice" {
|
||||
t.Fatalf("DecodeJSONPayload() = %#v, want name alice", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFinalizeEventPreservesEnvelopeAndEffectiveAtBehavior(t *testing.T) {
|
||||
existingEffectiveAt := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC)
|
||||
in := event.Event{
|
||||
ID: "evt-1",
|
||||
Kind: event.Kind("observation"),
|
||||
Source: "source-a",
|
||||
EmittedAt: time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC),
|
||||
EffectiveAt: &existingEffectiveAt,
|
||||
Schema: "raw.example.v1",
|
||||
Payload: map[string]any{"old": true},
|
||||
}
|
||||
|
||||
out, err := FinalizeEvent(in, "example.v1", map[string]any{"value": 1.234567}, time.Time{})
|
||||
if err != nil {
|
||||
t.Fatalf("FinalizeEvent() unexpected error: %v", err)
|
||||
}
|
||||
if out.ID != in.ID || out.Kind != in.Kind || out.Source != in.Source || out.EmittedAt != in.EmittedAt {
|
||||
t.Fatalf("FinalizeEvent() changed preserved envelope fields: %#v", out)
|
||||
}
|
||||
if out.EffectiveAt == nil || !out.EffectiveAt.Equal(existingEffectiveAt) {
|
||||
t.Fatalf("FinalizeEvent() effectiveAt = %#v, want preserved existing value", out.EffectiveAt)
|
||||
}
|
||||
|
||||
nextEffectiveAt := time.Date(2026, 3, 28, 13, 0, 0, 0, time.FixedZone("x", -4*3600))
|
||||
out, err = FinalizeEvent(in, "example.v1", map[string]any{"value": 1.234567}, nextEffectiveAt)
|
||||
if err != nil {
|
||||
t.Fatalf("FinalizeEvent() unexpected overwrite error: %v", err)
|
||||
}
|
||||
if out.EffectiveAt == nil || !out.EffectiveAt.Equal(nextEffectiveAt.UTC()) {
|
||||
t.Fatalf("FinalizeEvent() effectiveAt = %#v, want %s", out.EffectiveAt, nextEffectiveAt.UTC())
|
||||
}
|
||||
|
||||
payloadMap, ok := out.Payload.(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("FinalizeEvent() payload type = %T, want map[string]any", out.Payload)
|
||||
}
|
||||
if payloadMap["value"] != 1.234567 {
|
||||
t.Fatalf("FinalizeEvent() payload value = %#v, want unrounded 1.234567", payloadMap["value"])
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user