Skip to content

retrievers

Attributes#

OutputVarName module-attribute #

OutputVarName = str

Classes#

DefaultRetriever #

Bases: Retriever


Default API for retrieving data from one or more input sources.

Reads data from one or more inputs, renames coordinates and data variables according to retrieval and dataset configurations, and applies registered DataConverters to retrieved data.

Parameters:

Name Type Description Default
readers Dict[Pattern[str], DataReader]

A mapping of patterns to DataReaders that the retriever uses to determine which DataReader to use for reading any given input key.

required
coords Dict[str, Dict[Pattern[str], VariableRetriever]]

A dictionary mapping output coordinate variable names to rules for how they should be retrieved.

required
data_vars Dict[str, Dict[Pattern[str], VariableRetriever]]

A dictionary mapping output data variable names to rules for how they should be retrieved.

required

Attributes#

parameters class-attribute instance-attribute #
parameters: Parameters = Parameters()
readers instance-attribute #
readers: Dict[Pattern, DataReader]

A dictionary of DataReaders that should be used to read data provided an input key.

Classes#

Parameters #

Bases: BaseModel

Attributes#
merge_kwargs class-attribute instance-attribute #
merge_kwargs: Dict[str, Any] = {}

Keyword arguments passed to xr.merge(). This is only relevant if multiple input keys are provided simultaneously, or if any registered DataReader objects could return a dataset mapping instead of a single dataset.

Functions#

retrieve #
retrieve(
    input_keys: List[str],
    dataset_config: DatasetConfig,
    **kwargs: Any
) -> xr.Dataset
Source code in tsdat/io/retrievers.py
def retrieve(
    self, input_keys: List[str], dataset_config: DatasetConfig, **kwargs: Any
) -> xr.Dataset:
    raw_mapping = self._get_raw_mapping(input_keys)
    dataset_mapping: Dict[str, xr.Dataset] = {}
    for key, dataset in raw_mapping.items():
        input_config = InputKeyRetrievalRules(
            input_key=key,
            coord_rules=self.coords,  # type: ignore
            data_var_rules=self.data_vars,  # type: ignore
        )
        dataset = _rename_variables(dataset, input_config)
        dataset = _reindex_dataset_coords(dataset, dataset_config, input_config)
        dataset = _run_data_converters(dataset, dataset_config, input_config)
        dataset_mapping[key] = dataset
    output_dataset = self._merge_raw_mapping(dataset_mapping)
    return output_dataset

GlobalARMTransformParams #

Bases: BaseModel

Attributes#

alignment instance-attribute #
alignment: Dict[
    Pattern, Dict[str, Literal["LEFT", "RIGHT", "CENTER"]]
]
dim_range class-attribute instance-attribute #
dim_range: Dict[Pattern, Dict[str, str]] = Field(
    ..., alias="range"
)
width instance-attribute #
width: Dict[Pattern, Dict[str, str]]

Functions#

default_pattern #
default_pattern(
    d: Dict[Any, Any]
) -> Dict[Pattern[str], Dict[str, str]]
Source code in tsdat/io/retrievers.py
@validator("alignment", "dim_range", "width", pre=True)
def default_pattern(cls, d: Dict[Any, Any]) -> Dict[Pattern[str], Dict[str, str]]:
    if not d:
        return {}
    pattern_dict: Dict[Pattern[str], Dict[str, str]] = defaultdict(dict)
    for k, v in d.items():
        if isinstance(v, dict):
            pattern_dict[re.compile(k)] = v
        else:
            pattern_dict[re.compile(r".*")][k] = v
    return pattern_dict
select_parameters #
select_parameters(
    input_key: str,
) -> Dict[str, Dict[str, Any]]
Source code in tsdat/io/retrievers.py
def select_parameters(self, input_key: str) -> Dict[str, Dict[str, Any]]:
    selected_params: Dict[str, Dict[str, Any]] = {
        "alignment": {},
        "range": {},
        "width": {},
    }
    for pattern, params in self.alignment.items():
        if pattern.match(input_key) is not None:
            selected_params["alignment"] = params.copy()
            break

    for pattern, params in self.dim_range.items():
        if pattern.match(input_key) is not None:
            selected_params["range"] = params.copy()
            break

    for pattern, params in self.width.items():
        if pattern.match(input_key) is not None:
            selected_params["width"] = params.copy()
            break

    return selected_params

GlobalFetchParams #

Bases: BaseModel

Attributes#

time_padding instance-attribute #
time_padding: Optional[str]

