Pipelines

Pipelines are the core of Megatron. Pipelines contain all your transformations and are what you ultimately use to generate outputs.
class megatron.pipeline.Pipeline(inputs, outputs, metrics=[], explorers=[], name=None, version=None, storage=None, overwrite=False)

Bases: object

Holds the core computation graph that maps out Layers and manipulates data.

Parameters:
  • inputs (list of megatron.Node(s)) – input nodes of the Pipeline, where raw data is fed in.
  • outputs (list of megatron.Node(s)) – output nodes of the Pipeline, the processed features.
  • name (str) – unique identifying name of the Pipeline.
  • version (str) – version tag for Pipeline’s cache table in the database.
  • storage_db (Connection (defeault: 'sqlite')) – database connection to be used for input and output data storage.
inputs

input nodes of the Pipeline, where raw data is fed in.

Type:list of megatron.Node(s)
outputs

output nodes of the Pipeline, the processed features.

Type:list of megatron.Node(s)
path

full topological sort of Pipeline from inputs to outputs.

Type:list of megatron.Nodes
nodes

separation of Nodes by type.

Type:dict of list of megatron.Node(s)
eager

when True, TransformationNode outputs are to be calculated on creation. This is indicated by data being passed to an InputNode node as a function call.

Type:bool
name

unique identifying name of the Pipeline.

Type:str
version

version tag for Pipeline’s cache table in the database.

Type:str
storage

storage database for input and output data.

Type:Connection (defeault: None)
evaluate(input_data, prune=True)

Execute the metric Nodes in the Pipeline and get their results.

Parameters:input_data (dict of Numpy array) – the input data to be passed to InputNodes to begin execution.
evaluate_generator(input_generator, steps)

Execute the metric Nodes in the Pipeline for each batch in a generator.

explore_generator(input_generator, steps)

Execute the explorer Nodes in the Pipeline for each batch in a generator.

fit(input_data, epochs=1)

Fit to input data and overwrite the metadata.

Parameters:
  • input_data (2-tuple of dict of Numpy array, Numpy array) – the input data to be passed to InputNodes to begin execution, and the index.
  • epochs (int (default: 1)) – number of passes to perform over the data.
fit_generator(input_generator, steps_per_epoch, epochs=1)

Fit to generator of input data batches. Execute partial_fit to each batch.

Parameters:
  • input_generator (generator of 2-tuple of dict of Numpy array and Numpy array) – generator that produces features and labels for each batch of data.
  • steps_per_epoch (int) – number of batches that are considered one full epoch.
  • epochs (int (default: 1)) – number of passes to perform over the data.
partial_fit(input_data)

Fit to input data in an incremental way if possible.

Parameters:input_data (dict of Numpy array) – the input data to be passed to InputNodes to begin execution.
save(save_dir)

Store the Pipeline and its learned metadata without the outputs on disk.

The filename will be {name of the pipeline}{version}.pkl.

Parameters:save_dir (str) – the desired location of the stored nodes, without the filename.
transform(input_data, index_field=None, prune=True)

Execute the graph with some input data, get the output nodes’ data.

Parameters:
  • input_data (dict of Numpy array) – the input data to be passed to InputNodes to begin execution.
  • index_field (str) – name of key from input_data to be used as index for storage and lookup.
  • keep_data (bool) – whether to keep data in non-output nodes after execution. activating this flag can be useful for debugging.
transform_generator(input_generator, steps, index=None)

Execute the graph with some input data from a generator, create generator.

Parameters:
  • input_generator (dict of Numpy array) – generator producing input data to be passed to Input nodes.
  • steps (int) – number of batches to pull from input_generator before terminating.
megatron.pipeline.load_pipeline(filepath, storage_db=None)

Load a set of nodes from a given file, stored previously with Pipeline.save().

Parameters:
  • filepath (str) – the file from which to load a Pipeline.
  • storage_db (Connection (default: sqlite3.connect('megatron_default.db'))) – database connection object to query for cached data from the Pipeline.