Demonstration distributed queries on TileDB Cloud.
How to run this tutorial
We recommend running this tutorial, as well as the other tutorials in the Tutorials section, inside TileDB Cloud. By using TileDB Cloud, you can experiment while avoiding all the installation, deployment, and configuration hassles. Sign up for the free tier, spin up a TileDB Cloud notebook with a Python kernel, and follow the tutorial instructions. If you wish to learn how to run tutorials locally on your machine, read the Tutorials: Running Locally tutorial.
This tutorial shows distributed querying on TileDB Cloud. It assumes you have already completed the Get Started section.
You can interface with TileDB Cloud using the same TileDB libraries used in the rest of the TileDB Tables tutorials. To authenticate yourself with TileDB Cloud, you need to pass a REST API token into the configuration of a TileDB context object. While it’s possible to pass your username and password to the TileDB Cloud API for authentication, we strongly recommend you use a REST API token instead, as it’s more controlled and prevents your login credentials from misuse. This tutorial assumes you have already stored your REST API token (or username and password) as environment variables.
Tip
When working inside a TileDB Cloud notebook environment, you’re already authenticated and do not need to create a REST API token.
First, import the necessary libraries.
import osimport tiledbimport warningswarnings.filterwarnings("ignore")import tiledb.sqlimport numpy as np# Set the appropriate environment variables.tiledb_token = os.environ["TILEDB_REST_TOKEN"]tiledb_account = os.environ["TILEDB_ACCOUNT"]# Set table URI you will use in this tutorialtable_uri ="tiledb://TileDB-Inc/nyc_tlc_yellow_trip_data_2016-2022"# Set the context configcfg = tiledb.Config({"vfs.s3.region": "us-east-1", "rest.token": tiledb_token})ctx = tiledb.Ctx(cfg)
Now you will query at scale by using a TileDB Cloud task graph.
Define two Python functions for computing a distributed average and partial average.
def compute_python_avg(partial_results):""" Function to perform average from partials. """ sum_num =0 count =0for result in partial_results: sum_num += result["sum"] count += result["count"]return {"average": sum_num / count, "sum": sum_num, "count": count}
def compute_partial_averages(data, previous_data=None):""" Function to perform partial averages. """ sum_num =0 count =len(data["fare_amount"])for record in data["fare_amount"]: sum_num += recordif previous_data isnotNone: sum_num += previous_data["sum"] count += previous_data["count"]return {"sum": sum_num, "count": count}
To partition, you will query by week for an entire year. To do this, select the year 2021 and use NumPy to partition into date ranges for the year:
Now you can define the task graph by using the dag API. Using the date ranges defined earlier, you can loop over each of the week ranges, call dag.submit_array_udf for each week, and pass in the parameters of the partial average function, the date range for a week, and limit the query to only the fare_amount field.
nodes = []tasks_to_run_in_parallel =len(dates)dag = tiledb.cloud.dag.DAG( name="Distributed Average", namespace=tiledb_account, max_workers=tasks_to_run_in_parallel,)# Loop over every week to compute in parallel partitionsfor index inrange(len(dates) -1):# Define start and end boundaries start = dates[index].astype("datetime64[ns]") end = dates[index] + np.timedelta64(7, "D") - np.timedelta64(1, "ns")# Set array_udf accessing only fare_amount node = dag.submit_array_udf( table_uri, compute_partial_averages, name=f'"{start}"', ranges=[(start, end), []], attrs=["fare_amount"], ) nodes.append(node)# Accumulate and tabulate the final resultsresults = dag.submit_local(compute_python_avg, nodes)
Running the graph should take only a few seconds and shows how TileDB Cloud can achieve returning millions of records per second when issuing distributed queries.
# Kick off distributed computationimport timestart_time = time.time()dag.compute()average = results.result()end_time = time.time()print(f"results: {average}")duration = end_time - start_timeprint(f"Distributed average took {duration}s yielding {average['count'] / duration} records per second")
results: {'average': 13.518565198990611, 'sum': 833644520.4798127, 'count': 61666642}
Distributed average took 5.920797109603882s yielding 10415260.117590092 records per second