import copy
from typing import List, Dict
from fast_causal_inference.dataframe import ais_dataframe_pb2 as DfPb
from fast_causal_inference.dataframe.df_base import (
DfColumnInternalNode,
DfColumnLeafNode,
aggregrate,
register_fn,
OlapEngineType,
define_args,
DfFunction,
DfColumnNode,
FnArg,
DfContext,
)
class DfFnColWrapper:
def __init__(self, fn: DfFunction, params: Dict, columns: List[DfColumnNode]):
self._fn = fn
self._params = copy.deepcopy(params)
self._columns = copy.deepcopy(columns)
@property
def fn(self):
return self._fn
@property
def params(self):
return self._params
@property
def columns(self):
return self._columns
def has_agg_func(self):
if self._fn.is_agg_func():
return True
for col in self._columns:
if isinstance(col, DfFnColWrapper) and col.has_agg_func():
return True
return False
def alias(self, alias_):
self._fn.alias = alias_
return self
def __add__(self, rhs):
return add(self, rhs)
def __radd__(self, lhs):
return add(lhs, self)
def __sub__(self, rhs):
return subtract(self, rhs)
def __rsub__(self, lhs):
return subtract(lhs, self)
def __mul__(self, rhs):
return multiply(self, rhs)
def __rmul__(self, lhs):
return multiply(lhs, self)
def __truediv__(self, rhs):
return divide(self, rhs)
def __rtruediv__(self, lhs):
return divide(lhs, self)
def __mod__(self, rhs):
return modulo(self, rhs)
def __rmod__(self, lhs):
return modulo(lhs, self)
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="x"), FnArg(name="y"))
class AddDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
return "(" + arg_dict["x"].sql(ctx) + " + " + arg_dict["y"].sql(ctx) + ")"
def add(x, y):
return DfFnColWrapper(AddDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="x"), FnArg(name="y"))
class SubtractDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
return "(" + arg_dict["x"].sql(ctx) + " - " + arg_dict["y"].sql(ctx) + ")"
def subtract(x, y):
return DfFnColWrapper(SubtractDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="x"), FnArg(name="y"))
class MultiplyDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
return "(" + arg_dict["x"].sql(ctx) + " * " + arg_dict["y"].sql(ctx) + ")"
def multiply(x, y):
return DfFnColWrapper(MultiplyDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="x"), FnArg(name="y"))
class DivideDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
return "(" + arg_dict["x"].sql(ctx) + " / " + arg_dict["y"].sql(ctx) + ")"
def divide(x, y):
return DfFnColWrapper(DivideDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="x"), FnArg(name="y"))
class ModuloDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
return "(" + arg_dict["x"].sql(ctx) + " % " + arg_dict["y"].sql(ctx) + ")"
def modulo(x, y):
return DfFnColWrapper(ModuloDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="self"))
class SelfRefDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
return "(" + arg_dict["self"].sql(ctx) + ")"
def col(*cols):
if len(cols) != 1:
raise Exception(f"number of columns({len(cols)}) is not 1.")
return DfFnColWrapper(SelfRefDfFunction(), {}, cols)
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="")
@register_fn(engine=OlapEngineType.STARROCKS, name="")
@define_args(FnArg(name="col"))
class LitDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
col = arg_dict["col"].column
if isinstance(col, DfColumnLeafNode):
return "'" + arg_dict["col"].sql(ctx) + "'"
elif isinstance(col, DfColumnInternalNode):
return arg_dict["col"].sql(ctx)
else:
raise Exception(
"Logical Error: col can only be DfColumnLeafNode|DfColumnInternalNode."
)
[docs]def lit(*cols):
"""
lit is used to create a constant column.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('constant', Fn.lit(1))
"""
if len(cols) != 1:
raise Exception(f"number of columns({len(cols)}) is not 1.")
return DfFnColWrapper(LitDfFunction(), {}, cols)
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="any")
@register_fn(engine=OlapEngineType.STARROCKS, name="any")
@define_args(FnArg(name="col"))
@aggregrate
class AggAnyDfFunction(DfFunction):
pass
[docs]def any(col):
"""
any is used to aggregate a column with any value.
"""
return DfFnColWrapper(AggAnyDfFunction(), {}, [col])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="stddevPop")
@register_fn(engine=OlapEngineType.STARROCKS, name="stddev_pop")
@define_args(FnArg(name="col"))
@aggregrate
class AggStddevPopDfFunction(DfFunction):
pass
[docs]def stddevPop(col):
"""
stddevPop is used to calculate the population standard deviation of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.stddevPop('numerator').show()
df.groupBy('treatment').stddevPop('numerator').show()
df.groupBy('treatment').agg(Fn.stddevPop('numerator').alias('numerator')).show()
df.groupBy('treatment').agg({'numerator':'stddevPop', 'numerator_pre':'stddevPop'}).show()
"""
return DfFnColWrapper(AggStddevPopDfFunction(), {}, [col])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="stddevSamp")
@register_fn(engine=OlapEngineType.STARROCKS, name="stddev_samp")
@define_args(FnArg(name="col"))
@aggregrate
class AggStddevSampDfFunction(DfFunction):
pass
[docs]def stddevSamp(col):
"""
stddevSamp is used to calculate the sample standard deviation of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.stddevSamp('numerator').show()
df.groupBy('treatment').stddevSamp('numerator').show()
df.groupBy('treatment').agg(Fn.stddevSamp('numerator').alias('numerator')).show()
df.groupBy('treatment').agg({'numerator':'stddevSamp', 'numerator_pre':'stddevSamp'}).show()
"""
return DfFnColWrapper(AggStddevSampDfFunction(), {}, [col])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="varPop")
@register_fn(engine=OlapEngineType.STARROCKS, name="var_pop")
@define_args(FnArg(name="col"))
@aggregrate
class AggVarPopDfFunction(DfFunction):
pass
[docs]def varPop(col):
"""
varPop is used to calculate the population variance of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.varPop('numerator').show()
df.groupBy('treatment').agg(Fn.varPop('numerator').alias('numerator')).show()
df.groupBy('treatment').varPop('numerator').show()
df.groupBy('treatment').agg({'numerator':'varPop', 'numerator_pre':'varPop'}).show()
"""
return DfFnColWrapper(AggVarPopDfFunction(), {}, [col])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="varSamp")
@register_fn(engine=OlapEngineType.STARROCKS, name="var_samp")
@define_args(FnArg(name="col"))
@aggregrate
class AggVarSampDfFunction(DfFunction):
pass
[docs]def varSamp(col):
"""
varSamp is used to calculate the sample variance of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.varSamp('numerator').show()
df.groupBy('treatment').agg(Fn.varSamp('numerator').alias('numerator')).show()
df.groupBy('treatment').varSamp('numerator').show()
df.groupBy('treatment').agg({'numerator':'varSamp', 'numerator_pre':'varSamp'}).show()
"""
return DfFnColWrapper(AggVarSampDfFunction(), {}, [col])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="corr")
@register_fn(engine=OlapEngineType.STARROCKS, name="corr")
@define_args(FnArg(name="x"), FnArg(name="y"))
@aggregrate
class AggCorrDfFunction(DfFunction):
pass
[docs]def corr(x, y):
"""
corr is used to calculate the correlation between two columns.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.corr('numerator', 'numerator_pre').show()
df.groupBy('treatment').agg(Fn.corr('numerator', 'numerator_pre').alias('numerator')).show()
df.groupBy('treatment').corr('numerator', 'numerator_pre').show()
"""
return DfFnColWrapper(AggCorrDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="count")
@register_fn(engine=OlapEngineType.STARROCKS, name="count")
@define_args(FnArg(name="expr", is_param=True))
@aggregrate
class AggCountDfFunction(DfFunction):
pass
[docs]def count(*, expr="*"):
"""
count is used to count the number of rows.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.count().show()
df.groupBy('treatment').count().show()
df.groupBy('treatment').agg(Fn.count().alias('numerator')).show()
"""
return DfFnColWrapper(AggCountDfFunction(), {"expr": expr}, [])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="max")
@register_fn(engine=OlapEngineType.STARROCKS, name="max")
@define_args(FnArg(name="lhs"), FnArg(name="rhs"))
class MaxDfFunction(DfFunction):
pass
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="max")
@register_fn(engine=OlapEngineType.STARROCKS, name="max")
@define_args(FnArg(name="x"))
@aggregrate
class AggMaxDfFunction(DfFunction):
pass
[docs]def max(*cols):
"""
max is used to calculate the maximum value of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.max('numerator').show()
df.groupBy('treatment').max('numerator').show()
df.groupBy('treatment').agg(Fn.max('numerator').alias('numerator')).show()
"""
if len(cols) == 1:
return DfFnColWrapper(AggMaxDfFunction(), {}, cols)
if len(cols) == 2:
return DfFnColWrapper(MaxDfFunction(), {}, cols)
raise Exception(f"number of columns({len(cols)}) is neither 1 nor 2.")
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="min")
@register_fn(engine=OlapEngineType.STARROCKS, name="min")
@define_args(FnArg(name="lhs"), FnArg(name="rhs"))
class MinDfFunction(DfFunction):
pass
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="min")
@register_fn(engine=OlapEngineType.STARROCKS, name="min")
@define_args(FnArg(name="x"))
@aggregrate
class AggMinDfFunction(DfFunction):
pass
[docs]def min(*cols):
"""
min is used to calculate the minimum value of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.min('numerator').show()
df.groupBy('treatment').min('numerator').show()
df.groupBy('treatment').agg(Fn.min('numerator').alias('numerator')).show()
"""
if len(cols) == 1:
return DfFnColWrapper(AggMinDfFunction(), {}, cols)
if len(cols) == 2:
return DfFnColWrapper(MinDfFunction(), {}, cols)
raise Exception(f"number of columns({len(cols)}) is neither 1 nor 2.")
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="avg")
@register_fn(engine=OlapEngineType.STARROCKS, name="avg")
@define_args(FnArg(name="x"))
@aggregrate
class AggAvgDfFunction(DfFunction):
pass
[docs]def avg(col1):
"""
avg is used to calculate the average value of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.avg('numerator').show()
df.groupBy('treatment').avg('numerator').show()
df.groupBy('treatment').agg(Fn.avg('numerator').alias('numerator')).show()
"""
return DfFnColWrapper(AggAvgDfFunction(), {}, [col1])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="sum")
@register_fn(engine=OlapEngineType.STARROCKS, name="sum")
@define_args(FnArg(name="x"))
@aggregrate
class AggSumDfFunction(DfFunction):
pass
[docs]def sum(col1):
"""
sum is used to calculate the sum of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.sum('numerator').show()
df.groupBy('treatment').sum('numerator').show()
df.groupBy('treatment').agg(Fn.sum('numerator').alias('numerator')).show()
"""
return DfFnColWrapper(AggSumDfFunction(), {}, [col1])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="mean")
@register_fn(engine=OlapEngineType.STARROCKS, name="mean")
@define_args(FnArg(name="x"))
@aggregrate
class AggMeanDfFunction(DfFunction):
pass
[docs]def mean(col1):
"""
mean is used to calculate the mean of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.mean('numerator').show()
df.groupBy('treatment').mean('numerator').show()
df.groupBy('treatment').agg(Fn.mean('numerator').alias('numerator')).show()
"""
return DfFnColWrapper(AggMeanDfFunction(), {}, [col1])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="quantile")
@register_fn(engine=OlapEngineType.STARROCKS, name="percentile_disc")
@define_args(FnArg(name="level", is_param=True), FnArg(name="x"))
@aggregrate
class AggQuantileDfFunction(DfFunction):
def sql_impl_starrocks(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
x = arg_dict["x"].sql(ctx)
level = arg_dict["level"].sql(ctx)
return self.fn_name(ctx) + f"({x}, {level})"
[docs]def quantile(x, *, level):
"""
quantile is used to calculate the quantile of a column.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.quantile('numerator', level=0.5).show()
df.groupBy('treatment').quantile('numerator', level=0.5).show()
df.groupBy('treatment').agg(Fn.quantile('numerator', level=0.5).alias('numerator')).show()
"""
return DfFnColWrapper(AggQuantileDfFunction(), {"level": level}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="covarPop")
@define_args(FnArg(name="x"), FnArg(name="y"))
@aggregrate
class AggCovarPopDfFunction(DfFunction):
pass
[docs]def covarPop(x, y):
"""
covarPop is used to calculate the population covariance between two columns.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.covarPop('numerator', 'numerator_pre').show()
df.groupBy('treatment').agg(Fn.covarPop('numerator', 'numerator_pre').alias('numerator')).show()
df.groupBy('treatment').covarPop('numerator', 'numerator_pre').show()
"""
return DfFnColWrapper(AggCovarPopDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="covarSamp")
@define_args(FnArg(name="x"), FnArg(name="y"))
@aggregrate
class AggCovarSampDfFunction(DfFunction):
pass
[docs]def covarSamp(x, y):
"""
covarSamp is used to calculate the sample covariance between two columns.
Example:
----------
.. code-block:: python
import fast_causal_inference.dataframe.functions as Fn
df = fast_causal_inference.readClickHouse('test_data_small')
df.covarSamp('numerator', 'numerator_pre').show()
df.groupBy('treatment').agg(Fn.covarSamp('numerator', 'numerator_pre').alias('numerator')).show()
df.groupBy('treatment').covarSamp('numerator', 'numerator_pre').show()
"""
return DfFnColWrapper(AggCovarSampDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="anyLast")
@define_args(FnArg(name="x"), FnArg(name="y"))
@aggregrate
class AggAnyLastDfFunction(DfFunction):
pass
def anyLast(x, y):
return DfFnColWrapper(AggAnyLastDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="anyMin")
@define_args(FnArg(name="x"), FnArg(name="y"))
@aggregrate
class AggAnyMinDfFunction(DfFunction):
pass
def anyMin(x, y):
# """
# anyMin is used to calculate the minimum value of two columns.
# >>> import fast_causal_inference.dataframe.functions as Fn
# >>> df.anyMin('x1', 'x2').show()
# """
return DfFnColWrapper(AggAnyMinDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="anyMax")
@define_args(FnArg(name="x"), FnArg(name="y"))
@aggregrate
class AggAnyMaxDfFunction(DfFunction):
pass
def anyMax(x, y):
# """
# anyMax is used to calculate the maximum value of two columns.
# >>> import fast_causal_inference.dataframe.functions as Fn
# >>> df_new = df.withColumn('new_column', Fn.anyMax('x1', 'x2'))
# """
return DfFnColWrapper(AggAnyMaxDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="sqrt")
@register_fn(engine=OlapEngineType.STARROCKS, name="sqrt")
@define_args(FnArg(name="x"))
class SqrtDfFunction(DfFunction):
pass
[docs]def sqrt(x):
"""
sqrt is used to calculate the square root of a column.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.sqrt('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(SqrtDfFunction(), {}, [x])
"""
If somehow we need to bypass Calcite, you need to implement the function `sql_impl_{engine}`
"""
# basic functions
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="abs")
@register_fn(engine=OlapEngineType.STARROCKS, name="abs")
@define_args(FnArg(name="col"))
class AbsDfFunction(DfFunction):
pass
[docs]def abs(col):
"""
abs is used to calculate the absolute value of a column.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.abs('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(AbsDfFunction(), {}, [col])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="mod")
@register_fn(engine=OlapEngineType.STARROCKS, name="mod")
@define_args(FnArg(name="x"), FnArg(name="y"))
class ModDfFunction(DfFunction):
pass
[docs]def mod(x, y):
"""
mod is used to calculate the modulo of column x by y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.mod('weight', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(ModDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="floor")
@register_fn(engine=OlapEngineType.STARROCKS, name="floor")
@define_args(FnArg(name="x"))
class FloorDfFunction(DfFunction):
pass
[docs]def floor(x):
"""
floor is used to calculate the largest integer less than or equal to the column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.floor('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(FloorDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="ceil")
@register_fn(engine=OlapEngineType.STARROCKS, name="ceil")
@define_args(FnArg(name="x"))
class CeilDfFunction(DfFunction):
pass
[docs]def ceil(x):
"""
ceil is used to calculate the smallest integer greater than or equal to the column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.ceil('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(CeilDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="rand")
@register_fn(engine=OlapEngineType.STARROCKS, name="rand")
@define_args()
class RandDfFunction(DfFunction):
pass
[docs]def rand():
"""
rand is used to generate a random number.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.rand())
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(RandDfFunction(), {}, [])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="pow")
@register_fn(engine=OlapEngineType.STARROCKS, name="pow")
@define_args(FnArg(name="x"), FnArg(name="y"))
class PowDfFunction(DfFunction):
pass
[docs]def pow(x, y):
"""
pow is used to calculate the column x raised to the power y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.pow('weight', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(PowDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="power")
@register_fn(engine=OlapEngineType.STARROCKS, name="power")
@define_args(FnArg(name="x"), FnArg(name="y"))
class PowerDfFunction(DfFunction):
pass
[docs]def power(x, y):
"""
power is used to calculate the column x raised to the power y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.power('weight', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(PowerDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="exp")
@register_fn(engine=OlapEngineType.STARROCKS, name="exp")
@define_args(FnArg(name="x"))
class ExpDfFunction(DfFunction):
pass
[docs]def exp(x):
"""
exp is used to calculate e raised to the power of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.exp('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(ExpDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="log")
@register_fn(engine=OlapEngineType.STARROCKS, name="log")
@define_args(FnArg(name="base"), FnArg(name="x"))
class LogDfFunction(DfFunction):
pass
def log(base, x):
"""
log is used to calculate the natural logarithm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.log('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LogDfFunction(), {}, [base, x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="ln")
@register_fn(engine=OlapEngineType.STARROCKS, name="ln")
@define_args(FnArg(name="x"))
class LnDfFunction(DfFunction):
pass
[docs]def ln(x):
"""
ln is used to calculate the natural logarithm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.ln('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LnDfFunction(), {}, [x])
[docs]def log(x):
"""
log is used to calculate the natural logarithm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.log('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LnDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="exp2")
@define_args(FnArg(name="x"))
class Exp2DfFunction(DfFunction):
pass
[docs]def exp2(x):
"""
exp2 is used to calculate 2 raised to the power of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.exp2('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(Exp2DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="log2")
@define_args(FnArg(name="x"))
class Log2DfFunction(DfFunction):
pass
[docs]def log2(x):
"""
log2 is used to calculate the base 2 logarithm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.log2('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(Log2DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="murmurHash3_64")
@define_args(FnArg(name="x"))
class MurmurHash3_64DfFunction(DfFunction):
pass
[docs]def murmur_hash3_64(x):
"""
murmur_hash3_64 is used to calculate the murmur3 hash of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.murmur_hash3_64('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(MurmurHash3_64DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="murmurHash3_32")
@register_fn(engine=OlapEngineType.STARROCKS, name="murmur_hash3_32")
@define_args(FnArg(name="x"))
class MurmurHash3_32DfFunction(DfFunction):
pass
[docs]def murmur_hash3_32(x):
"""
murmur_hash3_32 is used to calculate the murmur3 hash of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.murmur_hash3_32('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(MurmurHash3_32DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="isNull")
@register_fn(engine=OlapEngineType.STARROCKS, name="isnull")
@define_args(FnArg(name="x"))
class IsNullDfFunction(DfFunction):
pass
[docs]def isnull(x):
"""
isnull is used to check if column x is null.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.isnull('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(IsNullDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="isNotNull")
@register_fn(engine=OlapEngineType.STARROCKS, name="isnotnull")
@define_args(FnArg(name="x"))
class IsNotNullDfFunction(DfFunction):
pass
[docs]def isnotnull(x):
"""
isnotnull is used to check if column x is not null.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.isnotnull('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(IsNotNullDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="round")
@register_fn(engine=OlapEngineType.STARROCKS, name="round")
@define_args(FnArg(name="x"), FnArg(name="n"))
class RoundDfFunction(DfFunction):
def sql_impl_default(
self,
ctx: DfContext,
fn_args: List[FnArg],
fn_params: List[FnArg],
arg_dict: Dict,
) -> str:
x = arg_dict["x"].sql(ctx)
n = arg_dict["n"].sql(ctx)
if n == "":
return self.fn_name(ctx) + "(" + x + ")"
return self.fn_name(ctx) + "(" + x + ", " + n + ")"
[docs]def round(x, n=""):
"""
round is used to round column x to y decimal places.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.round('weight', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(RoundDfFunction(), {}, [x, n])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="cbrt")
@define_args(FnArg(name="x"))
class CBRTDfFunction(DfFunction):
pass
[docs]def cbrt(x):
"""
cbrt is used to calculate the cube root of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.cbrt('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(CBRTDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="erf")
@define_args(FnArg(name="x"))
class ERFDfFunction(DfFunction):
pass
[docs]def erf(x):
"""
erf is used to calculate the error function of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.erf('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(ERFDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="erfc")
@define_args(FnArg(name="x"))
class ERFCDfFunction(DfFunction):
pass
[docs]def erfc(x):
"""
erfc is used to calculate the complementary error function of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.erfc('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(ERFCDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="lgamma")
@define_args(FnArg(name="x"))
class LGammaDfFunction(DfFunction):
pass
[docs]def lgamma(x):
"""
lgamma is used to calculate the log gamma function of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.lgamma('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LGammaDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="tgamma")
@define_args(FnArg(name="x"))
class TGammaDfFunction(DfFunction):
pass
[docs]def tgamma(x):
"""
tgamma is used to calculate the gamma function of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.tgamma('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(TGammaDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="sin")
@register_fn(engine=OlapEngineType.STARROCKS, name="sin")
@define_args(FnArg(name="x"))
class SinDfFunction(DfFunction):
pass
[docs]def sin(x):
"""
sin is used to calculate the sine of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.sin('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(SinDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="cos")
@register_fn(engine=OlapEngineType.STARROCKS, name="cos")
@define_args(FnArg(name="x"))
class CosDfFunction(DfFunction):
pass
[docs]def cos(x):
"""
cos is used to calculate the cosine of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.cos('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(CosDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="tan")
@register_fn(engine=OlapEngineType.STARROCKS, name="tan")
@define_args(FnArg(name="x"))
class TanDfFunction(DfFunction):
pass
[docs]def tan(x):
"""
tan is used to calculate the tangent of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.tan('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(TanDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="asin")
@register_fn(engine=OlapEngineType.STARROCKS, name="asin")
@define_args(FnArg(name="x"))
class AsinDfFunction(DfFunction):
pass
[docs]def asin(x):
"""
asin is used to calculate the arcsine of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.asin('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(AsinDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="acos")
@register_fn(engine=OlapEngineType.STARROCKS, name="acos")
@define_args(FnArg(name="x"))
class AcosDfFunction(DfFunction):
pass
[docs]def acos(x):
"""
acos is used to calculate the arccosine of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.acos('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(AcosDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="atan")
@register_fn(engine=OlapEngineType.STARROCKS, name="atan")
@define_args(FnArg(name="x"))
class AtanDfFunction(DfFunction):
pass
[docs]def atan(x):
"""
atan is used to calculate the arctangent of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.atan('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(AtanDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="exp10")
@define_args(FnArg(name="x"))
class Exp10DfFunction(DfFunction):
pass
[docs]def exp10(x):
"""
exp10 is used to calculate 10 raised to the power of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.exp10('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(Exp10DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="log10")
@define_args(FnArg(name="x"))
class Log10DfFunction(DfFunction):
pass
[docs]def log10(x):
"""
log10 is used to calculate the base 10 logarithm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.log10('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(Log10DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="intExp2")
@define_args(FnArg(name="x"))
class IntExp2DfFunction(DfFunction):
pass
[docs]def intExp2(x):
"""
intExp2 is used to calculate 2 raised to the power of column x, and the result is an integer.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.intExp2('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(IntExp2DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="intExp10")
@define_args(FnArg(name="x"))
class IntExp10DfFunction(DfFunction):
pass
[docs]def intExp10(x):
"""
intExp10 is used to calculate 10 raised to the power of column x, and the result is an integer.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.intExp10('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(IntExp10DfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="gcd")
@define_args(FnArg(name="x"), FnArg(name="y"))
class GCDDfFunction(DfFunction):
pass
[docs]def gcd(x, y):
"""
gcd is used to calculate the greatest common divisor of column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.gcd('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(GCDDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="lcm")
@define_args(FnArg(name="x"), FnArg(name="y"))
class LCMDfFunction(DfFunction):
pass
[docs]def lcm(x, y):
"""
lcm is used to calculate the least common multiple of column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.lcm('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LCMDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="If")
@register_fn(engine=OlapEngineType.STARROCKS, name="if")
@define_args(FnArg(name="cond"), FnArg(name="x"), FnArg(name="y"))
class IfDfFunction(DfFunction):
pass
[docs]def If(cond, x, y):
"""
If is used to create a new column based on the condition x. If x is true, y is returned, otherwise z is returned.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.If(df['weight'] > 0.5, '>0.5', '<0.5'))
>>> df_new = df.withColumn('new_column', Fn.If('weight>0.5', 1, 0))
>>> df_new.show()
"""
return DfFnColWrapper(IfDfFunction(), {}, [cond, x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="e")
@register_fn(engine=OlapEngineType.STARROCKS, name="e")
@define_args()
class ConstantEDfFunction(DfFunction):
pass
[docs]def e():
"""
e is used to get the mathematical constant e.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.e())
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(ConstantEDfFunction(), {}, [])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="pi")
@register_fn(engine=OlapEngineType.STARROCKS, name="pi")
@define_args()
class ConstantPiDfFunction(DfFunction):
pass
[docs]def pi():
"""
pi is used to get the mathematical constant pi.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.pi())
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(ConstantPiDfFunction(), {}, [])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L1Norm")
@define_args(FnArg(name="x"))
class L1NormDfFunction(DfFunction):
pass
[docs]def L1Norm(x):
"""
L1Norm is used to calculate the L1 norm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L1Norm('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L1NormDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L2Norm")
@define_args(FnArg(name="x"))
class L2NormDfFunction(DfFunction):
pass
[docs]def L2Norm(x):
"""
L2Norm is used to calculate the L2 norm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L2Norm('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L2NormDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="LinfNorm")
@define_args(FnArg(name="x"))
class LinfNormDfFunction(DfFunction):
pass
[docs]def LinfNorm(x):
"""
LinfNorm is used to calculate the L-infinity norm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.LinfNorm('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LinfNormDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="LpNorm")
@define_args(FnArg(name="x"))
class LpNormDfFunction(DfFunction):
pass
[docs]def LpNorm(x):
"""
LpNorm is used to calculate the Lp norm of column x.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.LpNorm('weight', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LpNormDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L1Distance")
@define_args(FnArg(name="x"), FnArg(name="y"))
class L1DistanceDfFunction(DfFunction):
pass
[docs]def L1Distance(x, y):
"""
L1Distance is used to calculate the L1 distance between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L1Distance('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L1DistanceDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L2Distance")
@define_args(FnArg(name="x"), FnArg(name="y"))
class L2DistanceDfFunction(DfFunction):
pass
[docs]def L2Distance(x, y):
"""
L2Distance is used to calculate the L2 distance between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L2Distance('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L2DistanceDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L2SquaredDistance")
@define_args(FnArg(name="x"), FnArg(name="y"))
class L2SquaredDistanceDfFunction(DfFunction):
pass
[docs]def L2SquaredDistance(x, y):
"""
L2SquaredDistance is used to calculate the squared L2 distance between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L2SquaredDistance('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L2SquaredDistanceDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="LinfDistance")
@define_args(FnArg(name="x"), FnArg(name="y"))
class LinfDistanceDfFunction(DfFunction):
pass
[docs]def LinfDistance(x, y):
"""
LinfDistance is used to calculate the L-infinity distance between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.LinfDistance('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LinfDistanceDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="LpDistance")
@define_args(FnArg(name="x"), FnArg(name="y"))
class LpDistanceDfFunction(DfFunction):
pass
[docs]def LpDistance(x, y):
"""
LpDistance is used to calculate the Lp distance between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.LpDistance('weight', 'height', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LpDistanceDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L1Normalize")
@define_args(FnArg(name="x"))
class L1NormalizeDfFunction(DfFunction):
pass
[docs]def L1Normalize(x):
"""
L1Normalize is used to normalize column x using L1 norm.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L1Normalize('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L1NormalizeDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="L2Normalize")
@define_args(FnArg(name="x"))
class L2NormalizeDfFunction(DfFunction):
pass
[docs]def L2Normalize(x):
"""
L2Normalize is used to normalize column x using L2 norm.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.L2Normalize('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(L2NormalizeDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="LinfNormalize")
@define_args(FnArg(name="x"))
class LinfNormalizeDfFunction(DfFunction):
pass
[docs]def LinfNormalize(x):
"""
LinfNormalize is used to normalize column x using L-infinity norm.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.LinfNormalize('weight'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LinfNormalizeDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="LpNormalize")
@define_args(FnArg(name="x"))
class LpNormalizeDfFunction(DfFunction):
pass
[docs]def LpNormalize(x):
"""
LpNormalize is used to normalize column x using Lp norm.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.LpNormalize('weight', 2))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(LpNormalizeDfFunction(), {}, [x])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="cosineDistance")
@define_args(FnArg(name="x"), FnArg(name="y"))
class cosineDistanceDfFunction(DfFunction):
pass
[docs]def cosineDistance(x, y):
"""
cosineDistance is used to calculate the cosine distance between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.cosineDistance('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(cosineDistanceDfFunction(), {}, [x, y])
@register_fn(engine=OlapEngineType.CLICKHOUSE, name="cosineSimilarity")
@define_args(FnArg(name="x"), FnArg(name="y"))
class cosineSimilarityDfFunction(DfFunction):
pass
[docs]def cosineSimilarity(x, y):
"""
cosineSimilarity is used to calculate the cosine similarity between column x and y.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.withColumn('new_column', Fn.cosineSimilarity('weight', 'height'))
>>> df_new.avg('new_column').show()
"""
return DfFnColWrapper(cosineSimilarityDfFunction(), {}, [x, y])
[docs]def desc(column):
"""
desc is used to sort column x in descending order.
>>> import fast_causal_inference.dataframe.functions as Fn
>>> df_new = df.orderBy(Fn.desc('weight'))
>>> df_new.show()
"""
order = DfPb.Order()
order.column.name = column
order.desc = True
return order