Pipeline Template Tutorial

In this tutorial we will build a data pipeline to ingest global marine data hosted by the National Oceanic and Atmospheric Administration’s (NOAA) National Centers for Environmental Information (NCEI). The data can be found at https://www.ncdc.noaa.gov/cdo-web/datasets under the “Global Marine Data” section.

We will walk through the following steps in this tutorial:

  1. Examine and download the data

  2. Set up a GitHub repository in which to build our ingestion pipeline

  3. Modify configuration files and ingestion pipeline for our NCEI dataset

  4. Run the ingest data pipeline on NCEI data

Now that we’ve outlined the goals of this tutorial and the steps that we will need to take to ingest this data we can get started with step #1.

Examining and downloading the data

Navigate to https://www.ncdc.noaa.gov/cdo-web/datasets and download the documentation and a data sample from their global marine data section.

NOAA / NCEI Webpage for Global Marine Data sample data and documentation.

The documentation describes each variable in the sample dataset and will be extremely useful for updating our configuration file with the metadata for this dataset. The metadata we care most about are the units and user-friendly text descriptions of each variable, but we also need to be on the lookout for any inconsistencies or potential data problems that could complicate how we process this dataset. Take, for example, the following descriptions of the various temperature measurements that this dataset contains and note that the units are not necessarily the same between files in this dataset:

Global Marine Data documentation snippet indicating temperature measurements can be reported in Celcius or Fahrenheit depending on contributor preference.

If we were collecting this data from multiple users, we would need to be aware of possible unit differences between files from different users and we would likely want to standardize the units so that they were all in Celsius or all in Fahrenheit (Our preference is to use the metric system wherever possible). If we examine this data, it appears that the units are not metric – how unfortunate. Luckily, this is something that can easily be fixed by using tsdat.

Snippet from a sample data file.

Selection from the sample dataset. It appears that units are recorded in the imperial system instead of the metric system – Sea Level Pressure is recorded in Hg instead of hPa (Hectopascal) and Air Temperature is recorded in degF (Fahrenheit) instead of degC (Celsius).

Creating a repository from a template

Now that we have the data and metadata that we will need, let’s move on to step #2 and set up a GitHub repository for our work. What we are looking to do is read in the NCEI “raw” data, apply variable names and metadata, apply quality control, and convert it into the netCDF format – an ‘ingest pipeline’, in other words. To do this, navigate to https://github.com/tsdat/pipeline-template and click “Use this template” (you must log into github to see this button).

This will open https://github.com/tsdat/pipeline-template/generate (you can also just open this link directly) which will prompt you to name your repository, as well as to make it public or private.

Example shown is titled “ncei-global-marine-data-ingest”.

Click “Create repository from template” to create your own repository that you can work in for this example.

Go ahead and clone the repository to your local machine and open it up in VS Code.

Tip

You should open the project at the root, which is the Git repo’s root directory and where the file conda-environment.yaml is located.

Note

VS Code is not the only IDE that may be used, but we provide additional settings for VS Code to make it easier to set up.

Set up Python

Let’s set up a python environment that we can develop code in. We will use Anaconda to create an isolated virtual area that we can install packages to.

Note

When developing with intent to deploy to a production system on Windows, we recommend using Windows Subsystem for Linux (WSL) in addition to conda to manage your environment. See the Setting up WSL and VSCode for Windows Users tutorial for more information.

Once you have anaconda (and optionally WSL) installed, you can run the following command in the terminal from the project root (e.g., where conda-environment.yaml is at) to create and activate the development environment:

conda env create --file=conda-environment.yaml
conda activate tsdat-pipelines

Tip

You can find more details about using conda from Getting started with conda.

Note

Environments other than conda may be used as long as your python version is >=3.8 and you are able to install dependencies from the requirements-dev.txt file.

Configure Python interpreter in VS Code

Tell VS Code to use your new conda environment:

  1. Bring up the command pane in VS Code (shortcut “F1” or “Ctrl+Shift+P”)

  2. Type “Python: Select Interpreter” and select it.

  3. Select the newly-created “tsdat-pipelines” conda environment from the drop-down list. Note you may need to refresh the list (cycle icon in the top right) to see it.

  4. Bring up the command pane and type “Developer: Reload Window” to reload VS Code and ensure the settings changes propagate correctly.

