package normalize import ( "context" "fmt" "gitea.maximumdirect.net/ejr/feedkit/event" ) // Processor applies ordered normalization rules to pipeline events. // // Selection rule: // - iterate in Normalizers order // - the first Normalizer whose Match returns true is applied // // If no normalizer matches, the default behavior is pass-through. type Processor struct { Normalizers []Normalizer // If true, events that do not match any normalizer cause an error. // Default is false (pass-through). RequireMatch bool } // NewProcessor constructs a normalization processor from an ordered normalizer list. func NewProcessor(normalizers []Normalizer, requireMatch bool) Processor { return Processor{ Normalizers: append([]Normalizer(nil), normalizers...), RequireMatch: requireMatch, } } // Process implements processors.Processor. func (p Processor) Process(ctx context.Context, in event.Event) (*event.Event, error) { for _, n := range p.Normalizers { if n == nil { continue } if !n.Match(in) { continue } out, err := n.Normalize(ctx, in) if err != nil { return nil, fmt.Errorf("normalize: normalizer failed: %w", err) } return out, nil } if p.RequireMatch { return nil, fmt.Errorf("normalize: no normalizer matched event (id=%s kind=%s source=%s schema=%q)", in.ID, in.Kind, in.Source, in.Schema) } out := in return &out, nil }