API Documentation
API Documentation¶
- class dask_sql.Context(logging_level=20)¶
Main object to communicate with
dask_sql
. It holds a store of all registered data frames (= tables) and can convert SQL queries to dask data frames. The tables in these queries are referenced by the name, which is given when registering a dask dataframe.Example
from dask_sql import Context c = Context() # Register a table c.create_table("my_table", df) # Now execute an SQL query. The result is a dask dataframe result = c.sql("SELECT a, b FROM my_table") # Trigger the computation (or use the data frame for something else) result.compute()
Usually, you will only ever have a single context in your program.
See also
- DEFAULT_CATALOG_NAME = 'dask_sql'¶
- DEFAULT_SCHEMA_NAME = 'root'¶
- alter_schema(old_schema_name, new_schema_name)¶
Alter schema
- Parameters
old_schema_name –
new_schema_name –
- alter_table(old_table_name, new_table_name, schema_name=None)¶
Alter Table
- Parameters
old_table_name –
new_table_name –
schema_name –
- create_schema(schema_name)¶
Create a new schema in the database.
- Parameters
schema_name (
str
) – The name of the schema to create
- create_table(table_name, input_table, format=None, persist=False, schema_name=None, statistics=None, gpu=False, **kwargs)¶
Registering a (dask/pandas) table makes it usable in SQL queries. The name you give here can be used as table name in the SQL later.
Please note, that the table is stored as it is now. If you change the table later, you need to re-register.
Instead of passing an already loaded table, it is also possible to pass a string to a storage location. The library will then try to load the data using one of dask’s read methods. If the file format can not be deduced automatically, it is also possible to specify it via the
format
parameter. Typical file formats are csv or parquet. Any additional parameters will get passed on to the read method. Please note that some file formats require additional libraries. By default, the data will be lazily loaded. If you would like to load the data directly into memory you can do so by setting persist=True.See Data Loading and Input for more information.
Example
This code registers a data frame as table “data” and then uses it in a query.
c.create_table("data", df) df_result = c.sql("SELECT a, b FROM data")
This code reads a file from disk. Please note that we assume that the file(s) are reachable under this path from every node in the cluster
c.create_table("data", "/home/user/data.csv") df_result = c.sql("SELECT a, b FROM data")
This example reads from a hive table.
from pyhive.hive import connect cursor = connect("localhost", 10000).cursor() c.create_table("data", cursor, hive_table_name="the_name_in_hive") df_result = c.sql("SELECT a, b FROM data")
- Parameters
table_name – (
str
): Under which name should the new table be addressableinput_table (
dask.dataframe.DataFrame
orpandas.DataFrame
orstr
orhive.Cursor
) – The data frame/location/hive connection to register.format (
str
) – Only used when passing a string into theinput
parameter. Specify the file format directly here if it can not be deduced from the extension. If set to “memory”, load the data from a published dataset in the dask cluster.persist (
bool
) – Only used when passing a string into theinput
parameter. Set to true to turn on loading the file data directly into memory.schema_name – (
str
): in which schema to create the table. By default, will use the currently selected schema.statistics – (
Statistics
): if given, use these statistics during the cost-based optimization.gpu – (
bool
): if set to true, use dask-cudf to run the data frame calculations on your GPU. Please note that the GPU support is currently not covering all of dask-sql’s SQL language.**kwargs – Additional arguments for specific formats. See Data Loading and Input for more information.
- drop_schema(schema_name)¶
Remove a schema with the given name from the registered schemas. This will also delete all tables, functions etc.
- Parameters
schema_name – (
str
): Which schema to remove.
- drop_table(table_name, schema_name=None)¶
Remove a table with the given name from the registered tables. This will also delete the dataframe.
- Parameters
table_name – (
str
): Which table to remove.
- explain(sql, dataframes=None, gpu=False)¶
Return the stringified relational algebra that this query will produce once triggered (with
sql()
). Helpful to understand the inner workings of dask-sql, but typically not needed to query your data.If the query is of DDL type (e.g. CREATE TABLE or DESCRIBE SCHEMA), no relational algebra plan is created and therefore nothing returned.
- Parameters
sql (
str
) – The query string to usedataframes (
Dict[str, dask.dataframe.DataFrame]
) – additional Dask or pandas dataframes to register before executing this querygpu (
bool
) – Whether or not to load the additional Dask or pandas dataframes (if any) on GPU; requires cuDF / dask-cuDF if enabled. Defaults to False.
- Returns
a description of the created relational algebra.
- Return type
str
- fqn(tbl)¶
Return the fully qualified name of an object, maybe including the schema name.
- Parameters
tbl (
DaskTable
) – The Rust DaskTable instance of the view or table.- Returns
The fully qualified name of the object
- Return type
tuple
ofstr
- ipython_magic(auto_include=False, disable_highlighting=True)¶
Register a new ipython/jupyter magic function “sql” which sends its input as string to the
sql()
function. After calling this magic function in a Jupyter notebook or an IPython shell, you can write%sql SELECT * from data
or
%%sql SELECT * from data
instead of
c.sql("SELECT * from data")
- Parameters
auto_include (
bool
) –If set to true, automatically create a table for every pandas or Dask dataframe in the calling context. That means, if you define a dataframe in your jupyter notebook you can use it with the same name in your sql call. Use this setting with care as any defined dataframe can easily override tables created via CREATE TABLE.
df = ... # Later, without any calls to create_table %%sql SELECT * FROM df
disable_highlighting (
bool
) – If set to true, automatically disable syntax highlighting. If you are working in jupyter lab, diable_highlighting must be set to true to enable ipython_magic functionality. If you are working in a classic jupyter notebook, you may set disable_highlighting=False if desired.
- register_aggregation(f, name, parameters, return_type, replace=False, schema_name=None)¶
Register a custom aggregation with the given name. The aggregation can be used (with this name) in every SQL queries from now on - but only for aggregation operations (no scalar function calls). This means, if you register a aggregation “fagg”, you can now call
SELECT fagg(y) FROM df GROUP BY x
Please note that you can always only have one function with the same name; no matter if it is an aggregation or scalar function.
For the registration, you need to supply both the list of parameter and parameter types as well as the return type. Use numpy dtypes if possible.
More information: Custom Functions and Aggregations
Example
The following code registers a new aggregation “fagg”, which computes the sum of a column and uses it on the
y
column.fagg = dd.Aggregation("fagg", lambda x: x.sum(), lambda x: x.sum()) c.register_aggregation(fagg, "fagg", [("x", np.float64)], np.float64) sql = "SELECT fagg(y) FROM df GROUP BY x" df_result = c.sql(sql)
- Parameters
f (
dask.dataframe.Aggregate
) – The aggregate to register. See the dask documentation for more information.name (
str
) – Under which name should the new aggregate be addressable in SQLparameters (
List[Tuple[str, type]]
) –A list ot tuples of parameter name and parameter type. Use numpy dtypes if possible.
return_type (
type
) – The return type of the functionreplace (
bool
) – Do not raise an error if the function is already present
See also
- register_dask_table(df, name, *args, **kwargs)¶
Outdated version of
create_table()
.
- register_experiment(experiment_name, experiment_results, schema_name=None)¶
- register_function(f, name, parameters, return_type, replace=False, schema_name=None, row_udf=False)¶
Register a custom function with the given name. The function can be used (with this name) in every SQL queries from now on - but only for scalar operations (no aggregations). This means, if you register a function “f”, you can now call
SELECT f(x) FROM df
Please keep in mind that you can only have one function with the same name, regardless of whether it is an aggregation or a scalar function. By default, attempting to register two functions with the same name will raise an error; setting replace=True will give precedence to the most recently registered function.
For the registration, you need to supply both the list of parameter and parameter types as well as the return type. Use numpy dtypes if possible.
More information: Custom Functions and Aggregations
Example
This example registers a function “f”, which calculates the square of an integer and applies it to the column
x
.def f(x): return x ** 2 c.register_function(f, "f", [("x", np.int64)], np.int64) sql = "SELECT f(x) FROM df" df_result = c.sql(sql)
- Example of overwriting two functions with the same name:
This example registers a different function “f”, which calculates the floor division of an integer and applies it to the column
x
. It also shows how to overwrite the previous function with the replace parameter.def f(x): return x // 2 c.register_function(f, "f", [("x", np.int64)], np.int64, replace=True) sql = "SELECT f(x) FROM df" df_result = c.sql(sql)
- Parameters
f (
Callable
) – The function to registername (
str
) – Under which name should the new function be addressable in SQLparameters (
List[Tuple[str, type]]
) –A list ot tuples of parameter name and parameter type. Use numpy dtypes if possible. This function is sensitive to the order of specified parameters when row_udf=True, and it is assumed that column arguments are specified in order, followed by scalar arguments.
return_type (
type
) – The return type of the functionreplace (
bool
) – If True, do not raise an error if a function with the same name is alreadyinstead (present;) –
False. (replace the original function. Default is) –
See also
- register_model(model_name, model, training_columns, schema_name=None)¶
Add a model to the model registry. A model can be anything which has a .predict function that transforms a Dask dataframe into predicted labels (as a Dask series). After model registration, the model can be used in calls to SELECT … FROM PREDICT with the given name. Instead of creating your own model and register it, you can also train a model directly in dask-sql. See the SQL command CrEATE MODEL.
- Parameters
model_name (
str
) – The name of the modelmodel – The model to store
training_columns – (list of str): The names of the columns which were used during the training.
- run_server(**kwargs)¶
Run a HTTP server for answering SQL queries using
dask-sql
.See SQL Server for more information.
- Parameters
client (
dask.distributed.Client
) – If set, use this dask client instead of a new one.host (
str
) – The host interface to listen on (defaults to all interfaces)port (
int
) – The port to listen on (defaults to 8080)log_level – (
str
): The log level of the server and dask-sql
- sql(sql, return_futures=True, dataframes=None, gpu=False, config_options=None)¶
Query the registered tables with the given SQL. The SQL follows approximately the postgreSQL standard - however, not all operations are already implemented. In general, only select statements (no data manipulation) works. For more information, see SQL Syntax.
Example
In this example, a query is called using the registered tables and then executed using dask.
result = c.sql("SELECT a, b FROM my_table") print(result.compute())
- Parameters
sql (
str
) – The query string to executereturn_futures (
bool
) – Return the unexecuted dask dataframe or the data itself. Defaults to returning the dask dataframe.dataframes (
Dict[str, dask.dataframe.DataFrame]
) – additional Dask or pandas dataframes to register before executing this querygpu (
bool
) – Whether or not to load the additional Dask or pandas dataframes (if any) on GPU; requires cuDF / dask-cuDF if enabled. Defaults to False.config_options (
Dict[str,Any]
) – Specific configuration options to pass during query execution
- Returns
the created data frame of this query.
- Return type
dask.dataframe.DataFrame
- stop_server()¶
Stop a SQL server started by
run_server
.
- visualize(sql, filename='mydask.png')¶
Visualize the computation of the given SQL into the png
- dask_sql.run_server(context=None, client=None, host='0.0.0.0', port=8080, startup=False, log_level=None, blocking=True, jdbc_metadata=False)¶
Run a HTTP server for answering SQL queries using
dask-sql
. It uses the Presto Wire Protocol for communication. This means, it has a single POST endpoint /v1/statement, which answers SQL queries (as string in the body) with the output as a JSON (in the format described in the documentation above). Every SQL expression thatdask-sql
understands can be used here.See SQL Server for more information.
Note
The presto protocol also includes some statistics on the query in the response. These statistics are currently only filled with placeholder variables.
- Parameters
context (
dask_sql.Context
) – If set, use this context instead of an empty one.client (
dask.distributed.Client
) – If set, use this dask client instead of a new one.host (
str
) – The host interface to listen on (defaults to all interfaces)port (
int
) – The port to listen on (defaults to 8080)startup (
bool
) – Whether to wait until Apache Calcite was loadedlog_level – (
str
): The log level of the server and dask-sqlblocking – (
bool
): If running in an environment with an event loop (e.g. a jupyter notebook), do not block. The server can be stopped with context.stop_server() afterwards.jdbc_metadata – (
bool
): If enabled create JDBC metadata tables using schemas and tables in the current dask_sql context
Example
It is possible to run an SQL server by using the CLI script
dask-sql-server
or by calling this function directly in your user code:from dask_sql import run_server # Create your pre-filled context c = Context() ... run_server(context=c)
After starting the server, it is possible to send queries to it, e.g. with the presto CLI or via sqlalchemy (e.g. using the PyHive package):
from sqlalchemy.engine import create_engine engine = create_engine('presto://localhost:8080/') import pandas as pd pd.read_sql_query("SELECT 1 + 1", con=engine)
Of course, it is also possible to call the usual
CREATE TABLE
commands.If in a jupyter notebook, you should run the following code
from dask_sql import Context c = Context() c.run_server(blocking=False) ... c.stop_server()
- Note:
When running in a jupyter notebook without blocking, it is not possible to access the SQL server from within the notebook, e.g. using sqlalchemy. Doing so will deadlock infinitely.
- dask_sql.cmd_loop(context=None, client=None, startup=False, log_level=None)¶
Run a REPL for answering SQL queries using
dask-sql
. Every SQL expression thatdask-sql
understands can be used here.- Parameters
context (
dask_sql.Context
) – If set, use this context instead of an empty one.client (
dask.distributed.Client
) – If set, use this dask client instead of a new one.startup (
bool
) – Whether to wait until Apache Calcite was loadedlog_level – (
str
): The log level of the server and dask-sql
Example
It is possible to run a REPL by using the CLI script in
dask-sql
or by calling this function directly in your user code:from dask_sql import cmd_loop # Create your pre-filled context c = Context() ... cmd_loop(context=c)
Of course, it is also possible to call the usual
CREATE TABLE
commands.
- class dask_sql.integrations.fugue.DaskSQLExecutionEngine(*args, **kwargs)¶
Execution engine for fugue which has dask-sql as SQL engine configured.
Please note, that so far the native SQL engine in fugue understands a larger set of SQL commands, but in turns is (on average) slower in computation and scaling.
- dask_sql.integrations.fugue.fsql_dask(sql, ctx=None, register=False, fugue_conf=None)¶
FugueSQL utility function that can consume Context directly. FugueSQL is a language extending standard SQL. It makes SQL eligible to describe end to end workflows. It also enables you to invoke python extensions in the SQL like language.
For more, please read FugueSQL Tutorial
- Parameters
sql (
str
) – Fugue SQL statementctx (
dask_sql.Context
) – The context to operate on, defaults to Noneregister (
bool
) – Whether to register named steps back to the context (if provided), defaults to Falsefugue_conf (
Any
) – a dictionary like object containing Fugue specific configs
Example
# define a custom prepartition function for FugueSQL def median(df: pd.DataFrame) -> pd.DataFrame: df["y"] = df["y"].median() return df.head(1) # create a context with some tables c = Context() ... # run a FugueSQL query using the context as input query = ''' j = SELECT df1.*, df2.x FROM df1 INNER JOIN df2 ON df1.key = df2.key PERSIST TAKE 5 ROWS PREPARTITION BY x PRESORT key PRINT TRANSFORM j PREPARTITION BY x USING median PRINT ''' result = fsql_dask(query, c, register=True) assert "j" in result assert "j" in c.tables