1. Structure
  2. Arrays
  3. Tutorials
  4. Advanced
  5. Distributed Compute
  • Home
  • What is TileDB?
  • Get Started
  • Explore Content
  • Accounts
    • Individual Accounts
      • Apply for the Free Tier
      • Profile
        • Overview
        • Cloud Credentials
        • Storage Paths
        • REST API Tokens
        • Credits
    • Organization Admins
      • Create an Organization
      • Profile
        • Overview
        • Members
        • Cloud Credentials
        • Storage Paths
        • Billing
      • API Tokens
    • Organization Members
      • Organization Invitations
      • Profile
        • Overview
        • Members
        • Cloud Credentials
        • Storage Paths
        • Billing
      • API Tokens
  • Catalog
    • Introduction
    • Data
      • Arrays
      • Tables
      • Single-Cell (SOMA)
      • Genomics (VCF)
      • Biomedical Imaging
      • Vector Search
      • Files
    • Code
      • Notebooks
      • Dashboards
      • User-Defined Functions
      • Task Graphs
      • ML Models
    • Groups
    • Marketplace
    • Search
  • Collaborate
    • Introduction
    • Organizations
    • Access Control
      • Introduction
      • Share Assets
      • Asset Permissions
      • Public Assets
    • Logging
    • Marketplace
  • Analyze
    • Introduction
    • Slice Data
    • Multi-Region Redirection
    • Notebooks
      • Launch a Notebook
      • Usage
      • Widgets
      • Notebook Image Dependencies
    • Dashboards
      • Dashboards
      • Streamlit
    • Preview
    • User-Defined Functions
    • Task Graphs
    • Serverless SQL
    • Monitor
      • Task Log
      • Task Graph Log
  • Scale
    • Introduction
    • Task Graphs
    • API Usage
  • Structure
    • Why Structure Is Important
    • Arrays
      • Introduction
      • Quickstart
      • Foundation
        • Array Data Model
        • Key Concepts
          • Storage
            • Arrays
            • Dimensions
            • Attributes
            • Cells
            • Domain
            • Tiles
            • Data Layout
            • Compression
            • Encryption
            • Tile Filters
            • Array Schema
            • Schema Evolution
            • Fragments
            • Fragment Metadata
            • Commits
            • Indexing
            • Array Metadata
            • Datetimes
            • Groups
            • Object Stores
          • Compute
            • Writes
            • Deletions
            • Consolidation
            • Vacuuming
            • Time Traveling
            • Reads
            • Query Conditions
            • Aggregates
            • User-Defined Functions
            • Distributed Compute
            • Concurrency
            • Parallelism
        • Storage Format Spec
      • Tutorials
        • Basics
          • Basic Dense Array
          • Basic Sparse Array
          • Array Metadata
          • Compression
          • Encryption
          • Data Layout
          • Tile Filters
          • Datetimes
          • Multiple Attributes
          • Variable-Length Attributes
          • String Dimensions
          • Nullable Attributes
          • Multi-Range Reads
          • Query Conditions
          • Aggregates
          • Deletions
          • Catching Errors
          • Configuration
          • Basic S3 Example
          • Basic TileDB Cloud
          • fromDataFrame
          • Palmer Penguins
        • Advanced
          • Schema Evolution
          • Advanced Writes
            • Write at a Timestamp
            • Get Fragment Info
            • Consolidation
              • Fragments
              • Fragment List
              • Consolidation Plan
              • Commits
              • Fragment Metadata
              • Array Metadata
            • Vacuuming
              • Fragments
              • Commits
              • Fragment Metadata
              • Array Metadata
          • Advanced Reads
            • Get Fragment Info
            • Time Traveling
              • Introduction
              • Fragments
              • Array Metadata
              • Schema Evolution
          • Array Upgrade
          • Backends
            • Amazon S3
            • Azure Blob Storage
            • Google Cloud Storage
            • MinIO
            • Lustre
          • Virtual Filesystem
          • User-Defined Functions
          • Distributed Compute
          • Result Estimation
          • Incomplete Queries
        • Management
          • Array Schema
          • Groups
          • Object Management
        • Performance
          • Summary of Factors
          • Dense vs. Sparse
          • Dimensions vs. Attributes
          • Compression
          • Tiling and Data Layout
          • Tuning Writes
          • Tuning Reads
      • API Reference
    • Tables
      • Introduction
      • Quickstart
      • Foundation
        • Data Model
        • Key Concepts
          • Indexes
          • Columnar Storage
          • Compression
          • Data Manipulation
          • Optimize Tables
          • ACID
          • Serverless SQL
          • SQL Connectors
          • Dataframes
          • CSV Ingestion
      • Tutorials
        • Basics
          • Ingestion with SQL
          • CSV Ingestion
          • Basic S3 Example
          • Running Locally
        • Advanced
          • Scalable Ingestion
          • Scalable Queries
      • API Reference
    • AI & ML
      • Vector Search
        • Introduction
        • Quickstart
        • Foundation
          • Data Model
          • Key Concepts
            • Vector Search
            • Vector Databases
            • Algorithms
            • Distance Metrics
            • Updates
            • Deployment Methods
            • Architecture
            • Distributed Compute
          • Storage Format Spec
        • Tutorials
          • Basics
            • Ingestion & Querying
            • Updates
            • Deletions
            • Basic S3 Example
            • Running Locally
          • Advanced
            • Versioning
            • Time Traveling
            • Consolidation
            • Distributed Compute
            • RAG LLM
            • LLM Memory
            • File Search
            • Image Search
            • Protein Search
          • Performance
        • API Reference
      • ML Models
        • Introduction
        • Quickstart
        • Foundation
          • Basics
          • Storage
          • Cloud Execution
          • Why TileDB for Machine Learning
        • Tutorials
          • Ingestion
            • Data Ingestion
              • Dense Datasets
              • Sparse Datasets
            • ML Model Ingestion
          • Management
            • Array Schema
            • Machine Learning: Groups
            • Time Traveling
    • Life Sciences
      • Single-cell
        • Introduction
        • Quickstart
        • Foundation
          • Data Model
          • Key Concepts
            • Data Structures
            • Use of Apache Arrow
            • Join IDs
            • State Management
            • TileDB Cloud URIs
          • SOMA API Specification
        • Tutorials
          • Data Ingestion
          • Bulk Ingestion Tutorial
          • Data Access
          • Distributed Compute
          • Basic S3 Example
          • Multi-Experiment Queries
          • Appending Data to a SOMA Experiment
          • Add New Measurements
          • SQL Queries
          • Running Locally
          • Shapes in TileDB-SOMA
          • Drug Discovery App
        • Spatial
          • Introduction
          • Foundation
            • Spatial Data Model
            • Data Structures
          • Tutorials
            • Spatial Data Ingestion
            • Access Spatial Data
            • Manage Coordinate Spaces
        • API Reference
      • Population Genomics
        • Introduction
        • Quickstart
        • Foundation
          • Data Model
          • Key Concepts
            • The N+1 Problem
            • Architecture
            • Arrays
            • Ingestion
            • Reads
            • Variant Statistics
            • Annotations
            • User-Defined Functions
            • Tables and SQL
            • Distributed Compute
          • Storage Format Spec
        • Tutorials
          • Basics
            • Basic Ingestion
            • Basic Queries
            • Export to VCF
            • Add New Samples
            • Deleting Samples
            • Basic S3 Example
            • Basic TileDB Cloud
          • Advanced
            • Scalable Ingestion
            • Scalable Queries
            • Query Transforms
            • Handling Large Queries
            • Annotations
              • Finding Annotations
              • Embedded Annotations
              • External Annotations
              • Annotation VCFs
              • Ingesting Annotations
            • Variant Statistics
            • Tables and SQL
            • User-Defined Functions
            • Sample Metadata
            • Split VCF
          • Performance
        • API Reference
          • Command Line Interface
          • Python API
          • Cloud API
      • Biomedical Imaging
        • Introduction
        • Foundation
          • Data Model
          • Key Concepts
            • Arrays
            • Ingestion
            • Reads
            • User Defined Functions
          • Storage Format Spec
        • Quickstart
        • Tutorials
          • Basics
            • Ingestion
            • Read
              • OpenSlide
              • TileDB-Py
          • Advanced
            • Batched Ingestion
            • Chunked Ingestion
            • Machine Learning
              • PyTorch
            • Napari
    • Files
  • API Reference
  • Self-Hosting
    • Installation
    • Upgrades
    • Administrative Tasks
    • Image Customization
      • Customize User-Defined Function Images
      • AWS ECR Container Registry
      • Customize Jupyter Notebook Images
    • Single Sign-On
      • Configure Single Sign-On
      • OpenID Connect
      • Okta SCIM
      • Microsoft Entra
  • Glossary

