Source code for haychecker.dhc.metrics

"""
Module containing metrics for the distributed version of hay_checker.
"""

import pyspark
from pyspark.sql.functions import isnan, when, count, col, sum, countDistinct, avg, to_date, lit, \
    abs, datediff, to_timestamp, current_timestamp, current_date, approx_count_distinct, log2, log

from haychecker.dhc import task


def _completeness_todo(columns, df):
    """
    Returns what (columns, as in spark columns) to compute to get the results requested by
    the parameters.

    :param columns:
    :type columns: list
    :param df:
    :type df: DataFrame
    :return: Pyspark columns representing what to compute.
    """
    if columns is None:
        columns = df.columns
    todo = [count(c).alias(c) for c in columns]
    return todo


[docs]def completeness(columns=None, df=None): """ If a df is passed, the completeness metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param columns: Columns on which to run the metric, None to run the completeness metric on the whole table. :type columns: list :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "completeness"} if not (columns is None): params["columns"] = columns t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _deduplication_todo(columns, df): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param columns: :type columns: list :param df: :type df: DataFrame :return: Pyspark columns representing what to compute. """ if columns is None: # 1 count distinct, on all columns todo = [countDistinct(*[col(c) for c in df.columns])] else: # multiple count distinct, one column each todo = [countDistinct(col(c)) for c in columns] return todo
[docs]def deduplication(columns=None, df=None): """ If a df is passed, the deduplication metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param columns: Columns on which to run the metric, None to run the deduplication metric on the whole table (deduplication on rows). :type columns: list :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "deduplication"} if not (columns is None): params["columns"] = columns t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _deduplication_approximated_todo(columns, df): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param columns: :type columns: list :param df: :type df: DataFrame :return: Pyspark columns representing what to compute. """ if columns is None: print("Approximated count distinct spanning over the whole row is currently not supported") else: # multiple count distinct, one column each todo = [approx_count_distinct(col(c)).alias(c) for c in columns] return todo
[docs]def deduplication_approximated(columns, df=None): """ If a df is passed, the deduplication_approximated metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). Differently from deduplication, here columns must be specified (deduplication_approximated does not work on a whole row level). :param columns: Columns on which to run the metric. :type columns: list :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "deduplication_approximated"} if not (columns is None): params["columns"] = columns t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _contains_date(format): """ Check if a format (string) contains a date. (It currently check if the string contains tokens from the simpledateformat https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html that represent, years, months, days). :param format: A string format representing a simple date format. :type format: str :return: True if values part of a date are contained in the format string. :rtype: bool """ part_of_date_tokens = "GyYMwWdDFEu" for token in part_of_date_tokens: if token in format: return True return False def _timeliness_todo(columns, value, df, dateFormat=None, timeFormat=None): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param columns: :type columns: list :param value :type value: str :param df: :type df: DataFrame :param dateFormat: :type dateFormat: str :param timeFormat: :type timeFormat: str :return: Pyspark columns representing what to compute. """ assert (dateFormat is None or timeFormat is None) and ( not dateFormat is None or not timeFormat is None), "Pass either a dateFormat or a timeFormat, " \ "not both. " todo = [] types = dict(df.dtypes) if dateFormat: value_date = to_date(lit(value), dateFormat) for c in columns: if types[c] == "timestamp" or types[c] == "date": todo.append(sum(when(datediff(value_date, c) > 0, 1).otherwise(0)).alias(c)) elif types[c] == "string": todo.append(sum(when(datediff(value_date, to_date(c, dateFormat)) > 0, 1).otherwise(0)).alias(c)) else: print( "Type of a column on which the timeliness metric is run must be either timestamp, " "date or string, if the metric is being run on dateFormat.") exit() elif timeFormat: value_long = to_timestamp(lit(value), timeFormat).cast("long") # check if value contains a date and not only hours, minutes, seconds has_date = _contains_date(timeFormat) if has_date: for c in columns: if types[c] == "timestamp": todo.append(sum(when(value_long - col(c).cast("long") > 0, 1).otherwise(0)).alias(c)) elif types[c] == "string": todo.append( sum(when(value_long - to_timestamp(col(c), timeFormat).cast("long") > 0, 1).otherwise(0)).alias( c)) else: print( "Type of a column on which the timeliness metric is run must be either timestamp or string, if " "the metric is being run on a timeFormat") exit() else: for c in columns: if types[c] == "timestamp": """ If there is no years, months, days we must ignore the years, months, days in the timestamp. """ value_long = to_timestamp(lit(value), timeFormat) # remove years, months, days value_long = value_long.cast("long") - value_long.cast("date").cast("timestamp").cast("long") # check for difference, but only considering hours, minutes, seconds todo.append(sum( when( value_long - (col(c).cast("long") - col(c).cast("date").cast("timestamp").cast("long")) > 0, 1).otherwise(0)).alias(c)) elif types[c] == "string": """ If there are no years, months, days and the column is in the same format, meaning that it also has no years, months, days, this means that they will be both initialized to the same year, month, day; so years, months, days will be basically ignored. """ todo.append( sum(when((value_long - to_timestamp(c, timeFormat).cast("long")) > 0, 1).otherwise(0)).alias(c)) else: print( "Type of a column on which the timeliness metric is run must be either timestamp or string, if " "the metric is being run on a timeFormat") exit() return todo
[docs]def timeliness(columns, value, df=None, dateFormat=None, timeFormat=None): """ If a df is passed, the timeliness metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). Use https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html directives to express formats. :param columns: Columns on which to run the metric, columns of type string will be casted to timestamp using the dateFormat or timeFormat argument. :type columns: list :param value: Value used to run the metric, confronting values in the specified columns against it. :type value: str :param dateFormat: Format in which the value (and values in columns, if they are of string type) are; used to cast columns if they contain dates as strings. Either dateFormat or timeFormat must be passed, but not both. Use https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html directives to express formats. :type dateFormat: str :param timeFormat: Format in which the value (and values in columns, if they are of string type) are; used to cast columns if they contain dates as strings. Either dateFormat or timeFormat must be passed, but not both. Use https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html directives to express formats. :type timeFormat: str :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ assert (dateFormat is None or timeFormat is None) and ( not dateFormat is None or not timeFormat is None), "Pass either a dateFormat or a timeFormat, not both." # make a dict representing the parameters params = {"metric": "timeliness", "columns": columns, "value": value} if dateFormat: params["dateFormat"] = dateFormat elif timeFormat: params["timeFormat"] = timeFormat t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _freshness_todo(columns, df, dateFormat=None, timeFormat=None): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param columns: :type columns: list :param df: :type df: DataFrame :param dateFormat: :type dateFormat: str :param timeFormat: :type timeFormat: str :return: Pyspark columns representing what to compute. """ assert (dateFormat is None or timeFormat is None) and ( not dateFormat is None or not timeFormat is None), "Pass either a dateFormat or a timeFormat, " \ "not both. " types = dict(df.dtypes) todo = [] if dateFormat: now = current_date() for c in columns: if types[c] == "timestamp" or types[c] == "date": todo.append(avg(abs(datediff(c, now))).alias(c)) elif types[c] == "string": todo.append(avg(abs(datediff(to_date(c, dateFormat), now))).alias(c)) else: print( "Type of a column on which the freshness metric is run must be either timestamp, " "date or string, if the metric is being run on dateFormat.") exit() elif timeFormat: # check if value contains a date and not only hours, minutes, seconds has_date = _contains_date(timeFormat) current = current_timestamp() if has_date: """ If the time format also contains a date it means the user is also interested in comparing years, months, days, etc. """ now = current.cast("long") for c in columns: if types[c] == "timestamp": todo.append(avg(abs(col(c).cast("long") - now)).alias(c)) elif types[c] == "string": todo.append(avg(abs(to_timestamp(c, timeFormat).cast("long") - now)).alias(c)) else: print( "Type of a column on which the freshness metric is run must be either timestamp" "or string, if the metric is being run on timeFormat.") exit() else: """ If the timestamp has no date the user is not interested in differences that consider years, months, days, but only hours, minutes, seconds. """ now = current now = now.cast("long") - now.cast("date").cast("timestamp").cast("long") for c in columns: if types[c] == "timestamp": todo.append(avg( abs((col(c).cast("long") - col(c).cast("date").cast("timestamp").cast("long")) - now)).alias(c)) elif types[c] == "string": """ Need to remove seconds from years, months and days here as well because even if the format does not specify anything for those values they are initialized to something by default. """ todo.append(avg(abs((to_timestamp(c, timeFormat).cast("long") - to_timestamp(c, timeFormat).cast( "date").cast("timestamp").cast("long")) - now)).alias(c)) else: print( "Type of a column on which the freshness metric is run must be either timestamp" "or string, if the metric is being run on timeFormat.") exit() return todo
[docs]def freshness(columns, df=None, dateFormat=None, timeFormat=None): """ If a df is passed, the freshness metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). Use https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html directives to express formats. :param columns: Columns on which to run the metric, columns of type string will be casted to timestamp using the dateFormat or timeFormat argument. :type columns: list :param dateFormat: Format in which the values in columns are if those columns are of type string; otherwise they must be of type date or timestamp. Use this parameter if you are interested in a result in terms of days. Either dateFormat or timeFormat must be passed, but not both. Use https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html directives to express formats. :type dateFormat: str :param timeFormat: Format in which the values in columns are if those columns are of type string; otherwise they must be of type timestamp. Use this parameter if you are interested in results in terms of seconds. Either dateFormat or timeFormat must be passed, but not both. Use https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html directives to express formats. :type timeFormat: str :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "freshness", "columns": columns} if dateFormat: params["dateFormat"] = dateFormat elif timeFormat: params["timeFormat"] = timeFormat t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _and_conditions_as_columns(conditions): """ Return conditions as an "and" concatenation of columns. :param conditions: :type conditions: list :return: """ # add first condition cond = conditions[0] if "casted_to" in cond: casted_to = cond["casted_to"] if cond["operator"] == "gt": result = col(cond["column"]).cast(casted_to) > cond["value"] elif cond["operator"] == "lt": result = col(cond["column"]).cast(casted_to) < cond["value"] elif cond["operator"] == "eq": result = col(cond["column"]).cast(casted_to) == cond["value"] else: if cond["operator"] == "gt": result = col(cond["column"]) > cond["value"] elif cond["operator"] == "lt": result = col(cond["column"]) < cond["value"] elif cond["operator"] == "eq": result = col(cond["column"]) == cond["value"] # add the rest for cond in conditions[1:]: if "casted_to" in cond: casted_to = cond["casted_to"] if cond["operator"] == "gt": result = result & (col(cond["column"]).cast(casted_to) > cond["value"]) elif cond["operator"] == "lt": result = result & (col(cond["column"]).cast(casted_to) < cond["value"]) elif cond["operator"] == "eq": result = result & (col(cond["column"]).cast(casted_to) == cond["value"]) else: if cond["operator"] == "gt": result = result & (col(cond["column"]) > cond["value"]) elif cond["operator"] == "lt": result = result & (col(cond["column"]) < cond["value"]) elif cond["operator"] == "eq": result = result & (col(cond["column"]) == cond["value"]) return result def _andcheckjoin(then): """ Returns a column which is the and concatenation of different checks, 1 for each column in 'then', checking for each distinct_then%s, where %s is a column in 'then', to be equal to 1. :param then: 'then' columns of the constraint metric :type then: list :return: """ res = col("distinct_then%s" % then[0]) == 1 for c in then[1:]: res = res & (col("distinct_then%s" % c) == 1) return res def _constraint_todo(when, then, conditions, df): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param when: :type when: list :param then: :type then: list :param conditions: :type conditions: list :param df: :type df: DataFrame :return: Pyspark columns representing what to compute. """ todo = df # filter if needed if conditions: filtering_conditions = _and_conditions_as_columns(conditions) todo = todo.filter(filtering_conditions) # groupby the when columns todo = todo.groupBy(*when) # for each group, count the total and the number of distinct 'thens' (should be 1 if the constraint is respected) todo = todo.agg(count("*").alias("metrics_check_count_1"), *[countDistinct(c).alias("distinct_then%s" % c) for c in then]) # given the new 'table', aggregate over it, summing over all total rows to get the total number of filtered # rows, and summing the count only of groups that have one distinct then value todo = todo.agg(sum("metrics_check_count_1").alias("all_filtered"), sum( pyspark.sql.functions.when(_andcheckjoin(then), col("metrics_check_count_1")).otherwise(0)).alias( "respecting")) # get the ratio between the tuples respecting the constraint and the total, where total is the number of # rows that have passed the filtering todo = todo.select(col("respecting") / col("all_filtered")) return todo
[docs]def constraint(when, then, conditions=None, df=None): """ If a df is passed, the constraint metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param when: A list of columns in the df to use as the precondition of a functional constraint. No column should be in both when and then. :type when: list :param then: A list of columns in the df to use as the postcondition of a functional constraint. No column should be in both when and then. :type then: list :param conditions: Conditions on which to filter data before applying the metric. :type conditions: list :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "constraint", "when": when, "then": then} if conditions: params["conditions"] = conditions t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _rule_todo(conditions): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param conditions: :type conditions: list :return: Pyspark columns representing what to compute. """ filtering_conditions = _and_conditions_as_columns(conditions) todo = sum(when(filtering_conditions, 1.0).otherwise(0.)) return [todo]
[docs]def rule(conditions, df=None): """ If a df is passed, the rule metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param conditions: Conditions on which to run the metric. :type conditions: list :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "rule", "conditions": conditions} t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _having_aggregations_as_columns(condition): """ Return an "having" aggregation as a column. :param condition: :type condition: dict :return: """ column = condition["column"] aggregator = condition["aggregator"] if "aggregator" in condition else None if "casted_to" in condition: casted_to = condition["casted_to"] if aggregator == "count": return count(column) elif aggregator == "min": return pyspark.sql.functions.min(col(column).cast(casted_to)) elif aggregator == "max": return pyspark.sql.functions.max(col(column).cast(casted_to)) elif aggregator == "avg": return pyspark.sql.functions.avg(col(column).cast(casted_to)) elif aggregator == "sum": return pyspark.sql.functions.sum(col(column).cast(casted_to)) else: print("Aggregator %s not recognized" % aggregator) exit() else: if aggregator == "count": return count(column) elif aggregator == "min": return pyspark.sql.functions.min(column) elif aggregator == "max": return pyspark.sql.functions.max(column) elif aggregator == "avg": return pyspark.sql.functions.avg(column) elif aggregator == "sum": return pyspark.sql.functions.sum(column) else: print("Aggregator %s not recognized" % aggregator) exit() def _having_constraints_as_column(having): """ Return "having" conditions as an "and" concatenation of columns. :param having: :type having: list :return: """ # add first condition index = 0 cond = having[0] if cond["operator"] == "gt": result = col("_grouprule_h%i" % index) > cond["value"] elif cond["operator"] == "lt": result = col("_grouprule_h%i" % index) < cond["value"] elif cond["operator"] == "eq": result = col("_grouprule_h%i" % index) == cond["value"] index += 1 # add the rest for cond in having[1:]: if cond["operator"] == "gt": result = result & (col("_grouprule_h%i" % index) > cond["value"]) elif cond["operator"] == "lt": result = result & (col("_grouprule_h%i" % index) < cond["value"]) elif cond["operator"] == "eq": result = result & (col("_grouprule_h%i" % index) == cond["value"]) index += 1 return result def _grouprule_todo(columns, conditions, having, df): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param columns: :type columns: list :param conditions: :type conditions: list :param having: :type having: list :param df: :type df: DataFrame :return: Pyspark columns representing what to compute. """ todo = df # filter if needed if conditions: filtering_conditions = _and_conditions_as_columns(conditions) todo = todo.filter(filtering_conditions) # get groups todo = todo.groupBy(*columns) # aggregate groups over 'having' conditions aggregations = [_having_aggregations_as_columns(cond).alias("_grouprule_h%i" % i) for i, cond in enumerate(having)] todo = todo.agg(*aggregations) # aggregate the tuples (each tuple representing a group # to 1) count them 2) count the ones passing the having conditions having_constraints = _having_constraints_as_column(having) todo = todo.agg(sum(when(having_constraints, 1).otherwise(0)).alias("_having_filtered"), count("*").alias("_having_all")) # normalize (divide passing groups by total groups) todo = todo.select(col("_having_filtered") / col("_having_all")) return todo
[docs]def grouprule(columns, having, conditions=None, df=None): """ If a df is passed, the groupRule metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param columns: Columns on which to run the metric, grouping data. :param conditions: Conditions on which to run the metric, filtering data before grouping, can be None. :type conditions: list :param having: Conditions to apply to groups. :type having: list :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "groupRule", "columns": columns, "having": having} if conditions is not None: params["conditions"] = conditions t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _entropy_todo(column, df): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param column: :type column: str/int :param df: :type df: DataFrame :return: Pyspark columns representing what to compute. """ # group on that column todo = df.groupBy(column) # count instances of each group todo = todo.agg(count("*").alias("_entropy_ci")) # ignore nans/null for computing entropy todo = todo.filter(~ col(column).isNull()) todo = todo.select(sum(col("_entropy_ci") * log2("_entropy_ci")).alias("_sumcilogci"), sum("_entropy_ci").alias("_total")) todo = todo.select(log2(col("_total")) - col("_sumcilogci") / col("_total")) return todo
[docs]def entropy(column, df=None): """ If a df is passed, the entropy metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param column: Column on which to run the metric. :type column: str/int :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "entropy", "column": column} t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]
def _mutual_info_todo(when, then, df): """ Returns what (columns, as in spark columns) to compute to get the results requested by the parameters. :param when: :type when: str/int :param then: :type then: str/int :param df: :type df: DataFrame :return: Pyspark columns representing what to compute. """ # group on the pair of columns, count occurrences pairs_table = df.groupBy([when, then]).agg(count("*").alias("_pairs_count")) # ignore nulls pairs_table = pairs_table.filter((~ col(when).isNull()) & (~ col(then).isNull())) pairs_table.cache() when_table = pairs_table.groupBy(col(when).alias("wt")).agg(sum("_pairs_count").alias("_when_count")) then_table = pairs_table.groupBy(col(then).alias("tt")).agg(sum("_pairs_count").alias("_then_count")) final_table = pairs_table.join(when_table, pairs_table[when].eqNullSafe(when_table["wt"])) final_table = final_table.join(then_table, final_table[then].eqNullSafe(then_table["tt"])) # prepare 4 subformulas of MI to later sum, plus the total todo = final_table.select(sum(col("_pairs_count") * log(col("_pairs_count"))).alias("_s1"), # c_xy * logc_xy sum(col("_pairs_count")).alias("_s2"), # c_xy sum(col("_pairs_count") * log(col("_when_count"))).alias("_s3"), # c_xy * logc_x sum(col("_pairs_count") * log(col("_then_count"))).alias("_s4"), # c_xy * logc_y sum(col("_pairs_count")).alias("_total") # total ) todo = todo.select((col("_s1") / col("_total")) + (log(col("_total")) * (col("_s2") / col("_total"))) - ( (col("_s3")) / col("_total")) - ((col("_s4")) / col("_total")).alias("mutual_info")) return todo
[docs]def mutual_info(when, then, df=None): """ If a df is passed, the mutual_info metric will be run and result returned as a list of scores, otherwise an instance of the Task class containing this metric wil be returned, to be later run (possibly after adding to it other tasks/metrics). :param when: First column on which to compute MI. :type when: str/int :param then: Second column on which to compute MI. :type then: str/int :param df: Dataframe on which to run the metric, None to have this function return a Task instance containing this metric to be run later. :type df: DataFrame :return: Either a list of scores or a Task instance containing this metric (with these parameters) to be run later. :rtype: list/Task """ # make a dict representing the parameters params = {"metric": "mutual_info", "when": when, "then": then} # create tak containing parameters t = task.Task([params]) if df is None: return t else: return t.run(df)[0]["scores"]