Skip to content

pipeline_config

Modules:

Name Description
matches_overridable_schema
read_yaml
recursive_instantiate

Classes:

Name Description
PipelineConfig

Contains configuration parameters for tsdat pipelines.

Functions:

Name Description
get_resolved_cfg_path

Classes#

PipelineConfig #

Bases: ParameterizedConfigClass

Contains configuration parameters for tsdat pipelines.

This class is ultimately converted into a tsdat.pipeline.base.Pipeline subclass that will be used to process data.

Provides methods to support yaml parsing and validation, including the generation of json schema for immediate validation. This class also provides a method to instantiate a tsdat.pipeline.base.Pipeline subclass from a parsed configuration file.

Methods:

Name Description
from_yaml

Creates a python configuration object from a yaml file.

generate_schema

Generates JSON schema from the model fields and type annotations.

instantiate_pipeline

Loads the tsdat.pipeline.BasePipeline subclass specified by the classname property.

merge_overridable_yaml

Attributes:

Name Type Description
cfg_filepath Optional[Path]

The path to the yaml config file used to instantiate this class. Set via the

dataset Union[Overrideable[DatasetConfig], DatasetConfig]

Either the path to the dataset configuration yaml file and any overrides that

quality Union[Overrideable[QualityConfig], QualityConfig]

Either the path to the quality configuration yaml file and any overrides that

retriever Union[Overrideable[RetrieverConfig], RetrieverConfig]

Either the path to the retriever configuration yaml file and any overrides that

storage Union[Overrideable[StorageConfig], StorageConfig]

Either the path to the storage configuration yaml file and any overrides that

triggers List[Pattern]

A list of regex patterns that should trigger this pipeline when matched with an

Attributes#

cfg_filepath class-attribute instance-attribute #
cfg_filepath: Optional[Path] = None

The path to the yaml config file used to instantiate this class. Set via the 'from_yaml()' classmethod

dataset class-attribute instance-attribute #
dataset: Union[
    Overrideable[DatasetConfig], DatasetConfig
] = Field(
    description="Specify the dataset configurations that describe the structure and metadata of the dataset produced by this pipeline."
)

Either the path to the dataset configuration yaml file and any overrides that should be applied, or the dataset configurations themselves.

quality class-attribute instance-attribute #
quality: Union[
    Overrideable[QualityConfig], QualityConfig
] = Field(
    description="Specify the quality checks and controls that should be applied to the dataset as part of this pipeline."
)

Either the path to the quality configuration yaml file and any overrides that should be applied, or the quality configurations themselves.

retriever class-attribute instance-attribute #
retriever: Union[
    Overrideable[RetrieverConfig], RetrieverConfig
] = Field(
    description="Specify the retrieval configurations that the pipeline should use."
)

Either the path to the retriever configuration yaml file and any overrides that should be applied, or the retriever configurations themselves.

storage class-attribute instance-attribute #
storage: Union[
    Overrideable[StorageConfig], StorageConfig
] = Field(
    description="Specify the Storage configurations that should be used to save data produced by this pipeline."
)

Either the path to the storage configuration yaml file and any overrides that should be applied, or the storage configurations themselves.

triggers class-attribute instance-attribute #
triggers: List[Pattern] = Field(
    description="A list of regex patterns matching input keys to determine if the pipeline should be run. Please ensure these are specific as possible in order to match the desired input keys without any false positive matches (this is more important in repositories with many pipelines)."
)

A list of regex patterns that should trigger this pipeline when matched with an input key.

Functions#

from_yaml classmethod #
from_yaml(
    filepath: Path,
    overrides: Optional[Dict[str, Any]] = None,
) -> Self

Creates a python configuration object from a yaml file.

Parameters:

Name Type Description Default
filepath Path

The path to the yaml file

required
overrides Optional[Dict[str, Any]]

Overrides to apply to the yaml before instantiating the YamlModel object. Defaults to None.

None

Returns:

Name Type Description
YamlModel Self

A YamlModel subclass

