tsdat
¶
Subpackages¶
Package Contents¶
Classes¶
Wrapper for the pipeline configuration file. |
|
Wrapper for the pipeline portion of the pipeline config file. |
|
Wrapper for the dataset_definition portion of the pipeline config |
|
Class to represent dimensions defined in the pipeline config file. |
|
Class to encode variable definitions from the config file. Also provides |
|
Class that adds constants for interacting with tsdat data-model |
|
Class that adds keywords for referring to variables. |
|
DatastreamStorage is the base class for providing |
|
DatastreamStorage subclass for an AWS S3-based filesystem. |
|
This class wraps a ‘special’ path string that lets us include the |
|
Datastreamstorage subclass for a local Linux-based filesystem. |
|
Abstract class to define methods required by all FileHandlers. Classes |
|
Class to provide methods to read and write files with a variety of |
|
FileHandler to read from and write to CSV files. Takes a number of |
|
FileHandler to read from and write to netCDF files. Takes a number of |
|
This class serves as the base class for all tsdat data pipelines. |
|
The IngestPipeline class is designed to read in raw, non-standardized |
|
Class containing the code to perform a single Quality Check on a |
|
Class containing code to be executed if a particular quality check fails. |
|
Provides helper functions for xarray.Dataset |
|
Base class for converting data arrays from one units to another. |
|
Default class for converting units on data arrays. This class utilizes |
|
Convert a time string to a np.datetime64, which is needed for xarray. |
|
Convert a numeric UTC timestamp to a np.datetime64, which is needed for |
Functions¶
|
Python decorator to register an AbstractFileHandler in the FileHandler |
-
class
tsdat.
Config
(dictionary: Dict)¶ Wrapper for the pipeline configuration file.
Note: in most cases,
Config.load(filepath)
should be used to instantiate the Config class.- Parameters
dictionary (Dict) – The pipeline configuration file as a dictionary.
-
_parse_quality_managers
(self, dictionary: Dict) → Dict[str, tsdat.config.quality_manager_definition.QualityManagerDefinition]¶ Extracts QualityManagerDefinitions from the config file.
- Parameters
dictionary (Dict) – The quality_management dictionary.
- Returns
Mapping of quality manager name to QualityManagerDefinition
- Return type
Dict[str, QualityManagerDefinition]
-
classmethod
load
(self, filepaths: List[str])¶ Load one or more yaml pipeline configuration files. Multiple files should only be passed as input if the pipeline configuration file is split across multiple files.
- Parameters
filepaths (List[str]) – The path(s) to yaml configuration files to load.
- Returns
A Config object wrapping the yaml configuration file(s).
- Return type
-
static
lint_yaml
(filename: str)¶ Lints a yaml file and raises an exception if an error is found.
- Parameters
filename (str) – The path to the file to lint.
- Raises
Exception – Raises an exception if an error is found.
-
class
tsdat.
PipelineDefinition
(dictionary: Dict[str, Dict])¶ Wrapper for the pipeline portion of the pipeline config file.
- Parameters
dictionary (Dict[str]) – The pipeline component of the pipeline config file.
- Raises
DefinitionError – Raises DefinitionError if one of the file naming components contains an illegal character.
-
check_file_name_components
(self)¶ Performs sanity checks on the config properties used in naming files output by tsdat pipelines.
- Raises
DefinitionError – Raises DefinitionError if a component has been set improperly.
-
class
tsdat.
DatasetDefinition
(dictionary: Dict, datastream_name: str)¶ Wrapper for the dataset_definition portion of the pipeline config file.
- Parameters
dictionary (Dict) – The portion of the config file corresponding with the dataset definition.
datastream_name (str) – The name of the datastream that the config file is for.
-
_parse_dimensions
(self, dictionary: Dict) → Dict[str, tsdat.config.dimension_definition.DimensionDefinition]¶ Extracts the dimensions from the dataset_definition portion of the config file.
- Parameters
dictionary (Dict) – The dataset_definition dictionary from the config file.
- Returns
Returns a mapping of output dimension names to DimensionDefinition objects.
- Return type
Dict[str, DimensionDefinition]
-
_parse_variables
(self, dictionary: Dict, available_dimensions: Dict[str, tsdat.config.dimension_definition.DimensionDefinition]) → Dict[str, tsdat.config.variable_definition.VariableDefinition]¶ Extracts the variables from the dataset_definition portion of the config file.
- Parameters
dictionary (Dict) – The dataset_definition dictionary from the config file.
available_dimensions (Dict[str, DimensionDefinition]) – The DimensionDefinition objects that have already been parsed.
- Returns
Returns a mapping of output variable names to VariableDefinition objects.
- Return type
Dict[str, VariableDefinition]
-
_parse_coordinates
(self, vars: Dict[str, tsdat.config.variable_definition.VariableDefinition]) → Tuple[Dict[str, tsdat.config.variable_definition.VariableDefinition], Dict[str, tsdat.config.variable_definition.VariableDefinition]]¶ Separates coordinate variables and data variables.
Determines which variables are coordinate variables and moves those variables from
self.vars
toself.coords
. Coordinate variables are defined as variables that are dimensioned by themselves, i.e.,var.name == var.dim.name
is a true statement for coordinate variables, but false for data variables.- Parameters
vars (Dict[str, VariableDefinition]) – The dictionary of VariableDefinition objects to check.
- Returns
The dictionary of dimensions in the dataset.
- Return type
Tuple[Dict[str, VariableDefinition], Dict[str, VariableDefinition]]
-
_validate_dataset_definition
(self)¶ Performs sanity checks on the DatasetDefinition object.
- Raises
DefinitionError – If any sanity checks fail.
-
get_attr
(self, attribute_name) → Any¶ Retrieves the value of the attribute requested, or None if it does not exist.
- Parameters
attribute_name (str) – The name of the attribute to retrieve.
- Returns
The value of the attribute, or None.
- Return type
Any
-
get_variable_names
(self) → List[str]¶ Retrieves the list of variable names. Note that this excludes coordinate variables.
- Returns
The list of variable names.
- Return type
List[str]
-
get_variable
(self, variable_name: str) → tsdat.config.variable_definition.VariableDefinition¶ Attemps to retrieve the requested variable. First searches the data variables, then searches the coordinate variables. Returns
None
if no data or coordinate variables have been defined with the requested variable name.- Parameters
variable_name (str) – The name of the variable to retrieve.
- Returns
Returns the VariableDefinition for the variable, or
None
if the variable could not be found.- Return type
-
get_coordinates
(self, variable: tsdat.config.variable_definition.VariableDefinition) → List[tsdat.config.variable_definition.VariableDefinition]¶ Returns the coordinate VariableDefinition object(s) that dimension the requested VariableDefinition.
- Parameters
variable (VariableDefinition) – The VariableDefinition whose coordinate variables should be retrieved.
- Returns
A list of VariableDefinition coordinate variables that dimension the provided VariableDefinition.
- Return type
List[VariableDefinition]
-
get_static_variables
(self) → List[tsdat.config.variable_definition.VariableDefinition]¶ Retrieves a list of static VariableDefinition objects. A variable is defined as static if it has a “data” section in the config file, which would mean that the variable’s data is defined statically. For example, in the config file snippet below, “depth” is a static variable:
depth: data: [4, 8, 12] dims: [depth] type: int attrs: long_name: Depth units: m
- Returns
The list of static VariableDefinition objects.
- Return type
List[VariableDefinition]
-
class
tsdat.
DimensionDefinition
(name: str, length: Union[str, int])¶ Class to represent dimensions defined in the pipeline config file.
- Parameters
name (str) – The name of the dimension
length (Union[str, int]) – The length of the dimension. This should be one of:
"unlimited"
,"variable"
, or a positive int. The ‘time’ dimension should always have length of"unlimited"
.
-
is_unlimited
(self) → bool¶ Returns
True
is the dimension has unlimited length. Represented by setting the length attribute to"unlimited"
.- Returns
True
if the dimension has unlimited length.- Return type
bool
-
is_variable_length
(self) → bool¶ Returns
True
if the dimension has variable length, meaning that the dimension’s length is set at runtime. Represented by setting the length to"variable"
.- Returns
True
if the dimension has variable length, False otherwise.- Return type
bool
-
class
tsdat.
VariableDefinition
(name: str, dictionary: Dict, available_dimensions: Dict[str, tsdat.config.dimension_definition.DimensionDefinition], defaults: Union[Dict, None] = None)¶ Class to encode variable definitions from the config file. Also provides a few utility methods.
- Parameters
name (str) – The name of the variable in the output file.
dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.
- :param
available_dimensions: A mapping of dimension name to DimensionDefinition objects.
- Parameters
defaults (Dict, optional) – The defaults to use when instantiating this VariableDefinition object, defaults to {}.
-
_parse_input
(self, dictionary: Dict, defaults: Union[Dict, None] = None) → VarInput¶ Parses the variable’s input property, if it has one, from the variable dictionary.
- Parameters
dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.
defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.
- Returns
A VarInput object for this VariableDefinition, or None.
- Return type
VarInput
-
_parse_attributes
(self, dictionary: Dict, defaults: Union[Dict, None] = None) → Dict[str, Any]¶ Parses the variable’s attributes from the variable dictionary.
- Parameters
dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.
defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.
- Returns
A mapping of attribute name to attribute value.
- Return type
Dict[str, Any]
-
_parse_dimensions
(self, dictionary: Dict, available_dimensions: Dict[str, tsdat.config.dimension_definition.DimensionDefinition], defaults: Union[Dict, None] = None) → Dict[str, tsdat.config.dimension_definition.DimensionDefinition]¶ Parses the variable’s dimensions from the variable dictionary.
- Parameters
dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.
available_dimensions – A mapping of dimension name to DimensionDefinition.
defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.
- Returns
A mapping of dimension name to DimensionDefinition objects.
- Return type
Dict[str, DimensionDefinition]
-
_parse_data_type
(self, dictionary: Dict, defaults: Union[Dict, None] = None) → object¶ Parses the data_type string and returns the appropriate numpy data type (i.e. “float” -> np.float).
- Parameters
dictionary (Dict) – The dictionary entry corresponding with this variable in the config file.
defaults (Dict, optional) – The defaults to use when instantiating the VariableDefinition object, defaults to {}.
- Raises
KeyError – Raises KeyError if the data type in the dictionary does not match a valid data type.
- Returns
The numpy data type corresponding with the type provided in the yaml file, or data_type if the provided data_type is not in the ME Data Standards list of data types.
- Return type
object
-
add_fillvalue_if_none
(self, attributes: Dict[str, Any]) → Dict[str, Any]¶ Adds the _FillValue attribute to the provided attributes dictionary if the _FillValue attribute has not already been defined and returns the modified attributes dictionary.
- Parameters
attributes (Dict[str, Any]) – The dictionary containing user-defined variable attributes.
- Returns
The dictionary containing user-defined variable attributes. Is guaranteed to have a _FillValue attribute.
- Return type
Dict[str, Any]
-
is_constant
(self) → bool¶ Returns True if the variable is a constant. A variable is constant if it does not have any dimensions.
- Returns
True if the variable is constant, False otherwise.
- Return type
bool
-
is_predefined
(self) → bool¶ Returns True if the variable’s data was predefined in the config yaml file.
- Returns
True if the variable is predefined, False otherwise.
- Return type
bool
-
is_coordinate
(self) → bool¶ Returns True if the variable is a coordinate variable. A variable is defined as a coordinate variable if it is dimensioned by itself.
- Returns
True if the variable is a coordinate variable, False otherwise.
- Return type
bool
-
is_derived
(self) → bool¶ Return True if the variable is derived. A variable is derived if it does not have an input and it is not predefined.
- Returns
True if the Variable is derived, False otherwise.
- Return type
bool
-
has_converter
(self) → bool¶ Returns True if the variable has an input converter defined, False otherwise.
- Returns
True if the Variable has a converter defined, False otherwise.
- Return type
bool
-
is_required
(self) → bool¶ Returns True if the variable has the ‘required’ property defined and the ‘required’ property evaluates to True. A required variable is a variable which much be retrieved in the input dataset. If a required variable is not in the input dataset, the process should crash.
- Returns
True if the variable is required, False otherwise.
- Return type
bool
-
has_input
(self) → bool¶ Return True if the variable is copied from an input dataset, regardless of whether or not unit and/or naming conversions should be applied.
- Returns
True if the Variable has an input defined, False otherwise.
- Return type
bool
-
get_input_name
(self) → str¶ Returns the name of the variable in the input if defined, otherwise returns None.
- Returns
The name of the variable in the input, or None.
- Return type
str
-
get_input_units
(self) → str¶ If the variable has input, returns the units of the input variable or the output units if no input units are defined.
- Returns
The units of the input variable data.
- Return type
str
-
get_output_units
(self) → str¶ Returns the units of the output data or None if no units attribute has been defined.
- Returns
The units of the output variable data.
- Return type
str
-
get_coordinate_names
(self) → List[str]¶ Returns the names of the coordinate VariableDefinition(s) that this VariableDefinition is dimensioned by.
- Returns
A list of dimension/coordinate variable names.
- Return type
List[str]
-
get_shape
(self) → Tuple[int]¶ Returns the shape of the data attribute on the VariableDefinition.
- Raises
KeyError – Raises a KeyError if the data attribute has not been set yet.
- Returns
The shape of the VariableDefinition’s data, or None.
- Return type
Tuple[int]
-
get_data_type
(self) → numpy.dtype¶ Retrieves the variable’s data type.
- Returns
Returns the data type of the variable’s data as a numpy dtype.
- Return type
np.dtype
-
get_FillValue
(self) → int¶ Retrieves the variable’s _FillValue attribute, using -9999 as a default if it has not been defined.
- Returns
Returns the variable’s _FillValue.
- Return type
int
-
run_converter
(self, data: numpy.ndarray) → numpy.ndarray¶ If the variable has an input converter, runs the input converter for the input/output units on the provided data.
- Parameters
data (np.ndarray) – The data to be converted.
- Returns
Returns the data after it has been run through the variable’s converter.
- Return type
np.ndarray
-
to_dict
(self) → Dict¶ Returns the Variable as a dictionary to be used to intialize an empty xarray Dataset or DataArray.
Returns a dictionary like (Example is for temperature):
{ "dims": ["time"], "data": [], "attrs": {"units": "degC"} }
- Returns
A dictionary representation of the variable.
- Return type
Dict
-
class
tsdat.
ATTS
¶ Class that adds constants for interacting with tsdat data-model specific attributes.
-
TITLE
= title¶
-
DESCRIPTION
= description¶
-
CONVENTIONS
= conventions¶
-
HISTORY
= history¶
-
DOI
= doi¶
-
INSTITUTION
= institution¶
-
CODE_URL
= code_url¶
-
REFERENCES
= references¶
-
INPUT_FILES
= input_files¶
-
LOCATION_ID
= location_id¶
-
DATASTREAM
= datastream_name¶
-
DATA_LEVEL
= data_level¶
-
LOCATION_DESCRPTION
= location_description¶
-
INSTRUMENT_NAME
= instrument_name¶
-
SERIAL_NUMBER
= serial_number¶
-
INSTRUMENT_DESCRPTION
= instrument_description¶
-
INSTRUMENT_MANUFACTURER
= instrument_manufacturer¶
-
AVERAGING_INTERVAL
= averaging_interval¶
-
SAMPLING_INTERVAL
= sampling_interval¶
-
UNITS
= units¶
-
VALID_DELTA
= valid_delta¶
-
VALID_RANGE
= valid_range¶
-
FAIL_RANGE
= fail_range¶
-
WARN_RANGE
= warn_range¶
-
FILL_VALUE
= _FillValue¶
-
CORRECTIONS_APPLIED
= corrections_applied¶
-
-
class
tsdat.
VARS
¶ Class that adds keywords for referring to variables.
-
ALL
= ALL¶
-
COORDS
= COORDS¶
-
DATA_VARS
= DATA_VARS¶
-
-
exception
tsdat.
DefinitionError
¶ Bases:
Exception
Indicates a fatal error within the YAML Dataset Definition.
-
exception
tsdat.
QCError
¶ Bases:
Exception
Indicates that a given Quality Manager failed with a fatal error.
-
class
tsdat.
DatastreamStorage
(parameters: Union[Dict, None] = None)¶ Bases:
abc.ABC
DatastreamStorage is the base class for providing access to processed data files in a persistent archive. DatastreamStorage provides shortcut methods to find files based upon date, datastream name, file type, etc. This is the class that should be used to save and retrieve processed data files. Use the DatastreamStorage.from_config() method to construct the appropriate subclass instance based upon a storage config file.
-
default_file_type
¶
-
file_filters
¶
-
output_file_extensions
¶
-
static
from_config
(storage_config_file: str)¶ Load a yaml config file which provides the storage constructor parameters.
- Parameters
storage_config_file (str) – The path to the config file to load
- Returns
A subclass instance created from the config file.
- Return type
-
property
tmp
(self)¶ Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.
- Raises
NotImplementedError – [description]
-
abstract
find
(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None) → List[str]¶ Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.
- Returns
A list of paths in datastream storage in ascending order
- Return type
List[str]
-
abstract
fetch
(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)¶ Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.
filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.
- Returns
A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.
- Return type
DisposableLocalTempFileList:
-
save
(self, dataset_or_path: Union[str, xarray.Dataset], new_filename: str = None) → List[Any]¶ Saves a local file to the datastream store.
- Parameters
dataset_or_path (Union[str, xr.Dataset]) – The dataset or local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.
new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.
- Returns
A list of paths where the saved files were stored in storage. Path type is dependent upon the specific storage subclass.
- Return type
List[Any]
-
abstract
save_local_path
(self, local_path: str, new_filename: str = None) → Any¶ Given a path to a local file, save that file to the storage.
- Parameters
local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.
new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.
- Returns
The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.
- Return type
Any
-
abstract
exists
(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None) → bool¶ Checks if any data exists in the datastream store for the provided datastream and time range.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.
- Returns
True if data exists, False otherwise.
- Return type
bool
-
abstract
delete
(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None) → None¶ Deletes datastream data in the datastream store in between the specified time range.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.
-
-
class
tsdat.
AwsStorage
(parameters: Union[Dict, None] = None)¶ Bases:
tsdat.io.DatastreamStorage
DatastreamStorage subclass for an AWS S3-based filesystem.
- Parameters
parameters (dict, optional) –
Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}
Key parameters that should be set in the config file include
- retain_input_files
Whether the input files should be cleaned up after they are done processing
- root_dir
The bucket ‘key’ to use to prepend to all processed files created in the persistent store. Defaults to ‘root’
- temp_dir
The bucket ‘key’ to use to prepend to all temp files created in the S3 bucket. Defaults to ‘temp’
- bucket_name
The name of the S3 bucket to store to
-
property
s3_resource
(self)¶
-
property
s3_client
(self)¶
-
property
tmp
(self)¶ Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.
- Raises
NotImplementedError – [description]
-
property
root
(self)¶
-
property
temp_path
(self)¶
-
find
(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None) → List[S3Path]¶ Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.
- Returns
A list of paths in datastream storage in ascending order
- Return type
List[str]
-
fetch
(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None) → tsdat.io.DisposableLocalTempFileList¶ Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.
filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.
- Returns
A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.
- Return type
DisposableLocalTempFileList:
-
save_local_path
(self, local_path: str, new_filename: str = None)¶ Given a path to a local file, save that file to the storage.
- Parameters
local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.
new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.
- Returns
The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.
- Return type
Any
-
exists
(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None) → bool¶ Checks if any data exists in the datastream store for the provided datastream and time range.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.
- Returns
True if data exists, False otherwise.
- Return type
bool
-
delete
(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None) → None¶ Deletes datastream data in the datastream store in between the specified time range.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.
-
class
tsdat.
S3Path
(bucket_name: str, bucket_path: str = '', region_name: str = None)¶ Bases:
str
This class wraps a ‘special’ path string that lets us include the bucket name and region in the path, so that we can use it seamlessly in boto3 APIs. We are creating our own string to hold the region, bucket & key (i.e., path), since boto3 needs all three in order to access a file.
Example: .. code-block:: python
s3_client = boto3.client(‘s3’, region_name=’eu-central-1’) s3_client.download_file(bucket, key, download_path)
- Parameters
bucket_name (str) – The S3 bucket name where this file is located
bucket_path (str, optional) – The key to access this file in the bucket
region_name (str, optional) – The AWS region where this file is located, defaults to None, which inherits the default configured region.
-
__str__
(self)¶ Return str(self).
-
property
bucket_name
(self)¶
-
property
bucket_path
(self)¶
-
property
region_name
(self)¶
-
class
tsdat.
FilesystemStorage
(parameters: Union[Dict, None] = None)¶ Bases:
tsdat.io.DatastreamStorage
Datastreamstorage subclass for a local Linux-based filesystem.
TODO: rename to LocalStorage as this is more intuitive.
- Parameters
parameters (dict, optional) –
Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}
Key parameters that should be set in the config file include
- retain_input_files
Whether the input files should be cleaned up after they are done processing
- root_dir
The root path under which processed files will e stored.
-
property
tmp
(self)¶ Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.
- Raises
NotImplementedError – [description]
-
find
(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None) → List[str]¶ Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.
- Returns
A list of paths in datastream storage in ascending order
- Return type
List[str]
-
fetch
(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None) → tsdat.io.DisposableLocalTempFileList¶ Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.
filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.
- Returns
A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.
- Return type
DisposableLocalTempFileList:
-
save_local_path
(self, local_path: str, new_filename: str = None) → Any¶ Given a path to a local file, save that file to the storage.
- Parameters
local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.
new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.
- Returns
The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.
- Return type
Any
-
exists
(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None) → bool¶ Checks if any data exists in the datastream store for the provided datastream and time range.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.
- Returns
True if data exists, False otherwise.
- Return type
bool
-
delete
(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None) → None¶ Deletes datastream data in the datastream store in between the specified time range.
- Parameters
datastream_name (str) – The datastream_name as defined by ME Data Standards.
start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.
end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.
filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.
-
class
tsdat.
AbstractFileHandler
(parameters: Union[Dict, None] = None)¶ Abstract class to define methods required by all FileHandlers. Classes derived from AbstractFileHandler should implement one or more of the following methods:
write(ds: xr.Dataset, filename: str, config: Config, **kwargs)
read(filename: str, **kwargs) -> xr.Dataset
- Parameters
parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.
-
write
(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs) → None¶ Saves the given dataset to a file.
- Parameters
ds (xr.Dataset) – The dataset to save.
filename (str) – The path to where the file should be written to.
config (Config, optional) – Optional Config object, defaults to None
-
read
(self, filename: str, **kwargs) → xarray.Dataset¶ Reads in the given file and converts it into an Xarray dataset for use in the pipeline.
- Parameters
filename (str) – The path to the file to read in.
- Returns
A xr.Dataset object.
- Return type
xr.Dataset
-
class
tsdat.
FileHandler
¶ Class to provide methods to read and write files with a variety of extensions.
-
FILEREADERS
:Dict[str, AbstractFileHandler]¶
-
FILEWRITERS
:Dict[str, AbstractFileHandler]¶
-
static
_get_handler
(filename: str, method: Literal[read, write]) → AbstractFileHandler¶ Given the filepath of the file to read or write and the FileHandler method to apply to the filepath, this method determines which previously-registered FileHandler should be used on the provided filepath.
- Parameters
filename (str) – The path to the file to read or write to.
method (Literal[) – The method to apply to the file. Must be one of: “read”,
"write". –
- Returns
The FileHandler that should be applied.
- Return type
-
static
write
(ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs) → None¶ Calls the appropriate FileHandler to write the dataset to the provided filename.
- Parameters
ds (xr.Dataset) – The dataset to save.
filename (str) – The path to the file where the dataset should be written.
config (Config, optional) – Optional Config object. Defaults to None.
-
static
read
(filename: str, **kwargs) → xarray.Dataset¶ Reads in the given file and converts it into an xarray dataset object using the registered FileHandler for the provided filepath.
- Parameters
filename (str) – The path to the file to read in.
- Returns
The raw file as an Xarray.Dataset object.
- Return type
xr.Dataset
-
static
register_file_handler
(method: Literal[read, write], patterns: Union[str, List[str]], handler: AbstractFileHandler)¶ Method to register a FileHandler for reading from or writing to files matching one or more provided file patterns.
- Parameters
method ("Literal") – The method the FileHandler should call if the pattern is
Must be one of (matched.) – “read”, “write”.
patterns (Union[str, List[str]]) – The file pattern(s) that determine if this
should be run on a given filepath. (FileHandler) –
handler (AbstractFileHandler) – The AbstractFileHandler to register.
-
-
class
tsdat.
CsvHandler
(parameters: Union[Dict, None] = None)¶ Bases:
tsdat.io.filehandlers.file_handlers.AbstractFileHandler
FileHandler to read from and write to CSV files. Takes a number of parameters that are passed in from the storage config file. Parameters specified in the config file should follow the following example:
parameters: write: to_dataframe: # Parameters here will be passed to xr.Dataset.to_dataframe() to_csv: # Parameters here will be passed to pd.DataFrame.to_csv() read: read_csv: # Parameters here will be passed to pd.read_csv() to_xarray: # Parameters here will be passed to pd.DataFrame.to_xarray()
- Parameters
parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.
-
write
(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs) → None¶ Saves the given dataset to a csv file.
- Parameters
ds (xr.Dataset) – The dataset to save.
filename (str) – The path to where the file should be written to.
config (Config, optional) – Optional Config object, defaults to None
-
read
(self, filename: str, **kwargs) → xarray.Dataset¶ Reads in the given file and converts it into an Xarray dataset for use in the pipeline.
- Parameters
filename (str) – The path to the file to read in.
- Returns
A xr.Dataset object.
- Return type
xr.Dataset
-
class
tsdat.
NetCdfHandler
(parameters: Union[Dict, None] = None)¶ Bases:
tsdat.io.filehandlers.file_handlers.AbstractFileHandler
FileHandler to read from and write to netCDF files. Takes a number of parameters that are passed in from the storage config file. Parameters specified in the config file should follow the following example:
parameters: write: to_netcdf: # Parameters here will be passed to xr.Dataset.to_netcdf() read: load_dataset: # Parameters here will be passed to xr.load_dataset()
- Parameters
parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.
-
write
(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs) → None¶ Saves the given dataset to a netCDF file.
- Parameters
ds (xr.Dataset) – The dataset to save.
filename (str) – The path to where the file should be written to.
config (Config, optional) – Optional Config object, defaults to None
-
read
(self, filename: str, **kwargs) → xarray.Dataset¶ Reads in the given file and converts it into an Xarray dataset for use in the pipeline.
- Parameters
filename (str) – The path to the file to read in.
- Returns
A xr.Dataset object.
- Return type
xr.Dataset
-
tsdat.
register_filehandler
(patterns: Union[str, List[str]]) → AbstractFileHandler¶ Python decorator to register an AbstractFileHandler in the FileHandler object. The FileHandler object will be used by tsdat pipelines to read and write raw, intermediate, and processed data.
This decorator can be used to work with a specific AbstractFileHandler without having to specify a config file. This is useful when using an AbstractFileHandler for analysis or for tests outside of a pipeline. For tsdat pipelines, handlers should always be specified via the storage config file.
Example Usage:
import xarray as xr from tsdat.io import register_filehandler, AbstractFileHandler @register_filehandler(["*.nc", "*.cdf"]) class NetCdfHandler(AbstractFileHandler): def write(ds: xr.Dataset, filename: str, config: Config = None, **kwargs): ds.to_netcdf(filename) def read(filename: str, **kwargs) -> xr.Dataset: xr.load_dataset(filename)
- Parameters
patterns (Union[str, List[str]]) – The patterns (regex) that should be used to match a filepath to the AbstractFileHandler provided.
- Returns
The original AbstractFileHandler class, after it has been registered for use in tsdat pipelines.
- Return type
-
class
tsdat.
Pipeline
(pipeline_config: Union[str, tsdat.config.Config], storage_config: Union[str, tsdat.io.DatastreamStorage])¶ Bases:
abc.ABC
This class serves as the base class for all tsdat data pipelines.
- Parameters
pipeline_config (Union[str, Config]) – The pipeline config file. Can be either a config object, or the path to the pipeline config file that should be used with this pipeline.
storage_config (Union[str, DatastreamStorage]) – The storage config file. Can be either a config object, or the path to the storage config file that should be used with this pipeline.
-
abstract
run
(self, filepath: Union[str, List[str]])¶ This method is the entry point for the pipeline. It will take one or more file paths and process them from start to finish. All classes extending the Pipeline class must implement this method.
- Parameters
filepath (Union[str, List[str]]) – The path or list of paths to the file(s) to run the pipeline on.
-
standardize_dataset
(self, raw_mapping: Dict[str, xarray.Dataset]) → xarray.Dataset¶ Standardizes the dataset by applying variable name and units conversions as defined by the pipeline config file. This method returns the standardized dataset.
- Parameters
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.
- Returns
The standardized dataset.
- Return type
xr.Dataset
-
check_required_variables
(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition)¶ Function to throw an error if a required variable could not be retrieved.
- Parameters
dataset (xr.Dataset) – The dataset to check.
dod (DatasetDefinition) – The DatasetDefinition used to specify required variables.
- Raises
Exception – Raises an exception to indicate the variable could not be retrieved.
-
add_static_variables
(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Uses the DatasetDefinition to add static variables (variables whose data are defined in the pipeline config file) to the output dataset.
- Parameters
dataset (xr.Dataset) – The dataset to add static variables to.
dod (DatasetDefinition) – The DatasetDefinition to pull data from.
- Returns
The original dataset with added variables from the config
- Return type
xr.Dataset
-
add_missing_variables
(self, dataset: xarray.Dataset, dod: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Uses the dataset definition to initialize variables that are defined in the dataset definiton but did not have input. Uses the appropriate shape and _FillValue to initialize each variable.
- Parameters
dataset (xr.Dataset) – The dataset to add the variables to.
dod (DatasetDefinition) – The DatasetDefinition to use.
- Returns
The original dataset with variables that still need to be initialized, initialized.
- Return type
xr.Dataset
-
add_attrs
(self, dataset: xarray.Dataset, raw_mapping: Dict[str, xarray.Dataset], dod: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Adds global and variable-level attributes to the dataset from the DatasetDefinition object.
- Parameters
dataset (xr.Dataset) – The dataset to add attributes to.
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping. Used to set the
input_files
global attribute.dod (DatasetDefinition) – The DatasetDefinition containing the attributes to add.
- Returns
The original dataset with the attributes added.
- Return type
xr.Dataset
-
get_previous_dataset
(self, dataset: xarray.Dataset) → xarray.Dataset¶ Utility method to retrieve the previous set of data for hte same datastream as the provided dataset from the DatastreamStorage.
- Parameters
dataset (xr.Dataset) – The reference dataset that will be used to search the DatastreamStore for prior data.
- Returns
The previous dataset from the DatastreamStorage if it exists, otherwise None.
- Return type
xr.Dataset
-
reduce_raw_datasets
(self, raw_mapping: Dict[str, xarray.Dataset], definition: tsdat.config.DatasetDefinition) → List[xarray.Dataset]¶ Removes unused variables from each raw dataset in the raw mapping and performs input to output naming and unit conversions as defined in the dataset definition.
- Parameters
raw_mapping (Dict[str, xr.Dataset]) – The raw xarray dataset mapping.
definition (DatasetDefinition) – The DatasetDefinition used to select the variables to keep.
- Returns
A list of reduced datasets.
- Return type
List[xr.Dataset]
-
reduce_raw_dataset
(self, raw_dataset: xarray.Dataset, variable_definitions: List[tsdat.config.VariableDefinition], definition: tsdat.config.DatasetDefinition) → xarray.Dataset¶ Removes unused variables from the raw dataset provided and keeps only the variables and coordinates pertaining to the provdided variable definitions. Also performs input to output naming and unit conversions as defined in the DatasetDefinition.
- Parameters
raw_dataset (xr.Dataset) – The raw dataset mapping.
variable_definitions (List[VariableDefinition]) – List of variables to keep.
definition (DatasetDefinition) – The DatasetDefinition used to select the variables to keep.
- Returns
The reduced dataset.
- Return type
xr.Dataset
-
decode_cf
(self, dataset: xarray.Dataset) → xarray.Dataset¶ Decodes the dataset according to CF conventions. This helps ensure that the dataset is formatted correctly after it has been constructed from unstandardized sources or heavily modified. :param dataset: The dataset to decode. :type dataset: xr.Dataset
- Returns
The decoded dataset.
- Return type
xr.Dataset
-
class
tsdat.
IngestPipeline
(pipeline_config: Union[str, tsdat.config.Config], storage_config: Union[str, tsdat.io.DatastreamStorage])¶ Bases:
tsdat.pipeline.pipeline.Pipeline
The IngestPipeline class is designed to read in raw, non-standardized data and convert it to a standardized format by embedding metadata, applying quality checks and quality controls, and by saving the now-processed data in a standard file format.
-
run
(self, filepath: Union[str, List[str]]) → xarray.Dataset¶ Runs the IngestPipeline from start to finish.
- Parameters
filepath (Union[str, List[str]]) – The path or list of paths to the file(s) to run the pipeline on.
-
hook_customize_dataset
(self, dataset: xarray.Dataset, raw_mapping: Dict[str, xarray.Dataset]) → xarray.Dataset¶ Hook to allow for user customizations to the standardized dataset such as inserting a derived variable based on other variables in the dataset. This method is called immediately after the
standardize_dataset
method and beforeQualityManagement
has been run.- Parameters
dataset (xr.Dataset) – The dataset to customize.
raw_mapping (Dict[str, xr.Dataset]) – The raw dataset mapping.
- Returns
The customized dataset.
- Return type
xr.Dataset
-
hook_customize_raw_datasets
(self, raw_dataset_mapping: Dict[str, xarray.Dataset]) → Dict[str, xarray.Dataset]¶ Hook to allow for user customizations to one or more raw xarray Datasets before they merged and used to create the standardized dataset. The raw_dataset_mapping will contain one entry for each file being used as input to the pipeline. The keys are the standardized raw file name, and the values are the datasets.
This method would typically only be used if the user is combining multiple files into a single dataset. In this case, this method may be used to correct coordinates if they don’t match for all the files, or to change variable (column) names if two files have the same name for a variable, but they are two distinct variables.
This method can also be used to check for unique conditions in the raw data that should cause a pipeline failure if they are not met.
This method is called before the inputs are merged and converted to standard format as specified by the config file.
- Parameters
raw_dataset_mapping (Dict[str, xr.Dataset]) – The raw datasets to customize.
- Returns
The customized raw datasets.
- Return type
Dict[str, xr.Dataset]
-
hook_finalize_dataset
(self, dataset: xarray.Dataset) → xarray.Dataset¶ Hook to apply any final customizations to the dataset before it is saved. This hook is called after QualityManagement has been run and immediately before the dataset it saved to file.
- Parameters
dataset (xr.Dataset) – The dataset to finalize.
- Returns
The finalized dataset to save.
- Return type
xr.Dataset
-
hook_generate_and_persist_plots
(self, dataset: xarray.Dataset) → None¶ Hook to allow users to create plots from the xarray dataset after the dataset has been finalized and just before the dataset is saved to disk.
To save on filesystem space (which is limited when running on the cloud via a lambda function), this method should only write one plot to local storage at a time. An example of how this could be done is below:
filename = DSUtil.get_plot_filename(dataset, "sea_level", "png") with self.storage._tmp.get_temp_filepath(filename) as tmp_path: fig, ax = plt.subplots(figsize=(10,5)) ax.plot(dataset["time"].data, dataset["sea_level"].data) fig.save(tmp_path) storage.save(tmp_path) filename = DSUtil.get_plot_filename(dataset, "qc_sea_level", "png") with self.storage._tmp.get_temp_filepath(filename) as tmp_path: fig, ax = plt.subplots(figsize=(10,5)) DSUtil.plot_qc(dataset, "sea_level", tmp_path) storage.save(tmp_path)
- Parameters
dataset (xr.Dataset) – The xarray dataset with customizations and QualityManagement applied.
-
read_and_persist_raw_files
(self, file_paths: List[str]) → List[str]¶ Renames the provided raw files according to ME Data Standards file naming conventions for raw data files, and returns a list of the paths to the renamed files.
- Parameters
file_paths (List[str]) – A list of paths to the original raw files.
- Returns
A list of paths to the renamed files.
- Return type
List[str]
-
-
class
tsdat.
QualityChecker
(ds: xarray.Dataset, previous_data: xarray.Dataset, definition: tsdat.config.QualityManagerDefinition, parameters: Union[Dict, None] = None)¶ Bases:
abc.ABC
Class containing the code to perform a single Quality Check on a Dataset variable.
- Parameters
ds (xr.Dataset) – The dataset the checker will be applied to
previous_data (xr.Dataset) – A dataset from the previous processing interval (i.e., file). This is used to check for consistency between files, such as for monitonic or delta checks when we need to check the previous value.
definition (QualityManagerDefinition) – The quality manager definition as specified in the pipeline config file
parameters (dict, optional) – A dictionary of checker-specific parameters specified in the pipeline config file. Defaults to {}
-
abstract
run
(self, variable_name: str) → Optional[numpy.ndarray]¶ Check a dataset’s variable to see if it passes a quality check. These checks can be performed on the entire variable at one time by using xarray vectorized numerical operators.
- Parameters
variable_name (str) – The name of the variable to check
- Returns
If the check was performed, return a ndarray of the same shape as the variable. Each value in the data array will be either True or False, depending upon the results of the check. True means the check failed. False means it succeeded.
Note that we are using an np.ndarray instead of an xr.DataArray because the DataArray contains coordinate indexes which can sometimes get out of sync when performing np arithmectic vector operations. So it’s easier to just use numpy arrays.
If the check was skipped for some reason (i.e., it was not relevant given the current attributes defined for this dataset), then the run method should return None.
- Return type
Optional[np.ndarray]
-
class
tsdat.
QualityHandler
(ds: xarray.Dataset, previous_data: xarray.Dataset, quality_manager: tsdat.config.QualityManagerDefinition, parameters: Union[Dict, None] = None)¶ Bases:
abc.ABC
Class containing code to be executed if a particular quality check fails.
- Parameters
ds (xr.Dataset) – The dataset the handler will be applied to
previous_data (xr.Dataset) – A dataset from the previous processing interval (i.e., file). This is used to check for consistency between files, such as for monotonic or delta checks when we need to check the previous value.
quality_manager (QualityManagerDefinition) – The quality_manager definition as specified in the pipeline config file
parameters (dict, optional) – A dictionary of handler-specific parameters specified in the pipeline config file. Defaults to {}
-
abstract
run
(self, variable_name: str, results_array: numpy.ndarray)¶ Perform a follow-on action if a quality check fails. This can be used to correct data if needed (such as replacing a bad value with missing value, emailing a contact persion, or raising an exception if the failure constitutes a critical error).
- Parameters
variable_name (str) – Name of the variable that failed
results_array (np.ndarray) – An array of True/False values for each data value of the variable. True means the check failed.
-
record_correction
(self, variable_name: str)¶ If a correction was made to variable data to fix invalid values as detected by a quality check, this method will record the fix to the appropriate variable attribute. The correction description will come from the handler params which get set in the pipeline config file.
- Parameters
variable_name (str) – Name
-
class
tsdat.
DSUtil
¶ Provides helper functions for xarray.Dataset
-
static
record_corrections_applied
(ds: xarray.Dataset, variable: str, correction: str)¶ Records a description of a correction made to a variable to the corrections_applied corresponding attribute.
- Parameters
ds (xr.Dataset) – Dataset containing the corrected variable
variable (str) – The name of the variable that was corrected
correction (str) – A description of the correction
-
static
datetime64_to_string
(datetime64: numpy.datetime64) → Tuple[str, str]¶ Convert a datetime64 object to formated string.
- Parameters
datetime64 (Union[np.ndarray, np.datetime64]) – The datetime64 object
- Returns
A tuple of strings representing the formatted date. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.
- Return type
Tuple[str, str]
-
static
datetime64_to_timestamp
(variable_data: numpy.ndarray) → numpy.ndarray¶ Converts each datetime64 value to a timestamp in same units as the variable (eg., seconds, nanoseconds).
- Parameters
variable_data (np.ndarray) – ndarray of variable data
- Returns
An ndarray of the same shape, with time values converted to long timestamps (e.g., int64)
- Return type
np.ndarray
-
static
get_datastream_name
(ds: xarray.Dataset = None, config=None) → str¶ Returns the datastream name defined in the dataset or in the provided pipeline configuration.
- Parameters
ds (xr.Dataset, optional.) – The data as an xarray dataset; defaults to None
config (Config, optional) – The Config object used to assist reading time data from the raw_dataset; defaults to None.
- Returns
The datastream name
- Return type
str
-
static
get_end_time
(ds: xarray.Dataset) → Tuple[str, str]¶ Convenience method to get the end date and time from a xarray dataset.
- Parameters
ds (xr.Dataset) – The dataset
- Returns
A tuple of [day, time] as formatted strings representing the last time point in the dataset.
- Return type
Tuple[str, str]
-
static
get_fill_value
(ds: xarray.Dataset, variable_name: str)¶ Get the value of the _FillValue attribute for the given variable.
- Parameters
ds (xr.Dataset) – The dataset
variable_name (str) – A variable in the dataset
- Returns
The value of the _FillValue attr or None if it is not defined
- Return type
same data type of the variable (int, float, etc.) or None
-
static
get_non_qc_variable_names
(ds: xarray.Dataset) → List[str]¶ Get a list of all data variables in the dataset that are NOT qc variables.
- Parameters
ds (xr.Dataset) – A dataset
- Returns
List of non-qc data variable names
- Return type
List[str]
-
static
get_raw_end_time
(raw_ds: xarray.Dataset, time_var_definition: tsdat.VariableDefinition) → Tuple[str, str]¶ Convenience method to get the end date and time from a raw xarray dataset. This uses time_var_definition.get_input_name() as the dataset key for the time variable and additionally uses the input’s Converter object if applicable.
- Parameters
raw_ds (xr.Dataset) – A raw dataset (not standardized)
time_var_definition (VariableDefinition) – The ‘time’ variable definition from the pipeline config
- Returns
A tuple of strings representing the last time data point in the dataset. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.
- Return type
Tuple[str, str]
-
static
get_raw_start_time
(raw_ds: xarray.Dataset, time_var_definition: tsdat.config.VariableDefinition) → Tuple[str, str]¶ Convenience method to get the start date and time from a raw xarray dataset. This uses time_var_definition.get_input_name() as the dataset key for the time variable and additionally uses the input’s Converter object if applicable.
- Parameters
raw_ds (xr.Dataset) – A raw dataset (not standardized)
time_var_definition (VariableDefinition) – The ‘time’ variable definition from the pipeline config
- Returns
A tuple of strings representing the first time data point in the dataset. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.
- Return type
Tuple[str, str]
-
static
get_coordinate_variable_names
(ds: xarray.Dataset) → List[str]¶ Get a list of all coordinate variables in this dataset.
- Parameters
ds (xr.Dataset) – The dataset
- Returns
List of coordinate variable names
- Return type
List[str]
-
static
get_start_time
(ds: xarray.Dataset) → Tuple[str, str]¶ Convenience method to get the start date and time from a xarray dataset.
- Parameters
ds (xr.Dataset) – A standardized dataset
- Returns
A tuple of strings representing the first time data point in the dataset. The first string is the day in ‘yyyymmdd’ format. The second string is the time in ‘hhmmss’ format.
- Return type
Tuple[str, str]
-
static
get_metadata
(ds: xarray.Dataset) → Dict¶ Get a dictionary of all global and variable attributes in a dataset. Global atts are found under the ‘attributes’ key and variable atts are found under the ‘variables’ key.
- Parameters
ds (xr.Dataset) – A dataset
- Returns
A dictionary of global & variable attributes
- Return type
Dict
-
static
plot_qc
(ds: xarray.Dataset, variable_name: str, filename: str = None, **kwargs) → act.plotting.TimeSeriesDisplay¶ Create a QC plot for the given variable. This is based on the ACT library: https://arm-doe.github.io/ACT/source/auto_examples/plot_qc.html#sphx-glr-source-auto-examples-plot-qc-py
We provide a convenience wrapper method for basic QC plots of a variable, but we recommend to use ACT directly and look at their examples for more complex plots like plotting variables in two different datasets.
TODO: Depending on use cases, we will likely add more arguments to be able to quickly produce the most common types of QC plots.
- Parameters
ds (xr.Dataset) – A dataset
variable_name (str) – The variable to plot
filename (str, optional) – The filename for the image. Saves the plot as this filename if provided.
-
static
get_plot_filename
(dataset: xarray.Dataset, plot_description: str, extension: str) → str¶ Returns the filename for a plot according to MHKIT-Cloud Data standards. The dataset is used to determine the datastream_name and start date/time. The standards dictate that a plot filename should follow the format: datastream_name.date.time.description.extension.
- Parameters
dataset (xr.Dataset) – The dataset from which the plot data is drawn from. This is used to collect the datastream_name and start date/time.
plot_description (str) – The description of the plot. Should be as brief as possible and contain no spaces. Underscores may be used.
extension (str) – The file extension for the plot.
- Returns
The standardized plot filename.
- Return type
str
-
static
get_dataset_filename
(dataset: xarray.Dataset, file_extension='.nc') → str¶ Given an xarray dataset this function will return the base filename of the dataset according to MHkiT-Cloud data standards. The base filename does not include the directory structure where the file should be saved, only the name of the file itself, e.g. z05.ExampleBuoyDatastream.b1.20201230.000000.nc
- Parameters
dataset (xr.Dataset) – The dataset whose filename should be generated.
file_extension (str, optional) – The file extension to use. Defaults to “.nc”
- Returns
The base filename of the dataset.
- Return type
str
-
static
get_raw_filename
(raw_dataset: xarray.Dataset, old_filename: str, config) → str¶ Returns the appropriate raw filename of the raw dataset according to MHKIT-Cloud naming conventions. Uses the config object to parse the start date and time from the raw dataset for use in the new filename.
The new filename will follow the MHKIT-Cloud Data standards for raw filenames, ie: datastream_name.date.time.raw.old_filename, where the data level used in the datastream_name is 00.
- Parameters
raw_dataset (xr.Dataset) – The raw data as an xarray dataset.
old_filename (str) – The name of the original raw file.
config (Config) – The Config object used to assist reading time data from the raw_dataset.
- Returns
The standardized filename of the raw file.
- Return type
str
-
static
get_date_from_filename
(filename: str) → str¶ Given a filename that conforms to MHKiT-Cloud Data Standards, return the date of the first point of data in the file.
- Parameters
filename (str) – The filename or path to the file.
- Returns
The date, in “yyyymmdd.hhmmss” format.
- Return type
str
-
static
get_datastream_name_from_filename
(filename: str) → Optional[str]¶ Given a filename that conforms to MHKiT-Cloud Data Standards, return the datastream name. Datastream name is everything to the left of the third ‘.’ in the filename.
e.g., humboldt_ca.buoy_data.b1.20210120.000000.nc
- Parameters
filename (str) – The filename or path to the file.
- Returns
The datstream name, or None if filename is not in proper format.
- Return type
Optional[str]
-
static
get_datastream_directory
(datastream_name: str, root: str = '') → str¶ Given the datastream_name and an optional root, returns the path to where the datastream should be located. Does NOT create the directory where the datastream should be located.
- Parameters
datastream_name (str) – The name of the datastream whose directory path should be generated.
root (str, optional) – The directory to use as the root of the directory structure. Defaults to None. Defaults to “”
- Returns
The path to the directory where the datastream should be located.
- Return type
str
-
static
is_image
(filename: str) → bool¶ Detect the mimetype from the file extension and use it to determine if the file is an image or not
- Parameters
filename (str) – The name of the file to check
- Returns
True if the file extension matches an image mimetype
- Return type
bool
-
static
-
class
tsdat.
Converter
(parameters: Union[Dict, None] = None)¶ Bases:
abc.ABC
Base class for converting data arrays from one units to another. Users can extend this class if they have a special units conversion for their input data that cannot be resolved with the default converter classes.
- Parameters
parameters (dict, optional) – A dictionary of converter-specific parameters which get passed from the pipeline config file. Defaults to {}
-
abstract
run
(self, data: numpy.ndarray, in_units: str, out_units: str) → numpy.ndarray¶ Convert the input data from in_units to out_units.
- Parameters
data (np.ndarray) – Data array to be modified.
in_units (str) – Current units of the data array.
out_units (str) – Units to be converted to.
- Returns
Data array converted into the new units.
- Return type
np.ndarray
-
class
tsdat.
DefaultConverter
(parameters: Union[Dict, None] = None)¶ Bases:
Converter
Default class for converting units on data arrays. This class utilizes ACT.utils.data_utils.convert_units, and should work for most variables except time (see StringTimeConverter and TimestampTimeConverter)
-
run
(self, data: numpy.ndarray, in_units: str, out_units: str) → numpy.ndarray¶ Convert the input data from in_units to out_units.
- Parameters
data (np.ndarray) – Data array to be modified.
in_units (str) – Current units of the data array.
out_units (str) – Units to be converted to.
- Returns
Data array converted into the new units.
- Return type
np.ndarray
-
-
class
tsdat.
StringTimeConverter
(parameters: Union[Dict, None] = None)¶ Bases:
Converter
Convert a time string to a np.datetime64, which is needed for xarray. This class utilizes pd.to_datetime to perform the conversion.
One of the parameters should be ‘time_format’, which is the the strftime to parse time, eg “%d/%m/%Y”. Note that “%f” will parse all the way up to nanoseconds. See strftime documentation for more information on choices.
- Parameters
parameters (dict, optional) – dictionary of converter-specific parameters. Defaults to {}.
-
run
(self, data: numpy.ndarray, in_units: str, out_units: str) → numpy.ndarray¶ Convert the input data from in_units to out_units.
- Parameters
data (np.ndarray) – Data array to be modified.
in_units (str) – Current units of the data array.
out_units (str) – Units to be converted to.
- Returns
Data array converted into the new units.
- Return type
np.ndarray
-
class
tsdat.
TimestampTimeConverter
(parameters: Union[Dict, None] = None)¶ Bases:
Converter
Convert a numeric UTC timestamp to a np.datetime64, which is needed for xarray. This class utilizes pd.to_datetime to perform the conversion.
One of the parameters should be ‘unit’. This parameter denotes the time unit (e.g., D,s,ms,us,ns), which is an integer or float number. The timestamp will be based off the unix epoch start.
- Parameters
parameters (dict, optional) – A dictionary of converter-specific parameters which get passed from the pipeline config file. Defaults to {}
-
run
(self, data: numpy.ndarray, in_units: str, out_units: str) → numpy.ndarray¶ Convert the input data from in_units to out_units.
- Parameters
data (np.ndarray) – Data array to be modified.
in_units (str) – Current units of the data array.
out_units (str) – Units to be converted to.
- Returns
Data array converted into the new units.
- Return type
np.ndarray