How does it work?
How does it work?¶
At the core,
dask-sql does two things:
Translates the SQL query using Apache Arrow DataFusion into a relational algebra, represented by a LogicalPlan enum - similar to many other SQL engines (Hive, Flink, …)
Converts this description of the query from the Rust enum into Dask API calls (and executes them) - returning a Dask dataframe.
The following example explains this in quite some technical details. For most of the users, this level of technical understanding is not needed.
1. SQL enters the library¶
No matter of via the Python API (API Documentation), the command line client (Command Line Tool) or the server (SQL Server), eventually the SQL statement by the user will end up as a string in the function
2. SQL is parsed¶
This function will first give the SQL string to the dask_planner Rust crate via the
Inside this crate, Apache Arrow DataFusion is used to first parse the SQL string and then turn it into a relational algebra.
For this, DataFusion uses the SQL language description specified in the sqlparser-rs library
We also include SQL extensions specific to Dask-SQL. They specify custom language features, such as the
CREATE MODEL statement.
3. SQL is (maybe) optimized¶
Once the SQL string is parsed into a
Statement enum, DataFusion can convert it into a relational algebra represented by a LogicalPlan enum
and optimize it. As this is only implemented for DataFusion supported syntax (and not for the custom syntax such
SqlCreateModel) this conversion and optimization is not triggered for all SQL statements (have a look
The logical plan is a tree structure and most enum variants (such as
Join) can contain
other instances as “inputs” creating a tree of different steps in the SQL statement (see below for an example).
The result is an optimized
4. Translation to Dask API calls¶
Each step in the
LogicalPlan is converted into calls to Python functions using different Python “converters”.
For each enum variant (such as
Join), there exist a converter class in
dask_sql.physical.rel folder, which are registered at the
Their job is to use the information stored in the logical plan enum variants and turn it into calls to Python functions (see the example below for more information).
As many SQL statements contain calculations using literals and/or columns, these are split into their own functionality (
dask_sql.physical.rex) following a similar plugin-based converter system.
Have a look into the specific classes to understand how the conversion of a specific SQL language feature is implemented.
The result of each of the conversions is a
dask.DataFrame, which is given to the user. In case of the command line tool or the SQL server, it is evaluated immediately - otherwise it can be used for further calculations by the user.
Let’s walk through the steps above using the example SQL statement
SELECT x + y FROM timeseries WHERE x > 0
assuming the table “timeseries” is already registered. If you want to follow along with the steps outlined in the following, start the command line tool in debug mode
dask-sql --load-test-data --startup --log-level DEBUG
and enter the SQL statement above.
First, the SQL is parsed by DataFusion and (as it is not a custom statement) transformed into a tree of relational algebra objects.
Projection: #timeseries.x + #timeseries.y Filter: #timeseries.x > Float64(0) TableScan: timeseries projection=[x, y]
The tree output above means, that the outer instance (
Projection) needs as input the output of the previous instance (
Therefore the conversion to Python API calls is called recursively (depth-first). First, the
LogicalTableScan is converted using the
rel.logical.table_scan.LogicalTableScanPlugin plugin. It will just get the correct
dask.DataFrame from the dictionary of already registered tables of the context.
LogicalFilter (having the dataframe as input), is converted via the
The filter expression
>($3, 0) is converted into
df["x"] > 0 using a combination of REX plugins (have a look into the debug output to learn more) and applied to the dataframe.
The resulting dataframe is then passed to the converter
rel.logical.project.LogicalProjectPlugin for the
This will calculate the expression
df["x"] + df["y"] (after having converted it via the class:RexCallPlugin plugin) and return the final result to the user.
df_table_scan = context.tables["timeseries"] df_filter = df_table_scan[df_table_scan["x"] > 0] df_project = df_filter.assign(col=df_filter["x"] + df_filter["y"]) return df_project[["col"]]