from dq_whistler.constraints.constraint import Constraint
from typing import Dict, Union
import pyspark.sql.functions as f
from pandas.core.series import Series as pandas_df
from pyspark.sql.dataframe import DataFrame as spark_df
[docs]class Equal(Constraint):
"""
Equal constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"eq",
"values": 5
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint, for ex: if constraint is ``eq`` to ``5``, then the dataframe will have rows where
values are ``!= 5`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name) != self._values
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame != self._values]
[docs]class NotEqual(Constraint):
"""
NotEqual constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"nt_eq",
"values": 5
}
column_name (str): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``nt_eq`` to ``5``, then the dataframe will have rows where
values are ``= 5`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name) == self._values
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame == self._values]
[docs]class LessThan(Constraint):
"""
LessThan constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"lt",
"values": 5
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``lt`` ``5``, then the dataframe
will have rows where values are ``>= 5`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name) >= self._values
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame >= self._values]
[docs]class GreaterThan(Constraint):
"""
GreaterThan constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"gt",
"values": 5
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``gt`` ``5``, then the dataframe will have rows where values
are ``<= 5`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name) <= self._values
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame <= self._values]
[docs]class LessThanEqualTo(Constraint):
"""
LessThanEqualTo constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"lt_eq",
"values": 5
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``lt_eq`` to ``5``, then the dataframe will have rows where
the values are ``> 5`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name) > self._values
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame > self._values]
[docs]class GreaterThanEqualTo(Constraint):
"""
GreaterThanEqualTo constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"gt_eq",
"values": 5
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``gt_eq`` to ``5``, then the dataframe will have rows where
values are ``< 5`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name) < self._values
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame < self._values]
[docs]class Between(Constraint):
"""
Between constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"between",
"values": [3, 4]
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``between`` ``[2, 8]``, then the dataframe will have rows
where values are ``not in between [2, 8]`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
~f.col(self._column_name).between(*self._values)
)
if isinstance(data_frame, pandas_df):
return data_frame[~data_frame.between(self._values)]
[docs]class NotBetween(Constraint):
"""
NotBetween constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"not_between",
"values": [3, 5]
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``not_between`` ``[2,8]``, then the dataframe will have rows
where values ``are in between [2, 8]`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name).between(*self._values)
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame.between(self._values)]
[docs]class IsIn(Constraint):
"""
IsIn constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"is_in",
"values": [1, 2, 3]
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is ``is_in`` ``[1, 2, 3]``, then the dataframe will have rows where
values ``are in [1, 2, 3]`` (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
~f.col(self._column_name).isin(*self._values)
)
if isinstance(data_frame, pandas_df):
return data_frame[~data_frame.isin(self._values)]
[docs]class NotIn(Constraint):
"""
NotIn constraint class that extends the base Constraint class
Args:
constraint (:obj:`Dict[str, str]`): The dict representing a constraint config
::
{
"name":"not_in",
"values": [1, 2, 3]
}
column_name (:obj:`str`): The name of the column for constraint check
"""
def __init__(self, constraint: Dict[str, str], column_name: str):
super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]:
"""
Args:
data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data
Returns:
:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases``
as per the constraint for ex: if constraint is "not_in" [1, 2, 3], then the dataframe will have rows where
values are in [1, 2, 3] (i.e only invalid cases)
"""
if isinstance(data_frame, spark_df):
return data_frame.filter(
f.col(self._column_name).isin(*self._values)
)
if isinstance(data_frame, pandas_df):
return data_frame[data_frame.isin(self._values)]