tsdat.io.aws_storage

Module Contents

Classes

S3Path

This class wraps a ‘special’ path string that lets us include the

AwsTemporaryStorage

Class used to store temporary files or perform

AwsStorage

DatastreamStorage subclass for an AWS S3-based filesystem.

Attributes

SEPARATOR

tsdat.io.aws_storage.SEPARATOR = $$$
class tsdat.io.aws_storage.S3Path(bucket_name: str, bucket_path: str = '', region_name: str = None)

Bases: str

This class wraps a ‘special’ path string that lets us include the bucket name and region in the path, so that we can use it seamlessly in boto3 APIs. We are creating our own string to hold the region, bucket & key (i.e., path), since boto3 needs all three in order to access a file.

Example: .. code-block:: python

s3_client = boto3.client(‘s3’, region_name=’eu-central-1’) s3_client.download_file(bucket, key, download_path)

Parameters
  • bucket_name (str) – The S3 bucket name where this file is located

  • bucket_path (str, optional) – The key to access this file in the bucket

  • region_name (str, optional) – The AWS region where this file is located, defaults to None, which inherits the default configured region.

__str__(self)

Return str(self).

property bucket_name(self)
property bucket_path(self)
property region_name(self)
join(self, *args)

Joins segments in an S3 path. This method behaves exactly like os.path.join.

Returns

A New S3Path with the additional segments added.

Return type

S3Path

class tsdat.io.aws_storage.AwsTemporaryStorage(*args, **kwargs)

Bases: tsdat.io.TemporaryStorage

Class used to store temporary files or perform fileystem actions on files other than datastream files that reside in the same AWS S3 bucket as the DatastreamStorage. This is a helper class intended to be used in the internals of pipeline implementations only. It is not meant as an external API for interacting with files in DatastreamStorage.

property base_path(self)S3Path
clean(self)

Clean any extraneous files from the temp working dirs. Temp files could be in two places:

  1. the local temp folder - used when fetching files from the store

  2. the storage temp folder - used when extracting zip files in some stores (e.g., AWS)

This method removes the local temp folder. Child classes can extend this method to clean up their respective storage temp folders.

is_tarfile(self, filepath)
is_zipfile(self, filepath)
extract_tarfile(self, filepath: S3Path)List[S3Path]
extract_zipfile(self, filepath)List[S3Path]
extract_files(self, list_or_filepath: Union[S3Path, List[S3Path]])tsdat.io.DisposableStorageTempFileList

If provided a path to an archive file, this function will extract the archive into a temp directory IN THE SAME FILESYSTEM AS THE STORAGE. This means, for example that if storage was in an s3 bucket ,then the files would be extracted to a temp dir in that s3 bucket. This is to prevent local disk limitations when running via Lambda.

If the file is not an archive, then the same file will be returned.

This method supports zip, tar, and tar.g file formats.

Parameters

file_path (Union[str, List[str]]) – The path of a file or a list of files that should be processed together, located in the same filesystem as the storage.

Returns

A list of paths to the files that were extracted. Files will be located in the temp area of the storage filesystem.

Return type

DisposableStorageTempFileList

fetch(self, file_path: S3Path, local_dir=None, disposable=True)tsdat.io.DisposableLocalTempFile

Fetch a file from temp storage to a local temp folder. If disposable is True, then a DisposableLocalTempFile will be returned so that it can be used with a context manager.

Parameters
  • file_path (str) – The path of a file located in the same filesystem as the storage.

  • local_dir ([type], optional) – The destination folder for the file. If not specified, it will be created int the storage-approved local temp folder. defaults to None.

  • disposable (bool, optional) – True if this file should be auto-deleted when it goes out of scope. Defaults to True.

Returns

If disposable, return a DisposableLocalTempFile, otherwise return the path to the local file.

Return type

Union[DisposableLocalTempFile, str]

fetch_previous_file(self, datastream_name: str, start_time: str)tsdat.io.DisposableLocalTempFile

Look in DatastreamStorage for the first processed file before the given date.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

Returns

If a previous file was found, return the local path to the fetched file. Otherwise return None. (Return value wrapped in DisposableLocalTempFile so it can be auto-deleted if needed.)

