API Documentation

class dask_sql.Context

Main object to communicate with dask_sql. It holds a store of all registered data frames (= tables) and can convert SQL queries to dask data frames. The tables in these queries are referenced by the name, which is given when registering a dask dataframe.

Example

from dask_sql import Context
c = Context()

# Register a table
c.create_table("my_table", df)

# Now execute an SQL query. The result is a dask dataframe
result = c.sql("SELECT a, b FROM my_table")

# Trigger the computation (or use the data frame for something else)
result.compute()

Usually, you will only ever have a single context in your program.

create_table(table_name, input_table, file_format=None, persist=True, hive_table_name=None, hive_schema_name='default', **kwargs)

Registering a (dask/pandas) table makes it usable in SQL queries. The name you give here can be used as table name in the SQL later.

Please note, that the table is stored as it is now. If you change the table later, you need to re-register.

Instead of passing an already loaded table, it is also possible to pass a string to a storage location. The library will then try to load the data using one of dask’s read methods. If the file format can not be deduced automatically, it is also possible to specify it via the file_format parameter. Typical file formats are csv or parquet. Any additional parameters will get passed on to the read method. Please note that some file formats require additional libraries. By default, the data will be loaded directly into the memory of the nodes. If you do not want that, set persist to False.

See Data Loading and Input for more information.

Example

This code registers a data frame as table “data” and then uses it in a query.

c.create_table("data", df)
df_result = c.sql("SELECT a, b FROM data")

This code reads a file from disk. Please note that we assume that the file(s) are reachable under this path from every node in the cluster

c.create_table("data", "/home/user/data.csv")
df_result = c.sql("SELECT a, b FROM data")

This example reads from a hive table.

from pyhive.hive import connect

cursor = connect("localhost", 10000).cursor()
c.create_table("data", cursor, hive_table_name="the_name_in_hive")
df_result = c.sql("SELECT a, b FROM data")
Parameters
  • table_name – (str): Under which name should the new table be addressable

  • input_table (dask.dataframe.DataFrame or pandas.DataFrame or str or hive.Cursor) – The data frame/location/hive connection to register.

  • file_format (str) – Only used when passing a string into the input parameter. Specify the file format directly here if it can not be deduced from the extension. If set to “memory”, load the data from a published dataset in the dask cluster.

  • persist (bool) – Only used when passing a string into the input parameter. Set to false to turn off loading the file data directly into memory.

  • hive_table_name (str) – If using input from a hive table, you can specify the hive table name if different from the table_name.

  • hive_schema_name (str) – If using input from a hive table, you can specify the hive schema name.

drop_table(table_name)

Remove a table with the given name from the registered tables. This will also delete the dataframe.

Parameters

table_name – (str): Which table to remove.

explain(sql)

Return the stringified relational algebra that this query will produce once triggered (with sql()). Helpful to understand the inner workings of dask-sql, but typically not needed to query your data.

If the query is of DDL type (e.g. CREATE TABLE or DESCRIBE SCHEMA), no relational algebra plan is created and therefore nothing returned.

Parameters

sql (str) – The query string to use

Returns

a description of the created relational algebra.

Return type

str

register_aggregation(f, name, parameters, return_type)

Register a custom aggregation with the given name. The aggregation can be used (with this name) in every SQL queries from now on - but only for aggregation operations (no scalar function calls). This means, if you register a aggregation “fagg”, you can now call

SELECT fagg(y)
FROM df
GROUP BY x

Please note that you can always only have one function with the same name; no matter if it is an aggregation or scalar function.

For the registration, you need to supply both the list of parameter and parameter types as well as the return type. Use numpy dtypes if possible.

More information: Custom Functions and Aggregations

Example

The following code registers a new aggregation “fagg”, which computes the sum of a column and uses it on the y column.

fagg = dd.Aggregation("fagg", lambda x: x.sum(), lambda x: x.sum())
c.register_aggregation(fagg, "fagg", [("x", np.float64)], np.float64)

sql = "SELECT fagg(y) FROM df GROUP BY x"
df_result = c.sql(sql)
Parameters
  • f (dask.dataframe.Aggregate) – The aggregate to register. See the dask documentation for more information.

  • name (str) – Under which name should the new aggregate be addressable in SQL

  • parameters (List[Tuple[str, type]]) –

    A list ot tuples of parameter name and parameter type. Use numpy dtypes if possible.

  • return_type (type) – The return type of the function

register_dask_table(df, name)

Outdated version of create_table().

register_function(f, name, parameters, return_type)

Register a custom function with the given name. The function can be used (with this name) in every SQL queries from now on - but only for scalar operations (no aggregations). This means, if you register a function “f”, you can now call

SELECT f(x)
FROM df

Please note that you can always only have one function with the same name; no matter if it is an aggregation or scalar function.

For the registration, you need to supply both the list of parameter and parameter types as well as the return type. Use numpy dtypes if possible.

More information: Custom Functions and Aggregations

Example

This example registers a function “f”, which calculates the square of an integer and applies it to the column x.

def f(x):
    return x ** 2

c.register_function(f, "f", [("x", np.int64)], np.int64)

sql = "SELECT f(x) FROM df"
df_result = c.sql(sql)
Parameters
  • f (Callable) – The function to register

  • name (str) – Under which name should the new function be addressable in SQL

  • parameters (List[Tuple[str, type]]) –

    A list ot tuples of parameter name and parameter type. Use numpy dtypes if possible.

  • return_type (type) – The return type of the function

sql(sql, return_futures=True)

Query the registered tables with the given SQL. The SQL follows approximately the postgreSQL standard - however, not all operations are already implemented. In general, only select statements (no data manipulation) works.

For more information, see SQL Syntax.

Example

In this example, a query is called using the registered tables and then executed using dask.

result = c.sql("SELECT a, b FROM my_table")
print(result.compute())
Parameters
  • sql (str) – The query string to execute

  • return_futures (bool) – Return the unexecuted dask dataframe or the data itself. Defaults to returning the dask dataframe.

Returns

the created data frame of this query.

Return type

dask.dataframe.DataFrame

dask_sql.run_server(context=None, client=None, host='0.0.0.0', port=8080)

Run a HTTP server for answering SQL queries using dask-sql. It uses the Presto Wire Protocol for communication. This means, it has a single POST endpoint /v1/statement, which answers SQL queries (as string in the body) with the output as a JSON (in the format described in the documentation above). Every SQL expression that dask-sql understands can be used here.

See SQL Server for more information.

Note

The presto protocol also includes some statistics on the query in the response. These statistics are currently only filled with placeholder variables.

Parameters
  • context (dask_sql.Context) – If set, use this context instead of an empty one.

  • client (dask.distributed.Client) – If set, use this dask client instead of a new one.

  • host (str) – The host interface to listen on (defaults to all interfaces)

  • port (int) – The port to listen on (defaults to 8080)

Example

It is possible to run an SQL server by using the CLI script in dask_sql.server.app or by calling this function directly in your user code:

from dask_sql import run_server

# Create your pre-filled context
c = Context()
...

run_server(context=c)

After starting the server, it is possible to send queries to it, e.g. with the presto CLI or via sqlalchemy (e.g. using the PyHive package):

from sqlalchemy.engine import create_engine
engine = create_engine('presto://localhost:8080/')

import pandas as pd
pd.read_sql_query("SELECT 1 + 1", con=engine)

Of course, it is also possible to call the usual CREATE TABLE commands.