create a custom pipeline loader permalink

load pipeline not on the local filesystem permalink

A pype loader is responsible for loading a pipeline.

The default pype loader is pypyr.loaders.file. You can change this default in config for default_loader.

This default loader loads pipelines from the local file-system, following the usual pypyr pipeline look-up sequence.

You can find all of pypyr’s built-in loaders here.

If you want to load pipelines from somewhere else, like maybe a shared pipeline library, or implement your own caching, or maybe if you want to load a pipeline from something like s3 or consul, you can roll your own pype loader.

pypyr internally automatically caches a pipeline by name for each loader on 1st load, so that subsequent calls in the same session to run that pipeline will use the cached pipeline rather than invoke the loader each time.

This is the case both when you invoke pipelines from the api/cli, and when you are calling a pipeline from within a pipeline with pypyr.steps.pype.

custom loader function signature permalink

A loader is any Python module with a function like this:

get_pipeline_definition(name: str, parent: Any) -> pypyr.pipedef.PipelineDefinition | Mapping

Here it is in more detail - all you have to do is replace get_filelike_obj_from_somewhere with your own code that, well, gets a file-like object from somewhere:

import logging
from pypyr.errors import PipelineNotFoundError
import pypyr.yaml

logger = logging.getLogger(__name__)

def get_pipeline_definition(pipeline_name, parent):
    """Open and parse the pipeline yaml.

    Parse the pipeline yaml and return dictionary representing the pipeline.

    pipeline_name is whatever is passed in from the shell like:
    $ pypyr pipelinename args

    Args:
        pipeline_name (str): Name of pipeline. Passed in from the 1st 
                             positional cmd argument, or the pipeline_name
                             run() api arg.
        parent (any): Set by the parent's loader when this call is to get the
                      definition for a child pipeline called by that parent
                      pipeline with pype.
        
    Returns:
        pypyr.pipedef.PipelineDefinition, or Dict-like object describing the 
        pipeline's yaml, parsed from the pipeline file/stream.

    Raises:
        PipelineNotFoundError: pipeline_name not found.

    """
    logger.debug("starting")

    # it's good form only to use .info and higher log levels when you must.
    # For .debug() being verbose is very much encouraged.
    logger.info("Your clever code goes here. . . ")

    # if something goes wrong you might want to raise PipelineNotFoundError
    with get_filelike_obj_from_somewhere(pipeline_name) as yaml_file:
    # pass the file-like object here to have pypyr parse the yaml for you
    # yaml_file has to be open and ready to read at this point.
    # if you prefer to use your own yaml parser, do that here instead of 
    # calling get_pipeline_yaml.
        pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)

    # remember to close yaml_file here if necessary if not using "with"

    # some arbitrary log output
    logger.debug(f"found {len(pipeline_yaml)} stages in pipeline.")

    logger.debug("done")
    return pipeline_yaml
The parent argument only has a value when loading a child pipeline via a pypyr.steps.pype step.

code example permalink

Here is a minimalist custom loader that just loads a pipeline from the current working directory without doing anything special.

Create a file ./mydir/myloader.py as follows (the file-name is not important, what is important is that the .py file has a def get_pipeline_definition in it):

"""Naive custom loader without any error handling."""
from pathlib import Path
import pypyr.yaml

CWD = Path.cwd()

def get_pipeline_definition(pipeline_name, parent):
    """Simplified loader that gets pipeline_name.yaml in working dir."""
    with open(CWD.joinpath(f'{pipeline_name}.yaml')) as yaml_file:
        return pypyr.yaml.get_pipeline_yaml(yaml_file)
pypyr will automatically cache the return value of the get_pipeline_definition function of your custom loader for you, you do not have to do anything special.

set which loader to use permalink

The usual custom module import resolution rules apply. If your loader path is not in sys.path already, pypyr can only look for ad hoc loader modules relative to the py_dir directory or in packages installed in the current Python environment.

set default loader in config permalink

You can set the default_loader in config to specify your custom loader as the default per project directory, per user or globally for all pipelines on your system.

If you do this then you don’t need to set your custom loader explicitly whenever you run a pipeline.

Remember that if you specify default_loader in the user or global config files you probably should install your custom loader as package into your active Python environment. This is because you could run pypyr from anywhere in your system, so it might not always run from the same directory.