On this page

  • Distributed array ingestion
  • Distributed array processing
  1. Structure
  2. Arrays
  3. Tutorials
  4. Advanced
  5. Distributed Compute

Tutorial: Distributed Compute with Arrays

arrays
tutorials
python
scalable compute
Task graphs are a powerful feature of TileDB, allowing you to distribute work across numerous machines in a parallel, serverless manner.
How to run this tutorial

You can run this tutorial only on TileDB Cloud. However, TileDB Cloud has a free tier. We strongly recommend that you sign up and run everything there, as that requires no installations or deployment.

This tutorial shows distributed compute on TileDB Cloud using task graphs. It assumes you have already completed the Get Started section. You will learn how to ingest data and perform array computations, all at scale in a distributed (i.e., parallel across multiple machines) fashion. For more information on distributed compute, visit the Key Concepts: Distributed Compute section.

Distributed array ingestion

In this section, you will learn how to write to an array in parallel with a task graph containing four write nodes, each writing to a different partition of the array.

First, import the necessary libraries, set the array URI for TileDB Cloud, and delete any previously created arrays with the same name.

  • Python
# Import necessary libraries
import os.path

import numpy as np
import tiledb
import tiledb.cloud
from tiledb.cloud.compute import Delayed, DelayedArrayUDF