Source code in tsdat/config/pipeline/pipeline_config.py
@classmethod
def from_yaml(
    cls, filepath: Path, overrides: Optional[Dict[str, Any]] = None
) -> Self:
    """Creates a python configuration object from a yaml file.

    Args:
        filepath (Path): The path to the yaml file
        overrides (Optional[Dict[str, Any]], optional): Overrides to apply to the
            yaml before instantiating the YamlModel object. Defaults to None.

    Returns:
        YamlModel: A YamlModel subclass

    """
    config = read_yaml(filepath)
    if overrides:
        for pointer, new_value in overrides.items():
            set_pointer(config, pointer, new_value)
    try:
        return cls(cfg_filepath=filepath, **config)
    except (ValidationError, Exception) as e:
        raise ConfigError(
            f"Error encountered while instantiating {filepath}"
        ) from e
generate_schema classmethod #
generate_schema(output_file: Path)

Generates JSON schema from the model fields and type annotations.

Parameters:

Name Type Description Default
output_file Path

The path to store the JSON schema.

required
Source code in tsdat/config/pipeline/pipeline_config.py
@classmethod
def generate_schema(cls, output_file: Path):
    """Generates JSON schema from the model fields and type annotations.

    Args:
        output_file (Path): The path to store the JSON schema.
    """
    output_file.write_text(cls.schema_json(indent=4))
instantiate_pipeline #
instantiate_pipeline() -> Pipeline

Loads the tsdat.pipeline.BasePipeline subclass specified by the classname property.

Properties and sub-properties of the PipelineConfig class that are subclasses of tsdat.config.utils.ParameterizedConfigClass (e.g, classes that define a 'classname' and optional 'parameters' properties) will also be instantiated in similar fashion. See tsdat.config.utils.recursive_instantiate for implementation details.

Returns:

Name Type Description
Pipeline Pipeline

An instance of a tsdat.pipeline.base.Pipeline subclass.

Source code in tsdat/config/pipeline/pipeline_config.py
def instantiate_pipeline(self) -> Pipeline:
    """Loads the tsdat.pipeline.BasePipeline subclass specified by the classname property.

    Properties and sub-properties of the PipelineConfig class that are subclasses of
    tsdat.config.utils.ParameterizedConfigClass (e.g, classes that define a 'classname' and
    optional 'parameters' properties) will also be instantiated in similar fashion. See
    tsdat.config.utils.recursive_instantiate for implementation details.

    Returns:
        Pipeline: An instance of a tsdat.pipeline.base.Pipeline subclass.
    """
    return recursive_instantiate(self)
merge_overridable_yaml #
merge_overridable_yaml(values: Dict[str, Any])
Source code in tsdat/config/pipeline/pipeline_config.py
@root_validator(pre=True)
def merge_overridable_yaml(cls, values: Dict[str, Any]):
    object_field_mapping = {
        "retriever": RetrieverConfig,
        "dataset": DatasetConfig,
        "quality": QualityConfig,
        "storage": StorageConfig,
    }
    for field, config_cls in object_field_mapping.items():
        v = values[field]
        if matches_overridable_schema(v):
            cfg_path = get_resolved_cfg_path(v["path"], values.get("cfg_filepath"))
            defaults = read_yaml(cfg_path)
            overrides = v.get("overrides", {})
            for pointer, new_value in overrides.items():
                set_pointer(defaults, pointer, new_value)
            v = defaults
        values[field] = config_cls(**v)
    return values

Functions#

get_resolved_cfg_path #

get_resolved_cfg_path(
    linked_path: str | Path,
    pipeline_cfg_path: str | Path | None,
) -> Path
Source code in tsdat/config/pipeline/pipeline_config.py
def get_resolved_cfg_path(
    linked_path: str | Path, pipeline_cfg_path: str | Path | None
) -> Path:
    if pipeline_cfg_path is not None and (
        str(linked_path).startswith("../") or str(linked_path).startswith("./")
    ):
        return (Path(pipeline_cfg_path).parent / linked_path).resolve()
    return Path(linked_path)

Modules#