If you set default_loader in a project config file (./pypyr-config.yaml or ./pyproject.toml), then you can set mydir.myloader relative to the project directory without having to install it as a package.

default_loader: mypackage.myloader
[tool.pypyr]
default_loader = "mypackage.myloader"

You can still override whatever the default_loader is by explicitly passing a value to loader wherever you run a pipeline.

use custom loader from the API permalink

from pathlib import Path
import pypyr.pipelinerunner

CWD = Path.cwd()

# load pipeline with get_pipeline_definition() in ./mydir/myloader.py
pypyr.pipelinerunner.run(
            pipeline_name='pipeline-name',
            args_in=['arb', 'context', 'input'],
            loader='mydir.myloader',
            py_dir=CWD
        )

If you set your default_loader in config, then you do not explicitly need to pass your custom value to the API or to pype, but you will still need to tell pypyr from which directory location to resolve it with py_dir if you’ve not installed it as a package into your environment.

If you package your loader and you install the package into the active python environment (i.e you did a $ pip install mypackage), you can of course use the usual python package name instead:

import pypyr.pipelinerunner

# mypackage.myloader is installed in the current Python env
pypyr.pipelinerunner.run(
            pipeline_name='pipeline-name',
            args_in=['arb', 'context', 'input'],
            loader='mypackage.myloader'
        )

Since in this case the loader exists in the Python environment, you do NOT need to pass py_dir to run().

See here for full details on how to run a pipeline from code.

use custom loader with pype permalink

When you call a child pipeline from a parent pipeline with pypyr.steps.pype you can explicitly set the loader when you invoke the child:

steps:
  - name: pypyr.steps.pype
    comment: loader ./mydir/myloader.py will load pipeline subdir/child-pipeline
    in:
      pype:
        name: subdir/child-pipeline
        loader: mydir.myloader

By default, pypyr will try to load a child pipeline with the same loader that handled the parent, so you do NOT explicitly need to set loader on the pype step if you just want the child to use the same loader.

If you do NOT want the child to inherit the parent’s loader setting by default for your custom loader, see the next section for how to set is_loader_cascading on the PipelineInfo object.

custom loader with shortcut permalink

You can configure a shortcut to use your custom loader.

shortcuts:
  my-shortcut:
    pipeline_name: my-pipeline-uri
    loader: mydir.myloader
[tool.pypyr.shortcuts]
[tool.pypyr.shortcuts.my-shortcut]
    pipeline_name = "my-pipeline-uri"
    loader = "mydir.myloader"

Now when you run my-shortcut pypyr will load the specified pipeline_name using your custom loader:

term
$ pypyr my-shortcut

custom pipeline metadata permalink

Your custom get_pipeline_definition function can return either just the bare dict-like yaml payload of the pipeline itself, or a pypyr.pipedef.PipelineDefinition object. Internally, the pypyr core will wrap the pipeline body in a default PipelineDefinition instance if you did not do so yourself.

If you want to add custom metadata from your loader to your pipeline you can do so by setting a pypyr.pipedef.PipelineInfo object on your PipelineDefinition.

# namespace: pypyr.pipedef

class PipelineDefinition():
    pipeline: Mapping # the dict-like pipeline body
    info: PipelineInfo # pipeline metadata

class PipelineInfo():
    pipeline_name: str # name of pipeline
    loader: str # absolute module name of loader (string)
    parent: Any = None # relative path hint for child pipelines
    is_parent_cascading: bool = True
    is_loader_cascading: bool = True

If you want to add extra custom metadata properties to your pipeline, derive your own class from PipelineInfo and add such properties as you want.

This is entirely optional - you don’t get any particular advantages by explicitly setting & returning a PipelineDefinition from your custom get_pipeline_definition function, since the pypyr core wraps the return value from get_pipeline_definition in a PipelineDefinition for you if you did not do so yourself.

You can control whether pipelines loaded by your custom loader will cascade their parent and loader properties down to child pipelines when using pype by setting the is_parent_cascading and is_loader_cascading boolean flags.

example permalink

Here is a silly example where we expand on the naive loader from earlier with an extra meta-data property called arb_property, by creating a custom metadata class ArbPipeInfo for this loader.

from pathlib import Path

from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml

