GuideSandbox

Parallel Sandboxes

Run multiple sandboxes concurrently to parallelize agent workloads

Because Sandbox.ephemeral and Sandbox.create are async, you can run many sandboxes at the same time using standard Python async patterns. This is useful for parallel scraping, batch testing, multi-agent workflows, and CI/CD pipelines.

Run N sandboxes concurrently

asyncio.gather is the simplest way to run a fixed set of tasks in parallel:

import asyncio
from cua import Sandbox, Image

async def run_task(task: str) -> str:
    async with Sandbox.ephemeral(Image.linux()) as sb:
        result = await sb.shell.run(f"echo '{task}'")
        return result.stdout.strip()

async def main():
    tasks = ["task-1", "task-2", "task-3", "task-4"]
    results = await asyncio.gather(*[run_task(t) for t in tasks])
    print(results)  # ["task-1", "task-2", "task-3", "task-4"]

asyncio.run(main())

Fan out with a semaphore (rate limiting)

If you have a large batch, use a semaphore to cap the number of sandboxes running at once:

import asyncio
from cua import Sandbox, Image

MAX_CONCURRENT = 5

async def process_item(sem: asyncio.Semaphore, item: str) -> dict:
    async with sem:
        async with Sandbox.ephemeral(Image.linux()) as sb:
            result = await sb.shell.run(f"python /app/process.py '{item}'")
            return {"item": item, "output": result.stdout, "ok": result.success}

async def main():
    items = [f"item-{i}" for i in range(20)]
    sem = asyncio.Semaphore(MAX_CONCURRENT)
    results = await asyncio.gather(*[process_item(sem, item) for item in items])
    print(f"{sum(r['ok'] for r in results)}/{len(results)} succeeded")

asyncio.run(main())

Producer-consumer with a queue

For dynamic workloads where tasks arrive at runtime, use asyncio.Queue:

import asyncio
from cua import Sandbox, Image

async def worker(queue: asyncio.Queue, worker_id: int):
    while True:
        task = await queue.get()
        if task is None:
            break
        async with Sandbox.ephemeral(Image.linux()) as sb:
            result = await sb.shell.run(task)
            print(f"[worker-{worker_id}] {result.stdout.strip()}")
        queue.task_done()

async def main():
    queue: asyncio.Queue = asyncio.Queue()
    num_workers = 4

    # Start workers
    workers = [asyncio.create_task(worker(queue, i)) for i in range(num_workers)]

    # Enqueue tasks
    for i in range(10):
        await queue.put(f"echo 'job {i}'")

    # Signal workers to stop
    for _ in range(num_workers):
        await queue.put(None)

    await asyncio.gather(*workers)

asyncio.run(main())

Collecting screenshots in parallel

A common pattern for visual testing — take screenshots across multiple environments simultaneously:

import asyncio
from cua import Sandbox, Image

async def screenshot_env(image: Image, label: str) -> tuple[str, bytes]:
    async with Sandbox.ephemeral(image) as sb:
        await sb.shell.run("sleep 2")  # wait for desktop to settle
        png = await sb.screenshot()
        return label, png

async def main():
    envs = [
        (Image.linux(), "linux"),
        (Image.linux(kind="vm"), "linux-vm"),
        (Image.macos(), "macos"),
        (Image.windows(), "windows"),
    ]
    results = await asyncio.gather(*[screenshot_env(img, label) for img, label in envs])
    for label, png in results:
        with open(f"screenshot-{label}.png", "wb") as f:
            f.write(png)
        print(f"Saved screenshot-{label}.png")

asyncio.run(main())

Parallel agent runs

Run the same task across multiple sandboxes to compare model outputs or increase throughput:

import asyncio
from cua import Sandbox, Image, ComputerAgent

async def run_agent(task: str, run_id: int) -> dict:
    async with Sandbox.ephemeral(Image.linux()) as sb:
        agent = ComputerAgent(
            model="cua/anthropic/claude-sonnet-4-5",
            tools=[sb],
        )
        messages = [{"role": "user", "content": task}]
        outputs = []
        async for result in agent.run(messages):
            outputs.append(result)
        return {"run_id": run_id, "steps": len(outputs)}

async def main():
    task = "Open Firefox and navigate to google.com"
    runs = await asyncio.gather(*[run_agent(task, i) for i in range(3)])
    for run in runs:
        print(f"Run {run['run_id']}: {run['steps']} steps")

asyncio.run(main())

Cloud sandboxes are billed per second of runtime. Running many sandboxes in parallel will incur charges proportional to wall-clock time × number of sandboxes. Use local=True during development to avoid costs.

Checking results and handling errors

asyncio.gather raises the first exception by default. Use return_exceptions=True to collect all results including failures:

results = await asyncio.gather(
    *[run_task(t) for t in tasks],
    return_exceptions=True
)

for task, result in zip(tasks, results):
    if isinstance(result, Exception):
        print(f"{task}: FAILED — {result}")
    else:
        print(f"{task}: {result}")

Was this page helpful?