How far in time to look ahead (+), behind (-), or both to search for files.

Functions#

default_to_seconds #
default_to_seconds(d: str) -> str
Source code in tsdat/io/retrievers.py
@validator("time_padding", pre=True)
def default_to_seconds(cls, d: str) -> str:
    if not d:
        return ""
    elif d[-1].isnumeric():
        return d + "s"
    else:
        return d
get_direction #
get_direction(d: str) -> int
Source code in tsdat/io/retrievers.py
def get_direction(self, d: str) -> int:
    if "+" in d:
        return 1, d.replace("+", "")
    elif "-" in d:
        return -1, d.replace("-", "")
    else:
        return 0, d

InputKeyRetrievalRules #

InputKeyRetrievalRules(
    input_key: InputKey,
    coord_rules: Dict[
        VarName, Dict[Pattern[Any], RetrievedVariable]
    ],
    data_var_rules: Dict[
        VarName, Dict[Pattern[Any], RetrievedVariable]
    ],
)

Gathers variable retrieval rules for the given input key.

Source code in tsdat/io/retrievers.py
def __init__(
    self,
    input_key: InputKey,
    coord_rules: Dict[VarName, Dict[Pattern[Any], RetrievedVariable]],
    data_var_rules: Dict[VarName, Dict[Pattern[Any], RetrievedVariable]],
):
    self.input_key = input_key
    self.coords: Dict[VarName, RetrievedVariable] = {}
    self.data_vars: Dict[VarName, RetrievedVariable] = {}

    for name, retriever_dict in coord_rules.items():
        for pattern, variable_retriever in retriever_dict.items():
            if pattern.match(input_key):
                self.coords[name] = variable_retriever
            break

    for name, retriever_dict in data_var_rules.items():
        for pattern, variable_retriever in retriever_dict.items():
            if pattern.match(input_key):
                self.data_vars[name] = variable_retriever
            break

Attributes#

coords instance-attribute #
coords: Dict[VarName, RetrievedVariable] = {}
data_vars instance-attribute #
data_vars: Dict[VarName, RetrievedVariable] = {}
input_key instance-attribute #
input_key = input_key

StorageRetriever #

Bases: Retriever

Retriever API for pulling input data from the storage area.

Attributes#

parameters class-attribute instance-attribute #
parameters: Optional[TransParameters] = None

Classes#

TransParameters #

Bases: BaseModel

Attributes#
fetch_params class-attribute instance-attribute #
fetch_params: Optional[GlobalFetchParams] = Field(
    default=None, alias="fetch_parameters"
)
trans_params class-attribute instance-attribute #
trans_params: Optional[GlobalARMTransformParams] = Field(
    default=None, alias="transformation_parameters"
)

Functions#

retrieve #
retrieve(
    input_keys: List[str],
    dataset_config: DatasetConfig,
    storage: Optional[Storage] = None,
    input_data_hook: Optional[
        Callable[
            [Dict[str, xr.Dataset]], Dict[str, xr.Dataset]
        ]
    ] = None,
    **kwargs: Any
) -> xr.Dataset

Retrieves input data from the storage area.

Note that each input_key is expected to be formatted according to the following format:

"--key1 value1 --key2 value2",

e.g.,

"--datastream sgp.met.b0 --start 20230801 --end 20230901"
"--datastream sgp.met.b0 --start 20230801 --end 20230901 --location_id sgp --data_level b0"

This format allows the retriever to pull datastream data from the Storage API for the desired dates for each desired input source.

Parameters:

Name Type Description Default
input_keys List[str]

A list of input keys formatted as described above.

required
dataset_config DatasetConfig

The output dataset configuration.

required
storage Storage

Instance of a Storage class used to fetch saved data.

None

Returns:

Type Description
Dataset

xr.Dataset: The retrieved dataset


