import numpy as np
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Union
from pandas.core.series import Series as pandas_df
from pyspark.sql.dataframe import DataFrame as spark_df
import pyspark.sql.functions as f
from pyspark.sql.types import StringType, DoubleType, IntegerType
from dq_whistler.constraints.constraint import Constraint
import json
[docs]class ColumnProfiler(ABC):
"""
Base class for column profiler
"""
_column_data: Union[spark_df, pandas_df]
_config: Dict[str, Any]
_constraints: List[Constraint]
def __init__(self, column_data: Union[spark_df, pandas_df], config: Dict[str, Any]):
"""
Creates an instance of :obj:`ColumnProfiler`
Args:
column_data (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data to
execute constraints
config (Dict[str, Any]): Config containing all the constraints of a column along with expected data types
Sample Dict::
{
"name": "col_name",
"datatype": "col_data_type(number/string/date)",
"constraints":[
{
"name": "gt_eq",
"values": 5
},
{
"name": "is_in",
"values": [1, 2]
}...
]
}
"""
self._column_data = column_data
self._config = config
self._column_name = config.get("name")
self._data_type = config.get("datatype")
self._constraints = []
[docs] def prepare_df_for_constraints(self) -> None:
"""
Prepares a dataframe by doing pre validations
"""
if isinstance(self._column_data, spark_df):
if self._data_type == "string":
self._column_data.withColumn(self._column_name, f.col(self._column_name).cast(StringType()))
elif self._data_type == "number":
self._column_data.withColumn(self._column_name, f.col(self._column_name).cast(DoubleType()))
elif self._data_type == "integer":
self._column_data.withColumn(self._column_name, f.col(self._column_name).cast(IntegerType()))
else:
raise NotImplementedError
elif isinstance(self._column_data, pandas_df):
if self._data_type == "string":
self._column_data = self._column_data.apply(np.str)
elif self._data_type == "number":
self._column_data = self._column_data.apply(np.float)
elif self._data_type == "integer":
self._column_data = self._column_data.apply(np.int)
else:
raise NotImplementedError
else:
raise NotImplementedError
[docs] def add_constraint(self, constraint: Constraint):
"""
Adds an instance of :obj:`Constraint` to the the parent list of constraints for this profiler
Args:
constraint (dq_whistler.constraints.constraint.Constraint): An instance of :obj:`Constraint` class
"""
existing = filter(
lambda c:
c.constraint_name() == constraint.constraint_name()
and c.get_column_name() == constraint.get_column_name(), self._constraints)
if list(existing):
raise ValueError(f"A similar constraint for the column {constraint.get_column_name()} already exists.")
self._constraints.append(constraint)
[docs] def get_constraints_config(self) -> List[Dict[str, str]]:
"""
Returns:
:obj:`List[Dict[str, str]]`: The array containing the constraints for the column
"""
return self._config.get("constraints") if self._config.get("constraints") else []
[docs] def get_column_info(self) -> str:
"""
Returns:
:obj:`str`: The column info for which the instance has been created
Sample output::
str({
"fields":[
{
"metadata":{},
"name":"col_name",
"nullable":True,
"type":"string"
}
],
"type":"struct"
})
"""
return self._column_data.schema.json()
[docs] def get_column_config(self) -> Dict[str, Any]:
"""
Returns:
:obj:`Dict[str, Any]`: The data quality config for the column
"""
return self._config
[docs] def get_null_count(self) -> int:
"""
Returns:
:obj:`int`: Count of null values in a column data
"""
col_name = self._column_name
if isinstance(self._column_data, spark_df):
return int(self._column_data.select(
f.count(
f.when(
f.col(col_name).contains("None") |
f.col(col_name).contains("NULL") |
(f.col(col_name) == "") |
f.col(col_name).isNull() |
f.isnan(col_name), col_name
)
).alias("null_count")
).first()[0])
if isinstance(self._column_data, pandas_df):
return int(self._column_data.isnull().sum(axis=0))
[docs] def get_unique_count(self) -> int:
"""
Returns:
:obj:`int`: Count of unique values in a column data
"""
if isinstance(self._column_data, spark_df):
return int(self._column_data.distinct().count())
if isinstance(self._column_data, pandas_df):
return int(self._column_data.nunique(dropna=True))
[docs] def get_total_count(self) -> int:
"""
Returns:
:obj:`int`: Count of total values in a column data
"""
return int(self._column_data.count())
[docs] def get_quality_score(self) -> float:
"""
Returns:
:obj:`float`: Overall quality score of a column
"""
return 0.0
[docs] def get_topn(self) -> Dict[str, Any]:
"""
Returns:
:obj:`Dict[str, Any]`: Dict containing the top 10 values along with their counts
Sample Output::
{
"value1": count1,
"value2": count2
}
"""
col_name = self._column_name
if isinstance(self._column_data, spark_df):
top_values = dict()
if self._data_type == "string":
top_values_rows = self._column_data \
.filter((f.col(col_name) != "") & (f.col(col_name).isNotNull()) & (f.col(col_name) != "null")) \
.groupby(col_name) \
.count() \
.sort(f.desc("count")) \
.toJSON() \
.take(10)
elif self._data_type == "number":
top_values_rows = self._column_data \
.filter(f.col(col_name).isNotNull()) \
.groupby(col_name) \
.count() \
.sort(f.desc("count")) \
.toJSON() \
.take(10)
else:
raise NotImplementedError
[
top_values.update(
{json.loads(row).get(col_name): json.loads(row)["count"]})
for row in top_values_rows
]
return top_values
if isinstance(self._column_data, pandas_df):
return json.loads(self._column_data.value_counts().iloc[:9].to_json())
[docs] def get_custom_constraint_check(self) -> List[Dict[str, str]]:
"""
Returns:
:obj:`List[Dict[str, str]]`: An array containing the output of each of the constraint for a column
Sample Output::
[
{
"name": "eq",
"values", 5,
"constraint_status": "failed/success",
"invalid_count": 21,
"invalid_values": [4, 6, 7, 1]
}...
]
"""
constraints_output = []
for constraint in self._constraints:
output = constraint.execute_check(self._column_data)
constraints_output.append(output)
return constraints_output
[docs] @abstractmethod
def run(self) -> Dict[str, Any]:
"""
Returns:
:obj:`Dict[str, Any]`: The final stats of the column containing null count, total count,
regex count, invalid rows, quality score etc.
"""
pass