1. Structure
  2. Tables
  3. Tutorials
  4. Advanced
  5. Scalable Ingestion
  • Home
  • What is TileDB?
  • Get Started
  • Explore Content
  • Accounts
    • Individual Accounts
      • Apply for the Free Tier
      • Profile
        • Overview
        • Cloud Credentials
        • Storage Paths
        • REST API Tokens
        • Credits
    • Organization Admins
      • Create an Organization
      • Profile
        • Overview
        • Members
        • Cloud Credentials
        • Storage Paths
        • Billing
      • API Tokens
    • Organization Members
      • Organization Invitations
      • Profile
        • Overview
        • Members
        • Cloud Credentials
        • Storage Paths
        • Billing
      • API Tokens
  • Catalog
    • Introduction
    • Data
      • Arrays
      • Tables
      • Single-Cell (SOMA)
      • Genomics (VCF)
      • Biomedical Imaging
      • Vector Search
      • Files
    • Code
      • Notebooks
      • Dashboards
      • User-Defined Functions
      • Task Graphs
      • ML Models
    • Groups
    • Marketplace
    • Search
  • Collaborate
    • Introduction
    • Organizations
    • Access Control
      • Introduction
      • Share Assets
      • Asset Permissions
      • Public Assets
    • Logging
    • Marketplace
  • Analyze
    • Introduction
    • Slice Data
    • Multi-Region Redirection
    • Notebooks
      • Launch a Notebook
      • Usage
      • Widgets
      • Notebook Image Dependencies
    • Dashboards
      • Dashboards
      • Streamlit
    • Preview
    • User-Defined Functions
    • Task Graphs
    • Serverless SQL
    • Monitor
      • Task Log
      • Task Graph Log
  • Scale
    • Introduction
    • Task Graphs
    • API Usage
  • Structure
    • Why Structure Is Important
    • Arrays
      • Introduction
      • Quickstart
      • Foundation
        • Array Data Model
        • Key Concepts
          • Storage
            • Arrays
            • Dimensions
            • Attributes
            • Cells
            • Domain
            • Tiles
            • Data Layout
            • Compression
            • Encryption
            • Tile Filters
            • Array Schema
            • Schema Evolution
            • Fragments
            • Fragment Metadata
            • Commits
            • Indexing
            • Array Metadata
            • Datetimes
            • Groups
            • Object Stores
          • Compute
            • Writes
            • Deletions
            • Consolidation
            • Vacuuming
            • Time Traveling
            • Reads
            • Query Conditions
            • Aggregates
            • User-Defined Functions
            • Distributed Compute
            • Concurrency
            • Parallelism
        • Storage Format Spec
      • Tutorials
        • Basics
          • Basic Dense Array
          • Basic Sparse Array
          • Array Metadata
          • Compression
          • Encryption
          • Data Layout
          • Tile Filters
          • Datetimes
          • Multiple Attributes
          • Variable-Length Attributes
          • String Dimensions
          • Nullable Attributes
          • Multi-Range Reads
          • Query Conditions
          • Aggregates
          • Deletions
          • Catching Errors
          • Configuration
          • Basic S3 Example
          • Basic TileDB Cloud
          • fromDataFrame
          • Palmer Penguins
        • Advanced
          • Schema Evolution
          • Advanced Writes
            • Write at a Timestamp
            • Get Fragment Info
            • Consolidation
              • Fragments
              • Fragment List
              • Consolidation Plan
              • Commits
              • Fragment Metadata
              • Array Metadata
            • Vacuuming
              • Fragments
              • Commits
              • Fragment Metadata
              • Array Metadata
          • Advanced Reads
            • Get Fragment Info
            • Time Traveling
              • Introduction
              • Fragments
              • Array Metadata
              • Schema Evolution
          • Array Upgrade
          • Backends
            • Amazon S3
            • Azure Blob Storage
            • Google Cloud Storage
            • MinIO
            • Lustre
          • Virtual Filesystem
          • User-Defined Functions
          • Distributed Compute
          • Result Estimation
          • Incomplete Queries
        • Management
          • Array Schema
          • Groups
          • Object Management
        • Performance
          • Summary of Factors
          • Dense vs. Sparse
          • Dimensions vs. Attributes
          • Compression
          • Tiling and Data Layout
          • Tuning Writes
          • Tuning Reads
      • API Reference
    • Tables
      • Introduction
      • Quickstart
      • Foundation
        • Data Model
        • Key Concepts
          • Indexes
          • Columnar Storage
          • Compression
          • Data Manipulation
          • Optimize Tables
          • ACID
          • Serverless SQL
          • SQL Connectors
          • Dataframes
          • CSV Ingestion
      • Tutorials
        • Basics
          • Ingestion with SQL
          • CSV Ingestion
          • Basic S3 Example
          • Running Locally
        • Advanced
          • Scalable Ingestion
          • Scalable Queries
      • API Reference
    • AI & ML
      • Vector Search
        • Introduction
        • Quickstart
        • Foundation
          • Data Model
          • Key Concepts
            • Vector Search
            • Vector Databases
            • Algorithms
            • Distance Metrics
            • Updates
            • Deployment Methods
            • Architecture
            • Distributed Compute
          • Storage Format Spec
        • Tutorials
          • Basics
            • Ingestion & Querying
            • Updates
            • Deletions
            • Basic S3 Example
            • Running Locally
          • Advanced
            • Versioning
            • Time Traveling
            • Consolidation
            • Distributed Compute
            • RAG LLM
            • LLM Memory
            • File Search
            • Image Search
            • Protein Search
          • Performance
        • API Reference
      • ML Models
        • Introduction
        • Quickstart
        • Foundation
          • Basics
          • Storage
          • Cloud Execution
          • Why TileDB for Machine Learning
        • Tutorials
          • Ingestion
            • Data Ingestion
              • Dense Datasets
              • Sparse Datasets
            • ML Model Ingestion
          • Management
            • Array Schema
            • Machine Learning: Groups
            • Time Traveling
    • Life Sciences
      • Single-cell
        • Introduction
        • Quickstart
        • Foundation
          • Data Model
          • Key Concepts
            • Data Structures
            • Use of Apache Arrow
            • Join IDs
            • State Management
            • TileDB Cloud URIs
          • SOMA API Specification
        • Tutorials
          • Data Ingestion
          • Bulk Ingestion Tutorial
          • Data Access
          • Distributed Compute
          • Basic S3 Example
          • Multi-Experiment Queries
          • Appending Data to a SOMA Experiment
          • Add New Measurements
          • SQL Queries
          • Running Locally
          • Shapes in TileDB-SOMA
          • Drug Discovery App
        • Spatial
          • Introduction
          • Foundation
            • Spatial Data Model
            • Data Structures
          • Tutorials
            • Spatial Data Ingestion
            • Access Spatial Data
            • Manage Coordinate Spaces
        • API Reference
      • Population Genomics
        • Introduction
        • Quickstart
        • Foundation
          • Data Model
          • Key Concepts
            • The N+1 Problem
            • Architecture
            • Arrays
            • Ingestion
            • Reads
            • Variant Statistics
            • Annotations
            • User-Defined Functions
            • Tables and SQL
            • Distributed Compute
          • Storage Format Spec
        • Tutorials
          • Basics
            • Basic Ingestion
            • Basic Queries
            • Export to VCF
            • Add New Samples
            • Deleting Samples
            • Basic S3 Example
            • Basic TileDB Cloud
          • Advanced
            • Scalable Ingestion
            • Scalable Queries
            • Query Transforms
            • Handling Large Queries
            • Annotations
              • Finding Annotations
              • Embedded Annotations
              • External Annotations
              • Annotation VCFs
              • Ingesting Annotations
            • Variant Statistics
            • Tables and SQL
            • User-Defined Functions
            • Sample Metadata
            • Split VCF
          • Performance
        • API Reference
          • Command Line Interface
          • Python API
          • Cloud API
      • Biomedical Imaging
        • Introduction
        • Foundation
          • Data Model
          • Key Concepts
            • Arrays
            • Ingestion
            • Reads
            • User Defined Functions
          • Storage Format Spec
        • Quickstart
        • Tutorials
          • Basics
            • Ingestion
            • Read
              • OpenSlide
              • TileDB-Py
          • Advanced
            • Batched Ingestion
            • Chunked Ingestion
            • Machine Learning
              • PyTorch
            • Napari
    • Files
  • API Reference
  • Self-Hosting
    • Installation
    • Upgrades
    • Administrative Tasks
    • Image Customization
      • Customize User-Defined Function Images
      • AWS ECR Container Registry
      • Customize Jupyter Notebook Images
    • Single Sign-On
      • Configure Single Sign-On
      • OpenID Connect
      • Okta SCIM
      • Microsoft Entra
  • Glossary
  1. Structure
  2. Tables
  3. Tutorials
  4. Advanced
  5. Scalable Ingestion