Tip

A typical path to the Python interpreter in conda is “~/anaconda3/envs/<env-name>/bin/python/”. You can find more details about using Python in VS Code from Using Python Environments in Visual Studio Code and Get Started Tutorial for Python in Visual Studio Code.

Run the Basic Template

If using VSCode, open the “Explorer” tab to see folder contents for the next step:

A few quick things on VSCode: in the left-hand toolbar, we will use the “Explorer”, “Search”, “Testing”, and “TODO tree” icons in this tutorial. Also useful to know are the commands “ctrl `” (toggle the terminal on/off) and “ctrl shift P” (open command search bar).

Navigate to the runner.py file and run

python runner.py  pipelines/example_pipeline/test/data/input/buoy.z06.00.20201201.000000.waves.csv

This will run the example pipeline provided in the pipelines/ folder in the template. All pipelines that we create are stored in the pipelines/ folder and are run using python runner.py <path_to_data>.

Addition options for the runner can be queried by running:

python runner.py --help.

After the code runs, notice that a new storage/ folder is created with the following contents:

These files contain the outputs of the example pipeline. Note that there are two subdirectories here – “data” and “ancillary”. “Data” contains the output data in either netcdf or csv format (specified by the user), and “ancillary” holds optional plots that a user can create.

Note, the data directory name contains a “.a1” key. This ending is called the “data level” and indicates the level of processing of the data. “00” represents raw data that has been renamed according to the data standards that tsdat was developed under, “a1” refers to data that has been standardized and some quality control, and “b1” represents data that has been ingested, standardized, quality-controlled, and contains added value from further analysis if applicable.

For more information on the standards used to develop tsdat, please consult our data standards.

Creating a New Ingest

Now let’s start working on ingesting the NCEI data.

In the Explorer window pane you’ll see a list of all folders and files in this ingest -> right click on the top level README.md and select “open preview”. The steps in this readme we are more or less following in this tutorial.


Before starting, we’ll run a quick test of the pipeline to make sure everything is set up properly. Navigate to “Testing” and run all tests using the “Play” icon by hovering over the “ingest” dropdown. Tsdat will automatically configure these tests, and they all should pass at this point in time, as indicated by green checkmarks. (You can find more details about testing from Python testing in Visual Studio Code.)


Navigate back to the “Explorer” pane and hit “ctrl `” to open the terminal. Create a new ingest by running a python template creator called “cookiecutter” in the terminal using:

make cookies

There will follow a series of prompts that’ll be used to auto-fill the new ingest. Fill these in for the particular dataset of interest. For this ingest we will not be using custom QC functions, readers/writers, or converters, so select no for those as well.

ingest_name [Name of the Ingest]: ncei_arctic_cruise_example
ingest_location [Location]: arctic_ocean
ingest_description [Brief description of the ingest]: Historical marine data that are comprised of ship, buoy and platform observations.
Select use_custom_data_reader [1]: 1
Select use_custom_data_converter [1]: 1
Select use_custom_qc [1]: 1
module [ncei_arctic_cruise_example]: ncei_arctic_cruise_example
classname [NceiArcticCruiseExample]: NceiArcticCruiseExample
location_id [arctic_ocean]: arctic_ocean

Once you fill that list out and hit the final enter, Tsdat will create a new ingest folder named with the “module” name (ncei_arctic_cruise_example):


5. Right-click the README.md in our new “ncei_arctic_cruise_example” ingest and “open-preview”. Scroll down to “Customizing your pipeline” (we have already accomplished the previous steps, but these are good to check).


6. We are now looking at step #1: Use the “TODO tree” extension or use the search tool to find occurrences of “# Developer”. (The “TODO tree” is the oak tree icon in the left-hand window pane).

You may need to reload VS Code for these to show up in the ingest. Hitting “ctrl shift P” on the keyboard to open the search bar, and type in and run the command “Reload Window”.


After doing the window reloads, all the newly created “TODOs” will show up in the new ingest folder. The rest of the tutorial consists of running through this list of “TODOs”.


