177 lines
4.8 KiB
Go
177 lines
4.8 KiB
Go
package pipeline
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"gitea.maximumdirect.net/eric/seriatim/internal/config"
|
|
"gitea.maximumdirect.net/eric/seriatim/internal/model"
|
|
"gitea.maximumdirect.net/eric/seriatim/internal/report"
|
|
)
|
|
|
|
const (
|
|
applicationName = "seriatim"
|
|
version = "dev"
|
|
)
|
|
|
|
// Run validates module composition, executes the pipeline, and emits outputs.
|
|
func Run(ctx context.Context, cfg config.Config, registry *Registry) error {
|
|
plan, err := resolvePlan(cfg, registry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
events := make([]report.Event, 0)
|
|
|
|
raw, newEvents, err := plan.inputReader.Read(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
events = append(events, newEvents...)
|
|
|
|
state := PreprocessState{
|
|
State: StateRaw,
|
|
Raw: raw,
|
|
}
|
|
for _, module := range plan.preprocessors {
|
|
state, newEvents, err = module.Process(ctx, state, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
events = append(events, newEvents...)
|
|
}
|
|
if state.State != StateCanonical {
|
|
return fmt.Errorf("preprocessing ended in state %q; expected %q", state.State, StateCanonical)
|
|
}
|
|
|
|
merged, newEvents, err := plan.merger.Merge(ctx, state.Canonical, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
events = append(events, newEvents...)
|
|
|
|
for _, module := range plan.postprocessors {
|
|
merged, newEvents, err = module.Process(ctx, merged, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
events = append(events, newEvents...)
|
|
}
|
|
|
|
final := finalizeTranscript(cfg, merged)
|
|
rpt := finalizeReport(cfg, events)
|
|
|
|
for _, module := range plan.outputWriters {
|
|
newEvents, err = module.Write(ctx, final, rpt, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
events = append(events, newEvents...)
|
|
}
|
|
|
|
rpt = finalizeReport(cfg, events)
|
|
if cfg.ReportFile != "" {
|
|
if err := report.WriteJSON(cfg.ReportFile, rpt); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type executionPlan struct {
|
|
inputReader InputReader
|
|
preprocessors []Preprocessor
|
|
merger Merger
|
|
postprocessors []Postprocessor
|
|
outputWriters []OutputWriter
|
|
}
|
|
|
|
func resolvePlan(cfg config.Config, registry *Registry) (executionPlan, error) {
|
|
inputReader, err := registry.resolveInputReader(cfg.InputReader)
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
|
|
preprocessors, err := registry.resolvePreprocessors(cfg.PreprocessingModules)
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
if err := validatePreprocessors(preprocessors); err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
|
|
merger, err := registry.resolveMerger()
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
|
|
postprocessors, err := registry.resolvePostprocessors(cfg.PostprocessingModules)
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
|
|
outputWriters, err := registry.resolveOutputWriters(cfg.OutputModules)
|
|
if err != nil {
|
|
return executionPlan{}, err
|
|
}
|
|
|
|
return executionPlan{
|
|
inputReader: inputReader,
|
|
preprocessors: preprocessors,
|
|
merger: merger,
|
|
postprocessors: postprocessors,
|
|
outputWriters: outputWriters,
|
|
}, nil
|
|
}
|
|
|
|
func validatePreprocessors(modules []Preprocessor) error {
|
|
state := StateRaw
|
|
for _, module := range modules {
|
|
if module.Requires() != state {
|
|
return fmt.Errorf("preprocessing module %q requires state %q but current state is %q", module.Name(), module.Requires(), state)
|
|
}
|
|
state = module.Produces()
|
|
}
|
|
if state != StateCanonical {
|
|
return fmt.Errorf("preprocessing chain ends in state %q; expected %q", state, StateCanonical)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func finalizeTranscript(cfg config.Config, merged model.MergedTranscript) model.FinalTranscript {
|
|
segments := make([]model.Segment, len(merged.Segments))
|
|
copy(segments, merged.Segments)
|
|
overlapGroups := make([]model.OverlapGroup, len(merged.OverlapGroups))
|
|
copy(overlapGroups, merged.OverlapGroups)
|
|
|
|
return model.FinalTranscript{
|
|
Metadata: model.OutputMetadata{
|
|
Application: applicationName,
|
|
Version: version,
|
|
InputReader: cfg.InputReader,
|
|
InputFiles: append([]string(nil), cfg.InputFiles...),
|
|
PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...),
|
|
PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...),
|
|
OutputModules: append([]string(nil), cfg.OutputModules...),
|
|
},
|
|
Segments: segments,
|
|
OverlapGroups: overlapGroups,
|
|
}
|
|
}
|
|
|
|
func finalizeReport(cfg config.Config, events []report.Event) report.Report {
|
|
return report.Report{
|
|
Metadata: report.Metadata{
|
|
Application: applicationName,
|
|
Version: version,
|
|
InputReader: cfg.InputReader,
|
|
InputFiles: append([]string(nil), cfg.InputFiles...),
|
|
PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...),
|
|
PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...),
|
|
OutputModules: append([]string(nil), cfg.OutputModules...),
|
|
},
|
|
Events: append([]report.Event(nil), events...),
|
|
}
|
|
}
|