package pipeline import ( "context" "fmt" "gitea.maximumdirect.net/eric/seriatim/internal/artifact" "gitea.maximumdirect.net/eric/seriatim/internal/buildinfo" "gitea.maximumdirect.net/eric/seriatim/internal/config" "gitea.maximumdirect.net/eric/seriatim/internal/model" "gitea.maximumdirect.net/eric/seriatim/internal/report" "gitea.maximumdirect.net/eric/seriatim/schema" ) const ( applicationName = artifact.ApplicationName ) // Run validates module composition, executes the pipeline, and emits outputs. func Run(ctx context.Context, cfg config.Config, registry *Registry) error { plan, err := resolvePlan(cfg, registry) if err != nil { return err } events := make([]report.Event, 0) raw, newEvents, err := plan.inputReader.Read(ctx, cfg) if err != nil { return err } events = append(events, newEvents...) state := PreprocessState{ State: StateRaw, Raw: raw, } for _, module := range plan.preprocessors { state, newEvents, err = module.Process(ctx, state, cfg) if err != nil { return err } events = append(events, newEvents...) } if state.State != StateCanonical { return fmt.Errorf("preprocessing ended in state %q; expected %q", state.State, StateCanonical) } merged, newEvents, err := plan.merger.Merge(ctx, state.Canonical, cfg) if err != nil { return err } events = append(events, newEvents...) for _, module := range plan.postprocessors { merged, newEvents, err = module.Process(ctx, merged, cfg) if err != nil { return err } events = append(events, newEvents...) } final := finalizeTranscript(cfg, merged) rpt := finalizeReport(cfg, events) for _, module := range plan.outputWriters { newEvents, err = module.Write(ctx, final, rpt, cfg) if err != nil { return err } events = append(events, newEvents...) } rpt = finalizeReport(cfg, events) if cfg.ReportFile != "" { if err := report.WriteJSON(cfg.ReportFile, rpt); err != nil { return err } } return nil } type executionPlan struct { inputReader InputReader preprocessors []Preprocessor merger Merger postprocessors []Postprocessor outputWriters []OutputWriter } func resolvePlan(cfg config.Config, registry *Registry) (executionPlan, error) { inputReader, err := registry.resolveInputReader(cfg.InputReader) if err != nil { return executionPlan{}, err } preprocessors, err := registry.resolvePreprocessors(cfg.PreprocessingModules) if err != nil { return executionPlan{}, err } if err := validatePreprocessors(preprocessors); err != nil { return executionPlan{}, err } merger, err := registry.resolveMerger() if err != nil { return executionPlan{}, err } postprocessors, err := registry.resolvePostprocessors(cfg.PostprocessingModules) if err != nil { return executionPlan{}, err } outputWriters, err := registry.resolveOutputWriters(cfg.OutputModules) if err != nil { return executionPlan{}, err } return executionPlan{ inputReader: inputReader, preprocessors: preprocessors, merger: merger, postprocessors: postprocessors, outputWriters: outputWriters, }, nil } func validatePreprocessors(modules []Preprocessor) error { state := StateRaw for _, module := range modules { if module.Requires() != state { return fmt.Errorf("preprocessing module %q requires state %q but current state is %q", module.Name(), module.Requires(), state) } state = module.Produces() } if state != StateCanonical { return fmt.Errorf("preprocessing chain ends in state %q; expected %q", state, StateCanonical) } return nil } func finalizeTranscript(cfg config.Config, merged model.MergedTranscript) schema.Transcript { return artifact.FromMerged(cfg, merged) } func finalizeReport(cfg config.Config, events []report.Event) report.Report { return report.Report{ Metadata: report.Metadata{ Application: applicationName, Version: buildinfo.Version, InputReader: cfg.InputReader, InputFiles: append([]string(nil), cfg.InputFiles...), PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...), PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...), OutputModules: append([]string(nil), cfg.OutputModules...), }, Events: append([]report.Event(nil), events...), } }