Tables: Scalable Ingestion

tutorials
tables
ingestion
This page includes a demonstration of distributed table ingestion on TileDB.
How to run this tutorial

We recommend running this tutorial, as well as the other tutorials in the Tutorials section, inside TileDB Cloud. By using TileDB Cloud, you can experiment while avoiding all the installation, deployment, and configuration hassles. Sign up for the free tier, spin up a TileDB Cloud notebook with a Python kernel, and follow the tutorial instructions. If you wish to learn how to run tutorials locally on your machine, read the Tutorials: Running Locally tutorial.

This tutorial shows distributed CSV ingestion on TileDB Cloud. It assumes you have already completed the Get Started section.

You can interface with TileDB Cloud by using the same TileDB libraries used in the rest of the TileDB Tables tutorials. To authenticate yourself with TileDB Cloud, you need to pass a REST API token into the configuration of a TileDB context object. While it’s possible to pass your username and password to the TileDB Cloud API for authentication, we strongly recommend you use a REST API token instead, as it’s more controlled and prevents your login credentials from misuse. This tutorial assumes you have already stored your API token (or username and password) as environment variables.

Tip

When working inside a TileDB Cloud notebook environment, you’re already authenticated and do not need to create a REST API token.

Note

TileDB’s SQL query support is now provided by TileDB Tables. The TileDB storage engine for MariaDB, imported as tiledb.sql, and also known as MyTile or TileDB-MariaDB, is now deprecated.