Source code in tsdat/io/retrievers.py
def retrieve(
    self,
    input_keys: List[str],
    dataset_config: DatasetConfig,
    storage: Optional[Storage] = None,
    input_data_hook: Optional[
        Callable[[Dict[str, xr.Dataset]], Dict[str, xr.Dataset]]
    ] = None,
    **kwargs: Any,
) -> xr.Dataset:
    """------------------------------------------------------------------------------------
    Retrieves input data from the storage area.

    Note that each input_key is expected to be formatted according to the following
    format:

    ```python
    "--key1 value1 --key2 value2",
    ```

    e.g.,

    ```python
    "--datastream sgp.met.b0 --start 20230801 --end 20230901"
    "--datastream sgp.met.b0 --start 20230801 --end 20230901 --location_id sgp --data_level b0"
    ```

    This format allows the retriever to pull datastream data from the Storage API
    for the desired dates for each desired input source.

    Args:
        input_keys (List[str]): A list of input keys formatted as described above.
        dataset_config (DatasetConfig): The output dataset configuration.
        storage (Storage): Instance of a Storage class used to fetch saved data.

    Returns:
        xr.Dataset: The retrieved dataset

    ------------------------------------------------------------------------------------
    """
    assert storage is not None, "Missing required 'storage' parameter."

    storage_input_keys = [StorageRetrieverInput(key) for key in input_keys]

    input_data = self.__fetch_inputs(storage_input_keys, storage)

    if input_data_hook is not None:
        modded_input_data = input_data_hook(input_data)
        if modded_input_data is not None:
            input_data = modded_input_data

    # Perform coord/variable retrieval
    retrieved_data, retrieval_selections = perform_data_retrieval(
        input_data=input_data,
        coord_rules=self.coords,  # type: ignore
        data_var_rules=self.data_vars,  # type: ignore
    )

    # Ensure selected coords are indexed by themselves
    for name, coord_data in retrieved_data.coords.items():
        if coord_data.equals(xr.DataArray([])):
            continue
        new_coord = xr.DataArray(
            data=coord_data.data,
            coords={name: coord_data.data},
            dims=(name,),
            attrs=coord_data.attrs,
            name=name,
        )
        retrieved_data.coords[name] = new_coord
    # Q: Do data_vars need to be renamed or reindexed before data converters run?

    # Run data converters on coordinates, then on data variables
    for name, coord_def in retrieval_selections.coords.items():
        for converter in coord_def.data_converters:
            coord_data = retrieved_data.coords[name]
            data = converter.convert(
                data=coord_data,
                variable_name=name,
                dataset_config=dataset_config,
                retrieved_dataset=retrieved_data,
                time_span=(storage_input_keys[0].start, storage_input_keys[0].end),
                input_dataset=input_data.get(coord_def.source),
                retriever=self,
                input_key=coord_def.source,
            )
            if data is not None:
                retrieved_data.coords[name] = data

    for name, var_def in retrieval_selections.data_vars.items():
        for converter in var_def.data_converters:
            var_data = retrieved_data.data_vars[name]
            data = converter.convert(
                data=var_data,
                variable_name=name,
                dataset_config=dataset_config,
                retrieved_dataset=retrieved_data,
                retriever=self,
                input_dataset=input_data.get(var_def.source),
                input_key=var_def.source,
            )
            if data is not None:
                retrieved_data.data_vars[name] = data

    # Construct the retrieved dataset structure
    # TODO: validate dimension alignment
    retrieved_dataset = xr.Dataset(
        coords=retrieved_data.coords,
        data_vars=retrieved_data.data_vars,
    )

    # Double check that dataset is trimmed to start and end time
    # Need to do this if adi_py is not used and more than one
    # files are pulled in.
    retrieved_dataset = self.__trim_dataset(retrieved_dataset, storage_input_keys)

    # Fix the dtype encoding
    for var_name, var_data in retrieved_dataset.data_vars.items():
        output_var_cfg = dataset_config.data_vars.get(var_name)
        if output_var_cfg is not None:
            dtype = output_var_cfg.dtype
            retrieved_dataset[var_name] = var_data.astype(dtype)
            var_data.encoding["dtype"] = dtype

    return retrieved_dataset

StorageRetrieverInput #

StorageRetrieverInput(input_key: str)

Returns an object representation of an input storage key.

Input storage keys should be formatted like:

"--datastream sgp.met.b0 --start 20230801 --end 20230901"
"--datastream sgp.met.b0 --start 20230801 --end 20230901 --location_id sgp --data_level b0"
Source code in tsdat/io/retrievers.py
def __init__(self, input_key: str):
    kwargs: Dict[str, str] = {}

    if len(input_key.split("::")) == 3:
        logger.warning(
            "Using old Storage input key format (datastream::start::end)."
        )
        datastream, _start, _end = input_key.split("::")
        kwargs["datastream"] = datastream
        kwargs["start"] = _start
        kwargs["end"] = _end
    else:
        args = shlex.split(input_key)
        key = ""
        for arg in args:
            if arg.startswith("-"):
                key = arg.lstrip("-")
                kwargs[key] = ""
            elif key in kwargs:
                kwargs[key] = arg
                key = ""
            else:
                raise ValueError(
                    "Bad storage retriever input key. Expected format like"
                    f" '--key1 value1 --key2 value2 ...', got '{input_key}'."
                )

    self.input_key = input_key
    self.datastream = kwargs.pop("datastream")
    self._start = kwargs.pop("start")
    self._end = kwargs.pop("end")

    start_format = "%Y%m%d.%H%M%S" if "." in self._start else "%Y%m%d"
    end_format = "%Y%m%d.%H%M%S" if "." in self._end else "%Y%m%d"
    self.start = datetime.strptime(self._start, start_format)
    self.end = datetime.strptime(self._end, end_format)

    self.kwargs = kwargs

