Run sandboxes in parallel
Run many sandbox jobs at once, limit concurrency, and handle failures.
Because Sandbox.ephemeral and Sandbox.create are async, you can run many sandboxes at the same time using asyncio.
Cloud sandboxes are billed per second of runtime. Running many sandboxes in parallel incurs charges proportional to wall-clock time times number of sandboxes. Use local=True during development to avoid costs.
Run a fixed set concurrently
Use asyncio.gather when you already have the full list of sandbox jobs.
import asyncio
from cua import Sandbox, Image
async def run_task(task: str) -> str:
async with Sandbox.ephemeral(Image.linux()) as sb:
result = await sb.shell.run(f"echo '{task}'")
return result.stdout.strip()
async def main():
tasks = ['task-1', 'task-2', 'task-3', 'task-4']
results = await asyncio.gather(*[run_task(t) for t in tasks])
print(results)
asyncio.run(main())Cap the number of concurrent sandboxes
Use an asyncio.Semaphore when you need rate limiting.
import asyncio
from cua import Sandbox, Image
MAX_CONCURRENT = 5
async def process_item(sem: asyncio.Semaphore, item: str) -> dict:
async with sem:
async with Sandbox.ephemeral(Image.linux()) as sb:
result = await sb.shell.run(f"python /app/process.py '{item}'")
return {'item': item, 'output': result.stdout, 'ok': result.success}
async def main():
items = [f'item-{i}' for i in range(20)]
sem = asyncio.Semaphore(MAX_CONCURRENT)
results = await asyncio.gather(*[process_item(sem, item) for item in items])
print(f"{sum(r['ok'] for r in results)}/{len(results)} succeeded")
asyncio.run(main())Process a dynamic queue of tasks
Use asyncio.Queue when tasks arrive at runtime.
import asyncio
from cua import Sandbox, Image
async def worker(queue: asyncio.Queue, worker_id: int):
while True:
task = await queue.get()
if task is None:
break
async with Sandbox.ephemeral(Image.linux()) as sb:
result = await sb.shell.run(task)
print(f'[worker-{worker_id}] {result.stdout.strip()}')
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue()
num_workers = 4
workers = [asyncio.create_task(worker(queue, i)) for i in range(num_workers)]
for i in range(10):
await queue.put(f"echo 'job {i}'")
for _ in range(num_workers):
await queue.put(None)
await asyncio.gather(*workers)
asyncio.run(main())Handle failures
Set return_exceptions=True when one failed sandbox job should not cancel the rest.
results = await asyncio.gather(
*[run_task(t) for t in tasks],
return_exceptions=True
)
for task, result in zip(tasks, results):
if isinstance(result, Exception):
print(f'{task}: FAILED — {result}')
else:
print(f'{task}: {result}')