Cua Docs

Run sandboxes in parallel

Run many sandbox jobs at once, limit concurrency, and handle failures.

Because Sandbox.ephemeral and Sandbox.create are async, you can run many sandboxes at the same time using asyncio.

Cloud sandboxes are billed per second of runtime. Running many sandboxes in parallel incurs charges proportional to wall-clock time times number of sandboxes. Use local=True during development to avoid costs.

Run a fixed set concurrently

Use asyncio.gather when you already have the full list of sandbox jobs.

import asyncio
from cua import Sandbox, Image
 
async def run_task(task: str) -> str:
    async with Sandbox.ephemeral(Image.linux()) as sb:
        result = await sb.shell.run(f"echo '{task}'")
        return result.stdout.strip()
 
async def main():
    tasks = ['task-1', 'task-2', 'task-3', 'task-4']
    results = await asyncio.gather(*[run_task(t) for t in tasks])
    print(results)
 
asyncio.run(main())

Cap the number of concurrent sandboxes

Use an asyncio.Semaphore when you need rate limiting.

import asyncio
from cua import Sandbox, Image
 
MAX_CONCURRENT = 5
 
async def process_item(sem: asyncio.Semaphore, item: str) -> dict:
    async with sem:
        async with Sandbox.ephemeral(Image.linux()) as sb:
            result = await sb.shell.run(f"python /app/process.py '{item}'")
            return {'item': item, 'output': result.stdout, 'ok': result.success}
 
async def main():
    items = [f'item-{i}' for i in range(20)]
    sem = asyncio.Semaphore(MAX_CONCURRENT)
    results = await asyncio.gather(*[process_item(sem, item) for item in items])
    print(f"{sum(r['ok'] for r in results)}/{len(results)} succeeded")
 
asyncio.run(main())

Process a dynamic queue of tasks

Use asyncio.Queue when tasks arrive at runtime.

import asyncio
from cua import Sandbox, Image
 
async def worker(queue: asyncio.Queue, worker_id: int):
    while True:
        task = await queue.get()
        if task is None:
            break
        async with Sandbox.ephemeral(Image.linux()) as sb:
            result = await sb.shell.run(task)
            print(f'[worker-{worker_id}] {result.stdout.strip()}')
        queue.task_done()
 
async def main():
    queue: asyncio.Queue = asyncio.Queue()
    num_workers = 4
    workers = [asyncio.create_task(worker(queue, i)) for i in range(num_workers)]
    for i in range(10):
        await queue.put(f"echo 'job {i}'")
    for _ in range(num_workers):
        await queue.put(None)
    await asyncio.gather(*workers)
 
asyncio.run(main())

Handle failures

Set return_exceptions=True when one failed sandbox job should not cancel the rest.

results = await asyncio.gather(
    *[run_task(t) for t in tasks],
    return_exceptions=True
)
for task, result in zip(tasks, results):
    if isinstance(result, Exception):
        print(f'{task}: FAILED — {result}')
    else:
        print(f'{task}: {result}')