To run this tutorial, you need to create the following environment variables:

Variable Description
$S3_BUCKET The name of your S3 bucket, prefixed with the scheme (for example, s3://your_bucket_name).
$TILEDB_ACCOUNT Your TileDB account username.
$TILEDB_REST_API_SERVER_ADDRESS* The TileDB REST API server address to which you’ll connect.
$TILEDB_REST_TOKEN* Your TileDB REST API token.

* Required only if you’re running this tutorial outside a TileDB notebook environment.

You can create these environment variables in Python as follows:

  • Python
import os

os.environ["S3_BUCKET"] = "s3://<your_bucket_name>"
os.environ["TILEDB_ACCOUNT"] = "<your_tiledb_account_username>"
os.environ["TILEDB_REST_API_SERVER_ADDRESS"] = "<rest_api_server_address>"
with open("~/.tiledb_rest_api_token", "r") as f:
    # Store your TileDB REST API token in a file named ".tiledb_rest_api_token" in your home directory.
    os.environ["TILEDB_REST_TOKEN"] = f.read().strip()

First, import the necessary libraries, set the URI you will use throughout this tutorial, and delete any datasets with the same name.

import os
import warnings

import tiledb

warnings.filterwarnings("ignore")
from pathlib import Path

import numpy as np
import tiledb.sql

# Set the appropriate environment variables.
tiledb_token = os.environ["TILEDB_REST_TOKEN"]
tiledb_account = os.environ["TILEDB_ACCOUNT"]

# Get the bucket and region from environment variables
s3_bucket = os.environ["S3_BUCKET"]

# Set table URIs you will use in this tutorial
table_name = "tables_scalable_ingestion"
table_uri = "tiledb://" + tiledb_account + "/" + table_name
table_uri_reg = "tiledb://" + tiledb_account + "/" + s3_bucket + "/" + table_name
csv_file_bucket = (
    "s3://tiledb-inc-demo-data/examples/notebooks/nyc_yellow_tripdata/csv/"
)

# Set the context config
cfg = tiledb.Config(
    {
        "vfs.s3.no_sign_request": "true",
        "vfs.s3.region": "us-east-1",
        "rest.token": tiledb_token,
    }
)
flatten_config = ",".join(["=".join(k) for k in cfg.items()])
ctx = tiledb.Ctx(cfg)

# Clean up previously generated data
with tiledb.scope_ctx(ctx):
    if tiledb.array_exists(table_uri):
        tiledb.cloud.asset.delete(table_uri)

You will use a subset from the latest New York City Taxi and Limousine Commission Trip Record Data dataset.

Define some extra setup details. Note that the 2016 formatted CSVs don’t include a header. This code will define the fields, data types, and order of the 2016 formatted CSVs.

header_2016_2020 = [
    "VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "RatecodeID",
    "store_and_fwd_flag",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
]

dtypes = {
    "VendorID": "float64",
    "passenger_count": "float64",
    "trip_distance": "float64",
    "RatecodeID": "float64",
    "store_and_fwd_flag": "str",
    "payment_type": "float64",
    "fare_amount": "float64",
    "extra": "float64",
    "mta_tax": "float64",
    "tip_amount": "float64",
    "tolls_amount": "float64",
    "improvement_surcharge": "float64",
    "total_amount": "float64",
}

converters = {
    "PULocationID": lambda x: int(x) if x else None,
    "DOLocationID": lambda x: int(x) if x else None,
}

Create the TileDB table so that you can then ingest to it in parallel.

create_table_sql = f"""
CREATE TABLE `table_scalable_ingestion_cloud` (
  `tpep_pickup_datetime` timestamp(6) NOT NULL `dimension`=1 `lower_bound`='-9223372036854775807' `upper_bound`='9223372036854774807' `tile_extent`="1",
  `PULocationID` bigint(20) NOT NULL `dimension`=1 `lower_bound`='-9223372036854775808' `upper_bound`='9223372036854774807' `tile_extent`="1",
  `congestion_surcharge` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `improvement_surcharge` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `VendorID` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `payment_type` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `tpep_dropoff_datetime` timestamp(6) NOT NULL DEFAULT '0000-00-00 00:00:00.000000' `filters`='ZSTD=-1',
  `DOLocationID` bigint(20) NOT NULL DEFAULT -9223372036854775808 `filters`='ZSTD=-1',
  `mta_tax` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `passenger_count` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `trip_distance` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `RatecodeID` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `store_and_fwd_flag` text NOT NULL `filters`='ZSTD=-1',
  `total_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `fare_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `extra` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `tip_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1',
  `tolls_amount` double NOT NULL DEFAULT 0 `filters`='ZSTD=-1'
) ENGINE=MyTile `uri`='{table_uri_reg}' `array_type`='SPARSE' `capacity`=100000 `cell_order`=HILBERT `offset_filters`='ZSTD=-1' `validity_filters`='RLE'

"""

tiledb.cloud.sql.exec(create_table_sql)

In many real world scenarios, you will need to manipulate the data before loading it into tables. This applies to this tutorial, because the source data from New York City comes in different formats and has evolved over time. The two format versions on which this tutorial will focus are the 2016 and 2019 formats.

The 2016 format needs several steps to adjust it for loading so that it’s compatible with the 2019 format. For this, create a Python function that will handle the distributed ingestion.

def ingest_2016(array_uri, csv_file):
    """
    This function will load CSVs that are in the 2016 format.
    """
    import time

    import pandas
    import tiledb

    ctx = tiledb.Ctx({"vfs.s3.no_sign_request": True})
    with tiledb.scope_ctx(ctx):
        pandas_start_time = time.time()
        vfs = tiledb.VFS()
        csv_file = tiledb.FileIO(vfs, csv_file, mode="rb")
        df = pandas.read_csv(
            csv_file,
            index_col=["tpep_pickup_datetime", "PULocationID"],
            parse_dates=["tpep_dropoff_datetime", "tpep_pickup_datetime"],
            skiprows=1,
            names=header_2016_2020,
            usecols=[i for i in range(len(header_2016_2020))],
            dtype=dtypes,
        )
        pandas_end_time = time.time()
        df["congestion_surcharge"] = np.float64(None)
        print(f"Pandas took {pandas_end_time - pandas_start_time}s")
        tiledb.from_pandas(
            array_uri, df, mode="append", fillna={"store_and_fwd_flag": ""}
        )

You’ll apply the same idea will for the 2019 format, but in this case, your function will just call the CSV ingestion directly.

def ingest_2019(*args, **kawrgs):
    """
    This function will load CSVs that are in the 2019 format.
    """
    ctx = tiledb.Ctx({"vfs.s3.no_sign_request": True})
    with tiledb.scope_ctx(ctx):
        tiledb.from_csv(*args, **kawrgs, mode="append")
    return

Now that you defined the functions for parsing and loading data, the next step is to list out the CSV files and then load them in parallel. To list the CSVs, create a small function that lists the bucket and filters to just the specific files needed for this tutorial.

def list_data(bucket):
    """
    This function lists out CSV files from a bucket location.
    """
    import os
    from pathlib import Path

    import tiledb

    ctx = tiledb.Ctx({"vfs.s3.no_sign_request": True})
    files = tiledb.VFS(ctx=ctx).ls(bucket)

    files_with_schema_version = {}
    for f in files:
        # Find only yellow tripdata
        if not f.startswith(os.path.join(bucket, "yellow_tripdata_")):
            continue
        if not f.endswith(".csv"):
            continue
        # We use stem twice for generic support of double extensions like .csv.gz
        filename = Path(f).stem
        filename = Path(filename).stem

        year_month = filename.split("_")[2]
        year = int(year_month.split("-")[0])
        month = int(year_month.split("-")[1])

        # Determine the year format of the csv
        if (year) >= 2019:
            files_with_schema_version[f] = "2019"
        elif (year == 2016 and month >= 7) or year == 2017 or year == 2018:
            files_with_schema_version[f] = "2016"
        elif year == 2015 or (year == 2016 and month < 7):
            files_with_schema_version[f] = "2015"
        elif year < 2015:
            files_with_schema_version[f] = "2009"
    return files_with_schema_version

Now call the function to list the CSV files to ingest.

files_with_schema_version = list_data(csv_file_bucket)

After you generate the file list, you can build the task graph for ingestion. You will loop over this list of files, and create a separate parallel task for each file, by using a batch task graph. After that, you’ll pass the appropriate function based on the timestamp of the CSV file. In the following code section, no computation will take place yet, you will just prepare the task graph in a “lazy” manner.

dag = tiledb.cloud.dag.DAG(
    name="nyc-yellow-taxi-ingestion",
    mode=tiledb.cloud.dag.Mode.BATCH,
    namespace=tiledb_account,
)

for csv_file in files_with_schema_version.items():
    filename = Path(csv_file[0]).stem
    filename = Path(filename).stem
    if csv_file[1] == "2019":
        dag.submit(
            ingest_2019,
            table_uri,
            csv_file[0],
            index_col=["tpep_pickup_datetime", "PULocationID"],
            parse_dates=["tpep_dropoff_datetime", "tpep_pickup_datetime"],
            fillna={"store_and_fwd_flag": ""},
            resources={"cpu": "2", "memory": "8Gi"},
            name=filename,
        )
    elif csv_file[1] == "2016":
        dag.submit(
            ingest_2016,
            table_uri,
            csv_file[0],
            resources={"cpu": "2", "memory": "8Gi"},
            name=filename,
        )
    else:
        # Ignore files in older than 2016 format
        pass

Execute the task graph and wait for it to complete. This will take around 5 minutes, with the majority of the time spent by pandas parsing the CSVs.

dag.compute()
dag.wait()

Now that you loaded the data, you can query it directly using SQL or Python. Read the first ten records.

tiledb.cloud.sql.exec(f"SELECT * FROM `{table_uri}` LIMIT 10")
tpep_pickup_datetime PULocationID tolls_amount tip_amount extra VendorID payment_type congestion_surcharge improvement_surcharge tpep_dropoff_datetime DOLocationID mta_tax passenger_count trip_distance RatecodeID store_and_fwd_flag total_amount fare_amount
0 2020-04-30 21:47:02+00:00 238 0.00 0.00 3.0 1.0 2.0 2.5 0.3 2020-04-30 22:01:32+00:00 243 0.5 1.0 5.30 1.0 b'N' 20.30 16.50
1 2020-04-30 21:47:31+00:00 161 0.00 2.15 3.0 1.0 1.0 2.5 0.3 2020-04-30 21:53:54+00:00 68 0.5 1.0 1.40 1.0 b'N' 12.95 7.00
2 2020-04-30 21:49:00+00:00 161 6.12 2.75 0.0 0.0 0.0 0.0 0.3 2020-04-30 22:16:00+00:00 129 0.5 0.0 10.09 0.0 b'' 26.65 16.98
3 2020-04-30 21:50:00+00:00 48 0.00 0.00 0.0 0.0 0.0 2.5 0.3 2020-04-30 22:02:00+00:00 74 0.5 0.0 4.65 0.0 b'' 22.01 18.71
4 2020-04-30 21:50:07+00:00 140 0.00 0.00 3.0 1.0 2.0 2.5 0.3 2020-04-30 21:55:15+00:00 236 0.5 1.0 0.90 1.0 b'N' 9.80 6.00
5 2020-04-30 21:50:15+00:00 152 0.00 2.19 0.5 2.0 1.0 0.0 0.3 2020-04-30 21:55:24+00:00 166 0.5 2.0 1.03 1.0 b'N' 9.49 6.00
6 2020-04-30 21:50:17+00:00 162 0.00 1.55 3.0 1.0 1.0 2.5 0.3 2020-04-30 21:52:36+00:00 100 0.5 1.0 0.60 1.0 b'N' 9.35 4.00
7 2020-04-30 21:50:22+00:00 159 6.12 0.00 0.0 0.0 0.0 0.0 0.3 2020-04-30 22:19:00+00:00 76 0.5 0.0 17.91 0.0 b'' 55.99 49.07
8 2020-04-30 21:51:59+00:00 35 0.00 2.75 0.0 0.0 0.0 0.0 0.3 2020-04-30 22:29:22+00:00 48 0.5 0.0 10.44 0.0 b'' 41.02 37.47
9 2020-04-30 21:52:00+00:00 89 0.00 0.00 0.0 0.0 0.0 2.5 0.3 2020-04-30 22:25:00+00:00 75 0.5 0.0 15.14 0.0 b'' 45.36 42.06

Finally, clean up the data you created.

# Clean up previously generated data
with tiledb.scope_ctx(ctx):
    if tiledb.array_exists(table_uri):
        tiledb.cloud.asset.delete(table_uri)
Advanced
Scalable Queries