tsdat

Package Contents

Classes

Config

Wrapper for the pipeline configuration file.

PipelineDefinition

Wrapper for the pipeline portion of the pipeline config file.

DatasetDefinition

Wrapper for the dataset_definition portion of the pipeline config

DimensionDefinition

Class to represent dimensions defined in the pipeline config file.

VariableDefinition

Class to encode variable definitions from the config file. Also provides

ATTS

Class that adds constants for interacting with tsdat data-model

VARS

Class that adds keywords for referring to variables.

DatastreamStorage

DatastreamStorage is the base class for providing

AwsStorage

DatastreamStorage subclass for an AWS S3-based filesystem.

S3Path

This class wraps a ‘special’ path string that lets us include the

FilesystemStorage

Datastreamstorage subclass for a local Linux-based filesystem.

AbstractFileHandler

Abstract class to define methods required by all FileHandlers. Classes

FileHandler

Class to provide methods to read and write files with a variety of

CsvHandler

FileHandler to read from and write to CSV files. Takes a number of

NetCdfHandler

FileHandler to read from and write to netCDF files. Takes a number of

Pipeline

This class serves as the base class for all tsdat data pipelines.

IngestPipeline

The IngestPipeline class is designed to read in raw, non-standardized

QualityChecker

Class containing the code to perform a single Quality Check on a

QualityHandler

Class containing code to be executed if a particular quality check fails.

DSUtil

Provides helper functions for xarray.Dataset

Converter

Base class for converting data arrays from one units to another.

DefaultConverter

Default class for converting units on data arrays. This class utilizes

StringTimeConverter

Convert a time string to a np.datetime64, which is needed for xarray.

TimestampTimeConverter

Convert a numeric UTC timestamp to a np.datetime64, which is needed for

Functions

register_filehandler(patterns: Union[str, List[str]]) → AbstractFileHandler

Python decorator to register an AbstractFileHandler in the FileHandler

class tsdat.Config(dictionary: Dict)

Wrapper for the pipeline configuration file.

Note: in most cases, Config.load(filepath) should be used to instantiate the Config class.

Parameters

dictionary (Dict) – The pipeline configuration file as a dictionary.

_parse_quality_managers(self, dictionary: Dict)Dict[str, tsdat.config.quality_manager_definition.QualityManagerDefinition]

Extracts QualityManagerDefinitions from the config file.

Parameters

dictionary (Dict) – The quality_management dictionary.

Returns

Mapping of quality manager name to QualityManagerDefinition

Return type

Dict[str, QualityManagerDefinition]

classmethod load(self, filepaths: List[str])

Load one or more yaml pipeline configuration files. Multiple files should only be passed as input if the pipeline configuration file is split across multiple files.

Parameters

filepaths (List[str]) – The path(s) to yaml configuration files to load.

Returns

A Config object wrapping the yaml configuration file(s).

Return type

Config

static lint_yaml(filename: str)

Lints a yaml file and raises an exception if an error is found.

Parameters

filename (str) – The path to the file to lint.

Raises

Exception – Raises an exception if an error is found.

class tsdat.PipelineDefinition(dictionary: Dict[str, Dict])

Wrapper for the pipeline portion of the pipeline config file.

Parameters

dictionary (Dict[str]) – The pipeline component of the pipeline config file.

Raises

DefinitionError – Raises DefinitionError if one of the file naming components contains an illegal character.

check_file_name_components(self)

Performs sanity checks on the config properties used in naming files output by tsdat pipelines.

Raises

DefinitionError – Raises DefinitionError if a component has been set improperly.

class tsdat.DatasetDefinition(dictionary: Dict, datastream_name: str)

Wrapper for the dataset_definition portion of the pipeline config file.

Parameters
  • dictionary (Dict) – The portion of the config file corresponding with the dataset definition.

  • datastream_name (str) – The name of the datastream that the config file is for.

_parse_dimensions(self, dictionary: Dict)Dict[str, tsdat.config.dimension_definition.DimensionDefinition]

Extracts the dimensions from the dataset_definition portion of the config file.

Parameters

dictionary (Dict) – The dataset_definition dictionary from the config file.

Returns

Returns a mapping of output dimension names to DimensionDefinition objects.

Return type

Dict[str, DimensionDefinition]

_parse_variables(self, dictionary: Dict, available_dimensions: Dict[str, tsdat.config.dimension_definition.DimensionDefinition])Dict[str, tsdat.config.variable_definition.VariableDefinition]

Extracts the variables from the dataset_definition portion of the config file.

Parameters
  • dictionary (Dict) – The dataset_definition dictionary from the config file.

  • available_dimensions (Dict[str, DimensionDefinition]) – The DimensionDefinition objects that have already been parsed.

Returns

Returns a mapping of output variable names to VariableDefinition objects.

Return type

Dict[str, VariableDefinition]

_parse_coordinates(self, vars: Dict[str, tsdat.config.variable_definition.VariableDefinition])Tuple[Dict[str, tsdat.config.variable_definition.VariableDefinition], Dict[str, tsdat.config.variable_definition.VariableDefinition]]

Separates coordinate variables and data variables.

Determines which variables are coordinate variables and moves those variables from self.vars to self.coords. Coordinate variables are defined as variables that are dimensioned by themselves, i.e., var.name == var.dim.name is a true statement for coordinate variables, but false for data variables.

Parameters

vars (Dict[str, VariableDefinition]) – The dictionary of VariableDefinition objects to check.

Returns

The dictionary of dimensions in the dataset.

Return type

Tuple[Dict[str, VariableDefinition], Dict[str, VariableDefinition]]

_validate_dataset_definition(self)

Performs sanity checks on the DatasetDefinition object.

Raises

DefinitionError – If any sanity checks fail.

get_attr(self, attribute_name)Any

Retrieves the value of the attribute requested, or None if it does not exist.

Parameters

attribute_name (str) – The name of the attribute to retrieve.

Returns

The value of the attribute, or None.

Return type

Any

get_variable_names(self)List[str]

Retrieves the list of variable names. Note that this excludes coordinate variables.

