Data Loading and Input
Contents
Data Loading and Input¶
Before data can be queried with dask-sql
, it needs to be loaded into the dask cluster (or local instance) and registered with the Context
.
dask-sql
supports all dask
-compatible input formats, plus some additional formats only suitable for dask-sql
.
1. Load it via python¶
You can either use already created dask dataframes or create one by using the create_table()
function.
Chances are high, there exists already a function to load your favorite format or location (e.g. s3 or hdfs).
See below for all formats understood by dask-sql
.
Make sure to install required libraries both on the driver and worker machines:
import dask.dataframe as dd
from dask_sql import Context
c = Context()
df = dd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv")
c.create_table("my_data", df)
import dask.dataframe as dd
from dask_sql import Context
c = Context()
df = dd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv")
c.create_table("my_data", df, gpu=True)
or in short (equivalent):
from dask_sql import Context
c = Context()
c.create_table("my_data", "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv")
from dask_sql import Context
c = Context()
c.create_table("my_data", "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv", gpu=True)
2. Load it via SQL¶
If you are connected to the SQL server implementation or you do not want to issue python command calls, you can also achieve the data loading via SQL only.
CREATE TABLE my_data WITH (
format = 'csv',
location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv'
)
CREATE TABLE my_data WITH (
format = 'csv',
location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv',
gpu = True
)
The parameters are the same as in the python function described above. You can find more information in Table Creation.
Input Formats¶
dask-sql
understands (thanks to the large Dask ecosystem) a wide verity of input formats and input locations.
All formats and locations mentioned in the Dask docu, including csv, parquet, json. Just pass in the location as string (and possibly the format, e.g. “csv” if it is not clear from the file extension). The data can be from local disc or many remote locations (S3, hdfs, Azure Filesystem, http, Google Filesystem, …) - just prefix the path with the matching protocol. Additional arguments passed to
create_table()
orCREATE TABLE
are given to theread_<format>
calls.
Example:
c.create_table(
"my_data",
"s3://bucket-name/my-data-*.csv",
storage_options={'anon': True}
)
CREATE TABLE my_data WITH (
format = 'csv', -- can also be omitted, as clear from the extension
location = 's3://bucket-name/my-data-*.csv',
storage_options = (
anon = True
)
)
c.create_table(
"my_data",
"s3://bucket-name/my-data-*.csv",
gpu=True,
storage_options={'anon': True}
)
CREATE TABLE my_data WITH (
format = 'csv', -- can also be omitted, as clear from the extension
location = 's3://bucket-name/my-data-*.csv',
gpu = True,
storage_options = (
anon = True
)
)
If your data is already in Pandas (or Dask) DataFrames format, you can just use it as it is via the Python API by giving it to
create_table()
directly.You can connect
dask-sql
to an intake catalog and use the data registered there. Assuming you have an intake catalog stored in “catalog.yaml” (can also be the URL of an intake server), you can read in a stored table “data_table” either via Pythoncatalog = intake.open_catalog("catalog.yaml") c.create_table("my_data", catalog, intake_table_name="intake_table") # or c.create_table("my_data", "catalog.yaml", format="intake", intake_table_name="intake_table")
or via SQL:
CREATE TABLE my_data WITH ( format = 'intake', location = 'catalog.yaml' )
The argument
intake_table_name
is optional and defaults to the table name indask_sql
. With the argumentcatalog_kwargs
you can control how the intake catalog object is created. Additional arguments are forwarded to theto_dask()
call of intake.As an experimental feature, it is also possible to use data stored in the Apache Hive metastore. For this,
dask-sql
will retrieve the information on the storage location and format from the metastore and will then register the raw data directly in the context. This means, no Hive data query will be issued and you might be able to see a speed improvement.It is both possible to use a pyhive.hive.Cursor or an sqlalchemy connection.
from dask_sql import Context from pyhive.hive import connect import sqlalchemy c = Context() cursor = connect("hive-server", 10000).cursor() # or cursor = sqlalchemy.create_engine("hive://hive-server:10000").connect() c.create_table("my_data", cursor, hive_table_name="the_name_in_hive")
or in SQL:
CREATE TABLE my_data WITH ( location = 'hive://hive-server:10000', hive_table_name = 'the_name_in_hive' )
Again,
hive_table_name
is optional and defaults to the table name indask-sql
. You can also control the database used in Hive via thehive_schema_name
parameter. Additional arguments are pushed to the internally calledread_<format>
functions.Similarly, it is possible to load data from a Databricks Cluster (which is similar to a Hive metastore).
You need to have the
databricks-dbapi
package installed andfsspec >= 0.8.7
. A token needs to be generated for the accessing user. Thehost
,port
andhttp_path
information can be found in the JDBC tab of the cluster.from dask_sql import Context from sqlalchemy import create_engine c = Context() cursor = create_engine(f"databricks+pyhive://token:{token}@{host}:{port}/", connect_args={"http_path": http_path}).connect() c.create_table("my_data", cursor, hive_table_name="schema.table", storage_options={"instance": host, "token": token})
or in SQL
CREATE TABLE my_data WITH ( location = 'databricks+pyhive://token:{token}@{host}:{port}/', connect_args = ( http_path = '{http_path}' ), hive_table_name = 'schema.table', storage_options = ( instance = '{host}', token = '{token}' ) )
Note
For dask-sql
it does not matter how you load your data.
In all shown cases you can then use the specified table name to query your data
in a SELECT
call.
Please note however that un-persisted data will be reread from its source (e.g. on S3 or disk) on every query whereas persisted data is only read once. This will increase the query speed, but will also prevent you from seeing external updates to your data (until you reload it explicitly).