Source code for dq_whistler.analyzer

import json
import numpy as np
from typing import Dict, List, Any, Union
import pyspark.sql.functions as F
from pandas.core.frame import DataFrame as pandas_df
from pyspark.sql.dataframe import DataFrame as spark_df
from dq_whistler.profiler.string_profiler import StringProfiler
from dq_whistler.profiler.number_profiler import NumberProfiler


[docs]class NpEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NpEncoder, self).default(obj)
[docs]class DataQualityAnalyzer: """ Analyzer class responsible for taking :obj:`JSON` dict and executing it on the columnar data Args: data (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Dataframe/Series containing the data config (:obj:`List[Dict[str, str]]`): The array of dicts containing config for each column """ _data: Union[spark_df, pandas_df] _config: List[Dict[str, str]] def __init__(self, data: Union[spark_df, pandas_df], config: List[Dict[str, str]]): """ Creates an instance of DQAnalyzer """ self._data = data self._config = config
[docs] def analyze(self) -> str: """ Returns: :obj:`str`: :obj:`JSON` string containing stats for multiple columns """ final_checks: List[Dict[str, Any]] = [] # TODO: Add feature of automatic column detection, if config is not present for column_config in self._config: # TODO:: checks for key existence column_name = column_config.get("name") column_data_type = column_config.get("datatype") if isinstance(self._data, spark_df): column_data = self._data.select(F.col(column_name)) if isinstance(self._data, pandas_df): column_data = self._data[column_name] if column_data_type == "string": profiler = StringProfiler(column_data, column_config) elif column_data_type == "number": profiler = NumberProfiler(column_data, column_config) else: raise NotImplementedError output = profiler.run() final_checks.append({ "col_name": column_name, **output }) return json.dumps(final_checks, cls=NpEncoder)