Returns

The list of variable names.

Return type

List[str]

get_variable(self, variable_name: str)tsdat.config.variable_definition.VariableDefinition

Attemps to retrieve the requested variable. First searches the data variables, then searches the coordinate variables. Returns None if no data or coordinate variables have been defined with the requested variable name.

Parameters

variable_name (str) – The name of the variable to retrieve.

Returns

Returns the VariableDefinition for the variable, or None if the variable could not be found.

Return type

VariableDefinition

get_coordinates(self, variable: tsdat.config.variable_definition.VariableDefinition)List[tsdat.config.variable_definition.VariableDefinition]

Returns the coordinate VariableDefinition object(s) that dimension the requested VariableDefinition.

Parameters

variable (VariableDefinition) – The VariableDefinition whose coordinate variables should be retrieved.

Returns

A list of VariableDefinition coordinate variables that dimension the provided VariableDefinition.

Return type

List[VariableDefinition]

get_static_variables(self)List[tsdat.config.variable_definition.VariableDefinition]

Retrieves a list of static VariableDefinition objects. A variable is defined as static if it has a “data” section in the config file, which would mean that the variable’s data is defined statically. For example, in the config file snippet below, “depth” is a static variable:

depth:
  data: [4, 8, 12]
  dims: [depth]
  type: int
  attrs:
    long_name: Depth
    units: m
Returns

The list of static VariableDefinition objects.

Return type

List[VariableDefinition]

class tsdat.DimensionDefinition(name: str, length: Union[str, int])

Class to represent dimensions defined in the pipeline config file.

Parameters
  • name (str) – The name of the dimension

  • length (Union[str, int]) – The length of the dimension. This should be one of: "unlimited", "variable", or a positive int. The ‘time’ dimension should always have length of "unlimited".

is_unlimited(self)bool

Returns True is the dimension has unlimited length. Represented by setting the length attribute to "unlimited".

Returns

True if the dimension has unlimited length.

Return type

bool

is_variable_length(self)bool

Returns True if the dimension has variable length, meaning that the dimension’s length is set at runtime. Represented by setting the length to "variable".

Returns

True if the dimension has variable length, False otherwise.

Return type

bool

class tsdat.VariableDefinition(name: str, dictionary: Dict, available_dimensions: Dict[str, tsdat.config.dimension_definition.DimensionDefinition], defaults: Union[Dict, None] = None)

Class to encode variable definitions from the config file. Also provides a few utility methods.

Parameters
  • name (str) – The name of the variable in the output file.

  • dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.

:param

available_dimensions: A mapping of dimension name to DimensionDefinition objects.

Parameters

defaults (Dict, optional) – The defaults to use when instantiating this VariableDefinition object, defaults to {}.

_parse_input(self, dictionary: Dict, defaults: Union[Dict, None] = None)VarInput

Parses the variable’s input property, if it has one, from the variable dictionary.

Parameters
  • dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.

  • defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.

Returns

A VarInput object for this VariableDefinition, or None.

Return type

VarInput

_parse_attributes(self, dictionary: Dict, defaults: Union[Dict, None] = None)Dict[str, Any]

Parses the variable’s attributes from the variable dictionary.

Parameters
  • dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.

  • defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.

Returns

A mapping of attribute name to attribute value.

Return type

Dict[str, Any]

_parse_dimensions(self, dictionary: Dict, available_dimensions: Dict[str, tsdat.config.dimension_definition.DimensionDefinition], defaults: Union[Dict, None] = None)Dict[str, tsdat.config.dimension_definition.DimensionDefinition]

Parses the variable’s dimensions from the variable dictionary.

Parameters
  • dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.

  • available_dimensions – A mapping of dimension name to DimensionDefinition.

  • defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.

Returns

A mapping of dimension name to DimensionDefinition objects.

Return type

Dict[str, DimensionDefinition]

_parse_data_type(self, dictionary: Dict, defaults: Union[Dict, None] = None)object

Parses the data_type string and returns the appropriate numpy data type (i.e. “float” -> np.float).

Parameters
  • dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.

  • defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.

Raises

KeyError – Raises KeyError if the data type in the dictionary does not match a valid data type.

Returns

The numpy data type corresponding with the type provided in the yaml file, or data_type if the provided data_type is not in the ME Data Standards list of data types.

Return type

object

add_fillvalue_if_none(self, attributes: Dict[str, Any])Dict[str, Any]

Adds the _FillValue attribute to the provided attributes dictionary if the _FillValue attribute has not already been defined and returns the modified attributes dictionary.

Parameters

attributes (Dict[str, Any]) – The dictionary containing user-defined variable attributes.

Returns

The dictionary containing user-defined variable attributes. Is guaranteed to have a _FillValue attribute.

Return type

Dict[str, Any]

is_constant(self)bool

Returns True if the variable is a constant. A variable is constant if it does not have any dimensions.

Returns

True if the variable is constant, False otherwise.

Return type

bool

is_predefined(self)bool

Returns True if the variable’s data was predefined in the config yaml file.

Returns

True if the variable is predefined, False otherwise.

Return type

bool

is_coordinate(self)bool

Returns True if the variable is a coordinate variable. A variable is defined as a coordinate variable if it is dimensioned by itself.

Returns

True if the variable is a coordinate variable, False otherwise.

Return type

bool

is_derived(self)bool

Return True if the variable is derived. A variable is derived if it does not have an input and it is not predefined.

Returns

True if the Variable is derived, False otherwise.

Return type

bool

has_converter(self)bool

Returns True if the variable has an input converter defined, False otherwise.

Returns

True if the Variable has a converter defined, False otherwise.

Return type

bool

is_required(self)bool

Returns True if the variable has the ‘required’ property defined and the ‘required’ property evaluates to True. A required variable is a variable which much be retrieved in the input dataset. If a required variable is not in the input dataset, the process should crash.

Returns

True if the variable is required, False otherwise.

Return type

bool

has_input(self)bool

Return True if the variable is copied from an input dataset, regardless of whether or not unit and/or naming conversions should be applied.

