tsdat.pipeline
¶
This module contains pipeline classes that are used to process time series data from start to finish.
Submodules¶
Classes¶
The IngestPipeline class is designed to read in raw, non-standardized |
|
This class serves as the base class for all tsdat data pipelines. |
-
class
tsdat.pipeline.
IngestPipeline
(pipeline_config: Union[str, tsdat.config.Config], storage_config: Union[str, tsdat.io.DatastreamStorage])[source]¶ Bases:
tsdat.pipeline.pipeline.Pipeline
The IngestPipeline class is designed to read in raw, non-standardized data and convert it to a standardized format by embedding metadata, applying quality checks and quality controls, and by saving the now-processed data in a standard file format.
Class Methods
Hook to allow for user customizations to the standardized dataset
Hook to allow for user customizations to one or more raw xarray
Hook to apply any final customizations to the dataset before it is
Hook to allow users to create plots from the xarray dataset after
Renames the provided raw files according to ME Data Standards file
Runs the IngestPipeline from start to finish.
Method Descriptions
-
hook_customize_dataset
(self, dataset: xarray.Dataset, raw_mapping: Dict[str, xarray.Dataset]) → xarray.Dataset¶ Hook to allow for user customizations to the standardized dataset such as inserting a derived variable based on other variables in the dataset. This method is called immediately after the
standardize_dataset
method and beforeQualityManagement
has been run.- Parameters
dataset (xr.Dataset) – The dataset to customize.
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.
- Returns
The customized dataset.
- Return type
xr.Dataset
-
hook_customize_raw_datasets
(self, raw_dataset_mapping: Dict[str, xarray.Dataset]) → Dict[str, xarray.Dataset]¶ Hook to allow for user customizations to one or more raw xarray Datasets before they merged and used to create the standardized dataset. The raw_dataset_mapping will contain one entry for each file being used as input to the pipeline. The keys are the standardized raw file name, and the values are the datasets.
This method would typically only be used if the user is combining multiple files into a single dataset. In this case, this method may be used to correct coordinates if they don’t match for all the files, or to change variable (column) names if two files have the same name for a variable, but they are two distinct variables.
This method can also be used to check for unique conditions in the raw data that should cause a pipeline failure if they are not met.
This method is called before the inputs are merged and converted to standard format as specified by the config file.
- Parameters
raw_dataset_mapping (Dict[str, xr.Dataset]) – The raw datasets to customize.
- Returns
The customized raw datasets.
- Return type
Dict[str, xr.Dataset]
-
hook_finalize_dataset
(self, dataset: xarray.Dataset) → xarray.Dataset¶ Hook to apply any final customizations to the dataset before it is saved. This hook is called after QualityManagement has been run and immediately before the dataset it saved to file.
- Parameters
dataset (xr.Dataset) – The dataset to finalize.
- Returns
The finalized dataset to save.
- Return type
xr.Dataset
-
hook_generate_and_persist_plots
(self, dataset: xarray.Dataset) → None¶ Hook to allow users to create plots from the xarray dataset after the dataset has been finalized and just before the dataset is saved to disk.
To save on filesystem space (which is limited when running on the cloud via a lambda function), this method should only write one plot to local storage at a time. An example of how this could be done is below:
filename = DSUtil.get_plot_filename(dataset, "sea_level", "png") with self.storage._tmp.get_temp_filepath(filename) as tmp_path: fig, ax = plt.subplots(figsize=(10,5)) ax.plot(dataset["time"].data, dataset["sea_level"].data) fig.save(tmp_path) storage.save(tmp_path) filename = DSUtil.get_plot_filename(dataset, "qc_sea_level", "png") with self.storage._tmp.get_temp_filepath(filename) as tmp_path: fig, ax = plt.subplots(figsize=(10,5)) DSUtil.plot_qc(dataset, "sea_level", tmp_path) storage.save(tmp_path)
- Parameters
dataset (xr.Dataset) – The xarray dataset with customizations and QualityManagement applied.
-
read_and_persist_raw_files
(self, file_paths: List[str]) → List[str]¶ Renames the provided raw files according to ME Data Standards file naming conventions for raw data files, and returns a list of the paths to the renamed files.
- Parameters
file_paths (List[str]) – A list of paths to the original raw files.
- Returns
A list of paths to the renamed files.
- Return type
List[str]
-
run
(self, filepath: Union[str, List[str]]) → xarray.Dataset¶ Runs the IngestPipeline from start to finish.
- Parameters
filepath (Union[str, List[str]]) – The path or list of paths to the file(s) to run the pipeline on.
-
-
class
tsdat.pipeline.
Pipeline
(pipeline_config: Union[str, tsdat.config.Config], storage_config: Union[str, tsdat.io.DatastreamStorage])[source]¶ Bases:
abc.ABC
This class serves as the base class for all tsdat data pipelines.
- Parameters
pipeline_config (Union[str, Config]) – The pipeline config file. Can be either a config object, or the path to the pipeline config file that should be used with this pipeline.
storage_config (Union[str, DatastreamStorage]) – The storage config file. Can be either a config object, or the path to the storage config file that should be used with this pipeline.
Class Methods
Adds global and variable-level attributes to the dataset from the
Uses the dataset definition to initialize variables that are
Uses the DatasetDefinition to add static variables (variables whose
Function to throw an error if a required variable could not be
Decodes the dataset according to CF conventions. This helps ensure that the dataset
Utility method to retrieve the previous set of data for hte same
Removes unused variables from the raw dataset provided and keeps
Removes unused variables from each raw dataset in the raw mapping
This method is the entry point for the pipeline. It will take one
Standardizes the dataset by applying variable name and units
Method Descriptions
-
add_attrs
(self, dataset: xarray.Dataset, raw_mapping: Dict[str, xarray.Dataset], dod: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Adds global and variable-level attributes to the dataset from the DatasetDefinition object.
- Parameters
dataset (xr.Dataset) – The dataset to add attributes to.
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping. Used to set the
input_files
global attribute.dod (DatasetDefinition) – The DatasetDefinition containing the attributes to add.
- Returns
The original dataset with the attributes added.
- Return type
xr.Dataset
-
add_missing_variables
(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Uses the dataset definition to initialize variables that are defined in the dataset definiton but did not have input. Uses the appropriate shape and _FillValue to initialize each variable.
- Parameters
dataset (xr.Dataset) – The dataset to add the variables to.
dod (DatasetDefinition) – The DatasetDefinition to use.
- Returns
The original dataset with variables that still need to be initialized, initialized.
- Return type
xr.Dataset
-
add_static_variables
(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Uses the DatasetDefinition to add static variables (variables whose data are defined in the pipeline config file) to the output dataset.
- Parameters
dataset (xr.Dataset) – The dataset to add static variables to.
dod (DatasetDefinition) – The DatasetDefinition to pull data from.
- Returns
The original dataset with added variables from the config
- Return type
xr.Dataset
-
check_required_variables
(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition)¶ Function to throw an error if a required variable could not be retrieved.
- Parameters
dataset (xr.Dataset) – The dataset to check.
dod (DatasetDefinition) – The DatasetDefinition used to specify required variables.
- Raises
Exception – Raises an exception to indicate the variable could not be retrieved.
-
decode_cf
(self, dataset: xarray.Dataset) → xarray.Dataset¶ Decodes the dataset according to CF conventions. This helps ensure that the dataset is formatted correctly after it has been constructed from unstandardized sources or heavily modified. :param dataset: The dataset to decode. :type dataset: xr.Dataset
- Returns
The decoded dataset.
- Return type
xr.Dataset
-
get_previous_dataset
(self, dataset: xarray.Dataset) → xarray.Dataset¶ Utility method to retrieve the previous set of data for hte same datastream as the provided dataset from the DatastreamStorage.
- Parameters
dataset (xr.Dataset) – The reference dataset that will be used to search the DatastreamStore for prior data.
- Returns
The previous dataset from the DatastreamStorage if it exists, otherwise None.
- Return type
xr.Dataset
-
reduce_raw_dataset
(self, raw_dataset: xarray.Dataset, variable_definitions: List[tsdat.config.VariableDefinition], definition: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Removes unused variables from the raw dataset provided and keeps only the variables and coordinates pertaining to the provdided variable definitions. Also performs input to output naming and unit conversions as defined in the DatasetDefinition.
- Parameters
raw_dataset (xr.Dataset) – The raw dataset mapping.
variable_definitions (List[VariableDefinition]) – List of variables to keep.
definition (DatasetDefinition) – The DatasetDefinition used to select the variables to keep.
- Returns
The reduced dataset.
- Return type
xr.Dataset
-
reduce_raw_datasets
(self, raw_mapping: Dict[str, xarray.Dataset], definition: tsdat.config.DatasetDefinition) → List[xarray.Dataset]¶ Removes unused variables from each raw dataset in the raw mapping and performs input to output naming and unit conversions as defined in the dataset definition.
- Parameters
raw_mapping (Dict[str, xr.Dataset]) – The raw xarray dataset mapping.
definition (DatasetDefinition) – The DatasetDefinition used to select the variables to keep.
- Returns
A list of reduced datasets.
- Return type
List[xr.Dataset]
-
abstract
run
(self, filepath: Union[str, List[str]])¶ This method is the entry point for the pipeline. It will take one or more file paths and process them from start to finish. All classes extending the Pipeline class must implement this method.
- Parameters
filepath (Union[str, List[str]]) – The path or list of paths to the file(s) to run the pipeline on.
-
standardize_dataset
(self, raw_mapping: Dict[str, xarray.Dataset]) → xarray.Dataset¶ Standardizes the dataset by applying variable name and units conversions as defined by the pipeline config file. This method returns the standardized dataset.
- Parameters
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.
- Returns
The standardized dataset.
- Return type
xr.Dataset