Skip to content

Commit

Permalink
pipelineImpl: de-pointerize plugin interface fields (algorand#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzaffi committed Jun 30, 2023
1 parent 9eb8e62 commit ba19d62
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 76 deletions.
69 changes: 33 additions & 36 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type pipelineImpl struct {

initProvider *data.InitProvider

importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
importer importers.Importer
processors []processors.Processor
exporter exporters.Exporter
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state
Expand All @@ -72,30 +72,30 @@ func (p *pipelineImpl) setError(err error) {
}

func (p *pipelineImpl) registerLifecycleCallbacks() {
if v, ok := (*p.importer).(conduit.Completed); ok {
if v, ok := p.importer.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
for _, processor := range p.processors {
if v, ok := (*processor).(conduit.Completed); ok {
if v, ok := processor.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}
if v, ok := (*p.exporter).(conduit.Completed); ok {
if v, ok := p.exporter.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}

func (p *pipelineImpl) registerPluginMetricsCallbacks() {
var collectors []prometheus.Collector
if v, ok := (*p.importer).(conduit.PluginMetrics); ok {
if v, ok := p.importer.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
for _, processor := range p.processors {
if v, ok := (*processor).(conduit.PluginMetrics); ok {
if v, ok := processor.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
}
if v, ok := (*p.exporter).(conduit.PluginMetrics); ok {
if v, ok := p.exporter.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
for _, c := range collectors {
Expand Down Expand Up @@ -143,23 +143,23 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
}
var parts []overridePart

if v, ok := (*p.importer).(conduit.RoundRequestor); ok {
if v, ok := p.importer.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Importer,
t: plugins.Importer,
})
}
for idx, processor := range p.processors {
if v, ok := (*processor).(conduit.RoundRequestor); ok {
if v, ok := processor.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Processors[idx],
t: plugins.Processor,
})
}
}
if v, ok := (*p.exporter).(conduit.RoundRequestor); ok {
if v, ok := p.exporter.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Exporter,
Expand Down Expand Up @@ -306,11 +306,11 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
err = (*p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}
genesis, err := (*p.importer).GetGenesis()
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)
}
err = (*processor).Init(p.ctx, *p.initProvider, config, logger)
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)
}
Expand All @@ -352,7 +352,7 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}
err = (*p.exporter).Init(p.ctx, *p.initProvider, config, logger)
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}
Expand Down Expand Up @@ -388,20 +388,20 @@ func (p *pipelineImpl) Stop() {
}
}

if err := (*p.importer).Close(); err != nil {
if err := p.importer.Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (*p.importer).Metadata().Name, err)
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)
}

for _, processor := range p.processors {
if err := (*processor).Close(); err != nil {
if err := processor.Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (*processor).Metadata().Name, err)
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)
}
}

if err := (*p.exporter).Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (*p.exporter).Metadata().Name, err)
if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)
}
}

Expand Down Expand Up @@ -460,7 +460,7 @@ func (p *pipelineImpl) Start() {
p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound)
// fetch block
importStart := time.Now()
blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound)
blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand All @@ -477,18 +477,18 @@ func (p *pipelineImpl) Start() {
start := time.Now()
for _, proc := range p.processors {
processorStart := time.Now()
blkData, err = (*proc).Process(blkData)
blkData, err = proc.Process(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
retry++
goto pipelineRun
}
metrics.ProcessorTimeSeconds.WithLabelValues((*proc).Metadata().Name).Observe(time.Since(processorStart).Seconds())
metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(time.Since(processorStart).Seconds())
}
// run through exporter
exporterStart := time.Now()
err = (*p.exporter).Receive(blkData)
err = p.exporter.Receive(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand Down Expand Up @@ -563,47 +563,44 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi
logger: logger,
initProvider: nil,
importer: nil,
processors: []*processors.Processor{},
processors: []processors.Processor{},
exporter: nil,
}

importerName := cfg.Importer.Name

importerBuilder, err := importers.ImporterBuilderByName(importerName)
importerConstructor, err := importers.ImporterConstructorByName(importerName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err)
}

importer := importerBuilder.New()
pipeline.importer = &importer
pipeline.importer = importerConstructor.New()
logger.Infof("Found Importer: %s", importerName)

// ---

for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorBuilder, err := processors.ProcessorBuilderByName(processorName)
processorConstructor, err := processors.ProcessorConstructorByName(processorName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

processor := processorBuilder.New()
pipeline.processors = append(pipeline.processors, &processor)
pipeline.processors = append(pipeline.processors, processorConstructor.New())
logger.Infof("Found Processor: %s", processorName)
}

// ---

exporterName := cfg.Exporter.Name