Customizing the New Ingest

  1. Navigate to your Explorer pane and open pipelines/*/config/pipeline.yaml.

This file lists the configuration files for the pipeline in the order that the pipeline is initiating them.

The first line, “classname”, refers to the the pipeline class path. This points to the class in your “pipeline/pipeline.py” file, which contains the hook functions. The only hook we’re using in this tutorial is that to create plots, which we’ll update after setting up the input data and configuration files. It isn’t necessary to edit this path name.

classname: pipelines.ncei_arctic_cruise_example.pipeline.NceiArcticCruiseExample

8. The second line, “triggers”, is the expected file pattern, or a “regex” pattern, of the input data, shown below. A regex pattern is a set of symbols and ascii characters that matches to a file name or path. A full set of these symbols can be found here.

triggers:
- .*arctic_ocean.*\.csv

The file pattern that will trigger a pipeline to run is automatically set to .*<location_name>.*\.csv. it can be adjusted as the user or raw data requires. This pipeline’s auto trigger can be broken down into 5 parts:

  • .*

  • arctic_ocean

  • .*

  • \

  • .csv

  1. The first symbol, .*, means “match any and all characters”.

  2. The next part, arctic_ocean, literally means search for the ascii characters that make up “arctic_ocean”.

  3. Next we have the .* again.

  4. Fourth is \, which is the “break” character, meaning “break” the .*, i.e. tell it to stop matching characters.

  5. Finally is .csv, which like “arctic_ocean”, matches the ascii “.csv”.

9. To match the raw data to the trigger, we will rename the sample datafile to “arctic_ocean.sample_data.csv” and move it to a new folder called “data” within our pipeline (ncei_arctic_cruise_example) directory.

How does “arctic_ocean.sample_data.csv” match with .*arctic_ocean.*\.csv? Good question! :

  • “.*” matches to the preceding filepath of the file (./pipelines/ncei_arctic_cruise_example/data/) that is assumed to exist

  • “arctic_ocean” matches itself

  • “.*” matches .sample_data (“.sample_data” does not need to begin with . to match)

  • “\” breaks the above “.*” on matching .csv

  • “.csv” matches itself

10. The third line, “retriever”, is the first of two required user-customized configuration files, which we’ll need to modify to capture the variables and metadata we want to retain in this ingest.

Start by opening retriever.yaml in the pipelines/*/config folder.

In the retriever file, we can specify several tasks to be run that apply to the input file and raw data:

  • Specify the file reader

  • Rename data variables

  • Apply conversions (timestamp format, unit conversion, basic calculations, etc)

  • Map particular data variables by input file regex pattern

The retriever is split into 4 blocks:

  1. “classname”: default retriever code used by tsdat, not necessary to edit

  2. “readers”: specifies details for the input file reader

  3. “coords”: short for coordinates, the number of which defines the number of dimensions of the dataset (i.e. data with a single coordinate are 1-dimensional)

  4. “data_vars”: short for data variables, these are scalar or vector data

For this pipeline, replace the text in the “retriever.yaml” file with the following:

 1classname: tsdat.io.retrievers.DefaultRetriever
 2readers:                                    # Block header
 3  .*:                                       # Secondary regex pattern to match files
 4    classname: tsdat.io.readers.CSVReader   # Name of file reader
 5    parameters:                             # File reader input arguments
 6      read_csv_kwargs:                      # keyword args for CSVReader (pandas.read_csv)
 7        sep: ", *"                          # csv "separator" or delimiter
 8        engine: "python"                    # csv read engine
 9        index_col: False                    # create index column from first column in csv