CWD = Path.cwd()

class ArbPipeInfo(PipelineInfo):
    """Custom pipeline meta-data."""
    # you don't HAVE to use slots, but it's a tiny bit more efficient
    __slots__ = ['arb_property']

    def __init__(self, pipeline_name, loader, parent, arb):
        # is_parent_cascading & is_loader_cascading default True if not specified
        super().__init__(pipeline_name=pipeline_name,
                         loader=loader,
                         parent=parent)
        self.arb_property = arb


def get_pipeline_definition(pipeline_name, parent):
    """Simple loader gets pipeline_name.yaml in working dir + custom metadata."""
    # get & parse the pipeline yaml payload
    with open(CWD.joinpath(f'{pipeline_name}.yaml')) as yaml_file:
        pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)
    
    # custom meta-data to associate with pipeline
    info = ArbPipeInfo(pipeline_name=pipeline_name,
                       parent=parent,
                       loader=__name__,
                       arb_property='some useful value')

    # wrap pipeline body in a PipelineDefinition alongside its metadata
    return PipelineDefinition(pipeline=pipeline_yaml, info=info)

If you want to access your custom property from the pipeline at run-time, you can get it from the current_pipeline on the Context object:

steps:
  - name: pypyr.steps.py
    comment: arb_property of the current pipeline, as set by the custom pipeline loader.
    in:
      pycode: print(context.current_pipeline.pipeline_definition.info.arb_property)
  
  - name: pypyr.steps.py
    comment: arb_property on the 1st pipeline in the pipeline call-stack.
             this is useful when pypyr.steps.pype calls pipelines within pipelines.
    in:
      pycode: print(context.get_root_pipeline().pipeline_definition.info.arb_property)

You can access your custom metadata properties in exactly the same way on the Context object in a custom step.

non-cascading loader permalink

On pypyr.steps.pype, if you do not want to inherit which loader to use from the parent you can set is_loader_cascading to False on the PipelineInfo object.

If you do not cascade your loader setting to child pipelines, your pipeline authors will explictly have to set loader on each pype step, even if it is the same as the parent.

from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml

class ArbPipeInfo(PipelineInfo):
    """Non-cascading custom pipeline meta-data."""
    # you don't HAVE to use slots, but it's a tiny bit more efficient
    __slots__ = ['arb_property']

    def __init__(self, pipeline_name, loader, parent, arb):
        # This is a non-cascading loader.
        super().__init__(pipeline_name=pipeline_name,
                         loader=loader,
                         parent=parent,
                         is_loader_cascading=False)
        self.arb_property = arb


def get_pipeline_definition(pipeline_name, parent):
    """Non-cascading because PipelineInfo has is_loader_cascading False."""
    # get & parse the pipeline yaml payload from somewhere
    with get_filelike_obj_from_somewhere(pipeline_name) as yaml_file:
        pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)
        actual_name = yaml_file.some_api_property
        actual_parent = yaml_file.another_api_property
    
    # custom meta-data to associate with pipeline
    info = ArbPipeInfo(pipeline_name=actual_name,
                       parent=actual_parent,
                       loader=__name__,
                       arb_property='some useful value')

    # wrap pipeline body in a PipelineDefinition alongside its metadata
    return PipelineDefinition(pipeline=pipeline_yaml, info=info)

load child pipeline relative to parent permalink

If wherever you store your pipelines has a hierarchical structure, you can load child pipelines invoked by pypyr.steps.pype so that the child pipeline_name resolves relative to the parent pipeline’s location. Use the parent argument on def get_pipeline_definition(pipeline_name, parent) to do so.

The parent property of the calling pipeline will cascade down to the child by default if is_parent_cascading on the PipelineInfo instance is True - which it is by default.

from pathlib import Path

from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml

CWD = Path.cwd()

def get_pipeline_definition(pipeline_name, parent):
    """Simple loader that gets pipeline_name.yaml in parent or working dir."""
    # pipeline_name could be "subdir/mydir/subdir/mypipe"
    pipeline_path = (parent.joinpath(f'{pipeline_name}.yaml')
                     if parent else CWD.joinpath(f'{pipeline_name}.yaml'))

    with open() as yaml_file:
        pipeline_yaml = pypyr.yaml.get_pipeline_yaml(pipeline_path)
    
    # set parent property so child pipelines can resolve relative to it
    info = PipelineInfo(pipeline_name=pipeline_name,
                        parent=pipeline_path.parent,
                        loader=__name__)

    # wrap pipeline body in a PipelineDefinition alongside its metadata
    return PipelineDefinition(pipeline=pipeline_yaml, info=info)

