Source code for dq_whistler.constraints.constraint

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Union
from pandas.core.series import Series as pandas_df
from pyspark.sql.dataframe import DataFrame as spark_df
import json


[docs]class Constraint(ABC): """Defines the base Constraint class""" _name: str _values: Any _column_name: str _constraint: Dict[str, str] def __init__(self, constraint: Dict[str, str], column_name: str): """ Creates an instance of Constraint with constraint config and column name Args: constraint (Dict[str, str]): Dict containing the name of constraint and the value of constraint check column_name (str): Column name to perform the constraint checks """ self._name = constraint.get("name") self._values = constraint.get("values") self._column_name = column_name self._constraint = constraint
[docs] def constraint_name(self): """ Returns: :obj:`str`: The name of the constraint """ return self._name
[docs] def get_column_name(self): """ Returns: :obj:`str`: The name of the column for which the Constraint instance was created """ return self._column_name
[docs] @abstractmethod def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame`: The dataframe containing failed cases for a constraint """ return data_frame
[docs] def get_sample_invalid_values(self, data_frame: Union[spark_df, pandas_df]) -> List: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`list`: A list containing the invalid values as per the given constraint """ sample_invalid_values = list() if isinstance(data_frame, spark_df): sample_invalid_values = [json.loads(row)[self._column_name] for row in data_frame.toJSON().take(10)] if isinstance(data_frame, pandas_df): sample_invalid_values = list(data_frame.iloc[0:9]) return sample_invalid_values
[docs] def execute_check(self, data_frame: Union[spark_df, pandas_df]) -> Dict[str, str]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`dict[str, str]`: The dict containing the final output for one constraint Example Output:: { "name": "eq", "values", 5, "constraint_status": "failed/success", "invalid_count": 21, "invalid_values": [4, 6, 7, 1] } """ unmatched_df = self.get_failure_df(data_frame) unmatched_count = unmatched_df.count() sample_invalid_values = self.get_sample_invalid_values(unmatched_df) return { **self._constraint, "constraint_status": "failed" if unmatched_count > 0 else "success", "invalid_count": unmatched_count, "invalid_values": sample_invalid_values }