.. _data_input: Data Loading and Input ====================== Before data can be queried with ``dask-sql``, it needs to be loaded into the Dask cluster (or local instance) and registered with the :class:`~dask_sql.Context`. ``dask-sql`` supports all ``dask``-compatible `input formats `_, plus some additional formats only suitable for ``dask-sql``. 1. Load it via Python --------------------- You can either use already created Dask DataFrames or create one by using the :func:`~dask_sql.Context.create_table` function. Chances are high, there exists already a function to load your favorite format or location (e.g. S3 or hdfs). See below for all formats understood by ``dask-sql``. Make sure to install required libraries both on the driver and worker machines: .. tabs:: .. group-tab:: CPU .. code-block:: python import dask.dataframe as dd from dask_sql import Context c = Context() df = dd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv") c.create_table("my_data", df) .. group-tab:: GPU .. code-block:: python import dask.dataframe as dd from dask_sql import Context c = Context() df = dd.read_csv("s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv") c.create_table("my_data", df, gpu=True) or in short (equivalent): .. tabs:: .. group-tab:: CPU .. code-block:: python from dask_sql import Context c = Context() c.create_table("my_data", "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv") .. group-tab:: GPU .. code-block:: python from dask_sql import Context c = Context() c.create_table("my_data", "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv", gpu=True) 2. Load it via SQL ------------------ If you are connected to the SQL server implementation or you do not want to issue Python command calls, you can also achieve the data loading via SQL only. .. tabs:: .. group-tab:: CPU .. code-block:: sql CREATE TABLE my_data WITH ( format = 'csv', location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv' ) .. group-tab:: GPU .. code-block:: sql CREATE TABLE my_data WITH ( format = 'csv', location = 's3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv', gpu = True ) The parameters are the same as in the Python function described above. You can find more information in :ref:`creation`. 3. Persist and share data on the cluster ---------------------------------------- In ``dask``, you can publish datasets with names into the cluster memory. This allows to reuse the same data from multiple clients/users in multiple sessions. For example, you can publish your data using the ``client.publish_dataset`` function of the ``distributed.Client``, and then later register it in the :class:`~dask_sql.Context` via SQL: .. code-block:: python # a dask.distributed Client client = Client(...) client.publish_dataset(my_df=df) Later in SQL: .. tabs:: .. group-tab:: CPU .. code-block:: SQL CREATE TABLE my_data WITH ( format = 'memory', location = 'my_df' ) .. group-tab:: GPU .. code-block:: SQL CREATE TABLE my_data WITH ( format = 'memory', location = 'my_df', gpu = True ) Note, that the format is set to ``memory`` and the location is the name, which was chosen when publishing the dataset. To achieve the same thing from Python, you can just use Dask's methods to get the dataset .. tabs:: .. group-tab:: CPU .. code-block:: python df = client.get_dataset("my_df") c.create_table("my_data", df) .. group-tab:: GPU .. code-block:: python df = client.get_dataset("my_df") c.create_table("my_data", df, gpu=True) Input Formats ------------- ``dask-sql`` understands (thanks to the large Dask ecosystem) a wide verity of input formats and input locations. * All formats and locations mentioned in `the Dask documentation `_, including CSV, Parquet, and JSON. Just pass in the location as string (and possibly the format, e.g. "csv" if it is not clear from the file extension). The data can be from local disc or many remote locations (S3, hdfs, Azure Filesystem, http, Google Filesystem, ...) - just prefix the path with the matching protocol. Additional arguments passed to :func:`~dask_sql.Context.create_table` or ``CREATE TABLE`` are given to the ``read_`` calls. Example: .. tabs:: .. group-tab:: CPU .. code-block:: python c.create_table( "my_data", "s3://bucket-name/my-data-*.csv", storage_options={'anon': True} ) .. code-block:: sql CREATE TABLE my_data WITH ( format = 'csv', -- can also be omitted, as clear from the extension location = 's3://bucket-name/my-data-*.csv', storage_options = ( anon = True ) ) .. group-tab:: GPU .. code-block:: python c.create_table( "my_data", "s3://bucket-name/my-data-*.csv", gpu=True, storage_options={'anon': True} ) .. code-block:: sql CREATE TABLE my_data WITH ( format = 'csv', -- can also be omitted, as clear from the extension location = 's3://bucket-name/my-data-*.csv', gpu = True, storage_options = ( anon = True ) ) * If your data is already in Pandas (or Dask) DataFrames format, you can just use it as it is via the Python API by giving it to :func:`~dask_sql.Context.create_table` directly. * You can connect ``dask-sql`` to an `intake `_ catalog and use the data registered there. Assuming you have an intake catalog stored in "catalog.yaml" (can also be the URL of an intake server), you can read in a stored table "data_table" either via Python .. code-block:: python catalog = intake.open_catalog("catalog.yaml") c.create_table("my_data", catalog, intake_table_name="intake_table") # or c.create_table("my_data", "catalog.yaml", format="intake", intake_table_name="intake_table") or via SQL: .. code-block:: sql CREATE TABLE my_data WITH ( format = 'intake', location = 'catalog.yaml' ) The argument ``intake_table_name`` is optional and defaults to the table name in ``dask_sql``. With the argument ``catalog_kwargs`` you can control how the intake catalog object is created. Additional arguments are forwarded to the ``to_dask()`` call of intake. * As an experimental feature, it is also possible to use data stored in the `Apache Hive `_ metastore. For this, ``dask-sql`` will retrieve the information on the storage location and format from the metastore and will then register the raw data directly in the context. This means, no Hive data query will be issued and you might be able to see a speed improvement. It is both possible to use a `pyhive.hive.Cursor` or an `sqlalchemy` connection. .. code-block:: python from dask_sql import Context from pyhive.hive import connect import sqlalchemy c = Context() cursor = connect("hive-server", 10000).cursor() # or cursor = sqlalchemy.create_engine("hive://hive-server:10000").connect() c.create_table("my_data", cursor, hive_table_name="the_name_in_hive") or in SQL: .. code-block:: sql CREATE TABLE my_data WITH ( location = 'hive://hive-server:10000', hive_table_name = 'the_name_in_hive' ) Again, ``hive_table_name`` is optional and defaults to the table name in ``dask-sql``. You can also control the database used in Hive via the ``hive_schema_name`` parameter. Additional arguments are pushed to the internally called ``read_`` functions. * Similarly, it is possible to load data from a `Databricks Cluster `_ (which is similar to a Hive metastore). You need to have the ``databricks-dbapi`` package installed and ``fsspec >= 0.8.7``. A token needs to be `generated `_ for the accessing user. The ``host``, ``port`` and ``http_path`` information can be found in the JDBC tab of the cluster. .. code-block:: python from dask_sql import Context from sqlalchemy import create_engine c = Context() cursor = create_engine(f"databricks+pyhive://token:{token}@{host}:{port}/", connect_args={"http_path": http_path}).connect() c.create_table("my_data", cursor, hive_table_name="schema.table", storage_options={"instance": host, "token": token}) or in SQL .. code-block:: sql CREATE TABLE my_data WITH ( location = 'databricks+pyhive://token:{token}@{host}:{port}/', connect_args = ( http_path = '{http_path}' ), hive_table_name = 'schema.table', storage_options = ( instance = '{host}', token = '{token}' ) ) .. note:: For ``dask-sql`` it does not matter how you load your data. In all shown cases you can then use the specified table name to query your data in a ``SELECT`` call. Please note however that un-persisted data will be reread from its source (e.g. on S3 or disk) on every query whereas persisted data is only read once. This will increase the query speed, but will also prevent you from seeing external updates to your data (until you reload it explicitly).