Skip to content

storage

Classes#

FileSystem #

Bases: Storage

Handles data storage and retrieval for file-based data formats.

Formats that write to directories (such as zarr) are not supported by the FileSystem storage class.

Parameters:

Name Type Description Default
parameters Parameters

File-system specific parameters, such as the root path to where files should be saved, or additional keyword arguments to specific functions used by the storage API. See the FileSystemStorage.Parameters class for more details.

required
handler FileHandler

The FileHandler class that should be used to handle data I/O within the storage API.

required

Attributes#

handler class-attribute instance-attribute #
handler: FileHandler = Field(default_factory=NetCDFHandler)
parameters class-attribute instance-attribute #
parameters: Parameters = Field(default_factory=Parameters)

Classes#

Parameters #

Bases: Parameters

Attributes#
data_filename_template class-attribute instance-attribute #
data_filename_template: str = (
    "{datastream}.{date_time}.{extension}"
)

Template string to use for data filenames.

Allows substitution of the following parameters using curly braces '{}':

  • ext: the file extension from the storage data handler
  • datastream from the dataset's global attributes
  • location_id from the dataset's global attributes
  • data_level from the dataset's global attributes
  • date_time: the first timestamp in the file formatted as "YYYYMMDD.hhmmss"
  • Any other global attribute that has a string or integer data type.

At a minimum the template must include {date_time}.

data_storage_path class-attribute instance-attribute #
data_storage_path: Path = Path(
    "data/{location_id}/{datastream}"
)

The directory structure under storage_root where ancillary files are saved.

Allows substitution of the following parameters using curly braces '{}':

  • storage_root: the value from the storage_root parameter.
  • datastream: the datastream as defined in the dataset config file.
  • location_id: the location_id as defined in the dataset config file.
  • data_level: the data_level as defined in the dataset config file.
  • year: the year of the first timestamp in the file.
  • month: the month of the first timestamp in the file.
  • day: the day of the first timestamp in the file.
  • extension: the file extension used by the output file writer.

Defaults to data/{location_id}/{datastream}.

merge_fetched_data_kwargs class-attribute instance-attribute #
merge_fetched_data_kwargs: Dict[str, Any] = dict()

Keyword arguments passed to xr.merge.

Note that this will only be called if the DataReader returns a dictionary of xr.Datasets for a single input key.

Functions#

fetch_data #
fetch_data(
    start: datetime,
    end: datetime,
    datastream: str,
    metadata_kwargs: Union[Dict[str, str], None] = None,
    **kwargs: Any
) -> xr.Dataset

Fetches data for a given datastream between a specified time range.

Parameters:

Name Type Description Default
start datetime

The minimum datetime to fetch.

required
end datetime

The maximum datetime to fetch.

required
datastream str

The datastream id to search for.

required
metadata_kwargs dict[str, str]

Metadata substitutions to help resolve the data storage path. This is only required if the template data storage path includes any properties other than datastream or fields contained in the datastream. Defaults to None.

None

Returns:

Type Description
Dataset

xr.Dataset: A dataset containing all the data in the storage area that spans

Dataset

the specified datetimes.


Source code in tsdat/io/storage.py
def fetch_data(
    self,
    start: datetime,
    end: datetime,
    datastream: str,
    metadata_kwargs: Union[Dict[str, str], None] = None,
    **kwargs: Any,
) -> xr.Dataset:
    """-----------------------------------------------------------------------------
    Fetches data for a given datastream between a specified time range.

    Args:
        start (datetime): The minimum datetime to fetch.
        end (datetime): The maximum datetime to fetch.
        datastream (str): The datastream id to search for.
        metadata_kwargs (dict[str, str], optional): Metadata substitutions to help
            resolve the data storage path. This is only required if the template
            data storage path includes any properties other than datastream or
            fields contained in the datastream. Defaults to None.

    Returns:
        xr.Dataset: A dataset containing all the data in the storage area that spans
        the specified datetimes.

    -----------------------------------------------------------------------------"""
    if metadata_kwargs is None:
        metadata_kwargs = {}
    metadata_kwargs = {
        "datastream": datastream,
        **get_fields_from_datastream(datastream),
        **metadata_kwargs,
    }
    data_files = self._find_data(
        start,
        end,
        datastream,
        metadata_kwargs=metadata_kwargs,
    )
    datasets = self._open_data_files(*data_files)
    dataset = xr.merge(datasets, **self.parameters.merge_fetched_data_kwargs)  # type: ignore
    if not dataset:
        logger.warning(
            "No data found for %s in range %s - %s", datastream, start, end
        )
        return dataset  # empty
    return dataset.sel(time=slice(start, end))
