Demonstration of distributed table ingestion on TileDB Cloud.
How to run this tutorial
We recommend running this tutorial, as well as the other tutorials in the Tutorials section, inside TileDB Cloud. By using TileDB Cloud, you can experiment while avoiding all the installation, deployment, and configuration hassles. Sign up for the free tier, spin up a TileDB Cloud notebook with a Python kernel, and follow the tutorial instructions. If you wish to learn how to run tutorials locally on your machine, read the Tutorials: Running Locally tutorial.
This tutorial shows distributed CSV ingestion on TileDB Cloud. It assumes you have already completed the Get Started section.
You can interface with TileDB Cloud using the same TileDB libraries used in the rest of the TileDB Tables tutorials. To authenticate yourself with TileDB Cloud, you need to pass an API token into the configuration of a TileDB context object. While it’s possible to pass your username and password to the TileDB Cloud API for authentication, we strongly recommend you use a REST API token instead, as it’s more controlled and prevents your login credentials from misuse. This tutorial assumes you have already stored your API token (or username and password) as environment variables.
Tip
When working inside a TileDB Cloud notebook environment, you’re already authenticated and do not need to create a REST API token.
First, import the necessary libraries, set the URI you will use throughout this tutorial, and delete any datasets with the same name.
import osimport tiledbimport warningswarnings.filterwarnings("ignore")import tiledb.sqlimport numpy as npfrom pathlib import Path# Set the appropriate environment variables.tiledb_token = os.environ["TILEDB_REST_TOKEN"]tiledb_account = os.environ["TILEDB_ACCOUNT"]# Get the bucket and region from environment variabless3_bucket = os.environ["S3_BUCKET"]# Set table URIs you will use in this tutorialtable_name ="tables_scalable_ingestion"table_uri ="tiledb://"+ tiledb_account +"/"+ table_nametable_uri_reg ="tiledb://"+ tiledb_account +"/"+ s3_bucket +"/"+ table_namecsv_file_bucket = ("s3://tiledb-inc-demo-data/examples/notebooks/nyc_yellow_tripdata/csv/")# Set the context configcfg = tiledb.Config( {"vfs.s3.no_sign_request": "true","vfs.s3.region": "us-east-1","rest.token": tiledb_token, })flatten_config =",".join(["=".join(k) for k in cfg.items()])ctx = tiledb.Ctx(cfg)# Clean up previously generated datawith tiledb.scope_ctx(ctx):if tiledb.array_exists(table_uri): tiledb.cloud.asset.delete(table_uri)
Define some extra setup details. Note that the 2016 formatted CSVs don’t include a header. This code will define the fields, data types, and order of the 2016 formatted CSVs.
header_2016_2020 = ["VendorID","tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance","RatecodeID","store_and_fwd_flag","PULocationID","DOLocationID","payment_type","fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount",]dtypes = {"VendorID": "float64","passenger_count": "float64","trip_distance": "float64","RatecodeID": "float64","store_and_fwd_flag": "str","payment_type": "float64","fare_amount": "float64","extra": "float64","mta_tax": "float64","tip_amount": "float64","tolls_amount": "float64","improvement_surcharge": "float64","total_amount": "float64",}converters = {"PULocationID": lambda x: int(x) if x elseNone,"DOLocationID": lambda x: int(x) if x elseNone,}
Create the TileDB table so that you can then ingest to it in parallel.
create_table_sql =f"""CREATE TABLE `table_scalable_ingestion_cloud` ( `tpep_pickup_datetime` timestamp(6) NOT NULL `dimension`=1 `lower_bound`='-9223372036854775807' `upper_bound`='9223372036854774807' `tile_extent`="1", `PULocationID` bigint(20) NOT NULL `dimension`=1 `lower_bound`='-9223372036854775808' `upper_bound`='9223372036854774807' `tile_extent`="1", `congestion_surcharge` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `improvement_surcharge` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `VendorID` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `payment_type` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `tpep_dropoff_datetime` timestamp(6) NOT NULL DEFAULT '0000-00-00 00:00:00.000000' `filters`='ZSTD=-1', `DOLocationID` bigint(20) NOT NULL DEFAULT -9223372036854775808 `filters`='ZSTD=-1', `mta_tax` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `passenger_count` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `trip_distance` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `RatecodeID` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `store_and_fwd_flag` text NOT NULL `filters`='ZSTD=-1', `total_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `fare_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `extra` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `tip_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1', `tolls_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1') ENGINE=MyTile `uri`='{table_uri_reg}' `array_type`='SPARSE' `capacity`=100000 `cell_order`=HILBERT `offset_filters`='ZSTD=-1' `validity_filters`='RLE'"""tiledb.cloud.sql.exec(create_table_sql)
In many real world scenarios, you will need to manipulate the data before loading it into tables. This applies to this tutorial, because the source data from New York City comes in different formats and has evolved over time. The two format versions on which this tutorial will focus are the 2016 and 2019 formats.
The 2016 format needs several steps to adjust it for loading so that it’s compatible with the 2019 format. For this, create a Python function that will handle the distributed ingestion.
def ingest_2016(array_uri, csv_file):""" This function will load CSVs that are in the 2016 format. """import tiledbimport pandasimport time ctx = tiledb.Ctx({"vfs.s3.no_sign_request": True})with tiledb.scope_ctx(ctx): pandas_start_time = time.time() vfs = tiledb.VFS() csv_file = tiledb.FileIO(vfs, csv_file, mode="rb") df = pandas.read_csv( csv_file, index_col=["tpep_pickup_datetime", "PULocationID"], parse_dates=["tpep_dropoff_datetime", "tpep_pickup_datetime"], skiprows=1, names=header_2016_2020, usecols=[i for i inrange(len(header_2016_2020))], dtype=dtypes, ) pandas_end_time = time.time() df["congestion_surcharge"] = np.float64(None)print(f"Pandas took {pandas_end_time - pandas_start_time}s") tiledb.from_pandas( array_uri, df, mode="append", fillna={"store_and_fwd_flag": ""} )
You’ll apply the same idea will for the 2019 format, but in this case, your function will just call the CSV ingestion directly.
def ingest_2019(*args, **kawrgs):""" This function will load CSVs that are in the 2019 format. """ ctx = tiledb.Ctx({"vfs.s3.no_sign_request": True})with tiledb.scope_ctx(ctx): tiledb.from_csv(*args, **kawrgs, mode="append")return
Now that you defined the functions for parsing and loading data, the next step is to list out the CSV files and then load them in parallel. To list the CSVs, create a small function that lists the bucket and filters to just the specific files needed for this tutorial.
def list_data(bucket):""" This function lists out CSV files from a bucket location. """import tiledbimport osfrom pathlib import Path ctx = tiledb.Ctx({"vfs.s3.no_sign_request": True}) files = tiledb.VFS(ctx=ctx).ls(bucket) files_with_schema_version = {}for f in files:# Find only yellow tripdataifnot f.startswith(os.path.join(bucket, "yellow_tripdata_")):continueifnot f.endswith(".csv"):continue# We use stem twice for generic support of double extensions like .csv.gz filename = Path(f).stem filename = Path(filename).stem year_month = filename.split("_")[2] year =int(year_month.split("-")[0]) month =int(year_month.split("-")[1])# Determine the year format of the csvif (year) >=2019: files_with_schema_version[f] ="2019"elif (year ==2016and month >=7) or year ==2017or year ==2018: files_with_schema_version[f] ="2016"elif year ==2015or (year ==2016and month <7): files_with_schema_version[f] ="2015"elif year <2015: files_with_schema_version[f] ="2009"return files_with_schema_version
Now call the function to list the CSV files to ingest.
After you generate the file list, you can build the task graph for ingestion. You will loop over this list of files, and create a separate parallel task for each file, by using a batch task graph. After that, you’ll pass the appropriate function based on the timestamp of the CSV file. In the following code section, no computation will take place yet, you will just prepare the task graph in a “lazy” manner.