Production AI agents that execute arbitrary code are only as trustworthy as your ability to audit what they actually ran. CrewAI makes it easy to wire up tool-calling agents, but without a sandbox boundary and trace-level visibility, a misbehaving agent can silently run destructive commands and leave no record.
This tutorial wires three things together: a sandboxed execution layer that intercepts every shell and Python call, a CrewAI agent that uses those sandboxed tools, and an OpenTelemetry pipeline that records each invocation as a structured span you can inspect after the fact.
Why this matters
Alibaba’s OpenSandbox [1] frames the problem precisely: AI agents need a “secure, fast, and extensible” runtime boundary because the alternative is giving the model direct access to the host OS. Without that boundary, a single hallucinated rm -rf or a prompt-injected shell command can cause irreversible damage. The challenge is that most tutorials show agents calling tools directly, with no isolation and no audit trail. When something goes wrong in production, you have no way to distinguish “the agent planned to run X” from “the agent actually ran X and it succeeded.” OpenTelemetry spans solve the audit problem: each tool call becomes a span with attributes recording the input, the output, the exit code, and the wall-clock time. The combination of a sandbox boundary and a span-per-invocation trace gives you both safety and forensics.
Prerequisites
- Python 3.11 or 3.12
- Familiarity with CrewAI agents and tasks
- An OpenAI-compatible API key (needed only for the live agent run; structural and tracing tests run without one)
- No Docker required for this tutorial: the sandbox layer is implemented in-process
Setup
Install the required packages. crewai brings the agent framework; opentelemetry-sdk and opentelemetry-exporter-otlp-proto-grpc provide the tracing pipeline; arize-phoenix provides a local span viewer you can run in-process.
uv pip install crewai opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc arize-phoenix openinference-instrumentation-crewai
Verify the key packages installed correctly:
from importlib.metadata import version
for pkg in ["crewai", "opentelemetry-sdk", "arize-phoenix"]:
try:
print(f"{pkg}: {version(pkg)}")
except Exception as e:
print(f"{pkg}: not found ({e})")
print("imports_ok")
Step 1: Build the sandboxed execution layer
The sandbox wraps subprocess.run and exec behind a policy-checked interface. Every call is logged to a structured record before execution. In a production system, this layer would delegate to a container runtime or a gVisor/Firecracker boundary [1]. Here, the policy check is implemented in pure Python so the tutorial runs without Docker.
The key design decision: the sandbox raises SandboxPolicyViolation for blocked commands before any subprocess is spawned. This means the OTel span for a blocked call still records the attempt, giving you a full audit trail of what the agent tried to do, not just what it succeeded at.
# filename: sandbox.py
import subprocess
import textwrap
import time
from dataclasses import dataclass, field
from typing import Optional
class SandboxPolicyViolation(RuntimeError):
"""Raised when a command violates the sandbox policy."""
# Commands that are always blocked regardless of arguments.
_BLOCKED_COMMANDS = frozenset([
"rm", "mkfs", "dd", "shutdown", "reboot", "halt",
"wget", "curl", "nc", "ncat", "netcat",
])
# Maximum wall-clock seconds any single command may run.
_DEFAULT_TIMEOUT = 10
@dataclass
class ExecutionResult:
command: str
stdout: str
stderr: str
exit_code: int
duration_ms: float
blocked: bool = False
block_reason: Optional[str] = None
class Sandbox:
"""Lightweight in-process sandbox that enforces a command blocklist."""
def __init__(self, blocked_commands: frozenset = _BLOCKED_COMMANDS,
timeout: int = _DEFAULT_TIMEOUT,
workdir: str = "/tmp/sandbox_workdir"):
self.blocked_commands = blocked_commands
self.timeout = timeout
self.workdir = workdir
import os
os.makedirs(workdir, exist_ok=True)
def _check_policy(self, argv: list[str]) -> None:
if not argv:
raise SandboxPolicyViolation("Empty command")
binary = argv[0].split("/")[-1] # strip path prefix
if binary in self.blocked_commands:
raise SandboxPolicyViolation(
f"Command '{binary}' is blocked by sandbox policy"
)
def run_shell(self, command: str) -> ExecutionResult:
"""Run a shell command string inside the sandbox."""
argv = command.strip().split()
start = time.perf_counter()
try:
self._check_policy(argv)
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=self.timeout,
cwd=self.workdir,
)
duration_ms = (time.perf_counter() - start) * 1000
return ExecutionResult(
command=command,
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.returncode,
duration_ms=duration_ms,
)
except SandboxPolicyViolation as exc:
duration_ms = (time.perf_counter() - start) * 1000
return ExecutionResult(
command=command,
stdout="",
stderr=str(exc),
exit_code=126,
duration_ms=duration_ms,
blocked=True,
block_reason=str(exc),
)
except subprocess.TimeoutExpired:
duration_ms = (time.perf_counter() - start) * 1000
return ExecutionResult(
command=command,
stdout="",
stderr="Sandbox timeout",
exit_code=124,
duration_ms=duration_ms,
)
def run_python(self, code: str) -> ExecutionResult:
"""Execute a Python snippet inside the sandbox."""
import os, tempfile
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", dir=self.workdir, delete=False
) as f:
f.write(textwrap.dedent(code))
script_path = f.name
try:
result = self.run_shell(f"python {script_path}")
finally:
os.unlink(script_path)
return result
Quick smoke-test of the sandbox in isolation:
from sandbox import Sandbox, SandboxPolicyViolation
sb = Sandbox()
# Allowed command
result = sb.run_shell("echo hello_sandbox")
assert result.exit_code == 0, f"unexpected exit: {result.exit_code}"
assert "hello_sandbox" in result.stdout
print("shell_ok:", result.stdout.strip())
# Blocked command
result = sb.run_shell("rm -rf /tmp/test")
assert result.blocked is True
assert result.exit_code == 126
print("block_ok:", result.block_reason)
# Python execution
result = sb.run_python("print(2 + 2)")
assert "4" in result.stdout
print("python_ok:", result.stdout.strip())
print("sandbox_smoke_test_passed")
Step 2: Wrap the sandbox in OTel-instrumented CrewAI tools
Every tool call becomes a span. The span carries the full command text, the exit code, whether the call was blocked, and the wall-clock duration. This is the audit record you’ll query after a run.
The SandboxedShellTool and SandboxedPythonTool classes inherit from CrewAI’s BaseTool. The _run method is the only required override. The OTel instrumentation wraps _run so the span lifecycle is tied to the tool call lifecycle: the span starts before the sandbox call and ends (with status set) after it returns.
# filename: sandboxed_tools.py
from __future__ import annotations
from typing import Any, Type
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from opentelemetry import trace
from opentelemetry.trace import StatusCode
from sandbox import Sandbox
_tracer = trace.get_tracer("sandboxed_tools", "0.1.0")
_sandbox = Sandbox()
class ShellInput(BaseModel):
command: str = Field(description="Shell command to execute inside the sandbox")
class PythonInput(BaseModel):
code: str = Field(description="Python source code to execute inside the sandbox")
class SandboxedShellTool(BaseTool):
name: str = "sandboxed_shell"
description: str = (
"Execute a shell command inside the secure sandbox. "
"Blocked commands (rm, curl, wget, etc.) will be rejected."
)
args_schema: Type[BaseModel] = ShellInput
def _run(self, command: str) -> str:
with _tracer.start_as_current_span("sandbox.shell") as span:
span.set_attribute("sandbox.command", command)
result = _sandbox.run_shell(command)
span.set_attribute("sandbox.exit_code", result.exit_code)
span.set_attribute("sandbox.duration_ms", round(result.duration_ms, 2))
span.set_attribute("sandbox.blocked", result.blocked)
if result.block_reason:
span.set_attribute("sandbox.block_reason", result.block_reason)
if result.exit_code != 0 and not result.blocked:
span.set_status(StatusCode.ERROR, result.stderr[:200])
elif result.blocked:
span.set_status(StatusCode.ERROR, result.block_reason or "blocked")
else:
span.set_status(StatusCode.OK)
output_parts = []
if result.stdout:
output_parts.append(f"stdout: {result.stdout.strip()}")
if result.stderr:
output_parts.append(f"stderr: {result.stderr.strip()}")
output_parts.append(f"exit_code: {result.exit_code}")
return "\n".join(output_parts)
class SandboxedPythonTool(BaseTool):
name: str = "sandboxed_python"
description: str = (
"Execute a Python code snippet inside the secure sandbox. "
"Use this for calculations, data processing, or file operations."
)
args_schema: Type[BaseModel] = PythonInput
def _run(self, code: str) -> str:
with _tracer.start_as_current_span("sandbox.python") as span:
span.set_attribute("sandbox.code_length", len(code))
span.set_attribute("sandbox.code_preview", code[:200])
result = _sandbox.run_python(code)
span.set_attribute("sandbox.exit_code", result.exit_code)
span.set_attribute("sandbox.duration_ms", round(result.duration_ms, 2))
if result.exit_code != 0:
span.set_status(StatusCode.ERROR, result.stderr[:200])
else:
span.set_status(StatusCode.OK)
output_parts = []
if result.stdout:
output_parts.append(f"stdout: {result.stdout.strip()}")
if result.stderr:
output_parts.append(f"stderr: {result.stderr.strip()}")
output_parts.append(f"exit_code: {result.exit_code}")
return "\n".join(output_parts)
Step 3: Configure the OpenTelemetry pipeline
The pipeline uses a SimpleSpanProcessor backed by a ConsoleSpanExporter so spans flush synchronously and appear in stdout immediately after each tool call. In production you would swap the exporter for an OTLP endpoint pointing at Phoenix, Jaeger, or any OTLP-compatible backend. The same span structure indexes the same way on Datadog or Honeycomb, only the exporter endpoint changes.
The same span structure indexes the same way on Datadog or Honeycomb, only the exporter endpoint changes.
# filename: otel_setup.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
def configure_tracing(service_name: str = "crewai-sandbox-agent") -> TracerProvider:
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
return provider
Step 4: Assemble the CrewAI agent
The agent is a single Researcher role whose only tools are the two sandboxed variants. The task asks it to do a small calculation and inspect the working directory. Because the task is deterministic and short, it exercises both tools in a predictable order.
The model client is injected at call time so the structural wiring can be verified without an API key.
# filename: agent_builder.py
from crewai import Agent, Task, Crew, Process
from sandboxed_tools import SandboxedShellTool, SandboxedPythonTool
def build_crew(llm=None):
shell_tool = SandboxedShellTool()
python_tool = SandboxedPythonTool()
researcher = Agent(
role="Sandbox Researcher",
goal="Execute tasks safely inside the sandbox and report results accurately.",
backstory=(
"You are a careful analyst who always uses the sandboxed tools "
"to run code and shell commands. You never attempt to run commands "
"outside the sandbox."
),
tools=[shell_tool, python_tool],
verbose=True,
llm=llm,
)
task = Task(
description=(
"1. Use the sandboxed_shell tool to list files in the current directory.\n"
"2. Use the sandboxed_python tool to compute the sum of integers from 1 to 100.\n"
"3. Report both results in your final answer."
),
expected_output=(
"A summary containing: the list of files found, and the sum 5050."
),
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
process=Process.sequential,
verbose=True,
)
return crew
Verify the crew structure compiles without an API key:
from agent_builder import build_crew
from crewai import Crew
crew = build_crew(llm=None)
assert isinstance(crew, Crew)
assert len(crew.agents) == 1
assert len(crew.tasks) == 1
assert crew.agents[0].role == "Sandbox Researcher"
assert len(crew.agents[0].tools) == 2
tool_names = {t.name for t in crew.agents[0].tools}
assert "sandboxed_shell" in tool_names
assert "sandboxed_python" in tool_names
print("crew_structure_ok")
print("tools:", sorted(tool_names))
Step 5: Run the agent with tracing enabled
This block requires a real LLM API key. It configures the OTel pipeline, kicks off the crew, and then forces a flush so all spans are written before the process exits.
import os
from otel_setup import configure_tracing
from agent_builder import build_crew
from crewai import LLM
# Configure tracing before any tool calls happen.
provider = configure_tracing("crewai-sandbox-agent")
llm = LLM(
model="gpt-4o-mini",
api_key=os.environ["OPENAI_API_KEY"],
)
crew = build_crew(llm=llm)
result = crew.kickoff()
# Force-flush so all spans are written to console before exit.
provider.force_flush()
print("\n=== AGENT RESULT ===")
print(result)
Verify it works
This block exercises the full tracing pipeline without an LLM. It calls the sandboxed tools directly, then asserts that spans were emitted with the expected attributes.
import io
import sys
import json
from otel_setup import configure_tracing
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry import trace
# Capture console span output.
buf = io.StringIO()
capturing_exporter = ConsoleSpanExporter(out=buf)
provider = configure_tracing("verify-run")
# Add a second processor that writes to our buffer.
provider.add_span_processor(SimpleSpanProcessor(capturing_exporter))
from sandboxed_tools import SandboxedShellTool, SandboxedPythonTool
shell_tool = SandboxedShellTool()
python_tool = SandboxedPythonTool()
# Run a shell command.
shell_out = shell_tool._run("echo verify_shell_span")
assert "verify_shell_span" in shell_out, f"unexpected output: {shell_out}"
# Run a Python snippet.
python_out = python_tool._run("print(6 * 7)")
assert "42" in python_out, f"unexpected output: {python_out}"
# Run a blocked command and confirm it's recorded.
blocked_out = shell_tool._run("rm -rf /tmp/test")
assert "126" in blocked_out or "blocked" in blocked_out.lower(), \
f"expected block signal: {blocked_out}"
# Force flush and inspect spans.
provider.force_flush()
span_text = buf.getvalue()
assert "sandbox.shell" in span_text or "sandbox" in span_text, \
"Expected sandbox span names in output"
assert "sandbox.exit_code" in span_text or "exit_code" in span_text, \
"Expected exit_code attribute in spans"
print("verify_tracing_passed")
print(f"Captured {span_text.count('name')} span name occurrences in output")
Troubleshooting
ModuleNotFoundError: No module named 'crewai' after install. The install block may have timed out. Re-run the setup bash block. If the error persists, check that uv resolved without conflicts by running uv pip install crewai --dry-run to see the dependency tree.
SandboxPolicyViolation raised instead of returning a blocked result. This happens if you call _check_policy directly rather than going through run_shell. The run_shell method catches the exception and converts it to an ExecutionResult with blocked=True. Always use the public run_shell / run_python interface.
Spans appear in stdout but not in the captured buffer. The ConsoleSpanExporter default writes to sys.stdout, not to the out parameter. Confirm you passed out=buf explicitly when constructing the exporter in the verify block. The configure_tracing helper uses the default (stdout), which is intentional for production; the verify block adds a second processor with the buffer-backed exporter.
CrewAI agent loops indefinitely without calling tools. This usually means the task description is ambiguous or the LLM is not recognizing the tool names. Set verbose=True on the agent (already set in the builder) and inspect the printed reasoning steps. If the model keeps producing text answers instead of tool calls, add an explicit instruction: “You MUST use the sandboxed_shell and sandboxed_python tools. Do not answer from memory.”
subprocess.TimeoutExpired during run_python. The default sandbox timeout is 10 seconds. For longer-running snippets, construct Sandbox(timeout=30) and pass it to the tool constructors. In sandboxed_tools.py, expose the sandbox as a parameter rather than a module-level singleton.
Phoenix UI not showing spans. This tutorial uses the ConsoleSpanExporter so no Phoenix server is required. To forward spans to a local Phoenix instance, replace ConsoleSpanExporter() in otel_setup.py with OTLPSpanExporter(endpoint="http://localhost:6006/v1/traces") after starting Phoenix with python -m phoenix.server.main serve.
Next steps
- Swap in a real container boundary. Replace
subprocess.runinSandbox.run_shellwith a call to the OpenSandbox runtime API [1] or a Docker SDK call that spins up a fresh container per invocation. The OTel instrumentation layer stays identical. - Add a span-based policy enforcer. Write a
SpanProcessorthat readssandbox.blockedattributes inon_endand emits an alert (Slack webhook, PagerDuty, etc.) whenever a blocked call appears in a trace. This gives you real-time detection of prompt injection attempts. - Persist traces to Phoenix. Run
python -m phoenix.server.main servelocally and point the OTLP exporter athttp://localhost:6006/v1/traces. Phoenix’s span table lets you filter bysandbox.exit_code != 0to find all failed tool calls across a session. - Multi-agent sandboxing. Add a second CrewAI agent with a different tool set (file read, HTTP fetch via a proxy) and give each agent its own
Sandboxinstance with different blocklists. Theservice.nameresource attribute on each tracer provider lets you distinguish agent traces in the same Phoenix session.
FAQ
How does the sandbox prevent destructive commands?
The sandbox maintains a blocklist of dangerous commands (rm, mkfs, dd, shutdown, reboot, curl, wget, etc.) and raises SandboxPolicyViolation before any subprocess is spawned. The violation is caught and converted to an ExecutionResult with blocked=True and exit code 126, so the attempt is still recorded in the OpenTelemetry span for audit purposes.
What information does each OpenTelemetry span capture?
Each span records the command or code executed, the exit code, wall-clock duration in milliseconds, whether the call was blocked, and the block reason if applicable. For shell commands, stdout and stderr are returned to the agent; for Python code, the code preview and length are logged as span attributes.
Can this sandbox be used with a real container runtime?
Yes. The tutorial uses in-process subprocess calls for simplicity, but the OTel instrumentation layer is independent of the execution backend. You can replace subprocess.run in Sandbox.run_shell with calls to the Docker SDK, OpenSandbox runtime API, or gVisor/Firecracker without changing the span structure or tool interface.
What happens if a command times out?
The sandbox enforces a default 10-second timeout per command. If a command exceeds the timeout, subprocess.TimeoutExpired is caught and converted to an ExecutionResult with exit code 124 and stderr set to ‘Sandbox timeout’. The timeout is configurable by passing timeout=N when constructing the Sandbox instance.
How do you export spans to a backend like Phoenix or Datadog?
Replace ConsoleSpanExporter in otel_setup.py with OTLPSpanExporter pointing at your backend endpoint. For Phoenix, use OTLPSpanExporter(endpoint=‘http://localhost:6006/v1/traces’). The same span attributes (sandbox.exit_code, sandbox.blocked, etc.) index identically on any OTLP-compatible backend.