Returns

True if the Variable has an input defined, False otherwise.

Return type

bool

get_input_name(self)str

Returns the name of the variable in the input if defined, otherwise returns None.

Returns

The name of the variable in the input, or None.

Return type

str

get_input_units(self)str

If the variable has input, returns the units of the input variable or the output units if no input units are defined.

Returns

The units of the input variable data.

Return type

str

get_output_units(self)str

Returns the units of the output data or None if no units attribute has been defined.

Returns

The units of the output variable data.

Return type

str

get_coordinate_names(self)List[str]

Returns the names of the coordinate VariableDefinition(s) that this VariableDefinition is dimensioned by.

Returns

A list of dimension/coordinate variable names.

Return type

List[str]

get_shape(self)Tuple[int]

Returns the shape of the data attribute on the VariableDefinition.

Raises

KeyError – Raises a KeyError if the data attribute has not been set yet.

Returns

The shape of the VariableDefinition’s data, or None.

Return type

Tuple[int]

get_data_type(self)numpy.dtype

Retrieves the variable’s data type.

Returns

Returns the data type of the variable’s data as a numpy dtype.

Return type

np.dtype

get_FillValue(self)int

Retrieves the variable’s _FillValue attribute, using -9999 as a default if it has not been defined.

Returns

Returns the variable’s _FillValue.

Return type

int

run_converter(self, data: numpy.ndarray)numpy.ndarray

If the variable has an input converter, runs the input converter for the input/output units on the provided data.

Parameters

data (np.ndarray) – The data to be converted.

Returns

Returns the data after it has been run through the variable’s converter.

Return type

np.ndarray

to_dict(self)Dict

Returns the Variable as a dictionary to be used to intialize an empty xarray Dataset or DataArray.

Returns a dictionary like (Example is for temperature):

{
    "dims": ["time"],
    "data": [],
    "attrs": {"units": "degC"}
}
Returns

A dictionary representation of the variable.

Return type

Dict

class tsdat.ATTS

Class that adds constants for interacting with tsdat data-model specific attributes.

TITLE = title
DESCRIPTION = description
CONVENTIONS = conventions
HISTORY = history
DOI = doi
INSTITUTION = institution
CODE_URL = code_url
REFERENCES = references
INPUT_FILES = input_files
LOCATION_ID = location_id
DATASTREAM = datastream_name
DATA_LEVEL = data_level
LOCATION_DESCRPTION = location_description
INSTRUMENT_NAME = instrument_name
SERIAL_NUMBER = serial_number
INSTRUMENT_DESCRPTION = instrument_description
INSTRUMENT_MANUFACTURER = instrument_manufacturer
AVERAGING_INTERVAL = averaging_interval
SAMPLING_INTERVAL = sampling_interval
UNITS = units
VALID_DELTA = valid_delta
VALID_RANGE = valid_range
FAIL_RANGE = fail_range
WARN_RANGE = warn_range
FILL_VALUE = _FillValue
CORRECTIONS_APPLIED = corrections_applied
class tsdat.VARS

Class that adds keywords for referring to variables.

ALL = ALL
COORDS = COORDS
DATA_VARS = DATA_VARS
exception tsdat.DefinitionError

Bases: Exception

Indicates a fatal error within the YAML Dataset Definition.

exception tsdat.QCError

Bases: Exception

Indicates that a given Quality Manager failed with a fatal error.

class tsdat.DatastreamStorage(parameters: Union[Dict, None] = None)

Bases: abc.ABC

DatastreamStorage is the base class for providing access to processed data files in a persistent archive. DatastreamStorage provides shortcut methods to find files based upon date, datastream name, file type, etc. This is the class that should be used to save and retrieve processed data files. Use the DatastreamStorage.from_config() method to construct the appropriate subclass instance based upon a storage config file.

default_file_type
file_filters
output_file_extensions
static from_config(storage_config_file: str)

Load a yaml config file which provides the storage constructor parameters.

Parameters

storage_config_file (str) – The path to the config file to load

Returns

A subclass instance created from the config file.

Return type

DatastreamStorage

property tmp(self)

Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.

Raises

NotImplementedError – [description]

abstract find(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)List[str]

Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths in datastream storage in ascending order

Return type

List[str]

abstract fetch(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)

Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.

  • filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.

Return type

DisposableLocalTempFileList:

save(self, dataset_or_path: Union[str, xarray.Dataset], new_filename: str = None)List[Any]

Saves a local file to the datastream store.

Parameters
  • dataset_or_path (Union[str, xr.Dataset]) – The dataset or local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

A list of paths where the saved files were stored in storage. Path type is dependent upon the specific storage subclass.

Return type

List[Any]

abstract save_local_path(self, local_path: str, new_filename: str = None)Any

Given a path to a local file, save that file to the storage.

Parameters
  • local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.

Return type

Any

abstract exists(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)bool

Checks if any data exists in the datastream store for the provided datastream and time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.

Returns

True if data exists, False otherwise.

Return type

bool

abstract delete(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)None

Deletes datastream data in the datastream store in between the specified time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.

class tsdat.AwsStorage(parameters: Union[Dict, None] = None)

Bases: tsdat.io.DatastreamStorage

DatastreamStorage subclass for an AWS S3-based filesystem.

Parameters

parameters (dict, optional) –

Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}

Key parameters that should be set in the config file include

retain_input_files

Whether the input files should be cleaned up after they are done processing

root_dir

The bucket ‘key’ to use to prepend to all processed files created in the persistent store. Defaults to ‘root’

temp_dir

The bucket ‘key’ to use to prepend to all temp files created in the S3 bucket. Defaults to ‘temp’

bucket_name

The name of the S3 bucket to store to

property s3_resource(self)
property s3_client(self)
property tmp(self)

Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.

Raises

NotImplementedError – [description]

property root(self)
property temp_path(self)
find(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)List[S3Path]

Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths in datastream storage in ascending order

Return type

List[str]

fetch(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)tsdat.io.DisposableLocalTempFileList

Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.

  • filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.

