Skip to content

handlers

Classes#

DataQualityError #

Bases: ValueError

Raised when the quality of a variable indicates a fatal error has occurred. Manual review of the data in question is often recommended in this case.

FailPipeline #

Bases: QualityHandler


Raises a DataQualityError, halting the pipeline, if the data quality are sufficiently bad. This usually indicates that a manual inspection of the data is recommended.

Raises:

Type Description
DataQualityError

DataQualityError


Attributes#

parameters class-attribute instance-attribute #
parameters: Parameters = Parameters()

Classes#

Parameters #

Bases: BaseModel

Attributes#
context class-attribute instance-attribute #
context: str = ''

Additional context set by users that ends up in the traceback message.

display_limit class-attribute instance-attribute #
display_limit: int = 5
tolerance class-attribute instance-attribute #
tolerance: float = 0

Tolerance for the number of allowable failures as the ratio of allowable failures to the total number of values checked. Defaults to 0, meaning that any failed checks will result in a DataQualityError being raised.

Functions#

run #
run(
    dataset: xr.Dataset,
    variable_name: str,
    failures: NDArray[np.bool_],
)
Source code in tsdat/qc/handlers.py
def run(self, dataset: xr.Dataset, variable_name: str, failures: NDArray[np.bool_]):
    if self._exceeds_tolerance(failures):  # default failure tolerance is 0%
        msg = (
            f"Quality results for variable '{variable_name}' indicate a fatal error"
            " has occurred. Manual review of the data is recommended.\n"
        )

        # Show % failed if tolerance is set
        fail_rate: float = np.average(failures)  # type: ignore
        msg += (
            f" {np.count_nonzero(failures)} / {failures.size} values failed"  # type: ignore
            f" ({100*fail_rate:.2f}%), exceeding the allowable threshold of"
            f" {100*self.parameters.tolerance}%.\n"
        )

        # Want to show the first few indexes where the test failed and also the
        # corresponding data values. Careful to not show too many, otherwise the
        # message will be bloated and hard to read. Note that np.nonzero(failures)
        # returns a hard-to-read tuple of indexes, so we modify that to be easier to
        # read and show the first self.parameters.display_limit # of errors.
        failed_where = np.nonzero(failures)  # type: ignore
        failed_values = list(dataset[variable_name].values[failed_where][: self.parameters.display_limit])  # type: ignore
        failed_indexes: Union[List[int], List[List[int]]]
        if len(failed_where) == 1:  # 1D
            failed_indexes = list(failed_where[0][: self.parameters.display_limit])
        else:
            failed_indexes = [
                [dim_idxs[i] for dim_idxs in failed_where]
                for i in range(
                    min(self.parameters.display_limit, len(failed_where[0]))
                )
            ]
        msg += (
            f"The first failures occur at indexes: {failed_indexes}. The"
            f" corresponding values are: {failed_values}.\n"
        )

        raise DataQualityError(msg)
    return dataset

RecordQualityResults #

Bases: QualityHandler


Records the results of the quality check in an ancillary qc variable. Creates the ancillary qc variable if one does not already exist.


Attributes#

parameters instance-attribute #
parameters: Parameters

Classes#

Parameters #

Bases: BaseModel

Attributes#
assessment instance-attribute #
assessment: Literal['bad', 'indeterminate']

Indicates the quality of the data if the test results indicate a failure.

bit class-attribute instance-attribute #
bit: Optional[int] = None

DEPRECATED

The bit number (e.g., 1, 2, 3, ...) used to indicate if the check passed.

The quality results are bitpacked into an integer array to preserve space. For example, if 'check #0' uses bit 0 and fails, and 'check #1' uses bit 1 and fails then the resulting value on the qc variable would be 2^(0) + 2^(1) = 3. If we had a third check it would be 2^(0) + 2^(1) + 2^(2) = 7.

meaning instance-attribute #
meaning: str

A string that describes the test applied.

