Pipeline Code Hooks¶
Each pipeline base class provides certain abstract methods which the developer can override if desired to customize pipeline functionality. In your template repository, your Pipeline class will come with all the hook methods stubbed out automatically (i.e., they will be included with an empty definition).
The following hook methods (which can be easily identified because they all start with the
‘hook_’ prefix) are provided in the pipeline templates in the ingest/<ingest_name>/pipeline/pipeline.py
. They are listed in the order that they are executed (see image in Configuring Tsdat).
Hook to allow for user customizations to one or more raw xarray Datasets before they merged and used to create the standardized dataset. |
|
Hook to allow for user customizations to the standardized dataset such as inserting a derived variable based on other variables in the dataset. |
|
Hook to apply any final customizations to the dataset before it is saved. |
|
Hook to allow users to create plots from the xarray dataset after the dataset has been finalized and just before the dataset is saved to disk. |
The plotting hook (hook_generate_and_persist_plots
) is likely to be the
most useful for users. This hook creates plots and saves them to the storage
directory with the output dataset and is a good way to check the pipeline
output. Below is shown an example pipeline.py
file:
import os
import cmocean
import pandas as pd
import xarray as xr
import matplotlib as mpl
import matplotlib.pyplot as plt
from tsdat.pipeline import IngestPipeline
from tsdat.utils import DSUtil
example_dir = os.path.abspath(os.path.dirname(__file__))
style_file = os.path.join(example_dir, "styling.mplstyle")
plt.style.use(style_file)
class WaveIngestPipeline(IngestPipeline):
"""-------------------------------------------------------------------
This is an example class that extends the default IngestPipeline in
order to hook in custom behavior such as creating custom plots.
If users need to apply custom changes to the dataset, instrument
corrections, or create custom plots, they should follow this example
to extend the IngestPipeline class.
-------------------------------------------------------------------"""
def hook_generate_and_persist_plots(self, dataset: xr.Dataset) -> None:
"""-------------------------------------------------------------------
Hook to allow users to create plots from the xarray dataset after
processing and QC have been applied and just before the dataset is
saved to disk.
Args:
dataset (xr.Dataset): The xarray dataset with customizations and
QC applied.
-------------------------------------------------------------------"""
def format_time_xticks(ax, start=4, stop=21, step=4, date_format="%H-%M"):
ax.xaxis.set_major_locator(
mpl.dates.HourLocator(byhour=range(start, stop, step))
)
ax.xaxis.set_major_formatter(mpl.dates.DateFormatter(date_format))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=0, ha="center")
# Useful variables
ds = dataset
date = pd.to_datetime(ds.time.data[0]).strftime("%d-%b-%Y")
# Create wave statistics plot
filename = DSUtil.get_plot_filename(dataset, "wave_statistics", "png")
with self.storage._tmp.get_temp_filepath(filename) as tmp_path:
# Create figure and axes objects
fig, axs = plt.subplots(nrows=3, figsize=(14, 8), constrained_layout=True)
fig.suptitle(f"Wave Statistics at {ds.attrs['location_meaning']} on {date}")
# Plot wave heights
cmap = cmocean.cm.amp_r
ds.average_wave_height.plot(
ax=axs[0], c=cmap(0.10), linewidth=2, label=r"H$_{avg}$"
)
ds.significant_wave_height.plot(
ax=axs[0], c=cmap(0.5), linewidth=2, label=r"H$_{sig}$"
)
ds.max_wave_height.plot(
ax=axs[0], c=cmap(0.85), linewidth=2, label=r"H$_{max}$"
)
axs[0].set_ylabel("Wave Height (m)")
axs[0].legend(bbox_to_anchor=(1, -0.10), ncol=3)
# Plot wave periods
cmap = cmocean.cm.dense
ds.average_wave_period.plot(
ax=axs[1], c=cmap(0.15), linewidth=2, label=r"T$_{avg}$"
)
ds.significant_wave_period.plot(
ax=axs[1], c=cmap(0.5), linewidth=2, label=r"T$_{sig}$"
)
ds.peak_wave_period.plot(
ax=axs[1], c=cmap(0.8), linewidth=2, label=r"T$_{peak}$"
)
axs[1].set_ylabel("Wave Period (s)")
axs[1].legend(bbox_to_anchor=(1, -0.10), ncol=3)
# Plot mean direction
cmap = cmocean.cm.haline
ds.mean_wave_direction.plot(
ax=axs[2], c=cmap(0.4), linewidth=2, label=r"$\theta_{mean}$"
)
axs[2].set_ylabel(r"Wave Direction (deg)")
axs[2].legend(bbox_to_anchor=(1, -0.10))
# Set xlabels and ticks
for i in range(3):
axs[i].set_xlabel("Time (UTC)")
format_time_xticks(axs[i])
# Save figure
fig.savefig(tmp_path, dpi=100)
self.storage.save(tmp_path)
plt.close()
-
class
tsdat.pipeline.ingest_pipeline.
IngestPipeline
(pipeline_config: Union[str, tsdat.config.config.Config], storage_config: Union[str, tsdat.io.storage.DatastreamStorage])[source]¶ The IngestPipeline class is designed to read in raw, non-standardized data and convert it to a standardized format by embedding metadata, applying quality checks and quality controls, and by saving the now-processed data in a standard file format.
-
hook_customize_dataset
(dataset: xarray.core.dataset.Dataset, raw_mapping: Dict[str, xarray.core.dataset.Dataset]) → xarray.core.dataset.Dataset[source]¶ Hook to allow for user customizations to the standardized dataset such as inserting a derived variable based on other variables in the dataset. This method is called immediately after the
standardize_dataset
method and beforeQualityManagement
has been run.- Parameters
dataset (xr.Dataset) – The dataset to customize.
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.
- Returns
The customized dataset.
- Return type
xr.Dataset
-
hook_customize_raw_datasets
(raw_dataset_mapping: Dict[str, xarray.core.dataset.Dataset]) → Dict[str, xarray.core.dataset.Dataset][source]¶ Hook to allow for user customizations to one or more raw xarray Datasets before they merged and used to create the standardized dataset. The raw_dataset_mapping will contain one entry for each file being used as input to the pipeline. The keys are the standardized raw file name, and the values are the datasets.
This method would typically only be used if the user is combining multiple files into a single dataset. In this case, this method may be used to correct coordinates if they don’t match for all the files, or to change variable (column) names if two files have the same name for a variable, but they are two distinct variables.
This method can also be used to check for unique conditions in the raw data that should cause a pipeline failure if they are not met.
This method is called before the inputs are merged and converted to standard format as specified by the config file.
- Parameters
raw_dataset_mapping (Dict[str, xr.Dataset]) – The raw datasets to customize.
- Returns
The customized raw datasets.
- Return type
Dict[str, xr.Dataset]
-
hook_finalize_dataset
(dataset: xarray.core.dataset.Dataset) → xarray.core.dataset.Dataset[source]¶ Hook to apply any final customizations to the dataset before it is saved. This hook is called after QualityManagement has been run and immediately before the dataset it saved to file.
- Parameters
dataset (xr.Dataset) – The dataset to finalize.
- Returns
The finalized dataset to save.
- Return type
xr.Dataset
-
hook_generate_and_persist_plots
(dataset: xarray.core.dataset.Dataset) → None[source]¶ Hook to allow users to create plots from the xarray dataset after the dataset has been finalized and just before the dataset is saved to disk.
To save on filesystem space (which is limited when running on the cloud via a lambda function), this method should only write one plot to local storage at a time. An example of how this could be done is below:
filename = DSUtil.get_plot_filename(dataset, "sea_level", "png") with self.storage._tmp.get_temp_filepath(filename) as tmp_path: fig, ax = plt.subplots(figsize=(10,5)) ax.plot(dataset["time"].data, dataset["sea_level"].data) fig.save(tmp_path) storage.save(tmp_path) filename = DSUtil.get_plot_filename(dataset, "qc_sea_level", "png") with self.storage._tmp.get_temp_filepath(filename) as tmp_path: fig, ax = plt.subplots(figsize=(10,5)) DSUtil.plot_qc(dataset, "sea_level", tmp_path) storage.save(tmp_path)
- Parameters
dataset (xr.Dataset) – The xarray dataset with customizations and QualityManagement applied.
-