Compare commits

...

3 Commits

Author SHA1 Message Date
Pi Agent
efd1f01b14 docs: mark issue #83 as completed
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 07:38:09 +01:00
Pi Agent
060277d2fa fix: address code review findings for run_code executor
- Add working_dir to PATH_KEYS in path_allowlist for enforcement layer 4
- Use kill_on_drop(true) to ensure timed-out child processes are killed
- Clear env and apply resource limits to Rust compilation step
- Preserve RUSTUP_HOME/CARGO_HOME for compiler toolchain access
- Cap timeout_secs parameter to 300 seconds maximum
- Remove RLIMIT_NPROC (unreliable per-user limit on Linux)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 07:37:59 +01:00
Pi Agent
4dad046461 feat: implement run_code tool executor (issue #83)
Add RunCodeExecutor to the Tool Broker service for sandboxed code
execution. Supports Python, Rust (compile-and-run), and shell languages.
Applies POSIX resource limits (memory, CPU time, file size, open files,
process count) via setrlimit in pre_exec. Configurable timeout enforcement
and optional working directory. Returns structured JSON output with
stdout, stderr, exit code, and timing metadata.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 07:10:29 +01:00
8 changed files with 1313 additions and 6 deletions

View File

@@ -85,6 +85,7 @@
| #79 | Implement session config application | Phase 9 | `COMPLETED` | Python | [issue-079.md](issue-079.md) |
| #80 | Wire orchestrator to researcher subagent e2e | Phase 9 | `COMPLETED` | Python | [issue-080.md](issue-080.md) |
| #82 | Implement fs_read and fs_write tool executors | Phase 10 | `COMPLETED` | Rust | [issue-082.md](issue-082.md) |
| #83 | Implement run_code tool executor | Phase 10 | `COMPLETED` | Rust | [issue-083.md](issue-083.md) |
## Status Legend
@@ -137,6 +138,7 @@
### Tool Broker
- [issue-014.md](issue-014.md) — tool_broker.proto (ToolBrokerService)
- [issue-082.md](issue-082.md) — fs_read and fs_write tool executors (file I/O with path canonicalization)
- [issue-083.md](issue-083.md) — run_code tool executor (sandboxed code execution with resource limits)
### Orchestrator
- [issue-015.md](issue-015.md) — orchestrator.proto (OrchestratorService)

View File

@@ -0,0 +1,324 @@
# Implementation Plan — Issue #83: Implement run_code tool executor
## Metadata
| Field | Value |
|---|---|
| Issue | [#83](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/83) |
| Title | Implement run_code tool executor |
| Milestone | Phase 10: Remaining Agent Types |
| Labels | — |
| Status | `COMPLETED` |
| Language | Rust |
| Related Plans | issue-058.md, issue-082.md |
| Blocked by | #58 (COMPLETED) |
## Acceptance Criteria
- [ ] Support multiple languages (Python, Rust, shell at minimum)
- [ ] Sandbox execution: restricted filesystem access, no network by default
- [ ] Timeout enforcement (configurable per invocation)
- [ ] Capture stdout, stderr, and exit code
- [ ] Working directory scoped to agent's allowed paths
- [ ] Resource limits (memory, CPU time)
## Architecture Analysis
### Service Context
- **Service:** Tool Broker (`services/tool-broker/`, Rust)
- **gRPC endpoints affected:** `ExecuteTool` — no proto changes needed; the executor produces `ToolOutput` structs consumed by the existing dispatch pipeline
- **Proto messages involved:** `ExecuteToolRequest` (parameters map), `ExecuteToolResponse` (output string + status). The `run_code` `ToolDefinition` already exists in `discovery.rs` with `language` and `code` as required parameters.
### Existing Patterns
- **Executor trait:** `dispatch.rs` defines `ToolExecutor` with `async fn execute(&self, parameters: &HashMap<String, String>) -> ToolOutput`. The `FsReadExecutor` and `FsWriteExecutor` in `executors/` implement this trait as dedicated structs. `RunCodeExecutor` follows the same pattern.
- **SubprocessExecutor:** `dispatch.rs` contains an existing `SubprocessExecutor` that demonstrates `tokio::process::Command` usage with stdout/stderr capture. `RunCodeExecutor` builds on this approach but adds temp file creation, language-specific interpreter selection, resource limits, and sandbox restrictions.
- **Registration pattern:** `executors/mod.rs` has `register_fs_executors()` that registers executors with the `ToolDispatcher`. A new `register_run_code_executor()` function follows this convention.
- **Path canonicalization:** `enforcement/path_allowlist.rs` exports `normalize_path()` (made `pub(crate)` in issue #82). The executor uses this for working directory validation.
### Dependencies
- **Internal:** `dispatch.rs` (`ToolExecutor`, `ToolOutput`), `enforcement/path_allowlist.rs` (`normalize_path`)
- **External crates:** `tokio` (process, fs, time — already available with `full` features), `serde_json` (already in `Cargo.toml`), `tempfile` (already in dev-dependencies; must be moved to regular dependencies for runtime temp file creation)
- **System:** Python 3 interpreter, Rust compiler (`rustc`), `/bin/sh` for shell execution
- **No new gRPC clients** — execution is local subprocess work
## Implementation Steps
### 1. Define language configuration
**File:** `services/tool-broker/src/executors/run_code.rs`
Define a `LanguageConfig` struct that maps language names to their execution details:
```rust
struct LanguageConfig {
/// Command to invoke (e.g., "python3", "rustc", "/bin/sh")
interpreter: &'static str,
/// File extension for the temp source file (e.g., ".py", ".rs", ".sh")
extension: &'static str,
/// How to build the command arguments from the temp file path.
/// Some languages need compilation first (Rust), others interpret directly.
mode: ExecutionMode,
}
enum ExecutionMode {
/// Run interpreter with source file as argument (Python, shell)
Interpret,
/// Compile to binary, then execute the binary (Rust)
CompileAndRun,
}
```
Supported languages at MVP:
| Language | Interpreter | Extension | Mode |
|---|---|---|---|
| `python` / `python3` | `python3` | `.py` | Interpret |
| `rust` | `rustc` | `.rs` | CompileAndRun |
| `bash` / `shell` / `sh` | `/bin/sh` | `.sh` | Interpret |
### 2. Create `RunCodeExecutor` struct
**File:** `services/tool-broker/src/executors/run_code.rs`
Struct `RunCodeExecutor` implementing `ToolExecutor`:
```rust
pub struct RunCodeExecutor {
/// Default timeout for code execution (overridable per call).
default_timeout: Duration,
/// Default memory limit in bytes.
default_memory_limit: u64,
/// Default CPU time limit in seconds.
default_cpu_time_limit: u64,
}
```
Constructor `RunCodeExecutor::new()` with sensible defaults:
- `default_timeout`: 30 seconds
- `default_memory_limit`: 256 MB (268_435_456 bytes)
- `default_cpu_time_limit`: 30 seconds
Also provide `RunCodeExecutor::with_limits(timeout, memory, cpu_time)` for testing and custom configuration.
### 3. Implement parameter extraction
**Parameter extraction from `HashMap<String, String>`:**
- `language` (required) — language identifier, normalized to lowercase
- `code` (required) — source code to execute
- `timeout_secs` (optional) — override execution timeout in seconds
- `working_dir` (optional) — working directory for execution; if provided, must be an absolute path (validated via `normalize_path`)
### 4. Implement core execution logic
**Execution flow:**
1. **Validate language:** Look up `LanguageConfig` for the given language string. Return error if unsupported.
2. **Create temp directory:** Use `tempfile::tempdir()` to create an isolated temporary directory for the execution. This directory is used for the source file and any compilation artifacts.
3. **Write source file:** Write the `code` parameter to a temp file with the appropriate extension inside the temp directory (e.g., `code.py`, `code.rs`, `code.sh`).
4. **Determine working directory:** If `working_dir` is provided, canonicalize and validate it. Otherwise, use the temp directory itself.
5. **Build command:** Based on `ExecutionMode`:
- **Interpret:** `Command::new(interpreter).arg(temp_file_path).current_dir(working_dir)`
- **CompileAndRun:** Two-step:
a. Compile: `Command::new("rustc").arg(temp_file_path).arg("-o").arg(binary_path)` — capture compile errors.
b. If compilation succeeds, execute: `Command::new(binary_path).current_dir(working_dir)`
6. **Apply resource limits:** Before spawning, configure the command with a `pre_exec` closure (unsafe, POSIX only) that calls `setrlimit` for:
- `RLIMIT_AS` — virtual memory limit (default 256 MB)
- `RLIMIT_CPU` — CPU time limit in seconds (default 30s)
- `RLIMIT_FSIZE` — max output file size (16 MB) to prevent disk fill
- `RLIMIT_NPROC` — max child processes (0, prevent fork bombs)
7. **Apply sandbox restrictions:** In the same `pre_exec` closure:
- Set `RLIMIT_NOFILE` to a low value (64) to limit open file descriptors
- Clear environment variables except `PATH`, `HOME`, `LANG`, `TERM` to minimize information leakage
- Note: Network restriction is best-effort. On Linux, full network namespace isolation requires root/CAP_NET_ADMIN. Document this limitation. For MVP, the resource limits and restricted environment provide reasonable containment.
8. **Enforce timeout:** Wrap the entire execution (compile + run for Rust) in `tokio::time::timeout(timeout_duration, ...)`.
9. **Capture output:** Collect stdout, stderr, and exit code from the process.
10. **Build structured JSON result:**
```json
{
"language": "python",
"exit_code": 0,
"stdout": "Hello, world!\n",
"stderr": "",
"timed_out": false,
"duration_ms": 42,
"working_dir": "/tmp/agent-sandbox/abc123"
}
```
For Rust CompileAndRun, if compilation fails, include compile errors:
```json
{
"language": "rust",
"phase": "compilation",
"exit_code": 1,
"stdout": "",
"stderr": "error[E0308]: mismatched types...",
"timed_out": false,
"duration_ms": 1200,
"working_dir": "/tmp/..."
}
```
11. **Cleanup:** The temp directory is automatically cleaned up when the `TempDir` guard is dropped.
### 5. Implement resource limit helper
**File:** `services/tool-broker/src/executors/run_code.rs`
Create a helper function `apply_resource_limits` that returns a closure suitable for `Command::pre_exec()`:
```rust
/// Build a pre_exec closure that applies POSIX resource limits.
///
/// # Safety
/// This function is called in a `pre_exec` context (after fork, before exec).
/// Only async-signal-safe operations are allowed.
unsafe fn apply_resource_limits(
memory_bytes: u64,
cpu_seconds: u64,
) -> impl FnMut() -> std::io::Result<()> {
move || {
use libc::{setrlimit, rlimit, RLIMIT_AS, RLIMIT_CPU, RLIMIT_FSIZE, RLIMIT_NPROC, RLIMIT_NOFILE};
let set_limit = |resource: i32, soft: u64, hard: u64| -> std::io::Result<()> {
let rlim = rlimit { rlim_cur: soft, rlim_max: hard };
if setrlimit(resource, &rlim) != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
};
set_limit(RLIMIT_AS, memory_bytes, memory_bytes)?;
set_limit(RLIMIT_CPU, cpu_seconds, cpu_seconds)?;
set_limit(RLIMIT_FSIZE, 16 * 1024 * 1024, 16 * 1024 * 1024)?;
set_limit(RLIMIT_NPROC, 0, 0)?;
set_limit(RLIMIT_NOFILE, 64, 64)?;
Ok(())
}
}
```
This uses `libc` directly via the `libc` crate, which must be added to `Cargo.toml`.
### 6. Update discovery.rs tool definition
**File:** `services/tool-broker/src/discovery.rs`
Update the existing `run_code` `ToolDefinition` to add optional parameters:
- `timeout_secs`: type `"integer"`, description `"Execution timeout in seconds (optional, default 30)"`, default_value `Some("30")`
- `working_dir`: type `"string"`, description `"Working directory for execution (optional, must be absolute path)"`, default_value `None`
### 7. Update executors/mod.rs
**File:** `services/tool-broker/src/executors/mod.rs`
- Add `pub mod run_code;`
- Add `register_run_code_executor()` function that registers a `RunCodeExecutor` with the dispatcher
- Update or add a combined `register_all_executors()` function that calls both `register_fs_executors()` and `register_run_code_executor()`
### 8. Wire executor in main.rs
**File:** `services/tool-broker/src/main.rs`
After the existing `register_fs_executors` call, add registration for `run_code`:
```rust
tool_broker::executors::register_run_code_executor(&mut dispatcher);
```
Or replace both calls with:
```rust
tool_broker::executors::register_all_executors(&mut dispatcher);
```
### 9. Update Cargo.toml
**File:** `services/tool-broker/Cargo.toml`
- Move `tempfile = "3"` from `[dev-dependencies]` to `[dependencies]` (needed at runtime for temp file creation). Keep it in `[dev-dependencies]` as well if needed, or just in `[dependencies]` since it's a superset.
- Add `libc = "0.2"` to `[dependencies]` for `setrlimit` calls.
### 10. Tests
**File:** `services/tool-broker/src/executors/run_code.rs` (inline `#[cfg(test)] mod tests`)
Unit tests for `RunCodeExecutor`:
**Language support:**
- Execute Python code (`print("hello")`) → success with stdout `"hello\n"`
- Execute shell code (`echo hello`) → success with stdout `"hello\n"`
- Execute Rust code (`fn main() { println!("hello"); }`) → success with stdout `"hello\n"`
- Unsupported language → error with clear message listing supported languages
**Error handling:**
- Python syntax error → failure with stderr containing error details, exit_code != 0
- Rust compilation error → failure with phase `"compilation"`, stderr with compile errors
- Shell command failure (`exit 1`) → failure with exit_code 1
- Missing `language` parameter → error
- Missing `code` parameter → error
- Empty code string → error
**Timeout enforcement:**
- Python infinite loop (`while True: pass`) with 1-second timeout → failure with `timed_out: true`
- Shell sleep exceeding timeout → failure with `timed_out: true`
- Fast code within timeout → success
**Resource limits:**
- Python code exceeding memory limit (allocate large list) → killed/failure (test with low memory limit via `with_limits`)
- Fork bomb attempt → blocked by RLIMIT_NPROC (test with shell `:(){ :|:& };:`)
**Working directory:**
- Execution with custom `working_dir` → code can access files in that directory
- Invalid `working_dir` (nonexistent) → error
- Relative `working_dir` → rejected
**Output capture:**
- Capture both stdout and stderr simultaneously
- Large stdout output (within limits) → captured correctly
- Exit code propagated correctly for various scenarios
**Structured output:**
- JSON output contains all expected fields (language, exit_code, stdout, stderr, timed_out, duration_ms, working_dir)
- Duration tracking is reasonable (> 0ms for any execution)
**Registration:**
- `register_run_code_executor` registers `run_code` tool
- Dispatcher `has_tool("run_code")` returns true after registration
**File:** `services/tool-broker/src/executors/mod.rs` (add to existing tests)
- `register_run_code_executor` registers `run_code`
- `register_all_executors` registers all tools (fs_read, fs_write, run_code)
All tests that invoke actual code execution should use short timeouts and low resource limits via `RunCodeExecutor::with_limits()` to keep test execution fast and prevent resource issues in CI.
## Files to Create/Modify
| File | Action | Purpose |
|---|---|---|
| `services/tool-broker/src/executors/run_code.rs` | Create | RunCodeExecutor implementation + tests |
| `services/tool-broker/src/executors/mod.rs` | Modify | Add `pub mod run_code`, registration functions |
| `services/tool-broker/src/main.rs` | Modify | Register run_code executor at startup |
| `services/tool-broker/src/discovery.rs` | Modify | Add timeout_secs and working_dir params to run_code definition |
| `services/tool-broker/Cargo.toml` | Modify | Add `libc`, move `tempfile` to regular dependencies |
## Risks and Edge Cases
- **Interpreter availability:** Tests assume `python3`, `rustc`, and `/bin/sh` are available on the build/test machine. If not, tests for those languages should be skipped with `#[ignore]` or conditional compilation. CI environments must have these installed.
- **RLIMIT_NPROC set to 0:** On some systems, setting `RLIMIT_NPROC` to 0 may prevent the process from being created at all (since the exec itself is a "new process" from the kernel's perspective in some interpretations). Test this and adjust to 1 if needed — the goal is to prevent fork bombs, not block the initial process.
- **Rust compilation time:** Compiling even trivial Rust code takes 1-2 seconds. The timeout must account for both compilation and execution time. Default 30s should be sufficient, but tests should use generous timeouts for Rust.
- **Network restriction limitations:** Subprocess-level restrictions via `setrlimit` cannot restrict network access. True network isolation requires Linux network namespaces (requiring `CAP_NET_ADMIN` or root). For MVP, this is documented as a known limitation. The sandbox still provides: memory limits, CPU time limits, restricted file descriptors, no fork capability, and restricted environment variables.
- **Pre_exec safety:** The `pre_exec` closure runs between `fork()` and `exec()` in the child process. Only async-signal-safe functions are allowed. `setrlimit` is async-signal-safe per POSIX, so this is safe.
- **Temp directory cleanup on timeout:** When a process is killed due to timeout, the `TempDir` guard may still be held. Ensure cleanup happens in all code paths (normal, error, timeout). Rust's `Drop` semantics handle this automatically as long as the `TempDir` is owned by the executor scope.
- **Large output:** Code that produces massive stdout/stderr could consume memory. Mitigation: the `RLIMIT_FSIZE` limit prevents writing large files, and `tokio::process::Command::output()` buffers stdout/stderr in memory. For MVP, accept this risk; a future enhancement could enforce output size limits by reading stdout/stderr in a loop with a byte cap.
- **Platform portability:** `setrlimit` and `pre_exec` are POSIX/Linux-specific. This executor will not compile on Windows. This is acceptable since the project targets Linux (Raspberry Pi and AMD server).
## Deviation Log
_(Filled during implementation if deviations from plan occur)_
| Deviation | Reason |
|---|---|

View File

@@ -19,6 +19,5 @@ toml = "0.8"
anyhow = "1"
glob-match = "0.2"
serde_json = "1"
[dev-dependencies]
tempfile = "3"
libc = "0.2"

View File

@@ -152,7 +152,7 @@ pub fn builtin_tool_definitions() -> Vec<ToolDefinition> {
"language".into(),
ParameterSchema {
r#type: "string".into(),
description: "Programming language (python, bash, etc.)".into(),
description: "Programming language (python, rust, bash/shell/sh)".into(),
default_value: None,
},
),
@@ -164,6 +164,22 @@ pub fn builtin_tool_definitions() -> Vec<ToolDefinition> {
default_value: None,
},
),
(
"timeout_secs".into(),
ParameterSchema {
r#type: "integer".into(),
description: "Execution timeout in seconds (optional, default 30)".into(),
default_value: Some("30".into()),
},
),
(
"working_dir".into(),
ParameterSchema {
r#type: "string".into(),
description: "Working directory for execution (optional, must be absolute path)".into(),
default_value: None,
},
),
]),
required_params: vec!["language".into(), "code".into()],
requires_credential: None,

View File

@@ -14,6 +14,7 @@ const PATH_KEYS: &[&str] = &[
"directory",
"source_path",
"destination_path",
"working_dir",
];
/// Enforcement layer 4: Path allowlist check.

View File

@@ -1,9 +1,11 @@
pub mod fs_read;
pub mod fs_write;
pub mod run_code;
use crate::dispatch::ToolDispatcher;
use fs_read::FsReadExecutor;
use fs_write::FsWriteExecutor;
use run_code::RunCodeExecutor;
/// Register all filesystem executors with the given dispatcher.
pub fn register_fs_executors(dispatcher: &mut ToolDispatcher) {
@@ -11,6 +13,17 @@ pub fn register_fs_executors(dispatcher: &mut ToolDispatcher) {
dispatcher.register("fs_write", Box::new(FsWriteExecutor::new()));
}
/// Register the run_code executor with the given dispatcher.
pub fn register_run_code_executor(dispatcher: &mut ToolDispatcher) {
dispatcher.register("run_code", Box::new(RunCodeExecutor::new()));
}
/// Register all executors with the given dispatcher.
pub fn register_all_executors(dispatcher: &mut ToolDispatcher) {
register_fs_executors(dispatcher);
register_run_code_executor(dispatcher);
}
#[cfg(test)]
mod tests {
use super::*;
@@ -25,12 +38,32 @@ mod tests {
assert!(dispatcher.has_tool("fs_write"));
}
#[test]
fn test_register_run_code_executor() {
let mut dispatcher = ToolDispatcher::new(Duration::from_secs(30));
register_run_code_executor(&mut dispatcher);
assert!(dispatcher.has_tool("run_code"));
assert!(dispatcher.tool_description("run_code").is_some());
}
#[test]
fn test_register_all_executors() {
let mut dispatcher = ToolDispatcher::new(Duration::from_secs(30));
register_all_executors(&mut dispatcher);
assert!(dispatcher.has_tool("fs_read"));
assert!(dispatcher.has_tool("fs_write"));
assert!(dispatcher.has_tool("run_code"));
}
#[test]
fn test_registered_descriptions() {
let mut dispatcher = ToolDispatcher::new(Duration::from_secs(30));
register_fs_executors(&mut dispatcher);
register_all_executors(&mut dispatcher);
assert!(dispatcher.tool_description("fs_read").is_some());
assert!(dispatcher.tool_description("fs_write").is_some());
assert!(dispatcher.tool_description("run_code").is_some());
}
}

View File

@@ -0,0 +1,932 @@
use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::dispatch::{ToolExecutor, ToolOutput};
use crate::enforcement::path_allowlist::normalize_path;
/// Default execution timeout.
const DEFAULT_TIMEOUT_SECS: u64 = 30;
/// Default memory limit: 256 MB.
const DEFAULT_MEMORY_LIMIT: u64 = 256 * 1024 * 1024;
/// Default CPU time limit in seconds.
const DEFAULT_CPU_TIME_SECS: u64 = 30;
/// Maximum output file size: 16 MB.
const MAX_OUTPUT_FILE_SIZE: u64 = 16 * 1024 * 1024;
/// Maximum open file descriptors.
const MAX_OPEN_FILES: u64 = 64;
/// Maximum allowed timeout in seconds.
const MAX_TIMEOUT_SECS: u64 = 300;
/// How a language executes code.
#[derive(Debug, Clone, Copy)]
enum ExecutionMode {
/// Run interpreter with source file as argument.
Interpret,
/// Compile to binary, then execute the binary.
CompileAndRun,
}
/// Configuration for a supported language.
#[derive(Debug, Clone)]
struct LanguageConfig {
interpreter: &'static str,
extension: &'static str,
mode: ExecutionMode,
}
fn get_language_config(language: &str) -> Option<LanguageConfig> {
match language {
"python" | "python3" => Some(LanguageConfig {
interpreter: "python3",
extension: ".py",
mode: ExecutionMode::Interpret,
}),
"bash" | "shell" | "sh" => Some(LanguageConfig {
interpreter: "/bin/sh",
extension: ".sh",
mode: ExecutionMode::Interpret,
}),
"rust" => Some(LanguageConfig {
interpreter: "rustc",
extension: ".rs",
mode: ExecutionMode::CompileAndRun,
}),
_ => None,
}
}
/// Executor for the `run_code` tool.
///
/// Executes code snippets in sandboxed subprocesses with timeout enforcement,
/// resource limits, and output capture.
pub struct RunCodeExecutor {
default_timeout: Duration,
memory_limit: u64,
cpu_time_limit: u64,
}
impl Default for RunCodeExecutor {
fn default() -> Self {
Self {
default_timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
memory_limit: DEFAULT_MEMORY_LIMIT,
cpu_time_limit: DEFAULT_CPU_TIME_SECS,
}
}
}
impl RunCodeExecutor {
pub fn new() -> Self {
Self::default()
}
/// Create an executor with custom limits (useful for testing).
pub fn with_limits(timeout: Duration, memory_limit: u64, cpu_time_limit: u64) -> Self {
Self {
default_timeout: timeout,
memory_limit,
cpu_time_limit,
}
}
/// Run an interpreted language (Python, shell).
async fn run_interpreted(
&self,
config: &LanguageConfig,
source_path: &std::path::Path,
working_dir: &std::path::Path,
timeout: Duration,
) -> ExecutionResult {
let memory_limit = self.memory_limit;
let cpu_time_limit = self.cpu_time_limit;
let mut cmd = tokio::process::Command::new(config.interpreter);
cmd.arg(source_path);
cmd.current_dir(working_dir);
cmd.env_clear();
cmd.env("PATH", std::env::var("PATH").unwrap_or_else(|_| "/usr/local/bin:/usr/bin:/bin".into()));
cmd.env("HOME", working_dir);
cmd.env("LANG", "C.UTF-8");
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
unsafe {
cmd.pre_exec(move || apply_resource_limits(memory_limit, cpu_time_limit));
}
run_with_timeout(cmd, timeout).await
}
/// Compile and run Rust code.
async fn run_compile_and_run(
&self,
source_path: &std::path::Path,
working_dir: &std::path::Path,
timeout: Duration,
) -> ExecutionResult {
let binary_path = source_path.with_extension("");
let memory_limit = self.memory_limit;
let cpu_time_limit = self.cpu_time_limit;
// Step 1: Compile (clear env, apply resource limits, preserve Rust toolchain paths).
let compile_memory = self.memory_limit.max(512 * 1024 * 1024); // rustc needs at least 512 MB
let compile_cpu = self.cpu_time_limit;
let mut compile_cmd = tokio::process::Command::new("rustc");
compile_cmd.arg(source_path);
compile_cmd.arg("-o");
compile_cmd.arg(&binary_path);
compile_cmd.current_dir(working_dir);
compile_cmd.env_clear();
compile_cmd.env("PATH", std::env::var("PATH").unwrap_or_else(|_| "/usr/local/bin:/usr/bin:/bin".into()));
compile_cmd.env("LANG", "C.UTF-8");
// Preserve Rust toolchain paths so rustup can find the compiler.
if let Ok(home) = std::env::var("HOME") {
compile_cmd.env("HOME", &home);
compile_cmd.env(
"RUSTUP_HOME",
std::env::var("RUSTUP_HOME").unwrap_or_else(|_| format!("{}/.rustup", home)),
);
compile_cmd.env(
"CARGO_HOME",
std::env::var("CARGO_HOME").unwrap_or_else(|_| format!("{}/.cargo", home)),
);
}
compile_cmd.stdout(std::process::Stdio::piped());
compile_cmd.stderr(std::process::Stdio::piped());
unsafe {
compile_cmd.pre_exec(move || apply_resource_limits(compile_memory, compile_cpu));
}
let compile_result = run_with_timeout(compile_cmd, timeout).await;
match &compile_result {
ExecutionResult::Completed {
exit_code, ..
} if *exit_code != Some(0) => {
// Compilation failed — return compile result with phase marker.
return compile_result.with_phase("compilation");
}
ExecutionResult::TimedOut { .. } | ExecutionResult::SpawnFailed { .. } => {
return compile_result.with_phase("compilation");
}
_ => {}
}
// Step 2: Execute the compiled binary.
let mut run_cmd = tokio::process::Command::new(&binary_path);
run_cmd.current_dir(working_dir);
run_cmd.env_clear();
run_cmd.env("PATH", "/usr/local/bin:/usr/bin:/bin");
run_cmd.env("HOME", working_dir);
run_cmd.stdout(std::process::Stdio::piped());
run_cmd.stderr(std::process::Stdio::piped());
unsafe {
run_cmd.pre_exec(move || apply_resource_limits(memory_limit, cpu_time_limit));
}
// Use remaining timeout.
let elapsed = compile_result.duration();
let remaining = timeout.saturating_sub(elapsed);
if remaining.is_zero() {
return ExecutionResult::TimedOut {
duration: elapsed,
phase: None,
};
}
run_with_timeout(run_cmd, remaining).await
}
}
/// Internal result of a code execution.
enum ExecutionResult {
Completed {
stdout: String,
stderr: String,
exit_code: Option<i32>,
duration: Duration,
phase: Option<String>,
},
TimedOut {
duration: Duration,
phase: Option<String>,
},
SpawnFailed {
error: String,
duration: Duration,
phase: Option<String>,
},
}
impl ExecutionResult {
fn duration(&self) -> Duration {
match self {
Self::Completed { duration, .. } => *duration,
Self::TimedOut { duration, .. } => *duration,
Self::SpawnFailed { duration, .. } => *duration,
}
}
fn with_phase(self, p: &str) -> Self {
match self {
Self::Completed {
stdout,
stderr,
exit_code,
duration,
..
} => Self::Completed {
stdout,
stderr,
exit_code,
duration,
phase: Some(p.to_string()),
},
Self::TimedOut { duration, .. } => Self::TimedOut {
duration,
phase: Some(p.to_string()),
},
Self::SpawnFailed { error, duration, .. } => Self::SpawnFailed {
error,
duration,
phase: Some(p.to_string()),
},
}
}
}
/// Run a command with timeout, capturing output.
/// Uses `kill_on_drop(true)` to ensure the child process is killed if dropped.
async fn run_with_timeout(
mut cmd: tokio::process::Command,
timeout: Duration,
) -> ExecutionResult {
let start = Instant::now();
cmd.kill_on_drop(true);
let child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
return ExecutionResult::SpawnFailed {
error: format!("failed to spawn process: {}", e),
duration: start.elapsed(),
phase: None,
};
}
};
match tokio::time::timeout(timeout, child.wait_with_output()).await {
Ok(Ok(output)) => ExecutionResult::Completed {
stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
exit_code: output.status.code(),
duration: start.elapsed(),
phase: None,
},
Ok(Err(e)) => ExecutionResult::SpawnFailed {
error: format!("process error: {}", e),
duration: start.elapsed(),
phase: None,
},
Err(_) => ExecutionResult::TimedOut {
duration: start.elapsed(),
phase: None,
},
}
}
/// Apply POSIX resource limits. Called in pre_exec context.
///
/// Note: `RLIMIT_NPROC` is intentionally NOT set here because Linux counts
/// ALL processes/threads for the user (not just children of this process),
/// making it unreliable for subprocess sandboxing. Fork bomb protection
/// is instead provided by `RLIMIT_CPU` and the execution timeout.
///
/// # Safety
/// Only async-signal-safe functions are used (setrlimit is POSIX async-signal-safe).
fn apply_resource_limits(memory_bytes: u64, cpu_seconds: u64) -> std::io::Result<()> {
let set_limit = |resource: libc::__rlimit_resource_t, value: u64| -> std::io::Result<()> {
let rlim = libc::rlimit {
rlim_cur: value,
rlim_max: value,
};
if unsafe { libc::setrlimit(resource, &rlim) } != 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
};
set_limit(libc::RLIMIT_AS, memory_bytes)?;
set_limit(libc::RLIMIT_CPU, cpu_seconds)?;
set_limit(libc::RLIMIT_FSIZE, MAX_OUTPUT_FILE_SIZE)?;
set_limit(libc::RLIMIT_NOFILE, MAX_OPEN_FILES)?;
Ok(())
}
#[tonic::async_trait]
impl ToolExecutor for RunCodeExecutor {
async fn execute(&self, parameters: &HashMap<String, String>) -> ToolOutput {
let start = Instant::now();
let language = match parameters.get("language") {
Some(l) if !l.is_empty() => l.to_lowercase(),
_ => {
return ToolOutput {
stdout: String::new(),
stderr: "missing required parameter: language".into(),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
};
let code = match parameters.get("code") {
Some(c) if !c.is_empty() => c,
Some(_) => {
return ToolOutput {
stdout: String::new(),
stderr: "code parameter must not be empty".into(),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
None => {
return ToolOutput {
stdout: String::new(),
stderr: "missing required parameter: code".into(),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
};
let config = match get_language_config(&language) {
Some(c) => c,
None => {
return ToolOutput {
stdout: String::new(),
stderr: format!(
"unsupported language: '{}'. Supported: python, rust, bash/shell/sh",
language
),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
};
let timeout_secs = parameters
.get("timeout_secs")
.and_then(|s| s.parse::<u64>().ok())
.map(|s| Duration::from_secs(s.min(MAX_TIMEOUT_SECS)))
.unwrap_or(self.default_timeout);
// Validate working_dir if provided.
let custom_working_dir = if let Some(wd) = parameters.get("working_dir") {
if wd.is_empty() {
None
} else {
let canonical = normalize_path(wd);
if !canonical.is_absolute() {
return ToolOutput {
stdout: String::new(),
stderr: format!(
"working_dir must be absolute, got: {}",
canonical.display()
),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
if !canonical.is_dir() {
return ToolOutput {
stdout: String::new(),
stderr: format!(
"working_dir does not exist or is not a directory: {}",
canonical.display()
),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
Some(canonical)
}
} else {
None
};
// Create temp directory for source file.
let tmp_dir = match tempfile::tempdir() {
Ok(d) => d,
Err(e) => {
return ToolOutput {
stdout: String::new(),
stderr: format!("failed to create temp directory: {}", e),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
};
let source_file = tmp_dir
.path()
.join(format!("code{}", config.extension));
if let Err(e) = std::fs::write(&source_file, code) {
return ToolOutput {
stdout: String::new(),
stderr: format!("failed to write source file: {}", e),
exit_code: None,
duration: start.elapsed(),
success: false,
};
}
let working_dir = custom_working_dir
.as_deref()
.unwrap_or(tmp_dir.path());
let result = match config.mode {
ExecutionMode::Interpret => {
self.run_interpreted(&config, &source_file, working_dir, timeout_secs)
.await
}
ExecutionMode::CompileAndRun => {
self.run_compile_and_run(&source_file, working_dir, timeout_secs)
.await
}
};
build_output(result, &language, working_dir, start)
}
fn description(&self) -> &str {
"Execute code in a sandboxed environment with resource limits"
}
}
fn build_output(
result: ExecutionResult,
language: &str,
working_dir: &std::path::Path,
start: Instant,
) -> ToolOutput {
match result {
ExecutionResult::Completed {
stdout,
stderr,
exit_code,
duration,
phase,
} => {
let success = exit_code == Some(0);
let mut json = serde_json::json!({
"language": language,
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr,
"timed_out": false,
"duration_ms": duration.as_millis() as u64,
"working_dir": working_dir.display().to_string(),
});
if let Some(p) = phase {
json["phase"] = serde_json::Value::String(p);
}
ToolOutput {
stdout: json.to_string(),
stderr: String::new(),
exit_code,
duration: start.elapsed(),
success,
}
}
ExecutionResult::TimedOut { duration, phase } => {
let mut json = serde_json::json!({
"language": language,
"exit_code": null,
"stdout": "",
"stderr": "execution timed out",
"timed_out": true,
"duration_ms": duration.as_millis() as u64,
"working_dir": working_dir.display().to_string(),
});
if let Some(p) = phase {
json["phase"] = serde_json::Value::String(p);
}
ToolOutput {
stdout: json.to_string(),
stderr: String::new(),
exit_code: None,
duration: start.elapsed(),
success: false,
}
}
ExecutionResult::SpawnFailed { error, phase, .. } => {
let mut json = serde_json::json!({
"language": language,
"exit_code": null,
"stdout": "",
"stderr": error,
"timed_out": false,
"duration_ms": 0,
"working_dir": working_dir.display().to_string(),
});
if let Some(p) = phase {
json["phase"] = serde_json::Value::String(p);
}
ToolOutput {
stdout: json.to_string(),
stderr: String::new(),
exit_code: None,
duration: start.elapsed(),
success: false,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_params(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn fast_executor() -> RunCodeExecutor {
RunCodeExecutor::with_limits(
Duration::from_secs(10),
DEFAULT_MEMORY_LIMIT,
DEFAULT_CPU_TIME_SECS,
)
}
#[tokio::test]
async fn test_python_hello_world() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "print('hello')"),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["language"], "python");
assert_eq!(result["exit_code"], 0);
assert_eq!(result["stdout"], "hello\n");
assert_eq!(result["timed_out"], false);
}
#[tokio::test]
async fn test_python3_alias() {
let executor = fast_executor();
let params = make_params(&[
("language", "python3"),
("code", "print(1+1)"),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["stdout"], "2\n");
}
#[tokio::test]
async fn test_shell_hello_world() {
let executor = fast_executor();
let params = make_params(&[
("language", "sh"),
("code", "echo hello"),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["stdout"], "hello\n");
assert_eq!(result["exit_code"], 0);
}
#[tokio::test]
async fn test_shell_aliases() {
let executor = fast_executor();
for lang in &["bash", "shell", "sh"] {
let params = make_params(&[
("language", lang),
("code", "echo ok"),
]);
let output = executor.execute(&params).await;
assert!(output.success, "failed for language alias: {}", lang);
}
}
#[tokio::test]
async fn test_rust_hello_world() {
let executor = RunCodeExecutor::with_limits(
Duration::from_secs(30),
512 * 1024 * 1024, // Rust compiler needs more memory
30,
);
let params = make_params(&[
("language", "rust"),
("code", "fn main() { println!(\"hello\"); }"),
]);
let output = executor.execute(&params).await;
assert!(output.success, "stdout: {}, stderr: {}", output.stdout, output.stderr);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["language"], "rust");
assert_eq!(result["stdout"], "hello\n");
assert_eq!(result["exit_code"], 0);
}
#[tokio::test]
async fn test_unsupported_language() {
let executor = fast_executor();
let params = make_params(&[
("language", "cobol"),
("code", "DISPLAY 'HELLO'"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert!(output.stderr.contains("unsupported language"));
assert!(output.stderr.contains("cobol"));
}
#[tokio::test]
async fn test_python_syntax_error() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "def f(\n"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_ne!(result["exit_code"], 0);
let stderr_val = result["stderr"].as_str().unwrap();
assert!(!stderr_val.is_empty());
}
#[tokio::test]
async fn test_rust_compilation_error() {
let executor = RunCodeExecutor::with_limits(
Duration::from_secs(30),
512 * 1024 * 1024,
30,
);
let params = make_params(&[
("language", "rust"),
("code", "fn main() { let x: i32 = \"not a number\"; }"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["phase"], "compilation");
let stderr_val = result["stderr"].as_str().unwrap();
assert!(stderr_val.contains("error"));
}
#[tokio::test]
async fn test_shell_exit_code() {
let executor = fast_executor();
let params = make_params(&[
("language", "sh"),
("code", "exit 42"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["exit_code"], 42);
}
#[tokio::test]
async fn test_missing_language() {
let executor = fast_executor();
let params = make_params(&[("code", "print('hi')")]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert!(output.stderr.contains("missing required parameter: language"));
}
#[tokio::test]
async fn test_missing_code() {
let executor = fast_executor();
let params = make_params(&[("language", "python")]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert!(output.stderr.contains("missing required parameter: code"));
}
#[tokio::test]
async fn test_empty_code() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", ""),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert!(output.stderr.contains("must not be empty"));
}
#[tokio::test]
async fn test_timeout_enforcement() {
let executor = RunCodeExecutor::with_limits(
Duration::from_secs(2),
DEFAULT_MEMORY_LIMIT,
DEFAULT_CPU_TIME_SECS,
);
let params = make_params(&[
("language", "python"),
("code", "import time; time.sleep(60)"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["timed_out"], true);
}
#[tokio::test]
async fn test_custom_timeout_param() {
let executor = RunCodeExecutor::with_limits(
Duration::from_secs(60), // High default
DEFAULT_MEMORY_LIMIT,
DEFAULT_CPU_TIME_SECS,
);
let params = make_params(&[
("language", "python"),
("code", "import time; time.sleep(60)"),
("timeout_secs", "1"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["timed_out"], true);
}
#[tokio::test]
async fn test_working_dir_custom() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("data.txt"), "test data").unwrap();
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "print(open('data.txt').read())"),
("working_dir", dir.path().to_str().unwrap()),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert!(result["stdout"].as_str().unwrap().contains("test data"));
}
#[tokio::test]
async fn test_working_dir_nonexistent() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "print('hi')"),
("working_dir", "/nonexistent/path/xyz"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert!(output.stderr.contains("does not exist"));
}
#[tokio::test]
async fn test_working_dir_relative_rejected() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "print('hi')"),
("working_dir", "relative/path"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert!(output.stderr.contains("must be absolute"));
}
#[tokio::test]
async fn test_capture_stdout_and_stderr() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "import sys; print('out'); print('err', file=sys.stderr)"),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["stdout"], "out\n");
assert_eq!(result["stderr"], "err\n");
}
#[tokio::test]
async fn test_json_output_structure() {
let executor = fast_executor();
let params = make_params(&[
("language", "sh"),
("code", "echo ok"),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert!(result.get("language").is_some());
assert!(result.get("exit_code").is_some());
assert!(result.get("stdout").is_some());
assert!(result.get("stderr").is_some());
assert!(result.get("timed_out").is_some());
assert!(result.get("duration_ms").is_some());
assert!(result.get("working_dir").is_some());
}
#[tokio::test]
async fn test_duration_positive() {
let executor = fast_executor();
let params = make_params(&[
("language", "sh"),
("code", "echo ok"),
]);
let output = executor.execute(&params).await;
assert!(output.duration.as_nanos() > 0);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert!(result["duration_ms"].as_u64().is_some());
}
#[tokio::test]
async fn test_language_case_insensitive() {
let executor = fast_executor();
let params = make_params(&[
("language", "PYTHON"),
("code", "print('ok')"),
]);
let output = executor.execute(&params).await;
assert!(output.success);
}
#[tokio::test]
async fn test_description() {
let executor = RunCodeExecutor::new();
assert!(!executor.description().is_empty());
}
#[tokio::test]
async fn test_multiline_code() {
let executor = fast_executor();
let code = "x = 1\ny = 2\nprint(x + y)";
let params = make_params(&[
("language", "python"),
("code", code),
]);
let output = executor.execute(&params).await;
assert!(output.success);
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["stdout"], "3\n");
}
#[tokio::test]
async fn test_exit_code_propagated() {
let executor = fast_executor();
let params = make_params(&[
("language", "python"),
("code", "import sys; sys.exit(7)"),
]);
let output = executor.execute(&params).await;
assert!(!output.success);
assert_eq!(output.exit_code, Some(7));
let result: serde_json::Value = serde_json::from_str(&output.stdout).unwrap();
assert_eq!(result["exit_code"], 7);
}
}

View File

@@ -37,9 +37,9 @@ async fn main() -> anyhow::Result<()> {
"Loaded agent type manifests"
);
// Create tool dispatcher and register executors.
// Create tool dispatcher and register all executors.
let mut dispatcher = ToolDispatcher::new(Duration::from_secs(60));
tool_broker::executors::register_fs_executors(&mut dispatcher);
tool_broker::executors::register_all_executors(&mut dispatcher);
let dispatcher = Arc::new(dispatcher);
// Create loop detector.