save_ancillary_file #
save_ancillary_file(
    filepath: Path, target_path: Union[Path, None] = None
)

Saves an ancillary filepath to the datastream's ancillary storage area.

NOTE: In most cases this function should not be used directly. Instead, prefer using the self.uploadable_dir(*args, **kwargs) method.

Parameters:

Name Type Description Default
filepath Path

The path to the ancillary file. This is expected to have a standardized filename and should be saved under the ancillary storage path.

required
target_path str

The path to where the data should be saved.

None
Source code in tsdat/io/storage.py
def save_ancillary_file(
    self, filepath: Path, target_path: Union[Path, None] = None
):
    """Saves an ancillary filepath to the datastream's ancillary storage area.

    NOTE: In most cases this function should not be used directly. Instead, prefer
    using the ``self.uploadable_dir(*args, **kwargs)`` method.

    Args:
        filepath (Path): The path to the ancillary file. This is expected to have
            a standardized filename and should be saved under the ancillary storage
            path.
        target_path (str): The path to where the data should be saved.
    """
    target_path.parent.mkdir(exist_ok=True, parents=True)
    saved_filepath = shutil.copy2(filepath, target_path)
    logger.info("Saved ancillary file to: %s", saved_filepath)
save_data #
save_data(dataset: xr.Dataset, **kwargs: Any)

Saves a dataset to the storage area.

At a minimum, the dataset must have a 'datastream' global attribute and must have a 'time' variable with a np.datetime64-like data type.

Parameters:

Name Type Description Default
dataset Dataset

The dataset to save.

required

Source code in tsdat/io/storage.py
def save_data(self, dataset: xr.Dataset, **kwargs: Any):
    """-----------------------------------------------------------------------------
    Saves a dataset to the storage area.

    At a minimum, the dataset must have a 'datastream' global attribute and must
    have a 'time' variable with a np.datetime64-like data type.

    Args:
        dataset (xr.Dataset): The dataset to save.

    -----------------------------------------------------------------------------"""
    datastream = dataset.attrs["datastream"]
    filepath = self._get_dataset_filepath(dataset)
    filepath.parent.mkdir(exist_ok=True, parents=True)
    self.handler.writer.write(dataset, filepath)
    logger.info("Saved %s dataset to %s", datastream, filepath.as_posix())

FileSystemS3 #

Bases: FileSystem

Handles data storage and retrieval for file-based data in an AWS S3 bucket.

Parameters:

Name Type Description Default
parameters Parameters

File-system and AWS-specific parameters, such as the path to where files should be saved or additional keyword arguments to specific functions used by the storage API. See the FileSystemS3.Parameters class for more details.

required
handler FileHandler

The FileHandler class that should be used to handle data I/O within the storage API.

required

Attributes#

parameters class-attribute instance-attribute #
parameters: Parameters = Field(default_factory=Parameters)

Classes#

Parameters #

Bases: Parameters

Additional parameters for S3 storage.

Note that all settings and parameters from Filesystem.Parameters are also supported by FileSystemS3.Parameters.

Attributes#
bucket class-attribute instance-attribute #
bucket: str = Field(
    "tsdat-storage", env="TSDAT_S3_BUCKET_NAME"
)

The name of the S3 bucket that the storage class should use.

Note

This parameter can also be set via the TSDAT_S3_BUCKET_NAME environment variable.

region class-attribute instance-attribute #
region: str = Field('us-west-2', env='AWS_DEFAULT_REGION')

The AWS region of the storage bucket.

Note

This parameter can also be set via the AWS_DEFAULT_REGION environment variable.

Defaults to us-west-2.

Functions#

last_modified #
last_modified(datastream: str) -> Union[datetime, None]

Returns the datetime of the last modification to the datastream's storage area.

Source code in tsdat/io/storage.py
def last_modified(self, datastream: str) -> Union[datetime, None]:
    """Returns the datetime of the last modification to the datastream's storage area."""
    substitutions = get_fields_from_datastream(datastream)
    substitutions["datastream"] = datastream
    prefix = self._get_data_directory(substitutions).as_posix()

    last_modified = None
    for obj in self._bucket.objects.filter(Prefix=prefix):
        if obj.last_modified is not None:
            last_modified = (
                obj.last_modified.astimezone(timezone.utc)
                if last_modified is None
                else max(last_modified, obj.last_modified)
            )
    return last_modified
modified_since #
modified_since(
    datastream: str, last_modified: datetime
) -> List[datetime]

Returns the data times of all files modified after the specified datetime.

