tsdat.io

The tsdat.io package provides the classes that the data pipeline uses to manage I/O for the pipeline. Specifically, it includes:

  1. The FileHandler infrastructure used to read/write to/from specific file formats, and

  2. The Storage infrastructure used to store/access processed data files

We warmly welcome community contribututions to increase the list of supported FileHandlers and Storage destinations.

Package Contents

Classes

AbstractFileHandler

Abstract class to define methods required by all FileHandlers. Classes

FileHandler

Class to provide methods to read and write files with a variety of

CsvHandler

FileHandler to read from and write to CSV files. Takes a number of

NetCdfHandler

FileHandler to read from and write to netCDF files. Takes a number of

FilesystemStorage

Datastreamstorage subclass for a local Linux-based filesystem.

AwsStorage

DatastreamStorage subclass for an AWS S3-based filesystem.

S3Path

This class wraps a ‘special’ path string that lets us include the

Functions

register_filehandler(patterns: Union[str, List[str]]) → AbstractFileHandler

Python decorator to register an AbstractFileHandler in the FileHandler

class tsdat.io.AbstractFileHandler(parameters: Dict = {})

Abstract class to define methods required by all FileHandlers. Classes derived from AbstractFileHandler should implement one or more of the following methods:

write(ds: xr.Dataset, filename: str, config: Config, **kwargs)

read(filename: str, **kwargs) -> xr.Dataset

Parameters

parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.

