Skip to content

pipeline

Classes#

PipelineConfig #

Bases: ParameterizedConfigClass, YamlModel


Contains configuration parameters for tsdat pipelines.

This class is ultimately converted into a tsdat.pipeline.base.Pipeline subclass that will be used to process data.

Provides methods to support yaml parsing and validation, including the generation of json schema for immediate validation. This class also provides a method to instantiate a tsdat.pipeline.base.Pipeline subclass from a parsed configuration file.

Parameters:

Name Type Description Default
classname str

The dotted module path to the pipeline that the specified configurations should apply to. To use the built-in IngestPipeline, for example, you would set 'tsdat.pipeline.pipelines.IngestPipeline' as the classname.

required
triggers List[Pattern[str]]

A list of regex patterns that should trigger this pipeline when matched with an input key.

required
retriever Union[Overrideable[RetrieverConfig], RetrieverConfig]

Either the path to the retriever configuration yaml file and any overrides that should be applied, or the retriever configurations themselves.

required
dataset Union[Overrideable[DatasetConfig], DatasetConfig]

Either the path to the dataset configuration yaml file and any overrides that should be applied, or the dataset configurations themselves.

required
quality Union[Overrideable[QualityConfig], QualityConfig]

Either the path to the quality configuration yaml file and any overrides that should be applied, or the quality configurations themselves.

required
storage Union[Overrideable[StorageConfig], StorageConfig]

Either the path to the storage configuration yaml file and any overrides that should be applied, or the storage configurations themselves.

required

Attributes#

dataset class-attribute instance-attribute #
dataset: Union[
    Overrideable[DatasetConfig], DatasetConfig
] = Field(
    description="Specify the dataset configurations that describe the structure and metadata of the dataset produced by this pipeline."
)
quality class-attribute instance-attribute #
quality: Union[
    Overrideable[QualityConfig], QualityConfig
] = Field(
    description="Specify the quality checks and controls that should be applied to the dataset as part of this pipeline."
)
retriever class-attribute instance-attribute #
retriever: Union[
    Overrideable[RetrieverConfig], RetrieverConfig
] = Field(
    description="Specify the retrieval configurations that the pipeline should use."
)
storage class-attribute instance-attribute #
storage: Union[
    Overrideable[StorageConfig], StorageConfig
] = Field(
    description="Specify the Storage configurations that should be used to save data produced by this pipeline."
)
triggers class-attribute instance-attribute #
triggers: List[Pattern] = Field(
    description="A list of regex patterns matching input keys to determine if the pipeline should be run. Please ensure these are specific as possible in order to match the desired input keys without any false positive matches (this is more important in repositories with many pipelines)."
)

Functions#

instantiate_pipeline #
instantiate_pipeline() -> Pipeline

Loads the tsdat.pipeline.BasePipeline subclass specified by the classname property.

Properties and sub-properties of the PipelineConfig class that are subclasses of tsdat.config.utils.ParameterizedConfigClass (e.g, classes that define a 'classname' and optional 'parameters' properties) will also be instantiated in similar fashion. See tsdat.config.utils.recursive_instantiate for implementation details.

Returns:

Name Type Description
Pipeline Pipeline

An instance of a tsdat.pipeline.base.Pipeline subclass.


Source code in tsdat/config/pipeline.py
def instantiate_pipeline(self) -> Pipeline:
    """------------------------------------------------------------------------------------
    Loads the tsdat.pipeline.BasePipeline subclass specified by the classname property.

    Properties and sub-properties of the PipelineConfig class that are subclasses of
    tsdat.config.utils.ParameterizedConfigClass (e.g, classes that define a 'classname' and
    optional 'parameters' properties) will also be instantiated in similar fashion. See
    tsdat.config.utils.recursive_instantiate for implementation details.


    Returns:
        Pipeline: An instance of a tsdat.pipeline.base.Pipeline subclass.

    ------------------------------------------------------------------------------------"""
    return recursive_instantiate(self)
merge_overrideable_yaml classmethod #
merge_overrideable_yaml(
    v: Dict[str, Any],
    values: Dict[str, Any],
    field: ModelField,
)
Source code in tsdat/config/pipeline.py
@validator("retriever", "dataset", "quality", "storage", pre=True)
@classmethod
def merge_overrideable_yaml(
    cls, v: Dict[str, Any], values: Dict[str, Any], field: ModelField
):
    object_field_mapping = {
        "retriever": RetrieverConfig,
        "dataset": DatasetConfig,
        "quality": QualityConfig,
        "storage": StorageConfig,
    }
    config_cls = object_field_mapping[field.name]

    if matches_overrideable_schema(v):
        defaults = read_yaml(Path(v["path"]))
        overrides = v.get("overrides", {})
        for pointer, new_value in overrides.items():
            set_pointer(defaults, pointer, new_value)
        v = defaults

    return config_cls(**v)

Functions#