Return type

DisposableLocalTempFile

delete(self, filepath: S3Path)None

Remove a file from storage temp area if the file exists. If the file does not exist, this method will NOT raise an exception.

Parameters

file_path (str) – The path of a file located in the same filesystem as the storage.

listdir(self, filepath: S3Path)List[S3Path]
upload(self, local_path: str, s3_path: S3Path)
class tsdat.io.aws_storage.AwsStorage(parameters: Union[Dict, None] = None)

Bases: tsdat.io.DatastreamStorage

DatastreamStorage subclass for an AWS S3-based filesystem.

Parameters

parameters (dict, optional) –

Dictionary of parameters that should be set automatically from the storage config file when this class is intantiated via the DatstreamStorage.from-config() method. Defaults to {}

Key parameters that should be set in the config file include

retain_input_files

Whether the input files should be cleaned up after they are done processing

root_dir

The bucket ‘key’ to use to prepend to all processed files created in the persistent store. Defaults to ‘root’

temp_dir

The bucket ‘key’ to use to prepend to all temp files created in the S3 bucket. Defaults to ‘temp’

bucket_name

The name of the S3 bucket to store to

property s3_resource(self)
property s3_client(self)
property tmp(self)

Each subclass should define the tmp property, which provides access to a TemporaryStorage object that is used to efficiently handle reading/writing temporary files used during the processing pipeline, or to perform fileystem actions on files other than processed datastream files that reside in the same filesystem as the DatastreamStorage. Is is not intended to be used outside of the pipeline.

Raises

NotImplementedError – [description]

property root(self)
property temp_path(self)
find(self, datastream_name: str, start_time: str, end_time: str, filetype: str = None)List[S3Path]

Finds all files of the given type from the datastream store with the given datastream_name and timestamps from start_time (inclusive) up to end_time (exclusive). Returns a list of paths to files that match the criteria.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106.000000” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108.000000” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths in datastream storage in ascending order

Return type

List[str]

fetch(self, datastream_name: str, start_time: str, end_time: str, local_path: str = None, filetype: int = None)tsdat.io.DisposableLocalTempFileList

Fetches files from the datastream store using the datastream_name, start_time, and end_time to specify the file(s) to retrieve. If the local path is not specified, it is up to the subclass to determine where to put the retrieved file(s).

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • local_path (str, optional) – The path to the directory where the data should be stored. Defaults to None.

  • filetype (int, optional) – A file type from the DatastreamStorage.file_filters keys If no type is specified, all files will be returned. Defaults to None.

Returns

A list of paths where the retrieved files were stored in local storage. This is a context manager class, so it this method should be called via the ‘with’ statement and all files referenced by the list will be cleaned up when it goes out of scope.

Return type

DisposableLocalTempFileList:

save_local_path(self, local_path: str, new_filename: str = None)

Given a path to a local file, save that file to the storage.

Parameters
  • local_path (str) – Local path to the file to save. The file should be named according to ME Data Standards naming conventions so that this method can automatically parse the datastream, date, and time from the file name.

  • new_filename (str, optional) – If provided, the new filename to save as. This parameter should ONLY be provided if using a local path for dataset_or_path. Must also follow ME Data Standards naming conventions. Defaults to None.

Returns

The path where this file was stored in storage. Path type is dependent upon the specific storage subclass.

Return type

Any

exists(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)bool

Checks if any data exists in the datastream store for the provided datastream and time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If none specified, all files will be checked. Defaults to None.

Returns

True if data exists, False otherwise.

Return type

bool

delete(self, datastream_name: str, start_time: str, end_time: str, filetype: int = None)None

Deletes datastream data in the datastream store in between the specified time range.

Parameters
  • datastream_name (str) – The datastream_name as defined by ME Data Standards.

  • start_time (str) – The start time or date to start searching for data (inclusive). Should be like “20210106” to search for data beginning on or after January 6th, 2021.

  • end_time (str) – The end time or date to stop searching for data (exclusive). Should be like “20210108” to search for data ending before January 8th, 2021.

  • filetype (str, optional) – A file type from the DatastreamStorage.file_filters keys. If no type is specified, all files will be deleted. Defaults to None.