Custom Functions and Aggregations¶
Additional to the included SQL functionalities, it is possible to include custom functions and aggregations into the SQL queries of dask-sql
.
The custom functions are classified into scalar functions and aggregations.
If you want to combine Machine Learning with SQL, you might also be interested in Machine Learning.
Scalar Functions¶
A scalar function (such as \(x \to x^2\)) turns a given column into another column of the same length.
It can be registered for usage in SQL with the register_function()
method.
Example:
def f(x):
return x ** 2
c.register_function(f, "f", [("x", np.int64)], np.int64)
The registration gives a name to the function and also adds type information on the input types and names, as well as the return type.
All usual numpy types (e.g. np.int64
) and pandas types (Int64
) are supported.
After registration, the function can be used as any other usual SQL function:
c.sql("SELECT f(column) FROM data")
Scalar functions can have one or more input parameters and can combine columns and literal values.
Row-Wise Pandas UDFs¶
In some cases it may be easier to write custom functions which process a dict like row object, such as those consumed by pandas.DataFrame.apply
.
These functions may be registered as above and flagged as row UDFs using the row_udf keyword argument:
def f(row):
return row['a'] + row['b']
c.register_function(f, "f", [("a", np.int64), ("b", np.int64)], np.int64, row_udf=True)
c.sql("SELECT f(a, b) FROM data")
** Note: Row UDFs use apply which may have unpredictable performance characteristics, depending on the function and dataframe library **
UDFs written in this way can also be extended to accept scalar arguments along with the incoming row:
def f(row, k):
return row['a'] + k
c.register_function(f, "f", [("a", np.int64), ("k", np.int64)], np.int64, row_udf=True)
c.sql("SELECT f(a, 42) FROM data")
Aggregation Functions¶
Aggregation functions run on a single column and turn them into a single value.
This means they can only be used in GROUP BY
aggregations.
They can be registered with the register_aggregation()
method.
This time however, an instance of a dask.dataframe.Aggregation
needs to be passed
instead of a plain function.
More information on dask aggregations can be found in the
dask documentation.
Example:
my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum())
c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64)
c.sql("SELECT my_sum(other_colum) FROM df GROUP BY column")
Note
There can only ever exist a single function with the same name. No matter if this is an aggregation function or a scalar function.