10
11coords:
12  time:
13    .*:
14      name: Time of Observation
15      data_converters:
16        - classname: tsdat.io.converters.StringToDatetime
17          format: "%Y-%m-%dT%H:%M:%S"
18          timezone: UTC
19
20data_vars:
21  latitude:
22    .*:
23      name: Latitude
24
25  longitude:
26    .*:
27      name: Longitude
28
29  pressure:
30    .*:
31      name: Sea Level Pressure
32      data_converters:
33        - classname: tsdat.io.converters.UnitsConverter
34          input_units: hPa
35
36  temperature:
37    .*:
38      name: Air Temperature
39      data_converters:
40        - classname: tsdat.io.converters.UnitsConverter
41          input_units: degF
42
43  dew_point:
44    .*:
45      name: Dew Point Temperature
46      data_converters:
47        - classname: tsdat.io.converters.UnitsConverter
48          input_units: degF
49
50  wave_period:
51    .*:
52      name: Wave Period
53
54  wave_height:
55    .*:
56      name: Wave Height
57      data_converters:
58        - classname: tsdat.io.converters.UnitsConverter
59          input_units: ft
60
61  swell_direction:
62    .*:
63      name: Swell Direction
64
65  swell_period:
66    .*:
67      name: Swell Period
68
69  swell_height:
70    .*:
71      name: Swell Height
72      data_converters:
73        - classname: tsdat.io.converters.UnitsConverter
74          input_units: ft
75
76  wind_direction:
77    .*:
78      name: Wind Direction
79
80  wind_speed:
81    .*:
82      name: Wind Speed
83      data_converters:
84        - classname: tsdat.io.converters.UnitsConverter
85          input_units: dm/s

I’ll break down the variable structure with the following code-block:

1temperature:
2  .*:
3    name: Air Temperature
4    data_converters:
5      - classname: tsdat.io.converters.UnitsConverter
6        input_units: degF

Matching the line numbers of the above code-block:

  1. Desired name of the variable in the output data - user editable

  2. Secondary regex pattern (matching input key/file) to input name & converter(s) to run

  3. Name of the variable in the input data - should directly match raw input data

  4. Converter keyword - add if a converter is desired

  5. Classname of data converter to run, in this case unit conversion. See the customization tutorial for a how-to on applying custom data conversions.

  6. Data converter input for this variable, parameter and value pair

10. Moving on now to the fourth line in pipeline.yaml, “dataset”, refers to the dataset.yaml file. This file is where user-specified datatype and metadata are added to the raw dataset.

This part of the process can take some time, as it involves knowing or learning a lot of the context around the dataset and then writing it up succinctly and clearly so that your data users can quickly get a good understanding of what this dataset is and how to start using it.

Replace the text in the “dataset.yaml” file with the following code-block.

  • Note that the units block is particularly important (you will get an error message if a variable doesn’t have units)

  • Variable names must match between retriever.yaml and dataset.yaml.

  • Variables not desired from retriever.yaml can be left out of dataset.yaml.

  • Notice the quality control (QC) attributes,, “_FillValue”, “fail_range”, and “warn_range”. These are both the input and keys to tell tsdat to run a particular QC function

  1attrs:
  2  title: NCEI Arctic Cruise Example
  3  description: Historical marine data that are comprised of ship, buoy and platform observations.
  4  location_id: arctic_ocean
  5  dataset_name: ncei_arctic_cruise_example
  6  data_level: a1
  7  # qualifier:
  8  # temporal:
  9  # institution:
 10
 11coords:
 12  time:
 13    dims: [time]
 14    dtype: datetime64[s]
 15    attrs:
 16      units: Seconds since 1970-01-01 00:00:00
 17
 18data_vars:
 19  latitude:                 # Name of variable in retriever.yaml
 20    dims: [time]            # Variable dimension(s), separated by ","
 21    dtype: float            # Datatype
 22    attrs:
 23      long_name: Latitude   # Name used in plotting
 24      units: degN           # Units, necessary for unit conversion
 25      comment: ""           # Add a comment or description if necessary
 26      _FillValue: -999      # Bad data marker in raw dataset, otherwise -9999
 27      valid_max: 90         # Expected failure range for "CheckValidMax" QC test
 28      valid_min: -90        # Expected failure range for "CheckValidMin" QC test
 29
 30  longitude:
 31    dims: [time]
 32    dtype: float
 33    attrs:
 34      long_name: Longitude
 35      units: degE
 36      comment: ""
 37      valid_max: 180
 38      valid_min: -180
 39
 40  pressure:
 41    dims: [time]
 42    dtype: float
 43    attrs:
 44      long_name: Pressure at Sea Level
 45      units: dbar
 46      comment: ""
 47
 48  temperature:
 49    dims: [time]
 50    dtype: float
 51    attrs:
 52      long_name: Air Temperature
 53      units: degC
 54      comment: ""
 55
 56  dew_point:
 57    dims: [time]
 58    dtype: float
 59    attrs:
 60      long_name: Dew Point
 61      units: degC
 62      comment: ""
 63
 64  wave_period:
 65    dims: [time]
 66    dtype: float
 67    attrs:
 68      long_name: Wave Period
 69      units: s
 70      comment: Assumed to refer to average wave period
 71      valid_max: 30 # Expected max for "CheckValidMax"/Min" QC tests
 72
 73  wave_height:
 74    dims: [time]
 75    dtype: float
 76    attrs:
 77      long_name: Wave Height
 78      units: m
 79      comment: Assumed to refer to average wave height
 80
 81  swell_direction:
 82    dims: [time]
 83    dtype: float
 84    attrs:
 85      long_name: Swell Direction
 86      units: deg from N
 87      comment: Assumed to refer to peak wave direction
 88      valid_max: 360
 89
 90  swell_period:
 91    dims: [time]
 92    dtype: float
 93    attrs:
 94      long_name: Swell Period
 95      units: s
 96      comment: Assumed to refer to peak wave period
 97      valid_max: 30
 98
 99  swell_height:
100    dims: [time]
101    dtype: float
102    attrs:
103      long_name: Swell Height
104      units: m
105      comment: Assumed to refer to significant wave height
106
107  wind_direction:
108    dims: [time]
109    dtype: float
110    attrs:
111      long_name: Wind Direction
112      units: deg from N
113      comment: ""
114      valid_max: 360
115
116  wind_speed:
117    dims: [time]
118    dtype: float
119    attrs:
120      long_name: Wind Speed
121      units: m/s
122      comment: ""

11. Finally we get to the last two lines in pipeline.yaml are “quality” and “storage”. In this tutorial, these files are located in the “shared” folder in the top-level directory. If custom QC is selected, these will also be located in the “config” folder.

The quality.yaml file defines the QC functions that we will run on this code, and the storage.yaml file defines the path to the output file writer.

The quality.yaml file contains a number of built-in tsdat quality control functions, which we will use as is for this ingest.

Quality control in tsdat is broken up into two types of functions: ‘checkers’ and ‘handlers’. Checkers are functions that perform a quality control test (e.g. check missing, check range (max/min), etc). Handlers are functions that do something with this data.

See the API documentation for more built-in QC tests, and the customization tutorial for more details on how QC works in tsdat and how to create your own.

File output is handled by storage.yaml, and built-in output writers are to NETCDF4 file format or CSV.

I won’t do this here, but CSV output can be added by replacing the “handler” block in storage.yaml with

handler:
  classname: tsdat.io.handlers.CSVHandler

12. Finally “pipeline.py” is the last get-pipeline-to-working mode TODO we should finish setting up here. As mentioned previously, it contains a series of hook functions that can be used along the pipeline for further data organization.