Attributes#

datastream instance-attribute #
datastream = pop('datastream')
end instance-attribute #
end = strptime(_end, end_format)
input_key instance-attribute #
input_key = input_key
kwargs instance-attribute #
kwargs = kwargs
start instance-attribute #
start = strptime(_start, start_format)

Functions#

perform_data_retrieval #

perform_data_retrieval(
    input_data: Dict[InputKey, xr.Dataset],
    coord_rules: Dict[
        VarName, Dict[Pattern[Any], RetrievedVariable]
    ],
    data_var_rules: Dict[
        VarName, Dict[Pattern[Any], RetrievedVariable]
    ],
) -> Tuple[RetrievedDataset, RetrievalRuleSelections]
Source code in tsdat/io/retrievers.py
def perform_data_retrieval(
    input_data: Dict[InputKey, xr.Dataset],
    coord_rules: Dict[VarName, Dict[Pattern[Any], RetrievedVariable]],
    data_var_rules: Dict[VarName, Dict[Pattern[Any], RetrievedVariable]],
) -> Tuple[RetrievedDataset, RetrievalRuleSelections]:
    # TODO: Also retrieve QC and Bounds variables -- possibly in ancillary structure?

    # Rule selections
    selected_coord_rules: Dict[VarName, RetrievedVariable] = {}
    selected_data_var_rules: Dict[VarName, RetrievedVariable] = {}

    # Retrieved dataset
    coord_data: Dict[VarName, xr.DataArray] = {}
    data_var_data: Dict[VarName, xr.DataArray] = {}

    # Retrieve coordinates
    for name, retriever_dict in coord_rules.items():
        for pattern, variable_retriever in retriever_dict.items():
            if name in selected_coord_rules:  # already matched
                break
            for input_key, dataset in input_data.items():
                if pattern.match(input_key):
                    logger.info(
                        "Coordinate '%s' retrieved from '%s': '%s'",
                        name,
                        input_key,
                        variable_retriever.name,
                    )
                    coord_data[name] = dataset.get(
                        variable_retriever.name, xr.DataArray([])
                    )
                    if not coord_data[name].equals(xr.DataArray([])):
                        variable_retriever.source = input_key
                    selected_coord_rules[name] = variable_retriever
                    break
        if name not in selected_coord_rules:
            logger.warning("Could not retrieve coordinate '%s'.", name)

    # Retrieve data variables
    for name, retriever_dict in data_var_rules.items():
        for pattern, variable_retriever in retriever_dict.items():
            if name in selected_data_var_rules:  # already matched
                break
            for input_key, dataset in input_data.items():
                if pattern.match(input_key):
                    logger.info(
                        "Variable '%s' retrieved from '%s': '%s'",
                        name,
                        input_key,
                        variable_retriever.name,
                    )
                    data_var_data[name] = dataset.get(
                        variable_retriever.name, xr.DataArray([])
                    )
                    if data_var_data[name].equals(xr.DataArray([])):
                        logger.warning(
                            "Input key matched regex pattern but no matching variable"
                            " could be found in the input dataset. A value of"
                            " xr.DataArray([]) will be used instead.\n"
                            "\tVariable: %s\n"
                            "\tInput Variable: %s\n"
                            "\tPattern: %s\n"
                            "\tInput Key: %s\n",
                            name,
                            variable_retriever.name,
                            pattern.pattern,
                            input_key,
                        )
                    variable_retriever.source = input_key
                    selected_data_var_rules[name] = variable_retriever
                    break
        if name not in selected_data_var_rules:
            logger.warning("Could not retrieve variable '%s'.", name)

    return (
        RetrievedDataset(coords=coord_data, data_vars=data_var_data),
        RetrievalRuleSelections(
            coords=selected_coord_rules, data_vars=selected_data_var_rules
        ),
    )