Custom Functions and Aggregations

Additional to the included SQL functionalities, it is possible to include custom functions and aggregations into the SQL queries of dask-sql. The custom functions are classified into scalar functions and aggregations. If you want to combine Machine Learning with SQL, you might also be interested in Machine Learning.

Scalar Functions

A scalar function (such as \(x \to x^2\)) turns a given column into another column of the same length. It can be registered for usage in SQL with the register_function() method.

Example:

def f(x):
    return x ** 2

c.register_function(f, "f", [("x", np.int64)], np.int64)

The registration gives a name to the function and also adds type information on the input types and names, as well as the return type. All usual numpy types (e.g. np.int64) and pandas types (Int64) are supported.

After registration, the function can be used as any other usual SQL function:

c.sql("SELECT f(column) FROM data")

Scalar functions can have one or more input parameters and can combine columns and literal values.

Row-Wise Pandas UDFs

In some cases it may be easier to write custom functions which process a dict like row object, such as those consumed by pandas.DataFrame.apply. These functions may be registered as above and flagged as row UDFs using the row_udf keyword argument:

def f(row):
    return row['a'] + row['b']

c.register_function(f, "f", [("a", np.int64), ("b", np.int64)], np.int64, row_udf=True)
c.sql("SELECT f(a, b) FROM data")

** Note: Row UDFs use apply which may have unpredictable performance characteristics, depending on the function and dataframe library **

UDFs written in this way can also be extended to accept scalar arguments along with the incoming row:

def f(row, k):
    return row['a'] + k

c.register_function(f, "f", [("a", np.int64), ("k", np.int64)], np.int64, row_udf=True)
c.sql("SELECT f(a, 42) FROM data")

Aggregation Functions

Aggregation functions run on a single column and turn them into a single value. This means they can only be used in GROUP BY aggregations. They can be registered with the register_aggregation() method. This time however, an instance of a dask.dataframe.Aggregation needs to be passed instead of a plain function. More information on dask aggregations can be found in the dask documentation.

Example:

my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum())
c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64)

c.sql("SELECT my_sum(other_colum) FROM df GROUP BY column")

Note

There can only ever exist a single function with the same name. No matter if this is an aggregation function or a scalar function.