create a custom pipeline loader
load pipeline not on the local filesystem
A pype loader is responsible for loading a pipeline.
The default pype loader is pypyr.loaders.file. You can change this default in config for default_loader.
This default loader loads pipelines from the local file-system, following the usual pypyr pipeline look-up sequence.
You can find all of pypyr’s built-in loaders here.
If you want to load pipelines from somewhere else, like maybe a shared pipeline library, or implement your own caching, or maybe if you want to load a pipeline from something like s3 or consul, you can roll your own pype loader.
pypyr internally automatically caches a pipeline by name for each loader on 1st load, so that subsequent calls in the same session to run that pipeline will use the cached pipeline rather than invoke the loader each time.
This is the case both when you invoke pipelines from the api/cli, and when you are calling a pipeline from within a pipeline with pypyr.steps.pype.
custom loader function signature
A loader is any Python module with a function like this:
get_pipeline_definition(name: str, parent: Any) -> pypyr.pipedef.PipelineDefinition | MappingHere it is in more detail - all you have to do is replace
get_filelike_obj_from_somewhere with your own code that, well, gets a
file-like object from somewhere:
import logging
from pypyr.errors import PipelineNotFoundError
import pypyr.yaml
logger = logging.getLogger(__name__)
def get_pipeline_definition(pipeline_name, parent):
"""Open and parse the pipeline yaml.
Parse the pipeline yaml and return dictionary representing the pipeline.
pipeline_name is whatever is passed in from the shell like:
$ pypyr pipelinename args
Args:
pipeline_name (str): Name of pipeline. Passed in from the 1st
positional cmd argument, or the pipeline_name
run() api arg.
parent (any): Set by the parent's loader when this call is to get the
definition for a child pipeline called by that parent
pipeline with pype.
Returns:
pypyr.pipedef.PipelineDefinition, or Dict-like object describing the
pipeline's yaml, parsed from the pipeline file/stream.
Raises:
PipelineNotFoundError: pipeline_name not found.
"""
logger.debug("starting")
# it's good form only to use .info and higher log levels when you must.
# For .debug() being verbose is very much encouraged.
logger.info("Your clever code goes here. . . ")
# if something goes wrong you might want to raise PipelineNotFoundError
with get_filelike_obj_from_somewhere(pipeline_name) as yaml_file:
# pass the file-like object here to have pypyr parse the yaml for you
# yaml_file has to be open and ready to read at this point.
# if you prefer to use your own yaml parser, do that here instead of
# calling get_pipeline_yaml.
pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)
# remember to close yaml_file here if necessary if not using "with"
# some arbitrary log output
logger.debug(f"found {len(pipeline_yaml)} stages in pipeline.")
logger.debug("done")
return pipeline_yamlparent argument only has a value when loading a child pipeline via a
pypyr.steps.pype step.code example
Here is a minimalist custom loader that just loads a pipeline from the current working directory without doing anything special.
Create a file ./mydir/myloader.py as follows (the file-name is not important,
what is important is that the .py file has a def get_pipeline_definition in
it):
"""Naive custom loader without any error handling."""
from pathlib import Path
import pypyr.yaml
CWD = Path.cwd()
def get_pipeline_definition(pipeline_name, parent):
"""Simplified loader that gets pipeline_name.yaml in working dir."""
with open(CWD.joinpath(f'{pipeline_name}.yaml')) as yaml_file:
return pypyr.yaml.get_pipeline_yaml(yaml_file)get_pipeline_definition function of your custom loader for you, you do not
have to do anything special.set which loader to use
The usual custom module import resolution rules apply. If your loader path is not in
sys.path already, pypyr can only look for ad hoc loader modules relative to
the py_dir directory or in packages installed in the current Python
environment.
set default loader in config
You can set the default_loader in config to specify your custom loader as the default per project directory, per user or globally for all pipelines on your system.
If you do this then you don’t need to set your custom loader explicitly whenever you run a pipeline.
Remember that if you specify default_loader in the user or global config
files you probably should install your custom loader as package into your
active Python environment. This is because you could run pypyr from anywhere
in your system, so it might not always run from the same directory.
If you set default_loader in a project config file (./pypyr-config.yaml or
./pyproject.toml), then you can set mydir.myloader relative to the
project directory without having to install it as a package.
default_loader: mypackage.myloader[tool.pypyr]
default_loader = "mypackage.myloader"You can still override whatever the default_loader is by explicitly passing
a value to loader wherever you run a pipeline.
use custom loader from the API
from pathlib import Path
import pypyr.pipelinerunner
CWD = Path.cwd()
# load pipeline with get_pipeline_definition() in ./mydir/myloader.py
pypyr.pipelinerunner.run(
pipeline_name='pipeline-name',
args_in=['arb', 'context', 'input'],
loader='mydir.myloader',
py_dir=CWD
)If you set your default_loader in config, then you do not
explicitly need to pass your custom value to the API or to pype, but you will
still need to tell pypyr from which directory location to resolve it with
py_dir if you’ve not installed it as a package into your environment.
If you package your loader and you install the package into the active python
environment (i.e you did a $ pip install mypackage), you can of course use the
usual python package name instead:
import pypyr.pipelinerunner
# mypackage.myloader is installed in the current Python env
pypyr.pipelinerunner.run(
pipeline_name='pipeline-name',
args_in=['arb', 'context', 'input'],
loader='mypackage.myloader'
)Since in this case the loader exists in the Python environment, you do NOT need
to pass py_dir to run().
See here for full details on how to run a pipeline from code.
use custom loader with pype
When you call a child pipeline from a parent pipeline with pypyr.steps.pype you can explicitly set the loader when you invoke the child:
steps:
- name: pypyr.steps.pype
comment: loader ./mydir/myloader.py will load pipeline subdir/child-pipeline
in:
pype:
name: subdir/child-pipeline
loader: mydir.myloaderBy default, pypyr will try to load a child pipeline with the same loader that
handled the parent, so you do NOT explicitly need to set loader on the pype
step if you just want the child to use the same loader.
If you do NOT want the child to inherit the parent’s loader setting by default
for your custom loader, see the next section for how to set
is_loader_cascading on the PipelineInfo object.
custom loader with shortcut
You can configure a shortcut to use your custom loader.
shortcuts:
my-shortcut:
pipeline_name: my-pipeline-uri
loader: mydir.myloader[tool.pypyr.shortcuts]
[tool.pypyr.shortcuts.my-shortcut]
pipeline_name = "my-pipeline-uri"
loader = "mydir.myloader"Now when you run my-shortcut pypyr will load the specified pipeline_name
using your custom loader:
$ pypyr my-shortcutcustom pipeline metadata
Your custom get_pipeline_definition function can return either just the bare
dict-like yaml payload of the pipeline itself, or a
pypyr.pipedef.PipelineDefinition object. Internally, the pypyr core will
wrap the pipeline body in a default PipelineDefinition instance if you did
not do so yourself.
If you want to add custom metadata from your loader to your pipeline you can do
so by setting a pypyr.pipedef.PipelineInfo object on your
PipelineDefinition.
# namespace: pypyr.pipedef
class PipelineDefinition():
pipeline: Mapping # the dict-like pipeline body
info: PipelineInfo # pipeline metadata
class PipelineInfo():
pipeline_name: str # name of pipeline
loader: str # absolute module name of loader (string)
parent: Any = None # relative path hint for child pipelines
is_parent_cascading: bool = True
is_loader_cascading: bool = TrueIf you want to add extra custom metadata properties to your pipeline, derive
your own class from PipelineInfo and add such properties as you want.
This is entirely optional - you don’t get any particular advantages by
explicitly setting & returning a PipelineDefinition from your custom
get_pipeline_definition function, since the pypyr core wraps the return value
from get_pipeline_definition in a PipelineDefinition for you if you did not
do so yourself.
You can control whether pipelines loaded by your custom loader will cascade
their parent and loader properties down to child pipelines when using
pype by setting the is_parent_cascading and is_loader_cascading boolean
flags.
example
Here is a silly example where we expand on the naive loader from earlier with
an extra meta-data property called arb_property, by creating a custom
metadata class ArbPipeInfo for this loader.
from pathlib import Path
from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml
CWD = Path.cwd()
class ArbPipeInfo(PipelineInfo):
"""Custom pipeline meta-data."""
# you don't HAVE to use slots, but it's a tiny bit more efficient
__slots__ = ['arb_property']
def __init__(self, pipeline_name, loader, parent, arb):
# is_parent_cascading & is_loader_cascading default True if not specified
super().__init__(pipeline_name=pipeline_name,
loader=loader,
parent=parent)
self.arb_property = arb
def get_pipeline_definition(pipeline_name, parent):
"""Simple loader gets pipeline_name.yaml in working dir + custom metadata."""
# get & parse the pipeline yaml payload
with open(CWD.joinpath(f'{pipeline_name}.yaml')) as yaml_file:
pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)
# custom meta-data to associate with pipeline
info = ArbPipeInfo(pipeline_name=pipeline_name,
parent=parent,
loader=__name__,
arb_property='some useful value')
# wrap pipeline body in a PipelineDefinition alongside its metadata
return PipelineDefinition(pipeline=pipeline_yaml, info=info)If you want to access your custom property from the pipeline at run-time, you
can get it from the current_pipeline on the Context object:
steps:
- name: pypyr.steps.py
comment: arb_property of the current pipeline, as set by the custom pipeline loader.
in:
pycode: print(context.current_pipeline.pipeline_definition.info.arb_property)
- name: pypyr.steps.py
comment: arb_property on the 1st pipeline in the pipeline call-stack.
this is useful when pypyr.steps.pype calls pipelines within pipelines.
in:
pycode: print(context.get_root_pipeline().pipeline_definition.info.arb_property)You can access your custom metadata properties in exactly the same way on the
Context object in a custom step.
non-cascading loader
On pypyr.steps.pype, if you do not want to
inherit which loader to use from the parent you can set is_loader_cascading to
False on the PipelineInfo object.
If you do not cascade your loader setting to child pipelines, your pipeline
authors will explictly have to set loader on each pype step, even if it
is the same as the parent.
from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml
class ArbPipeInfo(PipelineInfo):
"""Non-cascading custom pipeline meta-data."""
# you don't HAVE to use slots, but it's a tiny bit more efficient
__slots__ = ['arb_property']
def __init__(self, pipeline_name, loader, parent, arb):
# This is a non-cascading loader.
super().__init__(pipeline_name=pipeline_name,
loader=loader,
parent=parent,
is_loader_cascading=False)
self.arb_property = arb
def get_pipeline_definition(pipeline_name, parent):
"""Non-cascading because PipelineInfo has is_loader_cascading False."""
# get & parse the pipeline yaml payload from somewhere
with get_filelike_obj_from_somewhere(pipeline_name) as yaml_file:
pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)
actual_name = yaml_file.some_api_property
actual_parent = yaml_file.another_api_property
# custom meta-data to associate with pipeline
info = ArbPipeInfo(pipeline_name=actual_name,
parent=actual_parent,
loader=__name__,
arb_property='some useful value')
# wrap pipeline body in a PipelineDefinition alongside its metadata
return PipelineDefinition(pipeline=pipeline_yaml, info=info)load child pipeline relative to parent
If wherever you store your pipelines has a hierarchical structure, you can load
child pipelines invoked by pypyr.steps.pype
so that the child pipeline_name resolves relative to the parent pipeline’s
location. Use the parent argument on
def get_pipeline_definition(pipeline_name, parent) to do so.
The parent property of the calling pipeline will cascade down to the child
by default if is_parent_cascading on the PipelineInfo instance is True -
which it is by default.
from pathlib import Path
from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml
CWD = Path.cwd()
def get_pipeline_definition(pipeline_name, parent):
"""Simple loader that gets pipeline_name.yaml in parent or working dir."""
# pipeline_name could be "subdir/mydir/subdir/mypipe"
pipeline_path = (parent.joinpath(f'{pipeline_name}.yaml')
if parent else CWD.joinpath(f'{pipeline_name}.yaml'))
with open() as yaml_file:
pipeline_yaml = pypyr.yaml.get_pipeline_yaml(pipeline_path)
# set parent property so child pipelines can resolve relative to it
info = PipelineInfo(pipeline_name=pipeline_name,
parent=pipeline_path.parent,
loader=__name__)
# wrap pipeline body in a PipelineDefinition alongside its metadata
return PipelineDefinition(pipeline=pipeline_yaml, info=info)The root pipeline will always have parent=None. When this root pipeline calls
child pipelines with pypyr.steps.pype, the pype step will pass the calling
pipeline’s info.parent property to the loader when retrieving the child
pipeline.
In this simple example the parent argument is of type pathlib.Path, but it
can be any type that works for you.
Although it us up to your loader to interpret the parent argument as it sees
fit, and it therefore could be any type, whatever type of object parent is
should have a sensible __str__ representation.
This is a bit tough to explain with a typing hint. 😬
The reason is that pypyr caches child pipelines by the key
"{parent}+{pipeline_name}", so if the string representation of parent is
not unique enough, you could end up overwriting existing pipelines in cache
when you load a child pipeline with the same name but different parents.
If your pipeline store is flat and/or you identify pipelines solely by name,
there is no need to use the parent property. The parent property is there
to deal with the complexities that arise from the file system where it is
convenient to address a child pipeline relative to the parent’s location.
If wherever you store your pipelines does not allow for this, there is no point
in implementing anything special for the parent property.
Pipeline authors can use the parent input on the pype step to over-ride the
value of loader’s info.parent property cascading to the child pipeline. This
will only change the value of parent for the child pipeline load, it does not
touch the calling pipeline’s info.parent property. (info.parent refers to
the PipelineInfo object assigned to .info on the PipelineDefinition.)
steps:
- name: pypyr.steps.pype
comment: call child pipeline from here
in:
pype:
name: pipeline-name # mandatory. string.
parent: myparentmulti-address pipelines
If different combinations of parent + pipeline_name could refer to the same
underlying pipeline, it is up to your loader to implement a loader specific
cache if you do not want to parse & load that pipeline >1 in a session.
For example:
- parent
/dir+ pipeline namesub/pipe - parent
/dir/sub+ pipeline namepipe
On the filesystem, both of these is the same - both refer to /dir/sub/pipe.
If this is a concern for you, you can use pypyr’s built-in cache class to make your life a bit easier:
from pypyr.cache.cache import Cache
from pypyr.errors import PipelineNotFoundError
from pypyr.pipedef import PipelineDefinition, PipelineInfo
import pypyr.yaml
_pipedef_cache = Cache()
def get_pipeline_definition(pipeline_name, parent):
"""Loader with its own cache for combinations of parent+name."""
# loader has to find UNIQUE canonical path for pipeline here somehow
canonical_path = get_canonical_path_somehow(parent, pipeline_name)
# if canonical_path in cache, return it. if not, put it in cache with lambda.
pipeline_definition = _pipedef_cache.get(
canonical_path,
lambda: load_pipeline_from_somewhere(canonical_path))
return pipeline_definition
def load_pipeline_from_somewhere(canonical_path):
"""Use whatever API you gotta use to get the pipeline here."""
# if something goes wrong you might want to raise PipelineNotFoundError
with get_filelike_obj_from_somewhere(canonical_path) as yaml_file:
actual_name = yaml_file.some_api_property
actual_parent = yaml_file.another_api_property
# if you prefer to use your own yaml parser, do that here instead of
# calling get_pipeline_yaml.
pipeline_yaml = pypyr.yaml.get_pipeline_yaml(yaml_file)
# set parent property so child pipelines can resolve relative to it
info = PipelineInfo(pipeline_name=actual_name,
parent=actual_parent,
loader=__name__)
# wrap pipeline body in a PipelineDefinition alongside its metadata
return PipelineDefinition(pipeline=pipeline_yaml, info=info) You do NOT need to worry about this additional caching complexity if a pipeline
is only addressable by one and only one combination of parent and
pipeline_name for your loader, because the pypyr core already caches the
reference returned from get_pipeline_definition for you. If CPU & memory are
not particularly of concern for you, then you also don’t need to worry about
this.
The yaml loading & parsing is the most resource intensive operation in pypyr, so this extra cache is only really helpful if you’re seeking to optimize this by preventing >1 parsing of the same underlying pipeline in the edge case where a different parent+name combination refers to an already loaded pipeline.