Return type

DisposableLocalTempFileList:

save_local_path(self, local_path: str, new_filename: str = None)

Given a path to a local file, save that file to the storage.

Parameters
  • local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.

Return type

Any

exists(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)bool

Checks if any data exists in the datastream store for the provided datastream and time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.

Returns

True if data exists, False otherwise.

Return type

bool

delete(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)None

Deletes datastream data in the datastream store in between the specified time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.

class tsdat.S3Path(bucket_name: str, bucket_path: str = '', region_name: str = None)

Bases: str

This class wraps a ‘special’ path string that lets us include the bucket name and region in the path, so that we can use it seamlessly in boto3 APIs. We are creating our own string to hold the region, bucket & key (i.e., path), since boto3 needs all three in order to access a file.

Example: .. code-block:: python

s3_client = boto3.client(‘s3’, region_name=’eu-central-1’) s3_client.download_file(bucket, key, download_path)

Parameters
  • bucket_name (str) – The S3 bucket name where this file is located

  • bucket_path (str, optional) – The key to access this file in the bucket

  • region_name (str, optional) – The AWS region where this file is located, defaults to None, which inherits the default configured region.

__str__(self)

Return str(self).

property bucket_name(self)
property bucket_path(self)
property region_name(self)
join(self, *args)

Joins segments in an S3 path. This method behaves exactly like os.path.join.

Returns

A New S3Path with the additional segments added.

Return type

S3Path

class tsdat.FilesystemStorage(parameters: Union[Dict, None] = None)

Bases: tsdat.io.DatastreamStorage

Datastreamstorage subclass for a local Linux-based filesystem.

TODO: rename to LocalStorage as this is more intuitive.

Parameters

parameters (dict, optional) –

Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}

Key parameters that should be set in the config file include

retain_input_files

Whether the input files should be cleaned up after they are done processing

root_dir

The root path under which processed files will e stored.

property tmp(self)

Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.

Raises

NotImplementedError – [description]

find(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)List[str]

Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths in datastream storage in ascending order

Return type

List[str]

fetch(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)tsdat.io.DisposableLocalTempFileList

Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.

  • filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.

Return type

DisposableLocalTempFileList:

save_local_path(self, local_path: str, new_filename: str = None)Any

Given a path to a local file, save that file to the storage.

Parameters
  • local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.

Return type

Any

exists(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)bool

Checks if any data exists in the datastream store for the provided datastream and time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.

Returns

True if data exists, False otherwise.

Return type

bool

delete(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)None

Deletes datastream data in the datastream store in between the specified time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.

class tsdat.AbstractFileHandler(parameters: Union[Dict, None] = None)

Abstract class to define methods required by all FileHandlers. Classes derived from AbstractFileHandler should implement one or more of the following methods:

write(ds: xr.Dataset, filename: str, config: Config, **kwargs)

read(filename: str, **kwargs) -> xr.Dataset

Parameters

parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.

