from dq_whistler.profiler.column_profiler import ColumnProfiler
from dq_whistler.constraints.number_type import *
from pandas.core.series import Series as pandas_df
from pyspark.sql.dataframe import DataFrame as spark_df
import pyspark.sql.functions as f
from typing import Dict, Any
import json
[docs]class NumberProfiler(ColumnProfiler):
"""
Class for Numeric datatype profiler
"""
def __init__(self, column_data: Union[spark_df, pandas_df], config: Dict[str, str]):
"""
Creates an instance of :obj:`NumberProfiler`
Args:
column_data (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data to execute constraints
config (Dict[str, Any]): Config containing all the constraints of a column along with expected data types
{
"name": "col_name",
"datatype": "col_data_type(number/string/date)",
"constraints":[
{
"name": "gt_eq",
"values": 5
},
{
"name": "is_in",
"values": [1, 2]
}...
]
}
"""
super(NumberProfiler, self).__init__(column_data, config)
[docs] def get_min_value(self) -> float:
"""
Returns:
:obj:`float`: Min value of the column data
"""
if isinstance(self._column_data, spark_df):
return float(json.loads(self._column_data.select(
f.min(
f.col(self._column_name)
.cast("double")
).alias("min")
).toJSON().take(1)[0])["min"])
if isinstance(self._column_data, pandas_df):
return float(self._column_data.min())
[docs] def get_max_value(self) -> float:
"""
Returns:
:obj:`float`: Max value of the column data
"""
if isinstance(self._column_data, spark_df):
return float(json.loads(self._column_data.select(
f.max(
f.col(self._column_name)
.cast("double")
).alias("max")
).toJSON().take(1)[0])["max"])
if isinstance(self._column_data, pandas_df):
return float(self._column_data.max())
[docs] def get_mean_value(self) -> float:
"""
Returns:
:obj:`float`: Mean value of the column data
"""
if isinstance(self._column_data, spark_df):
return float(json.loads(self._column_data.select(
f.mean(
f.col(self._column_name)
.cast("double")
).alias("mean")
).toJSON().take(1)[0])["mean"])
if isinstance(self._column_data, pandas_df):
return float(self._column_data.mean())
[docs] def get_stddev_value(self) -> float:
"""
Returns:
:obj:`float`: Standard deviation value of the column value
"""
if isinstance(self._column_data, spark_df):
return float(json.loads(self._column_data.select(
f.stddev(
f.col(self._column_name)
.cast("double")
).alias("stddev")
).toJSON().take(1)[0])["stddev"])
if isinstance(self._column_data, pandas_df):
return float(self._column_data.std())
[docs] def run(self) -> Dict[str, Any]:
"""
Returns:
:obj:`Dict[str, Any]`: The final dict with all the metrics of a numeric column
Example Output::
{
"total_count": 100,
"null_count": 50,
"unique_count": 20,
"topn_values": {"1": 24, "2": 25},
"min": 2.0,
"max": 30.0,
"mean": 18.0,
"stddev": 5.0,
"quality_score": 0,
"constraints": [
{
"name": "eq",
"values", 5,
"constraint_status": "failed/success",
"invalid_count": 21,
"invalid_values": [4, 6, 7, 1]
}
]
}
"""
column_name = self._column_name
for constraint in self._config.get("constraints"):
name = constraint.get("name")
if name == "eq":
self.add_constraint(
Equal(constraint=constraint, column_name=column_name)
)
elif name == "not_eq":
self.add_constraint(
NotEqual(constraint=constraint, column_name=column_name)
)
elif name == "lt":
self.add_constraint(
LessThan(constraint=constraint, column_name=column_name)
)
elif name == "gt":
self.add_constraint(
GreaterThan(constraint=constraint, column_name=column_name)
)
elif name == "lt_eq":
self.add_constraint(
LessThanEqualTo(constraint=constraint, column_name=column_name)
)
elif name == "gt_eq":
self.add_constraint(
GreaterThanEqualTo(constraint=constraint, column_name=column_name)
)
elif name == "between":
self.add_constraint(
Between(constraint=constraint, column_name=column_name)
)
elif name == "not_between":
self.add_constraint(
NotBetween(constraint=constraint, column_name=column_name)
)
elif name == "is_in":
self.add_constraint(
IsIn(constraint=constraint, column_name=column_name)
)
elif name == "not_in":
self.add_constraint(
NotIn(constraint=constraint, column_name=column_name)
)
else:
raise NotImplementedError
# Preparing data frame for constraints execution
self.prepare_df_for_constraints()
# Get final output of constraints
output = self.get_custom_constraint_check()
return {
"total_count": self.get_total_count(),
"null_count": self.get_null_count(),
"unique_count": self.get_unique_count(),
"topn_values": self.get_topn(),
"min": self.get_min_value(),
"max": self.get_max_value(),
"mean": self.get_mean_value(),
"stddev": self.get_stddev_value(),
"quality_score": self.get_quality_score(),
"constraints": output
}