# You should set the appropriate environment variables with your keys.
# Get the keys from the environment variables.
tiledb_token = os.environ["TILEDB_REST_TOKEN"]
tiledb_account = os.environ["TILEDB_ACCOUNT"]

# Get the bucket and region from environment variables
s3_bucket = os.environ["S3_BUCKET"]

# Loging with your TileDB Cloud token
tiledb.cloud.login(token=tiledb_token)

# Set array URI
array_name = "distributed_computing"
array_uri = "tiledb://" + tiledb_account + "/" + array_name

# The following context will carry the TileDB Cloud credentials
cloud_ctx = tiledb.cloud.Ctx()

# Delete array, if it already exists
with tiledb.scope_ctx(cloud_ctx):
    if tiledb.array_exists(array_uri):
        tiledb.Array.delete_array(array_uri)

Next, create a dense array by specifying its schema (the case of sparse arrays is similar). The only difference between TileDB Cloud and TileDB Open-source when creating and registering arrays is that the TileDB Cloud URI should be of the form: tiledb://<account>/s3://<bucket>/<array_name>. TileDB Cloud understands that you are trying to create an array in S3 URI s3://<bucket>/<array_name> and register it under <account>. After you create and register the array, you can access the array as tiledb://<account>/<array_name>.

  • Python
# Create the two dimensions
d1 = tiledb.Dim(name="d1", domain=(1, 4), tile=2, dtype=np.int32)
d2 = tiledb.Dim(name="d2", domain=(1, 4), tile=2, dtype=np.int32)

# Create a domain using the two dimensions
dom = tiledb.Domain(d1, d2)

# Create an attribute
a = tiledb.Attr(name="a", dtype=np.int32)

# Create the array schema, setting `sparse=False` to indicate a dense array.
sch = tiledb.ArraySchema(domain=dom, sparse=False, attrs=[a])

# Create and register the array on TileDB Cloud
array_uri_reg = "tiledb://" + tiledb_account + "/" + s3_bucket + "/" + array_name
tiledb.Array.create(array_uri_reg, sch, ctx=cloud_ctx)

Subsequently, you need a function that takes as input an array URI, a slice, and some data, and writes the data to the corresponding array slice. This will be called by each of the task graph nodes with different slice and data arguments to perform the entire array ingestion in parallel.

  • Python
# Create a generic UDF that writes the input data to
# the input slice of the array
def write_slice(array_uri, slice, data):
    with tiledb.open(array_uri, mode="w", ctx=tiledb.cloud.Ctx()) as A:
        A[slice[0], slice[1]] = {"a": data}

You are now ready to create and run the task graph that will perform the parallel ingestion. There will be four nodes that perform the writes in parallel, all connected to a sink node that will run an empty function and complete the graph execution.

  • Python
# Write the following data to the array.
#    [1, 2, 3, 4],
#    [5, 6, 7, 8],
#    [9, 10, 11, 12],
#    [13, 14, 15, 16]
#
# The following code will create a task graph
# with 4 nodes/workers, each writing to the
# array a part of the array, namely:

