How does it work?

At the core, dask-sql does two things:

  • Translates the SQL query using Apache Arrow DataFusion into a relational algebra, represented by a LogicalPlan enum - similar to many other SQL engines (Hive, Flink, …)

  • Converts this description of the query from the Rust enum into Dask API calls (and executes them) - returning a Dask dataframe.

The following example explains this in quite some technical details. For most of the users, this level of technical understanding is not needed.

1. SQL enters the library

No matter of via the Python API (API Documentation), the command line client (Command Line Tool) or the server (SQL Server), eventually the SQL statement by the user will end up as a string in the function sql().

2. SQL is parsed

This function will first give the SQL string to the dask_planner Rust crate via the PyO3 library. Inside this crate, Apache Arrow DataFusion is used to first parse the SQL string and then turn it into a relational algebra. For this, DataFusion uses the SQL language description specified in the sqlparser-rs library We also include SQL extensions specific to Dask-SQL. They specify custom language features, such as the CREATE MODEL statement.

3. SQL is (maybe) optimized

Once the SQL string is parsed into a Statement enum, DataFusion can convert it into a relational algebra represented by a LogicalPlan enum and optimize it. As this is only implemented for DataFusion supported syntax (and not for the custom syntax such as SqlCreateModel) this conversion and optimization is not triggered for all SQL statements (have a look into Context._get_ral()).

The logical plan is a tree structure and most enum variants (such as Projection or Join) can contain other instances as “inputs” creating a tree of different steps in the SQL statement (see below for an example).

The result is an optimized LogicalPlan.

4. Translation to Dask API calls

Each step in the LogicalPlan is converted into calls to Python functions using different Python “converters”. For each enum variant (such as Projection and Join), there exist a converter class in the dask_sql.physical.rel folder, which are registered at the dask_sql.physical.rel.convert.RelConverter class.

Their job is to use the information stored in the logical plan enum variants and turn it into calls to Python functions (see the example below for more information).

As many SQL statements contain calculations using literals and/or columns, these are split into their own functionality (dask_sql.physical.rex) following a similar plugin-based converter system. Have a look into the specific classes to understand how the conversion of a specific SQL language feature is implemented.

5. Result

The result of each of the conversions is a dask.DataFrame, which is given to the user. In case of the command line tool or the SQL server, it is evaluated immediately - otherwise it can be used for further calculations by the user.

Example

Let’s walk through the steps above using the example SQL statement

SELECT x + y FROM timeseries WHERE x > 0

assuming the table “timeseries” is already registered. If you want to follow along with the steps outlined in the following, start the command line tool in debug mode

dask-sql --load-test-data --startup --log-level DEBUG

and enter the SQL statement above.

First, the SQL is parsed by DataFusion and (as it is not a custom statement) transformed into a tree of relational algebra objects.

Projection: #timeseries.x + #timeseries.y
  Filter: #timeseries.x > Float64(0)
    TableScan: timeseries projection=[x, y]

The tree output above means, that the outer instance (Projection) needs as input the output of the previous instance (Filter) etc.

Therefore the conversion to Python API calls is called recursively (depth-first). First, the LogicalTableScan is converted using the rel.logical.table_scan.LogicalTableScanPlugin plugin. It will just get the correct dask.DataFrame from the dictionary of already registered tables of the context. Next, the LogicalFilter (having the dataframe as input), is converted via the rel.logical.filter.LogicalFilterPlugin. The filter expression >($3, 0) is converted into df["x"] > 0 using a combination of REX plugins (have a look into the debug output to learn more) and applied to the dataframe. The resulting dataframe is then passed to the converter rel.logical.project.LogicalProjectPlugin for the LogicalProject. This will calculate the expression df["x"] + df["y"] (after having converted it via the class:RexCallPlugin plugin) and return the final result to the user.

df_table_scan = context.tables["timeseries"]
df_filter = df_table_scan[df_table_scan["x"] > 0]
df_project = df_filter.assign(col=df_filter["x"] + df_filter["y"])
return df_project[["col"]]