We shall set up “hook_plot_dataset”, which plots the processed data and save the figures in the storage/ancillary folder. To keep things simple, only the pressure data is plotted here, but feel free to edit this code as desired:

 1import xarray as xr
 2import cmocean
 3import matplotlib.pyplot as plt
 4
 5from tsdat import IngestPipeline, get_start_date_and_time_str, get_filename
 6
 7
 8class NceiArcticCruiseExample(IngestPipeline):
 9    """---------------------------------------------------------------------------------
10      NCEI ARCTIC CRUISE EXAMPLE INGESTION PIPELINE
11
12      Historical marine data that are comprised of ship, buoy, and platform
13      observations.
14    ---------------------------------------------------------------------------------"""
15
16    def hook_customize_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
17        # (Optional) Use this hook to modify the dataset before qc is applied
18        return dataset
19
20    def hook_finalize_dataset(self, dataset: xr.Dataset) -> xr.Dataset:
21        # (Optional) Use this hook to modify the dataset after qc is applied
22        # but before it gets saved to the storage area
23        return dataset
24
25    def hook_plot_dataset(self, dataset: xr.Dataset):
26        location = self.dataset_config.attrs.location_id
27        datastream: str = self.dataset_config.attrs.datastream
28
29        date, time = get_start_date_and_time_str(dataset)
30
31        plt.style.use("default")  # clear any styles that were set before
32        plt.style.use("shared/styling.mplstyle")
33
34      with self.storage.uploadable_dir(datastream) as tmp_dir:
35
36          fig, ax = plt.subplots()
37          dataset["temperature"].plot(ax=ax, x="time", c=cmocean.cm.deep_r(0.5))
38          fig.suptitle(f"Temperature measured at {location} on {date} {time}")
39
40          plot_file = get_filename(dataset, title="temperature", extension="png")
41          fig.savefig(tmp_dir / plot_file)
42          plt.close(fig)
43
44          # Creat Plot Display
45          obj = dataset
46          variable = "wave_height"
47          display = act.plotting.TimeSeriesDisplay(
48              obj, figsize=(15, 10), subplot_shape=(2,)
49          )
50          # Plot data in top plot
51          display.plot(variable, subplot_index=(0,), label="Wave Height")
52          # Plot QC data
53          display.qc_flag_block_plot(variable, subplot_index=(1,))
54          fig = display.fig
55
56          plot_file = get_filename(dataset, title="wave_height", extension="png")
57          fig.savefig(tmp_dir / plot_file)
58          plt.close(fig)

Running the Pipeline

We can now re-run the pipeline using the “runner.py” file as before with:

python runner.py pipelines/ncei_arctic_cruise_example/data/arctic_ocean.sample_data.csv

Which will run with the same output as before:

Once the pipeline runs, if you look in the “storage” folder, you’ll see the plot as well as the netCDF file output (or csv if you changed the output writer earlier):

Viewing the Data

NetCDF files can be opened using the provided file_viewer.ipynb jupyter notebook. This file can be opened in VSCode or through Jupyter’s website.

Change the first code block to point to our netcdf data:

import xarray as xr

ds = xr.open_dataset(
    "../../storage/root/data/arctic_ocean.ncei_arctic_cruise_example.a1/arctic_ocean.ncei_arctic_cruise_example.a1.20150112.000000.nc"
)

And hit shift enter to run this code block. Run the next code block to see an interactive data block.

ds

Use the drop-down arrows on the left and the text file and database icons on the right to explore the data.

There are two sets of variables here. The first are the original variables saved with their data (adjusted by data converters and/or QC function if applicable) and associated metadata.

The second set are the QC variables. Tsdat adds these variables if the RecordQualityResults handler is called in the quality configuration file. A few attributes, specified for this handler in the quality config file, are shared across all QC variables: flag_masks, flag_meanings, and flag_assessments. In this case, there are three flag masks: 1, 2, and 4. We can see in the data, flags 1 and 4 were tripped on this variable. Every point listed as 1 corresponds to the first entry in flag_meanings: “Value is equal to _FillValue or NaN”, a.k.a. it is a missing datapoint. Likewise for flag 4: a few datapoints are above the valid maximum specified.

Note: if multiple QC flags are tripped, these flags will be added together. For instance, if a QC variable has a value of 5, this means that the QC tests corresponding to flag 1 and flag 4 were both tripped.

The final two code blocks are shorthand for plotting variables in the dataset.

ds.temperature.plot()
ds.qc_temperature.plot()

Pipeline Tests

Testing is best completed as a last step, after everything is set up and the pipeline outputs as expected. If running a large number of data files, a good idea is to input one of those data files here, along with its expected output, and have a separate data folder to collect input files.

Move the input and output files to the test/data/input/ and test/data/expected/ folders, respectively, and update the file paths.

Next Steps

Tsdat is highly configurable because of the range and variability of input data and output requirements. The following tutorial, the customization tutorial, goes over the steps needed to create custom file readers, data converters, and custom quality control. In the developers experience, many types of input data (aka file extensions) require a custom file reader, which also offers the freedom for easy pre-processing and organization of raw data.