Parallel Sandboxes
Run multiple sandboxes concurrently to parallelize agent workloads
Because Sandbox.ephemeral and Sandbox.create are async, you can run many sandboxes at the same time using standard Python async patterns. This is useful for parallel scraping, batch testing, multi-agent workflows, and CI/CD pipelines.
Run N sandboxes concurrently
asyncio.gather is the simplest way to run a fixed set of tasks in parallel:
import asyncio
from cua import Sandbox, Image
async def run_task(task: str) -> str:
async with Sandbox.ephemeral(Image.linux()) as sb:
result = await sb.shell.run(f"echo '{task}'")
return result.stdout.strip()
async def main():
tasks = ["task-1", "task-2", "task-3", "task-4"]
results = await asyncio.gather(*[run_task(t) for t in tasks])
print(results) # ["task-1", "task-2", "task-3", "task-4"]
asyncio.run(main())Fan out with a semaphore (rate limiting)
If you have a large batch, use a semaphore to cap the number of sandboxes running at once:
import asyncio
from cua import Sandbox, Image
MAX_CONCURRENT = 5
async def process_item(sem: asyncio.Semaphore, item: str) -> dict:
async with sem:
async with Sandbox.ephemeral(Image.linux()) as sb:
result = await sb.shell.run(f"python /app/process.py '{item}'")
return {"item": item, "output": result.stdout, "ok": result.success}
async def main():
items = [f"item-{i}" for i in range(20)]
sem = asyncio.Semaphore(MAX_CONCURRENT)
results = await asyncio.gather(*[process_item(sem, item) for item in items])
print(f"{sum(r['ok'] for r in results)}/{len(results)} succeeded")
asyncio.run(main())Producer-consumer with a queue
For dynamic workloads where tasks arrive at runtime, use asyncio.Queue:
import asyncio
from cua import Sandbox, Image
async def worker(queue: asyncio.Queue, worker_id: int):
while True:
task = await queue.get()
if task is None:
break
async with Sandbox.ephemeral(Image.linux()) as sb:
result = await sb.shell.run(task)
print(f"[worker-{worker_id}] {result.stdout.strip()}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue()
num_workers = 4
# Start workers
workers = [asyncio.create_task(worker(queue, i)) for i in range(num_workers)]
# Enqueue tasks
for i in range(10):
await queue.put(f"echo 'job {i}'")
# Signal workers to stop
for _ in range(num_workers):
await queue.put(None)
await asyncio.gather(*workers)
asyncio.run(main())Collecting screenshots in parallel
A common pattern for visual testing — take screenshots across multiple environments simultaneously:
import asyncio
from cua import Sandbox, Image
async def screenshot_env(image: Image, label: str) -> tuple[str, bytes]:
async with Sandbox.ephemeral(image) as sb:
await sb.shell.run("sleep 2") # wait for desktop to settle
png = await sb.screenshot()
return label, png
async def main():
envs = [
(Image.linux(), "linux"),
(Image.linux(kind="vm"), "linux-vm"),
(Image.macos(), "macos"),
(Image.windows(), "windows"),
]
results = await asyncio.gather(*[screenshot_env(img, label) for img, label in envs])
for label, png in results:
with open(f"screenshot-{label}.png", "wb") as f:
f.write(png)
print(f"Saved screenshot-{label}.png")
asyncio.run(main())Parallel agent runs
Run the same task across multiple sandboxes to compare model outputs or increase throughput:
import asyncio
from cua import Sandbox, Image, ComputerAgent
async def run_agent(task: str, run_id: int) -> dict:
async with Sandbox.ephemeral(Image.linux()) as sb:
agent = ComputerAgent(
model="cua/anthropic/claude-sonnet-4-5",
tools=[sb],
)
messages = [{"role": "user", "content": task}]
outputs = []
async for result in agent.run(messages):
outputs.append(result)
return {"run_id": run_id, "steps": len(outputs)}
async def main():
task = "Open Firefox and navigate to google.com"
runs = await asyncio.gather(*[run_agent(task, i) for i in range(3)])
for run in runs:
print(f"Run {run['run_id']}: {run['steps']} steps")
asyncio.run(main())Cloud sandboxes are billed per second of runtime. Running many sandboxes in parallel will incur charges proportional to wall-clock time × number of sandboxes. Use local=True during development to avoid costs.
Checking results and handling errors
asyncio.gather raises the first exception by default. Use return_exceptions=True to collect all results including failures:
results = await asyncio.gather(
*[run_task(t) for t in tasks],
return_exceptions=True
)
for task, result in zip(tasks, results):
if isinstance(result, Exception):
print(f"{task}: FAILED — {result}")
else:
print(f"{task}: {result}")Was this page helpful?