Custom Functions and Aggregations
Custom Functions and Aggregations¶
Additional to the included SQL functionalities, it is possible to include custom functions and aggregations into the SQL queries of
The custom functions are classified into scalar functions and aggregations.
If you want to combine Machine Learning with SQL, you might also be interested in Machine Learning.
A scalar function (such as \(x \to x^2\)) turns a given column into another column of the same length.
It can be registered for usage in SQL with the
def f(x): return x ** 2 c.register_function(f, "f", [("x", np.int64)], np.int64)
The registration gives a name to the function and also adds type information on the input types and names, as well as the return type.
All usual numpy types (e.g.
np.int64) and pandas types (
Int64) are supported.
After registration, the function can be used as any other usual SQL function:
c.sql("SELECT f(column) FROM data")
Scalar functions can have one or more input parameters and can combine columns and literal values.
Row-Wise Pandas UDFs¶
In some cases it may be easier to write custom functions which process a dict like row object, such as those consumed by
These functions may be registered as above and flagged as row UDFs using the row_udf keyword argument:
def f(row): return row['a'] + row['b'] c.register_function(f, "f", [("a", np.int64), ("b", np.int64)], np.int64, row_udf=True) c.sql("SELECT f(a, b) FROM data")
** Note: Row UDFs use apply which may have unpredictable performance characteristics, depending on the function and dataframe library **
UDFs written in this way can also be extended to accept scalar arguments along with the incoming row:
def f(row, k): return row['a'] + k c.register_function(f, "f", [("a", np.int64), ("k", np.int64)], np.int64, row_udf=True) c.sql("SELECT f(a, 42) FROM data")
Aggregation functions run on a single column and turn them into a single value.
This means they can only be used in
GROUP BY aggregations.
They can be registered with the
This time however, an instance of a
dask.dataframe.Aggregation needs to be passed
instead of a plain function.
More information on dask aggregations can be found in the
my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum()) c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64) c.sql("SELECT my_sum(other_colum) FROM df GROUP BY column")
There can only ever exist a single function with the same name. No matter if this is an aggregation function or a scalar function.