Source code in tsdat/io/storage.py
def modified_since(
    self, datastream: str, last_modified: datetime
) -> List[datetime]:
    """Returns the data times of all files modified after the specified datetime."""
    substitutions = get_fields_from_datastream(datastream)
    substitutions["datastream"] = datastream
    prefix = self._get_data_directory(substitutions).as_posix()
    return [
        datetime.strptime(get_file_datetime_str(obj.key), "%Y%m%d.%H%M%S")
        for obj in self._bucket.objects.filter(Prefix=prefix)
        if obj.last_modified is not None
        and obj.last_modified.astimezone(timezone.utc) > last_modified
    ]
save_ancillary_file #
save_ancillary_file(
    filepath: Path, target_path: Union[Path, None] = None
)

Saves an ancillary filepath to the datastream's ancillary storage area.

NOTE: In most cases this function should not be used directly. Instead, prefer using the self.uploadable_dir(*args, **kwargs) method.

Parameters:

Name Type Description Default
filepath Path

The path to the ancillary file. This is expected to have a standardized filename and should be saved under the ancillary storage path.

required
target_path str

The path to where the data should be saved.

None
Source code in tsdat/io/storage.py
def save_ancillary_file(
    self, filepath: Path, target_path: Union[Path, None] = None
):
    """Saves an ancillary filepath to the datastream's ancillary storage area.

    NOTE: In most cases this function should not be used directly. Instead, prefer
    using the ``self.uploadable_dir(*args, **kwargs)`` method.

    Args:
        filepath (Path): The path to the ancillary file. This is expected to have
            a standardized filename and should be saved under the ancillary storage
            path.
        target_path (str): The path to where the data should be saved.
    """
    self._bucket.upload_file(Filename=str(filepath), Key=target_path.as_posix())
    logger.info("Saved ancillary file to: %s", target_path.as_posix())
save_data #
save_data(dataset: xr.Dataset, **kwargs: Any)
Source code in tsdat/io/storage.py
def save_data(self, dataset: xr.Dataset, **kwargs: Any):
    datastream: str = dataset.attrs["datastream"]
    standard_fpath = self._get_dataset_filepath(dataset)
    with tempfile.TemporaryDirectory() as tmp_dir:
        tmp_filepath = Path(tmp_dir) / standard_fpath.name
        self.handler.writer.write(dataset, tmp_filepath)
        for filepath in Path(tmp_dir).glob("**/*"):
            if filepath.is_dir():
                continue
            s3_key = (
                standard_fpath.parent / filepath.relative_to(tmp_dir)
            ).as_posix()
            self._bucket.upload_file(Filename=filepath.as_posix(), Key=s3_key)
            logger.info(
                "Saved %s data file to s3://%s/%s",
                datastream,
                self.parameters.bucket,
                s3_key,
            )

ZarrLocalStorage #

Bases: FileSystem


Handles data storage and retrieval for zarr archives on a local filesystem.

Zarr is a special format that writes chunked data to a number of files underneath a given directory. This distribution of data into chunks and distinct files makes zarr an extremely well-suited format for quickly storing and retrieving large quantities of data.

Parameters:

Name Type Description Default
parameters Parameters

File-system specific parameters, such as the root path to where the Zarr archives should be saved, or additional keyword arguments to specific functions used by the storage API. See the Parameters class for more details.

required
handler ZarrHandler

The ZarrHandler class that should be used to handle data I/O within the storage API.

required

Attributes#

handler class-attribute instance-attribute #
handler: ZarrHandler = Field(default_factory=ZarrHandler)
parameters class-attribute instance-attribute #
parameters: Parameters = Field(default_factory=Parameters)

Classes#

Parameters #

Bases: Parameters

Attributes#
data_filename_template class-attribute instance-attribute #
data_filename_template: str = '{datastream}.{extension}'

Template string to use for data filenames.

Allows substitution of the following parameters using curly braces '{}':

  • ext: the file extension from the storage data handler
  • datastream from the dataset's global attributes
  • location_id from the dataset's global attributes
  • data_level from the dataset's global attributes
  • Any other global attribute that has a string or integer data type.
data_storage_path class-attribute instance-attribute #
data_storage_path: Path = Path('data/{location_id}')

The directory structure under storage_root where ancillary files are saved.

Allows substitution of the following parameters using curly braces '{}':

  • storage_root: the value from the storage_root parameter.
  • datastream: the datastream as defined in the dataset config file.
  • location_id: the location_id as defined in the dataset config file.
  • data_level: the data_level as defined in the dataset config file.
  • year: the year of the first timestamp in the file.
  • month: the month of the first timestamp in the file.
  • day: the day of the first timestamp in the file.
  • extension: the file extension used by the output file writer.

Functions#