exporterBuilder, err := exporters.ExporterBuilderByName(exporterName)
exporterConstructor, err := exporters.ExporterConstructorByName(exporterName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err)
}

exporter := exporterBuilder.New()
pipeline.exporter = &exporter
pipeline.exporter = exporterConstructor.New()
logger.Infof("Found Exporter: %s", exporterName)

return pipeline, nil
Expand Down
32 changes: 16 additions & 16 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func mockPipeline(t *testing.T, dataDir string) (*pipelineImpl, *test.Hook, *moc
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
Expand Down Expand Up @@ -271,9 +271,9 @@ func TestPipelineRun(t *testing.T) {
cf: cf,
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete},
pipelineMetadata: state{
NextRound: 0,
Expand Down Expand Up @@ -371,9 +371,9 @@ func TestPipelineErrors(t *testing.T) {
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete},
pipelineMetadata: state{},
}
Expand Down Expand Up @@ -440,9 +440,9 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
cfg: &data.Config{},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor, &pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor, pProcessor},
exporter: pExporter,
}

// Each plugin implements the Completed interface, so there should be 4
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestGenesisHash(t *testing.T) {

// mock a different genesis hash
var pImporter importers.Importer = &mockImporter{genesis: sdk.Genesis{Network: "dev"}}
pImpl.importer = &pImporter
pImpl.importer = pImporter
err = pImpl.Init()
assert.Contains(t, err.Error(), "genesis hash in metadata does not match")
}
Expand Down Expand Up @@ -797,9 +797,9 @@ func TestPipelineRetryVariables(t *testing.T) {
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/exporter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func Register(name string, constructor ExporterConstructor) {
Exporters[name] = constructor
}

// ExporterBuilderByName returns a Processor constructor for the name provided
func ExporterBuilderByName(name string) (ExporterConstructor, error) {
// ExporterConstructorByName returns a Processor constructor for the name provided
func ExporterConstructorByName(name string) (ExporterConstructor, error) {
constructor, ok := Exporters[name]
if !ok {
return nil, fmt.Errorf("no Exporter Constructor for %s", name)
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/exporter_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ func TestExporterByNameSuccess(t *testing.T) {
me := mockExporter{}
Register("foobar", &mockExporterConstructor{&me})

expC, err := ExporterBuilderByName("foobar")
expC, err := ExporterConstructorByName("foobar")
assert.NoError(t, err)
exp := expC.New()
assert.Implements(t, (*Exporter)(nil), exp)
}

func TestExporterByNameNotFound(t *testing.T) {
_, err := ExporterBuilderByName("barfoo")
_, err := ExporterConstructorByName("barfoo")
expectedErr := "no Exporter Constructor for barfoo"
assert.EqualError(t, err, expectedErr)
}
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/exporters/noop/noop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ne = nc.New()
func TestExporterBuilderByName(t *testing.T) {
// init() has already registered the noop exporter
assert.Contains(t, exporters.Exporters, metadata.Name)
neBuilder, err := exporters.ExporterBuilderByName(metadata.Name)
neBuilder, err := exporters.ExporterConstructorByName(metadata.Name)
assert.NoError(t, err)
ne := neBuilder.New()
assert.Implements(t, (*exporters.Exporter)(nil), ne)
Expand Down
12 changes: 6 additions & 6 deletions conduit/plugins/importers/importer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
)

// Constructor must be implemented by each Importer.
// ImporterConstructor must be implemented by each Importer.
// It provides a basic no-arg constructor for instances of an ImporterImpl.
type Constructor interface {
type ImporterConstructor interface {
// New should return an instantiation of a Importer.
// Configuration values should be passed and can be processed during `Init()`.
New() Importer
Expand All @@ -21,20 +21,20 @@ func (f ImporterConstructorFunc) New() Importer {
}

// Importers are the constructors to build importer plugins.
var Importers = make(map[string]Constructor)
var Importers = make(map[string]ImporterConstructor)

// Register is used to register Constructor implementations. This mechanism allows
// for loose coupling between the configuration and the implementation. It is extremely similar to the way sql.DB
// drivers are configured and used.
func Register(name string, constructor Constructor) {
func Register(name string, constructor ImporterConstructor) {
if _, ok := Importers[name]; ok {
panic(fmt.Errorf("importer %s already registered", name))
}
Importers[name] = constructor
}

// ImporterBuilderByName returns a Importer constructor for the name provided
func ImporterBuilderByName(name string) (Constructor, error) {
// ImporterConstructorByName returns a Importer constructor for the name provided
func ImporterConstructorByName(name string) (ImporterConstructor, error) {
constructor, ok := Importers[name]
if !ok {
return nil, fmt.Errorf("no Importer Constructor for %s", name)
Expand Down
Loading

0 comments on commit ba19d62

Please sign in to comment.