write(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to a file.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

read(self, filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset for use in the pipeline.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

class tsdat.io.FileHandler

Class to provide methods to read and write files with a variety of extensions.

FILEHANDLERS :Dict[str, AbstractFileHandler]
static _get_handler(filename: str)AbstractFileHandler

Given the name of the file to read or write, this method applies a regular expression to match the name of the file with a handler that has been registered in its internal dictionary of FileHandler objects and returns the appropriate FileHandler, or None if a match is not found.

Parameters

filename (str) – The name of the file whose handler should be retrieved.

Returns

The FileHandler registered for use with the provided filename.

Return type

AbstractFileHandler

static write(ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to file using the registered FileHandler for the provided filename.

Parameters
  • ds (xr.Dataset) – The dataset ot save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

static read(filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset using the registered FileHandler for the provided filename.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

static register_file_handler(patterns: Union[str, List[str]], handler: AbstractFileHandler)

Static method to register an AbstractFileHandler for one or more file patterns. Once an AbstractFileHandler has been registered it may be used by this class to read or write files whose paths match one or more pattern(s) provided in registration.

Parameters
  • patterns (Union[str, List[str]]) – The patterns (regex) that should be used to match a filepath to the AbstractFileHandler provided.

  • handler (AbstractFileHandler) – The AbstractFileHandler to register.

tsdat.io.register_filehandler(patterns: Union[str, List[str]])AbstractFileHandler

Python decorator to register an AbstractFileHandler in the FileHandler object. The FileHandler object will be used by tsdat pipelines to read and write raw, intermediate, and processed data.

This decorator can be used to work with a specific AbstractFileHandler without having to specify a config file. This is useful when using an AbstractFileHandler for analysis or for tests outside of a pipeline. For tsdat pipelines, handlers should always be specified via the storage config file.

Example Usage:

import xarray as xr
from tsdat.io import register_filehandler, AbstractFileHandler

@register_filehandler(["*.nc", "*.cdf"])
class NetCdfHandler(AbstractFileHandler):
    def write(ds: xr.Dataset, filename: str, config: Config = None, **kwargs):
        ds.to_netcdf(filename)
    def read(filename: str, **kwargs) -> xr.Dataset:
        xr.load_dataset(filename)
Parameters

patterns (Union[str, List[str]]) – The patterns (regex) that should be used to match a filepath to the AbstractFileHandler provided.

Returns

The original AbstractFileHandler class, after it has been registered for use in tsdat pipelines.

Return type

AbstractFileHandler

class tsdat.io.CsvHandler(parameters: Dict = {})

Bases: tsdat.io.filehandlers.file_handlers.AbstractFileHandler

FileHandler to read from and write to CSV files. Takes a number of parameters that are passed in from the storage config file. Parameters specified in the config file should follow the following example:

parameters:
  write:
    to_dataframe:
      # Parameters here will be passed to xr.Dataset.to_dataframe()
    to_csv:
      # Parameters here will be passed to pd.DataFrame.to_csv()
  read:
    read_csv:
      # Parameters here will be passed to pd.read_csv()
    to_xarray:
      # Parameters here will be passed to pd.DataFrame.to_xarray()
Parameters

parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.

write(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to a csv file.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

read(self, filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset for use in the pipeline.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

class tsdat.io.NetCdfHandler(parameters: Dict = {})

Bases: tsdat.io.filehandlers.file_handlers.AbstractFileHandler

FileHandler to read from and write to netCDF files. Takes a number of parameters that are passed in from the storage config file. Parameters specified in the config file should follow the following example:

parameters:
  write:
    to_netcdf:
      # Parameters here will be passed to xr.Dataset.to_netcdf()
  read:
    load_dataset:
      # Parameters here will be passed to xr.load_dataset()
Parameters

parameters (Dict, optional) – Parameters that were passed to the FileHandler when it was registered in the storage config file, defaults to {}.

write(self, ds: xarray.Dataset, filename: str, config: tsdat.config.Config = None, **kwargs)None

Saves the given dataset to a netCDF file.

Parameters
  • ds (xr.Dataset) – The dataset to save.

  • filename (str) – The path to where the file should be written to.

  • config (Config, optional) – Optional Config object, defaults to None

read(self, filename: str, **kwargs)xarray.Dataset

Reads in the given file and converts it into an Xarray dataset for use in the pipeline.

Parameters

filename (str) – The path to the file to read in.

Returns

A xr.Dataset object.

Return type

xr.Dataset

class tsdat.io.FilesystemStorage(parameters={})

Bases: tsdat.io.DatastreamStorage

Datastreamstorage subclass for a local Linux-based filesystem.

TODO: rename to LocalStorage as this is more intuitive.

Parameters

parameters (dict, optional) –

Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}

Key parameters that should be set in the config file include

retain_input_files

Whether the input files should be cleaned up after they are done processing

root_dir

The root path under which processed files will e stored.

property tmp(self)

Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.

Raises

NotImplementedError – [description]

find(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)List[str]

Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths in datastream storage in ascending order

Return type

List[str]

fetch(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)tsdat.io.DisposableLocalTempFileList

Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.

  • filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.

Return type

DisposableLocalTempFileList:

save_local_path(self, local_path: str, new_filename: str = None)Any

Given a path to a local file, save that file to the storage.

Parameters
  • local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.

Return type

Any

exists(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)bool

Checks if any data exists in the datastream store for the provided datastream and time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.

Returns

True if data exists, False otherwise.

Return type

bool

delete(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)None

Deletes datastream data in the datastream store in between the specified time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.

class tsdat.io.AwsStorage(parameters={})

Bases: tsdat.io.DatastreamStorage

DatastreamStorage subclass for an AWS S3-based filesystem.

Parameters

parameters (dict, optional) –

Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}

Key parameters that should be set in the config file include

retain_input_files

Whether the input files should be cleaned up after they are done processing

root_dir

The bucket ‘key’ to use to prepend to all processed files created in the persistent store. Defaults to ‘root’

temp_dir

The bucket ‘key’ to use to prepend to all temp files created in the S3 bucket. Defaults to ‘temp’

bucket_name

The name of the S3 bucket to store to

property s3_resource(self)
property s3_client(self)
property tmp(self)

Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.

Raises

NotImplementedError – [description]

property root(self)
property temp_path(self)
find(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)List[S3Path]

Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths in datastream storage in ascending order

Return type

List[str]

fetch(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)tsdat.io.DisposableLocalTempFileList

Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.

  • filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.

Return type

DisposableLocalTempFileList:

save_local_path(self, local_path: str, new_filename: str = None)

Given a path to a local file, save that file to the storage.

Parameters
  • local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.

Return type

Any

exists(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)bool

Checks if any data exists in the datastream store for the provided datastream and time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.

Returns

True if data exists, False otherwise.

Return type

bool

delete(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)None

Deletes datastream data in the datastream store in between the specified time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.

class tsdat.io.S3Path(bucket_name: str, bucket_path: str = '', region_name: str = None)

Bases: str

This class wraps a ‘special’ path string that lets us include the bucket name and region in the path, so that we can use it seamlessly in boto3 APIs. We are creating our own string to hold the region, bucket & key (i.e., path), since boto3 needs all three in order to access a file.

Example: .. code-block:: python

s3_client = boto3.client(‘s3’, region_name=’eu-central-1’) s3_client.download_file(bucket, key, download_path)

Parameters
  • bucket_name (str) – The S3 bucket name where this file is located

  • bucket_path (str, optional) – The key to access this file in the bucket

  • region_name (str, optional) – The AWS region where this file is located, defaults to None, which inherits the default configured region.

__str__(self)

Return str(self).

property bucket_name(self)
property bucket_path(self)
property region_name(self)
join(self, *args)

Joins segments in an S3 path. This method behaves exactly like os.path.join.

Returns

A New S3Path with the additional segments added.

Return type

S3Path