Source code for dq_whistler.constraints.string_type

from dq_whistler.constraints.constraint import Constraint
from typing import Dict, Union
from pandas.core.series import Series as pandas_df
from pyspark.sql.dataframe import DataFrame as spark_df
import pyspark.sql.functions as f


[docs]class Equal(Constraint): """ Equal constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"eq", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``eq`` to ``"abc"``, then the dataframe will have rows where values are ``!= "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( f.col(self._column_name) != self._values ) if isinstance(data_frame, pandas_df): return data_frame[data_frame != self._values]
[docs]class NotEqual(Constraint): """ NotEqual constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"nt_eq", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``nt_eq`` to ``"abc"``, then the dataframe will have rows where values are ``== "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( f.col(self._column_name) == self._values ) if isinstance(data_frame, pandas_df): return data_frame[data_frame == self._values]
[docs]class Contains(Constraint): """ Contains constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"contains", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``contains`` ``"abc"``, then the dataframe will have rows where values ``does not contains "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( ~f.col(self._column_name).contains(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[~data_frame.str.contains(self._values)]
[docs]class NotContains(Constraint): """ NotContains constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"not_contains", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``not_contains`` ``abc``, then the dataframe will have rows where values ``contains "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( f.col(self._column_name).contains(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[data_frame.str.contains(self._values)]
[docs]class StartsWith(Constraint): """ StartsWith constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"starts_with", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``starts_with`` ``"abc"``, then the dataframe will have rows where values ``does not starts with "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( ~f.col(self._column_name).startswith(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[~data_frame.str.startswith(self._values)]
[docs]class NotStartsWith(Constraint): """ NotStartsWith constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"not_starts_with", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``not_starts_with`` ``"abc"``, then the dataframe will have rows where values ``starts with "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( f.col(self._column_name).startswith(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[data_frame.str.startswith(self._values)]
[docs]class EndsWith(Constraint): """ EndsWith constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"ends_with", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``ends_with`` ``"abc"``, then the dataframe will have rows where values ``does not ends with "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( ~f.col(self._column_name).endswith(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[~data_frame.str.endswith(self._values)]
[docs]class NotEndsWith(Constraint): """ NotEndsWith constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"not_ends_with", "values": "abc" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``not_ends_with`` ``"abc"``, then the dataframe will have rows where values ``ends with "abc"`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( f.col(self._column_name).endswith(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[data_frame.str.endswith(self._values)]
[docs]class IsIn(Constraint): """ IsIn constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"is_in", "values": ["abc", "xyz"] } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``is_in`` ``["abc", "xyz"]``, then the dataframe will have rows where values ``are not in ["abc", "xyz"]`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( ~f.col(self._column_name).isin(*self._values) ) if isinstance(data_frame, pandas_df): return data_frame[~data_frame.isin(self._values)]
[docs]class NotIn(Constraint): """ NotIn constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"not_in", "values": ["abc", "xyz"] } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``not_in`` ``["abc", "xyz"]``, then the dataframe will have rows where values ``are in ["abc", "xyz"]`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( f.col(self._column_name).isin(*self._values) ) if isinstance(data_frame, pandas_df): return data_frame[data_frame.isin(self._values)]
[docs]class Regex(Constraint): """ Regex constraint class that extends the base Constraint class Args: constraint (:obj:`Dict[str, str]`): The dict representing a constraint config :: { "name":"regex", "values": "^[A-Za-z]$" } column_name (:obj:`str`): The name of the column for constraint check """ def __init__(self, constraint: Dict[str, str], column_name: str): super().__init__(constraint, column_name)
[docs] def get_failure_df(self, data_frame: Union[spark_df, pandas_df]) -> Union[spark_df, pandas_df]: """ Args: data_frame (:obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`): Column data Returns: :obj:`pyspark.sql.DataFrame` | :obj:`pandas.core.series.Series`: The dataframe with ``invalid cases`` as per the constraint for ex: if constraint is ``regex`` ``^[A-Za-z]$``, then the dataframe will have rows where values ``does not`` satisfies the regex ``^[A-Za-z]$`` (i.e only invalid cases) """ if isinstance(data_frame, spark_df): return data_frame.filter( ~f.col(self._column_name).rlike(self._values) ) if isinstance(data_frame, pandas_df): return data_frame[~data_frame.str.match(pat=self._values)]