Functions#
deprecate_bit_parameter #
deprecate_bit_parameter(
    values: Dict[str, Any]
) -> Dict[str, Any]
Source code in tsdat/qc/handlers.py
@root_validator(pre=True)
def deprecate_bit_parameter(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    if "bit" in values:
        logger.warning("The 'bit' argument is deprecated, please remove it.")
    return values
to_lower #
to_lower(assessment: Any) -> str
Source code in tsdat/qc/handlers.py
@validator("assessment", pre=True)
def to_lower(cls, assessment: Any) -> str:
    if isinstance(assessment, str):
        return assessment.lower()
    raise ValueError(
        f"assessment must be 'bad' or 'indeterminate', not {assessment}"
    )

Functions#

get_next_bit_number #
get_next_bit_number(
    dataset: xr.Dataset, variable_name: str
) -> int
Source code in tsdat/qc/handlers.py
def get_next_bit_number(self, dataset: xr.Dataset, variable_name: str) -> int:
    if (qc_var := dataset.get(f"qc_{variable_name}")) is None:
        return 1
    masks = qc_var.attrs.get("flag_masks")
    if not isinstance(masks, list):
        raise ValueError(
            f"QC Variable {qc_var.name} is not standardized. Expected 'flag_masks'"
            f" attribute to be like [1, 2, ...], but found '{masks}'"
        )
    return len(masks) + 1  # type: ignore
run #
run(
    dataset: xr.Dataset,
    variable_name: str,
    failures: NDArray[np.bool_],
) -> xr.Dataset
Source code in tsdat/qc/handlers.py
def run(
    self, dataset: xr.Dataset, variable_name: str, failures: NDArray[np.bool_]
) -> xr.Dataset:
    dataset.qcfilter.add_test(
        variable_name,
        index=failures if failures.any() else None,
        test_number=self.get_next_bit_number(dataset, variable_name),
        test_meaning=self.parameters.meaning,
        test_assessment=self.parameters.assessment,
    )
    return dataset

RemoveFailedValues #

Bases: QualityHandler


Replaces all failed values with the variable's _FillValue. If the variable does not have a _FillValue attribute then nan is used instead


Functions#

run #
run(
    dataset: xr.Dataset,
    variable_name: str,
    failures: NDArray[np.bool_],
) -> xr.Dataset
Source code in tsdat/qc/handlers.py
def run(
    self, dataset: xr.Dataset, variable_name: str, failures: NDArray[np.bool_]
) -> xr.Dataset:
    if failures.any():
        if variable_name in dataset.dims:
            mask = xr.DataArray(
                failures, coords={variable_name: dataset[variable_name]}
            )
            dataset = dataset.where(~mask, drop=True)
        else:
            fill_value = dataset[variable_name].attrs.get("_FillValue", None)
            dataset[variable_name] = dataset[variable_name].where(~failures, fill_value)  # type: ignore
    return dataset

SortDatasetByCoordinate #

Bases: QualityHandler


Sorts the dataset by the failed variable, if there are any failures.


Attributes#

parameters class-attribute instance-attribute #
parameters: Parameters = Parameters()

Classes#

Parameters #

Bases: BaseModel

Attributes#
ascending class-attribute instance-attribute #
ascending: bool = True

Whether to sort the dataset in ascending order. Defaults to True.

correction class-attribute instance-attribute #
correction: str = (
    "Coordinate data was sorted in order to ensure monotonicity."
)

Functions#

run #
run(
    dataset: xr.Dataset,
    variable_name: str,
    failures: NDArray[np.bool_],
) -> xr.Dataset
Source code in tsdat/qc/handlers.py
def run(
    self, dataset: xr.Dataset, variable_name: str, failures: NDArray[np.bool_]
) -> xr.Dataset:
    if failures.any():
        dataset = dataset.sortby(variable_name, ascending=self.parameters.ascending)  # type: ignore
        record_corrections_applied(
            dataset, variable_name, self.parameters.correction
        )
    return dataset

Functions#