# 1. Write [[1, 2], [5,6]] in subarray [1,2], [1,2]
write1 = Delayed(write_slice, name="write_1")(
    array_uri, [slice(1, 3), slice(1, 3)], np.array([[1, 2], [5, 6]], dtype=np.int32)
)

# 2. Write [[3, 4], [7,8]] in subarray [1,2], [3,4]
write2 = Delayed(write_slice, name="write_2")(
    array_uri, [slice(1, 3), slice(3, 5)], np.array([[3, 4], [7, 8]], dtype=np.int32)
)

# 3. Write [[9, 10], [13,14]] in subarray [3,4], [1,2]
write3 = Delayed(write_slice, name="write_3")(
    array_uri, [slice(3, 5), slice(1, 3)], np.array([[9, 10], [13, 14]], dtype=np.int32)
)

# 4. Write [[11, 12], [15,16]] in subarray [3,4], [3,4]
write4 = Delayed(write_slice, name="write_4")(
    array_uri,
    [slice(3, 5), slice(3, 5)],
    np.array([[11, 12], [15, 16]], dtype=np.int32),
)


# Create a sink node with an empty function to connect all other nodes
def empty_func(write1, write2, write3, write4):
    pass


sink = Delayed(func_exec=empty_func, name="sink")(write1, write2, write3, write4)

# Visualize the graph and see it working in action
sink.visualize()

# Compute the whole task graph
sink.compute()

Read the entire array to verify that the parallel ingestion was successful.

  • Python
# Read the entire array to make sure everything was
# written correctly
with tiledb.open(array_uri, ctx=tiledb.cloud.Ctx()) as A:
    print(A[:])
OrderedDict([('a', array([[ 1,  2,  3,  4],
       [ 5,  6,  7,  8],
       [ 9, 10, 11, 12],
       [13, 14, 15, 16]], dtype=int32))])

Distributed array processing

This section continues the same code example as above to perform parallel computations over the array. Specifically, you will compute the sum over all the elements of the array you ingested. You will again use a task graph with 4 nodes, each reading a different array slice and computing a partial sum. These four nodes will be connected to a sink node that will receive the four partial results, and add them together to produce the final sum over the entire array.

First, you need to create a function that will act as an array user-defined function (UDF); visit the Use User-Defined Functions with Array Data tutorial for dedicated examples on array UDFs.

# Create an array UDF that sums the values in a given slice of the array
def sum_slice(data):
    return data["a"].sum()

Next, create the task graph. Notice that this time the task graph consists of array UDFs, as opposed to parallel ingestion that used generic UDFs in each graph node.

  • Python
# The following code will compute the sum of all values
# in the array using a task graph with 4 nodes/workers.
# It is similar to ingestion, in that each node will
# focus on a subarray partition of the array.

# 1. Return sum of subarray [1,2], [1,2]
sum1 = DelayedArrayUDF(array_uri, sum_slice, name="sum_1")(
    [(1, 2), (1, 2)], attrs=["a"]
)

# 2. Write [[3, 4], [7,8]] in subarray [1,2], [3,4]
sum2 = DelayedArrayUDF(array_uri, sum_slice, name="sum_2")(
    [(1, 2), (3, 4)], attrs=["a"]
)

# 3. Write [[9, 10], [13,14]] in subarray [3,4], [1,2]
sum3 = DelayedArrayUDF(array_uri, sum_slice, name="sum_3")(
    [(3, 4), (1, 2)], attrs=["a"]
)

# 4. Write [[11, 12], [15,16]] in subarray [3,4], [3,4]
sum4 = DelayedArrayUDF(array_uri, sum_slice, name="sum_4")(
    [(3, 4), (3, 4)], attrs=["a"]
)


# Create a sink node with sum function to connect all other nodes
def sum_all(sum1, sum2, sum3, sum4):
    return sum1 + sum2 + sum3 + sum4


sink = Delayed(func_exec=sum_all, name="sink")(sum1, sum2, sum3, sum4)

# Visualize the graph and see it working in action
sink.visualize()

# Compute the whole task graph
sink.compute()

Clean up by removing the created array.

  • Python
# Delete array, if it already exists
with tiledb.scope_ctx(cloud_ctx):
    if tiledb.array_exists(array_uri):
        tiledb.Array.delete_array(array_uri)
User-Defined Functions
Result Estimation