write(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to a file.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

read(self, filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset for use in the pipeline.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

class tsdat.FileHandler

Class to provide methods to read and write files with a variety of extensions.

FILEREADERS :Dict[str, AbstractFileHandler]
FILEWRITERS :Dict[str, AbstractFileHandler]
static _get_handler(filename: str, method: Literal[read, write])AbstractFileHandler

Given the filepath of the file to read or write and the FileHandler method to apply to the filepath, this method determines which previously-registered FileHandler should be used on the provided filepath.

Parameters
  • filename (str) – The path to the file to read or write to.

  • method (Literal[) – The method to apply to the file. Must be one of: “read”,

  • "write".

Returns

The FileHandler that should be applied.

Return type

AbstractFileHandler

static write(ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Calls the appropriate FileHandler to write the dataset to the provided filename.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to the file where the dataset should be written.

  • config (Config, optional) – Optional Config object. Defaults to None.

static read(filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an xarray dataset object using the registered FileHandler for the provided filepath.

Parameters

filename (str) – The path to the file to read in.

Returns

The raw file as an Xarray.Dataset object.

Return type

xr.Dataset

static register_file_handler(method: Literal[read, write], patterns: Union[str, List[str]], handler: AbstractFileHandler)

Method to register a FileHandler for reading from or writing to files matching one or more provided file patterns.

Parameters
  • method ("Literal") – The method the FileHandler should call if the pattern is

  • Must be one of (matched.) – “read”, “write”.

  • patterns (Union[str, List[str]]) – The file pattern(s) that determine if this

  • should be run on a given filepath. (FileHandler) –

  • handler (AbstractFileHandler) – The AbstractFileHandler to register.

class tsdat.CsvHandler(parameters: Union[Dict, None] = None)

Bases: tsdat.io.filehandlers.file_handlers.AbstractFileHandler

FileHandler to read from and write to CSV files. Takes a number of parameters that are passed in from the storage config file. Parameters specified in the config file should follow the following example:

parameters:
  write:
    to_dataframe:
      # Parameters here will be passed to xr.Dataset.to_dataframe()
    to_csv:
      # Parameters here will be passed to pd.DataFrame.to_csv()
  read:
    read_csv:
      # Parameters here will be passed to pd.read_csv()
    to_xarray:
      # Parameters here will be passed to pd.DataFrame.to_xarray()
Parameters

parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.

write(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to a csv file.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

read(self, filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset for use in the pipeline.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

class tsdat.NetCdfHandler(parameters: Union[Dict, None] = None)

Bases: tsdat.io.filehandlers.file_handlers.AbstractFileHandler

FileHandler to read from and write to netCDF files. Takes a number of parameters that are passed in from the storage config file. Parameters specified in the config file should follow the following example:

parameters:
  write:
    to_netcdf:
      # Parameters here will be passed to xr.Dataset.to_netcdf()
  read:
    load_dataset:
      # Parameters here will be passed to xr.load_dataset()
Parameters

parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.

write(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to a netCDF file.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

read(self, filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset for use in the pipeline.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

tsdat.register_filehandler(patterns: Union[str, List[str]])AbstractFileHandler

Python decorator to register an AbstractFileHandler in the FileHandler object. The FileHandler object will be used by tsdat pipelines to read and write raw, intermediate, and processed data.

This decorator can be used to work with a specific AbstractFileHandler without having to specify a config file. This is useful when using an AbstractFileHandler for analysis or for tests outside of a pipeline. For tsdat pipelines, handlers should always be specified via the storage config file.

Example Usage:

import xarray as xr
from tsdat.io import register_filehandler, AbstractFileHandler

@register_filehandler(["*.nc", "*.cdf"])
class NetCdfHandler(AbstractFileHandler):
    def write(ds: xr.Dataset, filename: str, config: Config = None, **kwargs):
        ds.to_netcdf(filename)
    def read(filename: str, **kwargs) -> xr.Dataset:
        xr.load_dataset(filename)
Parameters

patterns (Union[str, List[str]]) – The patterns (regex) that should be used to match a filepath to the AbstractFileHandler provided.

Returns

The original AbstractFileHandler class, after it has been registered for use in tsdat pipelines.

Return type

AbstractFileHandler

class tsdat.Pipeline(pipeline_config: Union[str, tsdat.config.Config], storage_config: Union[str, tsdat.io.DatastreamStorage])

Bases: abc.ABC

This class serves as the base class for all tsdat data pipelines.

Parameters
  • pipeline_config (Union[str, Config]) – The pipeline config file. Can be either a config object, or the path to the pipeline config file that should be used with this pipeline.

  • storage_config (Union[str, DatastreamStorage]) – The storage config file. Can be either a config object, or the path to the storage config file that should be used with this pipeline.

abstract run(self, filepath: Union[str, List[str]])

This method is the entry point for the pipeline. It will take one or more file paths and process them from start to finish. All classes extending the Pipeline class must implement this method.

Parameters

filepath (Union[str, List[str]]) – The path or list of paths to the file(s) to run the pipeline on.

standardize_dataset(self, raw_mapping: Dict[str, xarray.Dataset])xarray.Dataset

Standardizes the dataset by applying variable name and units conversions as defined by the pipeline config file. This method returns the standardized dataset.

Parameters

raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.

Returns

The standardized dataset.

Return type

xr.Dataset

check_required_variables(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition)

Function to throw an error if a required variable could not be retrieved.

Parameters
  • dataset (xr.Dataset) – The dataset to check.

  • dod (DatasetDefinition) – The DatasetDefinition used to specify required variables.

Raises

Exception – Raises an exception to indicate the variable could not be retrieved.

add_static_variables(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition)xarray.Dataset

Uses the DatasetDefinition to add static variables (variables whose data are defined in the pipeline config file) to the output dataset.

Parameters
  • dataset (xr.Dataset) – The dataset to add static variables to.

  • dod (DatasetDefinition) – The DatasetDefinition to pull data from.

Returns

The original dataset with added variables from the config

Return type

xr.Dataset

add_missing_variables(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition)xarray.Dataset

Uses the dataset definition to initialize variables that are defined in the dataset definiton but did not have input. Uses the appropriate shape and _FillValue to initialize each variable.

Parameters
  • dataset (xr.Dataset) – The dataset to add the variables to.

  • dod (DatasetDefinition) – The DatasetDefinition to use.

Returns

The original dataset with variables that still need to be initialized, initialized.

Return type

xr.Dataset

add_attrs(self, dataset: xarray.Dataset, raw_mapping: Dict[str, xarray.Dataset], dod: tsdat.config.DatasetDefinition)xarray.Dataset

Adds global and variable-level attributes to the dataset from the DatasetDefinition object.

Parameters
  • dataset (xr.Dataset) – The dataset to add attributes to.

  • raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping. Used to set the input_files global attribute.

  • dod (DatasetDefinition) – The DatasetDefinition containing the attributes to add.

Returns

The original dataset with the attributes added.

Return type

xr.Dataset

get_previous_dataset(self, dataset: xarray.Dataset)xarray.Dataset

Utility method to retrieve the previous set of data for hte same datastream as the provided dataset from the DatastreamStorage.

Parameters

dataset (xr.Dataset) – The reference dataset that will be used to search the DatastreamStore for prior data.

Returns

The previous dataset from the DatastreamStorage if it exists, otherwise None.

Return type

xr.Dataset

reduce_raw_datasets(self, raw_mapping: Dict[str, xarray.Dataset], definition: tsdat.config.DatasetDefinition)List[xarray.Dataset]

Removes unused variables from each raw dataset in the raw mapping and performs input to output naming and unit conversions as defined in the dataset definition.

Parameters
  • raw_mapping (Dict[str, xr.Dataset]) – The raw xarray dataset mapping.

  • definition (DatasetDefinition) – The DatasetDefinition used to select the variables to keep.

Returns

A list of reduced datasets.

Return type

List[xr.Dataset]

reduce_raw_dataset(self, raw_dataset: xarray.Dataset, variable_definitions: List[tsdat.config.VariableDefinition], definition: tsdat.config.DatasetDefinition)xarray.Dataset

Removes unused variables from the raw dataset provided and keeps only the variables and coordinates pertaining to the provdided variable definitions. Also performs input to output naming and unit conversions as defined in the DatasetDefinition.

Parameters
  • raw_dataset (xr.Dataset) – The raw dataset mapping.

  • variable_definitions (List[VariableDefinition]) – List of variables to keep.

  • definition (DatasetDefinition) – The DatasetDefinition used to select the variables to keep.

Returns

The reduced dataset.

Return type

xr.Dataset

decode_cf(self, dataset: xarray.Dataset)xarray.Dataset

Decodes the dataset according to CF conventions. This helps ensure that the dataset is formatted correctly after it has been constructed from unstandardized sources or heavily modified. :param dataset: The dataset to decode. :type dataset: xr.Dataset

Returns

The decoded dataset.

Return type

xr.Dataset

class tsdat.IngestPipeline(pipeline_config: Union[str, tsdat.config.Config], storage_config: Union[str, tsdat.io.DatastreamStorage])

Bases: tsdat.pipeline.pipeline.Pipeline

The IngestPipeline class is designed to read in raw, non-standardized data and convert it to a standardized format by embedding metadata, applying quality checks and quality controls, and by saving the now-processed data in a standard file format.

run(self, filepath: Union[str, List[str]])xarray.Dataset

Runs the IngestPipeline from start to finish.

Parameters

filepath (Union[str, List[str]]) – The path or list of paths to the file(s) to run the pipeline on.

hook_customize_dataset(self, dataset: xarray.Dataset, raw_mapping: Dict[str, xarray.Dataset])xarray.Dataset

Hook to allow for user customizations to the standardized dataset such as inserting a derived variable based on other variables in the dataset. This method is called immediately after the standardize_dataset method and before QualityManagement has been run.

Parameters
  • dataset (xr.Dataset) – The dataset to customize.

  • raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.

Returns

The customized dataset.

Return type

xr.Dataset

hook_customize_raw_datasets(self, raw_dataset_mapping: Dict[str, xarray.Dataset])Dict[str, xarray.Dataset]

Hook to allow for user customizations to one or more raw xarray Datasets before they merged and used to create the standardized dataset. The raw_dataset_mapping will contain one entry for each file being used as input to the pipeline. The keys are the standardized raw file name, and the values are the datasets.

This method would typically only be used if the user is combining multiple files into a single dataset. In this case, this method may be used to correct coordinates if they don’t match for all the files, or to change variable (column) names if two files have the same name for a variable, but they are two distinct variables.

This method can also be used to check for unique conditions in the raw data that should cause a pipeline failure if they are not met.

This method is called before the inputs are merged and converted to standard format as specified by the config file.

Parameters

raw_dataset_mapping (Dict[str, xr.Dataset]) – The raw datasets to customize.

Returns

The customized raw datasets.

Return type

Dict[str, xr.Dataset]

hook_finalize_dataset(self, dataset: xarray.Dataset)xarray.Dataset

Hook to apply any final customizations to the dataset before it is saved. This hook is called after QualityManagement has been run and immediately before the dataset it saved to file.

Parameters

dataset (xr.Dataset) – The dataset to finalize.

Returns

The finalized dataset to save.

Return type

xr.Dataset

hook_generate_and_persist_plots(self, dataset: xarray.Dataset)None

Hook to allow users to create plots from the xarray dataset after the dataset has been finalized and just before the dataset is saved to disk.

To save on filesystem space (which is limited when running on the cloud via a lambda function), this method should only write one plot to local storage at a time. An example of how this could be done is below:

filename = DSUtil.get_plot_filename(dataset, "sea_level", "png")
with self.storage._tmp.get_temp_filepath(filename) as tmp_path:
    fig, ax = plt.subplots(figsize=(10,5))
    ax.plot(dataset["time"].data, dataset["sea_level"].data)
    fig.save(tmp_path)
    storage.save(tmp_path)

filename = DSUtil.get_plot_filename(dataset, "qc_sea_level", "png")
with self.storage._tmp.get_temp_filepath(filename) as tmp_path:
    fig, ax = plt.subplots(figsize=(10,5))
    DSUtil.plot_qc(dataset, "sea_level", tmp_path)
    storage.save(tmp_path)
Parameters

dataset (xr.Dataset) – The xarray dataset with customizations and QualityManagement applied.

read_and_persist_raw_files(self, file_paths: List[str])List[str]

Renames the provided raw files according to ME Data Standards file naming conventions for raw data files, and returns a list of the paths to the renamed files.

Parameters

file_paths (List[str]) – A list of paths to the original raw files.

Returns

A list of paths to the renamed files.

Return type

List[str]

class tsdat.QualityChecker(ds: xarray.Dataset, previous_data: xarray.Dataset, definition: tsdat.config.QualityManagerDefinition, parameters: Union[Dict, None] = None)

Bases: abc.ABC

Class containing the code to perform a single Quality Check on a Dataset variable.

Parameters
  • ds (xr.Dataset) – The dataset the checker will be applied to

  • previous_data (xr.Dataset) – A dataset from the previous processing interval (i.e., file). This is used to check for consistency between files, such as for monitonic or delta checks when we need to check the previous value.

  • definition (QualityManagerDefinition) – The quality manager definition as specified in the pipeline config file

  • parameters (dict, optional) – A dictionary of checker-specific parameters specified in the pipeline config file. Defaults to {}

abstract run(self, variable_name: str)Optional[numpy.ndarray]

Check a dataset’s variable to see if it passes a quality check. These checks can be performed on the entire variable at one time by using xarray vectorized numerical operators.

Parameters

variable_name (str) – The name of the variable to check

Returns

If the check was performed, return a ndarray of the same shape as the variable. Each value in the data array will be either True or False, depending upon the results of the check. True means the check failed. False means it succeeded.

Note that we are using an np.ndarray instead of an xr.DataArray because the DataArray contains coordinate indexes which can sometimes get out of sync when performing np arithmectic vector operations. So it’s easier to just use numpy arrays.

If the check was skipped for some reason (i.e., it was not relevant given the current attributes defined for this dataset), then the run method should return None.

Return type

Optional[np.ndarray]

class tsdat.QualityHandler(ds: xarray.Dataset, previous_data: xarray.Dataset, quality_manager: tsdat.config.QualityManagerDefinition, parameters: Union[Dict, None] = None)

Bases: abc.ABC

Class containing code to be executed if a particular quality check fails.

Parameters
  • ds (xr.Dataset) – The dataset the handler will be applied to

  • previous_data (xr.Dataset) – A dataset from the previous processing interval (i.e., file). This is used to check for consistency between files, such as for monotonic or delta checks when we need to check the previous value.

  • quality_manager (QualityManagerDefinition) – The quality_manager definition as specified in the pipeline config file

  • parameters (dict, optional) – A dictionary of handler-specific parameters specified in the pipeline config file. Defaults to {}

abstract run(self, variable_name: str, results_array: numpy.ndarray)

Perform a follow-on action if a quality check fails. This can be used to correct data if needed (such as replacing a bad value with missing value, emailing a contact persion, or raising an exception if the failure constitutes a critical error).

Parameters
  • variable_name (str) – Name of the variable that failed

  • results_array (np.ndarray) – An array of True/False values for each data value of the variable. True means the check failed.

record_correction(self, variable_name: str)

If a correction was made to variable data to fix invalid values as detected by a quality check, this method will record the fix to the appropriate variable attribute. The correction description will come from the handler params which get set in the pipeline config file.

Parameters

variable_name (str) – Name

class tsdat.DSUtil

Provides helper functions for xarray.Dataset

static record_corrections_applied(ds: xarray.Dataset, variable: str, correction: str)

Records a description of a correction made to a variable to the corrections_applied corresponding attribute.

Parameters
  • ds (xr.Dataset) – Dataset containing the corrected variable

  • variable (str) – The name of the variable that was corrected

  • correction (str) – A description of the correction

static datetime64_to_string(datetime64: numpy.datetime64)Tuple[str, str]

Convert a datetime64 object to formated string.

Parameters

datetime64 (Union[np.ndarray, np.datetime64]) – The datetime64 object

Returns

A tuple of strings representing the formatted date. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.

Return type

Tuple[str, str]

static datetime64_to_timestamp(variable_data: numpy.ndarray)numpy.ndarray

Converts each datetime64 value to a timestamp in same units as the variable (eg., seconds, nanoseconds).

Parameters

variable_data (np.ndarray) – ndarray of variable data

Returns

An ndarray of the same shape, with time values converted to long timestamps (e.g., int64)

Return type

np.ndarray

static get_datastream_name(ds: xarray.Dataset = None, config=None)str

Returns the datastream name defined in the dataset or in the provided pipeline configuration.

Parameters
  • ds (xr.Dataset, optional.) – The data as an xarray dataset; defaults to None

  • config (Config, optional) – The Config object used to assist reading time data from the raw_dataset; defaults to None.

Returns

The datastream name

Return type

str

static get_end_time(ds: xarray.Dataset)Tuple[str, str]

Convenience method to get the end date and time from a xarray dataset.

Parameters

ds (xr.Dataset) – The dataset

Returns

A tuple of [day, time] as formatted strings representing the last time point in the dataset.

Return type

Tuple[str, str]

static get_fill_value(ds: xarray.Dataset, variable_name: str)

Get the value of the _FillValue attribute for the given variable.

Parameters
  • ds (xr.Dataset) – The dataset

  • variable_name (str) – A variable in the dataset

Returns

The value of the _FillValue attr or None if it is not defined

Return type

same data type of the variable (int, float, etc.) or None

static get_non_qc_variable_names(ds: xarray.Dataset)List[str]

Get a list of all data variables in the dataset that are NOT qc variables.

Parameters

ds (xr.Dataset) – A dataset

Returns

List of non-qc data variable names

Return type

List[str]

static get_raw_end_time(raw_ds: xarray.Dataset, time_var_definition: tsdat.VariableDefinition)Tuple[str, str]

Convenience method to get the end date and time from a raw xarray dataset. This uses time_var_definition.get_input_name() as the dataset key for the time variable and additionally uses the input’s Converter object if applicable.

Parameters
  • raw_ds (xr.Dataset) – A raw dataset (not standardized)

  • time_var_definition (VariableDefinition) – The ‘time’ variable definition from the pipeline config

Returns

A tuple of strings representing the last time data point in the dataset. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.

Return type

Tuple[str, str]

static get_raw_start_time(raw_ds: xarray.Dataset, time_var_definition: tsdat.config.VariableDefinition)Tuple[str, str]

Convenience method to get the start date and time from a raw xarray dataset. This uses time_var_definition.get_input_name() as the dataset key for the time variable and additionally uses the input’s Converter object if applicable.

Parameters
  • raw_ds (xr.Dataset) – A raw dataset (not standardized)

  • time_var_definition (VariableDefinition) – The ‘time’ variable definition from the pipeline config

Returns

A tuple of strings representing the first time data point in the dataset. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.

Return type

Tuple[str, str]

static get_coordinate_variable_names(ds: xarray.Dataset)List[str]

Get a list of all coordinate variables in this dataset.

Parameters

ds (xr.Dataset) – The dataset

Returns

List of coordinate variable names

Return type

List[str]

static get_start_time(ds: xarray.Dataset)Tuple[str, str]

Convenience method to get the start date and time from a xarray dataset.

Parameters

ds (xr.Dataset) – A standardized dataset

Returns

A tuple of strings representing the first time data point in the dataset. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.

Return type

Tuple[str, str]

static get_metadata(ds: xarray.Dataset)Dict

Get a dictionary of all global and variable attributes in a dataset. Global atts are found under the ‘attributes’ key and variable atts are found under the ‘variables’ key.

Parameters

ds (xr.Dataset) – A dataset

Returns

A dictionary of global & variable attributes

Return type

Dict

static plot_qc(ds: xarray.Dataset, variable_name: str, filename: str = None, **kwargs)act.plotting.TimeSeriesDisplay

Create a QC plot for the given variable. This is based on the ACT library: https://arm-doe.github.io/ACT/source/auto_examples/plot_qc.html#sphx-glr-source-auto-examples-plot-qc-py

We provide a convenience wrapper method for basic QC plots of a variable, but we recommend to use ACT directly and look at their examples for more complex plots like plotting variables in two different datasets.

TODO: Depending on use cases, we will likely add more arguments to be able to quickly produce the most common types of QC plots.

Parameters
  • ds (xr.Dataset) – A dataset

  • variable_name (str) – The variable to plot

  • filename (str, optional) – The filename for the image. Saves the plot as this filename if provided.

static get_plot_filename(dataset: xarray.Dataset, plot_description: str, extension: str)str

Returns the filename for a plot according to MHKIT-Cloud Data standards. The dataset is used to determine the datastream_name and start date/time. The standards dictate that a plot filename should follow the format: datastream_name.date.time.description.extension.

Parameters
  • dataset (xr.Dataset) – The dataset from which the plot data is drawn from. This is used to collect the datastream_name and start date/time.

  • plot_description (str) – The description of the plot. Should be as brief as possible and contain no spaces. Underscores may be used.

  • extension (str) – The file extension for the plot.

Returns

The standardized plot filename.

Return type

str

static get_dataset_filename(dataset: xarray.Dataset, file_extension='.nc')str

Given an xarray dataset this function will return the base filename of the dataset according to MHkiT-Cloud data standards. The base filename does not include the directory structure where the file should be saved, only the name of the file itself, e.g. z05.ExampleBuoyDatastream.b1.20201230.000000.nc

Parameters
  • dataset (xr.Dataset) – The dataset whose filename should be generated.

  • file_extension (str, optional) – The file extension to use. Defaults to “.nc”

Returns

The base filename of the dataset.

Return type

str

static get_raw_filename(raw_dataset: xarray.Dataset, old_filename: str, config)str

Returns the appropriate raw filename of the raw dataset according to MHKIT-Cloud naming conventions. Uses the config object to parse the start date and time from the raw dataset for use in the new filename.

The new filename will follow the MHKIT-Cloud Data standards for raw filenames, ie: datastream_name.date.time.raw.old_filename, where the data level used in the datastream_name is 00.

Parameters
  • raw_dataset (xr.Dataset) – The raw data as an xarray dataset.

  • old_filename (str) – The name of the original raw file.

  • config (Config) – The Config object used to assist reading time data from the raw_dataset.

Returns

The standardized filename of the raw file.

Return type

str

static get_date_from_filename(filename: str)str

Given a filename that conforms to MHKiT-Cloud Data Standards, return the date of the first point of data in the file.

Parameters

filename (str) – The filename or path to the file.

Returns

The date, in “yyyymmdd.hhmmss” format.

Return type

str

static get_datastream_name_from_filename(filename: str)Optional[str]

Given a filename that conforms to MHKiT-Cloud Data Standards, return the datastream name. Datastream name is everything to the left of the third ‘.’ in the filename.

e.g., humboldt_ca.buoy_data.b1.20210120.000000.nc

Parameters

filename (str) – The filename or path to the file.

Returns

The datstream name, or None if filename is not in proper format.

Return type

Optional[str]

static get_datastream_directory(datastream_name: str, root: str = '')str

Given the datastream_name and an optional root, returns the path to where the datastream should be located. Does NOT create the directory where the datastream should be located.

Parameters
  • datastream_name (str) – The name of the datastream whose directory path should be generated.

  • root (str, optional) – The directory to use as the root of the directory structure. Defaults to None. Defaults to “”

Returns

The path to the directory where the datastream should be located.

Return type

str

static is_image(filename: str)bool

Detect the mimetype from the file extension and use it to determine if the file is an image or not

Parameters

filename (str) – The name of the file to check

Returns

True if the file extension matches an image mimetype

Return type

bool

class tsdat.Converter(parameters: Union[Dict, None] = None)

Bases: abc.ABC

Base class for converting data arrays from one units to another. Users can extend this class if they have a special units conversion for their input data that cannot be resolved with the default converter classes.

Parameters

parameters (dict, optional) – A dictionary of converter-specific parameters which get passed from the pipeline config file. Defaults to {}

abstract run(self, data: numpy.ndarray, in_units: str, out_units: str)numpy.ndarray

Convert the input data from in_units to out_units.

Parameters
  • data (np.ndarray) – Data array to be modified.

  • in_units (str) – Current units of the data array.

  • out_units (str) – Units to be converted to.

Returns

Data array converted into the new units.

Return type

np.ndarray

class tsdat.DefaultConverter(parameters: Union[Dict, None] = None)

Bases: Converter

Default class for converting units on data arrays. This class utilizes ACT.utils.data_utils.convert_units, and should work for most variables except time (see StringTimeConverter and TimestampTimeConverter)

run(self, data: numpy.ndarray, in_units: str, out_units: str)numpy.ndarray

Convert the input data from in_units to out_units.

Parameters
  • data (np.ndarray) – Data array to be modified.

  • in_units (str) – Current units of the data array.

  • out_units (str) – Units to be converted to.

Returns

Data array converted into the new units.

Return type

np.ndarray

class tsdat.StringTimeConverter(parameters: Union[Dict, None] = None)

Bases: Converter

Convert a time string to a np.datetime64, which is needed for xarray. This class utilizes pd.to_datetime to perform the conversion.

One of the parameters should be ‘time_format’, which is the the strftime to parse time, eg “%d/%m/%Y”. Note that “%f” will parse all the way up to nanoseconds. See strftime documentation for more information on choices.

Parameters

parameters (dict, optional) – dictionary of converter-specific parameters. Defaults to {}.

run(self, data: numpy.ndarray, in_units: str, out_units: str)numpy.ndarray

Convert the input data from in_units to out_units.

Parameters
  • data (np.ndarray) – Data array to be modified.

  • in_units (str) – Current units of the data array.

  • out_units (str) – Units to be converted to.

Returns

Data array converted into the new units.

Return type

np.ndarray

class tsdat.TimestampTimeConverter(parameters: Union[Dict, None] = None)

Bases: Converter

Convert a numeric UTC timestamp to a np.datetime64, which is needed for xarray. This class utilizes pd.to_datetime to perform the conversion.

One of the parameters should be ‘unit’. This parameter denotes the time unit (e.g., D,s,ms,us,ns), which is an integer or float number. The timestamp will be based off the unix epoch start.

Parameters

parameters (dict, optional) – A dictionary of converter-specific parameters which get passed from the pipeline config file. Defaults to {}

run(self, data: numpy.ndarray, in_units: str, out_units: str)numpy.ndarray

Convert the input data from in_units to out_units.

Parameters
  • data (np.ndarray) – Data array to be modified.

  • in_units (str) – Current units of the data array.

  • out_units (str) – Units to be converted to.

Returns

Data array converted into the new units.

Return type

np.ndarray