Source code for tsdat.qc.checkers

import abc
import numpy as np
import xarray as xr
from typing import List, Optional, Dict, Union

from tsdat.config import QualityManagerDefinition
from tsdat.constants import ATTS
from tsdat.utils import DSUtil


[docs]class QualityChecker(abc.ABC): """Class containing the code to perform a single Quality Check on a Dataset variable. :param ds: The dataset the checker will be applied to :type ds: xr.Dataset :param previous_data: A dataset from the previous processing interval (i.e., file). This is used to check for consistency between files, such as for monitonic or delta checks when we need to check the previous value. :type previous_data: xr.Dataset :param definition: The quality manager definition as specified in the pipeline config file :type definition: QualityManagerDefinition :param parameters: A dictionary of checker-specific parameters specified in the pipeline config file. Defaults to {} :type parameters: dict, optional """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters: Union[Dict, None] = None, ): self.ds = ds self.previous_data = previous_data self.definition = definition self.params = parameters if parameters is not None else dict() @abc.abstractmethod
[docs] def run(self, variable_name: str) -> Optional[np.ndarray]: """Check a dataset's variable to see if it passes a quality check. These checks can be performed on the entire variable at one time by using xarray vectorized numerical operators. :param variable_name: The name of the variable to check :type variable_name: str :return: If the check was performed, return a ndarray of the same shape as the variable. Each value in the data array will be either True or False, depending upon the results of the check. True means the check failed. False means it succeeded. Note that we are using an np.ndarray instead of an xr.DataArray because the DataArray contains coordinate indexes which can sometimes get out of sync when performing np arithmectic vector operations. So it's easier to just use numpy arrays. If the check was skipped for some reason (i.e., it was not relevant given the current attributes defined for this dataset), then the run method should return None. :rtype: Optional[np.ndarray] """ pass
[docs]class CheckMissing(QualityChecker): """Checks if any values are assigned to _FillValue or 'NaN' (for non-time variables) or checks if values are assigned to 'NaT' (for time variables). Also, for non-time variables, checks if values are above or below valid_range, as this is considered missing as well. """
[docs] def run(self, variable_name: str) -> Optional[np.ndarray]: # If this is a time variable, we check for 'NaT' if self.ds[variable_name].data.dtype.type == np.datetime64: results_array = np.isnat(self.ds[variable_name].data) else: fill_value = DSUtil.get_fill_value(self.ds, variable_name) # If the variable has no _FillValue attribute, then # we select a default value to use if fill_value is None: fill_value = -9999 # Make sure fill value has same data type as the variable fill_value = np.array( fill_value, dtype=self.ds[variable_name].data.dtype.type ) # First check if any values are assigned to _FillValue results_array = np.equal(self.ds[variable_name].data, fill_value) # Then, if the value is numeric, we should also check if any values are assigned to NaN if self.ds[variable_name].data.dtype.type in ( type(0.0), np.float16, np.float32, np.float64, ): results_array |= np.isnan(self.ds[variable_name].data) return results_array
[docs]class CheckMin(QualityChecker): """Check that no values for the specified variable are less than a specified minimum threshold. The threshold value is an attribute set on the variable in question. The attribute name is specified in the quality checker definition in the pipeline config file by setting a param called 'key: ATTRIBUTE_NAME'. If the key parameter is not set or the variable does not possess the specified attribute, this check will be skipped. """
[docs] def run(self, variable_name: str) -> Optional[np.ndarray]: # Get the minimum value _min = self.ds[variable_name].attrs.get(self.params["key"], None) if isinstance(_min, List): _min = _min[0] # If no minimum value is available, then we just skip this check results_array = None if _min is not None: results_array = np.less(self.ds[variable_name].data, _min) return results_array
[docs]class CheckMax(QualityChecker): """Check that no values for the specified variable are greater than a specified maximum threshold. The threshold value is an attribute set on the variable in question. The attribute name is specified in the quality checker definition in the pipeline config file by setting a param called 'key: ATTRIBUTE_NAME'. If the key parameter is not set or the variable does not possess the specified attribute, this check will be skipped. """
[docs] def run(self, variable_name: str) -> Optional[np.ndarray]: # Get the maximum value _max = self.ds[variable_name].attrs.get(self.params["key"], None) if isinstance(_max, List): _max = _max[-1] # If no maximum value is available, then we just skip this check results_array = None if _max is not None: results_array = np.greater(self.ds[variable_name].data, _max) return results_array
[docs]class CheckValidMin(CheckMin): """Check that no values for the specified variable are less than the minimum vaue set by the 'valid_range' attribute. If the variable in question does not posess the 'valid_range' attribute, this check will be skipped. """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters, ): super().__init__(ds, previous_data, definition, parameters=parameters) self.params["key"] = "valid_range"
[docs]class CheckValidMax(CheckMax): """Check that no values for the specified variable are greater than the maximum vaue set by the 'valid_range' attribute. If the variable in question does not posess the 'valid_range' attribute, this check will be skipped. """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters, ): super().__init__(ds, previous_data, definition, parameters=parameters) self.params["key"] = "valid_range"
[docs]class CheckFailMin(CheckMin): """Check that no values for the specified variable are less than the minimum vaue set by the 'fail_range' attribute. If the variable in question does not posess the 'fail_range' attribute, this check will be skipped. """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters, ): super().__init__(ds, previous_data, definition, parameters=parameters) self.params["key"] = "fail_range"
[docs]class CheckFailMax(CheckMax): """Check that no values for the specified variable greater less than the maximum vaue set by the 'fail_range' attribute. If the variable in question does not posess the 'fail_range' attribute, this check will be skipped. """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters, ): super().__init__(ds, previous_data, definition, parameters=parameters) self.params["key"] = "fail_range"
[docs]class CheckWarnMin(CheckMin): """Check that no values for the specified variable are less than the minimum vaue set by the 'warn_range' attribute. If the variable in question does not posess the 'warn_range' attribute, this check will be skipped. """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters, ): super().__init__(ds, previous_data, definition, parameters=parameters) self.params["key"] = "warn_range"
[docs]class CheckWarnMax(CheckMax): """Check that no values for the specified variable are greater than the maximum vaue set by the 'warn_range' attribute. If the variable in question does not posess the 'warn_range' attribute, this check will be skipped. """ def __init__( self, ds: xr.Dataset, previous_data: xr.Dataset, definition: QualityManagerDefinition, parameters, ): super().__init__(ds, previous_data, definition, parameters=parameters) self.params["key"] = "warn_range"
[docs]class CheckValidDelta(QualityChecker): """Check that the difference between any two consecutive values is not greater than the threshold set by the 'valid_delta' attribute. If the variable in question does not posess the 'valid_delta' attribute, this check will be skipped. """
[docs] def run(self, variable_name: str) -> Optional[np.ndarray]: valid_delta = self.ds[variable_name].attrs.get(ATTS.VALID_DELTA, None) # If no valid_delta is available, then we just skip this definition results_array = None if valid_delta is not None: # We need to get the dim to diff on from the parameters # If dim is not specified, then we use the first dim for the variable dim = self.params.get("dim", None) if dim is None and len(self.ds[variable_name].dims) > 0: dim = self.ds[variable_name].dims[0] if dim is not None: # If previous data exists, then we must add the last row of # previous data as the first row of the variable's data array. # This is so that the diff function can compare the first value # of the file to make sure it is consistent with the previous file. # convert to np array variable_data = self.ds[variable_name].data axis = self.ds[variable_name].get_axis_num(dim) previous_row = None # Load the previous row from the other dataset if self.previous_data is not None: previous_variable_data = self.previous_data.get(variable_name, None) if previous_variable_data is not None: # convert to np array previous_variable_data = previous_variable_data.data # Get the last value from the first axis previous_row = previous_variable_data[-1] # Insert that value as the first value of the first axis variable_data = np.insert( variable_data, 0, previous_row, axis=axis ) # If the variable is a time variable, then we convert to nanoseconds before doing our check if self.ds[variable_name].data.dtype.type == np.datetime64: variable_data = DSUtil.datetime64_to_timestamp(variable_data) # Compute the difference between each two numbers and check if it exceeds valid_delta diff = np.absolute(np.diff(variable_data, axis=axis)) results_array = np.greater(diff, valid_delta) if previous_row is None: # This means our results array is missing one value for the first row, which is # not included in the diff computation. # We need to add False for the first row of results, since it won't fail # the check. first_row = np.zeros(results_array[0].size, dtype=bool) results_array = np.insert(results_array, 0, first_row, axis=axis) return results_array
[docs]class CheckMonotonic(QualityChecker): """Checks that all values for the specified variable are either strictly increasing or strictly decreasing. """
[docs] def run(self, variable_name: str) -> Optional[np.ndarray]: results_array = None # We need to get the dim to diff on from the parameters # If dim is not specified, then we use the first dim for the variable dim = self.params.get("dim", None) if dim is None and len(self.ds[variable_name].dims) > 0: dim = self.ds[variable_name].dims[0] if dim is not None: # If previous data exists, then we must add the last row of # previous data as the first row of the variable's data array. # This is so that the diff function can compare the first value # of the file to make sure it is consistent with the previous file. # convert to np array variable_data = self.ds[variable_name].data axis = self.ds[variable_name].get_axis_num(dim) previous_row = None # Load the previous row from the other dataset if self.previous_data is not None and dim == "time": previous_variable_data = self.previous_data.get(variable_name, None) if previous_variable_data is not None: # convert to np array previous_variable_data = previous_variable_data.data # Get the last value from the first axis previous_row = previous_variable_data[-1] # Insert that value as the first value of the first axis variable_data = np.insert(variable_data, 0, previous_row, axis=axis) # If the variable is a time variable, then we convert to nanoseconds before doing our check if self.ds[variable_name].data.dtype.type == np.datetime64: variable_data = DSUtil.datetime64_to_timestamp(variable_data) # Compute the difference between each two numbers and check if they are either all # increasing or all decreasing diff = np.diff(variable_data, axis=axis) is_monotonic = np.all(diff > 0) | np.all(diff < 0) # this returns a scalar # Create a results array, with all values set to the results of the is_monotonic check results_array = np.full(variable_data.shape, not is_monotonic, dtype=bool) return results_array
# TODO: Other checks we might implement # check_outlier(std_dev)