Skip to content

storage_retriever

Classes:

Name Description
StorageRetriever

Retriever API for pulling input data from the storage area.

Attributes#

Classes#

StorageRetriever #

Bases: Retriever

Retriever API for pulling input data from the storage area.

Classes:

Name Description
TransParameters

Methods:

Name Description
retrieve

Attributes:

Name Type Description
parameters Optional[TransParameters]

Attributes#

parameters class-attribute instance-attribute #
parameters: Optional[TransParameters] = None

Classes#

TransParameters #

Bases: BaseModel

Attributes:

Name Type Description
fetch_params Optional[GlobalFetchParams]
trans_params Optional[GlobalARMTransformParams]
Attributes#
fetch_params class-attribute instance-attribute #
fetch_params: Optional[GlobalFetchParams] = Field(
    default=None, alias="fetch_parameters"
)
trans_params class-attribute instance-attribute #
trans_params: Optional[GlobalARMTransformParams] = Field(
    default=None, alias="transformation_parameters"
)

Functions#

retrieve #
retrieve(
    input_keys: List[str],
    dataset_config: DatasetConfig,
    storage: Optional[Storage] = None,
    input_data_hook: Optional[
        Callable[
            [Dict[str, xr.Dataset]], Dict[str, xr.Dataset]
        ]
    ] = None,
    **kwargs: Any
) -> xr.Dataset

Retrieves input data from the storage area.

Note that each input_key is expected to be formatted according to the following format:

"--key1 value1 --key2 value2",

e.g.,

"--datastream sgp.met.b0 --start 20230801 --end 20230901"
"--datastream sgp.met.b0 --start 20230801 --end 20230901 --location_id sgp --data_level b0"

This format allows the retriever to pull datastream data from the Storage API for the desired dates for each desired input source.

Parameters:

Name Type Description Default
input_keys List[str]

A list of input keys formatted as described above.

required
dataset_config DatasetConfig

The output dataset configuration.

required
storage Storage

Instance of a Storage class used to fetch saved data.

None

Returns:

Type Description
Dataset

xr.Dataset: The retrieved dataset


Source code in tsdat/io/retrievers/storage_retriever.py
def retrieve(
    self,
    input_keys: List[str],
    dataset_config: DatasetConfig,
    storage: Optional[Storage] = None,
    input_data_hook: Optional[
        Callable[[Dict[str, xr.Dataset]], Dict[str, xr.Dataset]]
    ] = None,
    **kwargs: Any,
) -> xr.Dataset:
    """------------------------------------------------------------------------------------
    Retrieves input data from the storage area.

    Note that each input_key is expected to be formatted according to the following
    format:

    ```python
    "--key1 value1 --key2 value2",
    ```

    e.g.,

    ```python
    "--datastream sgp.met.b0 --start 20230801 --end 20230901"
    "--datastream sgp.met.b0 --start 20230801 --end 20230901 --location_id sgp --data_level b0"
    ```

    This format allows the retriever to pull datastream data from the Storage API
    for the desired dates for each desired input source.

    Args:
        input_keys (List[str]): A list of input keys formatted as described above.
        dataset_config (DatasetConfig): The output dataset configuration.
        storage (Storage): Instance of a Storage class used to fetch saved data.

    Returns:
        xr.Dataset: The retrieved dataset

    ------------------------------------------------------------------------------------
    """
    assert storage is not None, "Missing required 'storage' parameter."

    storage_input_keys = [StorageRetrieverInput(key) for key in input_keys]

    input_data = self.__fetch_inputs(storage_input_keys, storage)

    if input_data_hook is not None:
        modded_input_data = input_data_hook(input_data)
        if modded_input_data is not None:
            input_data = modded_input_data

    # Perform coord/variable retrieval
    retrieved_data, retrieval_selections = perform_data_retrieval(
        input_data=input_data,
        coord_rules=self.coords,  # type: ignore
        data_var_rules=self.data_vars,  # type: ignore
    )

    # Ensure selected coords are indexed by themselves
    for name, coord_data in retrieved_data.coords.items():
        if coord_data.equals(xr.DataArray([])):
            continue
        new_coord = xr.DataArray(
            data=coord_data.data,
            coords={name: coord_data.data},
            dims=(name,),
            attrs=coord_data.attrs,
            name=name,
        )
        retrieved_data.coords[name] = new_coord
    # Q: Do data_vars need to be renamed or reindexed before data converters run?

    # Run data converters on coordinates, then on data variables
    for name, coord_def in retrieval_selections.coords.items():
        for converter in coord_def.data_converters:
            coord_data = retrieved_data.coords[name]
            data = converter.convert(
                data=coord_data,
                variable_name=name,
                dataset_config=dataset_config,
                retrieved_dataset=retrieved_data,
                time_span=(storage_input_keys[0].start, storage_input_keys[0].end),
                input_dataset=input_data.get(coord_def.source),
                retriever=self,
                input_key=coord_def.source,
            )
            if data is not None:
                retrieved_data.coords[name] = data

    for name, var_def in retrieval_selections.data_vars.items():
        if not retrieved_data.data_vars[name].size:
            retrieved_data.data_vars.pop(name)
            continue
        for converter in var_def.data_converters:
            var_data = retrieved_data.data_vars[name]
            data = converter.convert(
                data=var_data,
                variable_name=name,
                dataset_config=dataset_config,
                retrieved_dataset=retrieved_data,
                retriever=self,
                input_dataset=input_data.get(var_def.source),
                input_key=var_def.source,
            )
            if data is not None:
                retrieved_data.data_vars[name] = data

    # Construct the retrieved dataset structure
    # TODO: validate dimension alignment
    retrieved_dataset = xr.Dataset(
        coords=retrieved_data.coords,
        data_vars=retrieved_data.data_vars,
    )

    # Double check that dataset is trimmed to start and end time
    # Need to do this if adi_py is not used and more than one
    # files are pulled in.
    retrieved_dataset = self.__trim_dataset(retrieved_dataset, storage_input_keys)

    # Fix the dtype encoding
    for var_name, var_data in retrieved_dataset.data_vars.items():
        output_var_cfg = dataset_config.data_vars.get(var_name)
        if output_var_cfg is not None:
            dtype = output_var_cfg.dtype
            retrieved_dataset[var_name] = var_data.astype(dtype)
            var_data.encoding["dtype"] = dtype

    return retrieved_dataset

Functions#