The root pipeline will always have parent=None. When this root pipeline calls child pipelines with pypyr.steps.pype, the pype step will pass the calling pipeline’s info.parent property to the loader when retrieving the child pipeline.

In this simple example the parent argument is of type pathlib.Path, but it can be any type that works for you.

Although it us up to your loader to interpret the parent argument as it sees fit, and it therefore could be any type, whatever type of object parent is should have a sensible __str__ representation.

This is a bit tough to explain with a typing hint. 😬

The reason is that pypyr caches child pipelines by the key "{parent}+{pipeline_name}", so if the string representation of parent is not unique enough, you could end up overwriting existing pipelines in cache when you load a child pipeline with the same name but different parents.

If your pipeline store is flat and/or you identify pipelines solely by name, there is no need to use the parent property. The parent property is there to deal with the complexities that arise from the file system where it is convenient to address a child pipeline relative to the parent’s location. If wherever you store your pipelines does not allow for this, there is no point in implementing anything special for the parent property.

Pipeline authors can use the parent input on the pype step to over-ride the value of loader’s info.parent property cascading to the child pipeline. This will only change the value of parent for the child pipeline load, it does not touch the calling pipeline’s info.parent property. (info.parent refers to the PipelineInfo object assigned to .info on the PipelineDefinition.)

steps:
- name: pypyr.steps.pype
  comment: call child pipeline from here
  in:
    pype:
      name: pipeline-name # mandatory. string.
      parent: myparent

multi-address pipelines permalink

If different combinations of parent + pipeline_name could refer to the same underlying pipeline, it is up to your loader to implement a loader specific cache if you do not want to parse & load that pipeline >1 in a session.

For example:

  • parent /dir + pipeline name sub/pipe
  • parent /dir/sub + pipeline name pipe

On the filesystem, both of these is the same - both refer to /dir/sub/pipe.

If this is a concern for you, you can use pypyr’s built-in cache class to make your life a bit easier:

from pypyr.cache.cache import Cache
from pypyr.errors import PipelineNotFoundError
from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml

_pipedef_cache = Cache() 

def get_pipeline_definition(pipeline_name, parent):
    """Loader with its own cache for combinations of parent+name."""
    # loader has to find UNIQUE canonical path for pipeline here somehow
    canonical_path = get_canonical_path_somehow(parent, pipeline_name)

    # if canonical_path in cache, return it. if not, put it in cache with lambda.
    pipeline_definition = _pipedef_cache.get(
        canonical_path,
        lambda: load_pipeline_from_somewhere(canonical_path))

    return pipeline_definition

def load_pipeline_from_somewhere(canonical_path):
    """Use whatever API you gotta use to get the pipeline here."""
    # if something goes wrong you might want to raise PipelineNotFoundError
    with get_filelike_obj_from_somewhere(canonical_path) as yaml_file:
        actual_name = yaml_file.some_api_property
        actual_parent = yaml_file.another_api_property
        
        # if you prefer to use your own yaml parser, do that here instead of 
        # calling get_pipeline_yaml.
        pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)

    # set parent property so child pipelines can resolve relative to it
    info = PipelineInfo(pipeline_name=actual_name,
                        parent=actual_parent,
                        loader=__name__)

    # wrap pipeline body in a PipelineDefinition alongside its metadata
    return PipelineDefinition(pipeline=pipeline_yaml, info=info)    

You do NOT need to worry about this additional caching complexity if a pipeline is only addressable by one and only one combination of parent and pipeline_name for your loader, because the pypyr core already caches the reference returned from get_pipeline_definition for you. If CPU & memory are not particularly of concern for you, then you also don’t need to worry about this.

The yaml loading & parsing is the most resource intensive operation in pypyr, so this extra cache is only really helpful if you’re seeking to optimize this by preventing >1 parsing of the same underlying pipeline in the edge case where a different parent+name combination refers to an already loaded pipeline.

see also

last updated on .