Skip to content

Shell Executable API Reference

The Shell class is the core component of hands-scaphoid, providing a secure context manager for shell command execution.

Context Manager

The Shell class implements the context manager protocol, ensuring proper cleanup of resources.

from hands_scaphoid import Shell

with Shell() as shell:
    result = shell.run("echo 'Hello World'")
    print(result.stdout)
from hands_scaphoid import Shell

with Shell(shell="bash", timeout=30) as shell:
    result = shell.run("ls -la")
    if result.success:
        print(result.stdout)
    else:
        print(f"Error: {result.stderr}")

Platform Compatibility

Different shell types have varying levels of support across platforms. See the Windows Shells documentation for platform-specific details.

API documentation

hands_scaphoid.objects.ShellExecutable

Shell execution module with security and environment management.

File

name: Shell.py uuid: 52319d84-4784-4df7-8752-64967f3716f8 date: 2025-09-12

Description

Provides secure shell command execution with allowlisting, environment management, and Docker integration capabilities.

Project

name: hands/palm/trapezium uuid: 2945ba3b-2d66-4dff-b898-672c386f03f4

Authors: ["Andreas Häberle"] Projects: ["hands/palm/trapezium"]

Classes

ShellExecutable

Bases: ExecutableFile

A secure shell command executor with environment management.

This class provides a secure way to execute shell commands with features like: - Command allowlisting for security - Environment variable management - Docker container command execution - Working directory management

Source code in src/hands_scaphoid/objects/ShellExecutable.py
class ShellExecutable(ExecutableFile):
    """
    A secure shell command executor with environment management.

    This class provides a secure way to execute shell commands with features like:
    - Command allowlisting for security
    - Environment variable management
    - Docker container command execution
    - Working directory management
    """

    def __init__(
        self,
        cwd: Optional[Union[str, PathLike]] = None,
        env: Optional[dict[str, str]] = None,
        env_file: str = "~/.env",
    ) -> None:
        """
        Initialize the Shell instance.

        Args:
            cwd: Working directory for command execution. Defaults to current directory.
            env: Environment variables dictionary. Defaults to copy of os.environ.
            env_file: Path to environment file to load variables from.

        Raises:
            FileNotFoundError: If the specified working directory doesn't exist.
        """
        super().__init__(name="shell", path=str(cwd or os.getcwd()))

        self.cwd = str(Path(cwd or os.getcwd()).resolve())
        if not os.path.isdir(self.cwd):
            raise FileNotFoundError(f"Working directory does not exist: {self.cwd}")

        self.env = env or os.environ.copy()
        self.env_file = os.path.expanduser(env_file)
        self._load_env_file()
        self.allow_commands: List[str] = []
        self.last_result: Optional[subprocess.CompletedProcess] = None

        # define basic commands
        self.allow_commands.append("which")

    def _load_env_file(self) -> None:
        """Load environment variables from the specified env file."""
        if os.path.exists(self.env_file):
            try:
                with open(self.env_file, "r", encoding="utf-8") as f:
                    for line_num, line in enumerate(f, 1):
                        line = line.strip()
                        if line and not line.startswith("#") and "=" in line:
                            try:
                                key, value = line.split("=", 1)
                                self.env[key.strip()] = value.strip()
                            except ValueError:
                                console.print(
                                    f"[yellow]Warning: Invalid line {line_num} in {self.env_file}: {line}[/yellow]"
                                )
            except IOError as e:
                console.print(
                    f"[yellow]Warning: Could not read env file {self.env_file}: {e}[/yellow]"
                )

    def allow(self, command: Union[str, List[str]]) -> bool:
        """
        Allow a command to be executed.

        Args:
            command: The command or lost of commands to allow (first word will be extracted).

        Returns:
            True if command was successfully allowed, False if command doesn't exist.

        Raises:
            ValueError: If command is empty or invalid.
        """
        do_check: bool = True

        if isinstance(command, str):
            command = [command]

        for cmd in command:
            if not cmd or not cmd.split(" ")[0].strip():
                raise ValueError("Command cannot be empty")

            # check if available
            if do_check:
                # Use a different approach for checking command availability
                # to avoid recursive call to run()
                try:
                    if platform.system() == "Windows":
                        # On Windows, try 'where' command instead of 'which'
                        check_result = subprocess.run(
                            ["where", cmd], capture_output=True, timeout=2, env=self.env
                        )
                    else:
                        # On Unix systems, use 'which'
                        check_result = subprocess.run(
                            ["which", cmd], capture_output=True, timeout=2, env=self.env
                        )
                    add = check_result.returncode == 0
                except (
                    subprocess.CalledProcessError,
                    subprocess.TimeoutExpired,
                    FileNotFoundError,
                ):
                    add = True  # If we can't check, assume it's available
                    console.print(f"⚠️ Could not verify availability of command: {cmd}")
            else:
                add = True
                console.print(f"⚠️ Skipping availability check for command: {cmd}")

            if add:
                # command is available
                if cmd not in self.allow_commands:
                    self.allow_commands.append(cmd)
                else:
                    pass
                result = True
            else:
                console.print(
                    f"❌ Command not found: {cmd} in {self.env.get('PATH', '')}"
                )
                result = False

        return result

    def run(
        self,
        command_with_args: str,
        timeout: Optional[int] = None,
        capture_output: bool = True,
        text: bool = True,
        check: bool = True,
    ) -> subprocess.CompletedProcess:
        """
        Execute a shell command with security checks.

        Args:
            command_with_args: The shell command to execute including arguments.
            timeout: Maximum seconds to wait for command completion.
            capture_output: Whether to capture stdout and stderr.
            text: Whether to return output as text (str) or bytes.
            check: Whether to raise exception on non-zero exit codes.

        Returns:
            CompletedProcess object with execution results.

        Raises:
            PermissionError: If the command is not in the allow list.
            subprocess.CalledProcessError: If check=True and command fails.
            subprocess.TimeoutExpired: If command times out.
            ValueError: If command is empty or invalid.
        """

        if isinstance(command_with_args, str):
            command_parts = command_with_args.strip().split()
        else:
            command_parts = command_with_args

        command_name = command_parts[0]

        console.print(f"[bold]$ {command_with_args}[/bold]")

        if command_name not in self.allow_commands:
            raise PermissionError(
                f"Command '{command_name}' is not allowed. Use allow() first."
            )

        try:
            result = subprocess.run(
                command_parts,
                cwd=self.cwd,
                env=self.env,
                timeout=timeout,
                capture_output=capture_output,
                text=text,
                check=check,
            )

            if result.stderr and capture_output:
                console.print(f"[red]Error: {result.stderr}[/red]")

            self.last_result = result
            return result

        except subprocess.CalledProcessError as e:
            self.last_result = e
            raise
        except subprocess.TimeoutExpired as e:
            console.print(
                f"[red]Command timed out after {timeout} seconds: {command_with_args}[/red]"
            )
            raise

    def run_in(
        self,
        container_name: str,
        command_with_args: list[str] | str,
        timeout: Optional[int] = None,
        capture_output: bool = True,
        text: bool = True,
        check: bool = True,
    ) -> subprocess.CompletedProcess:
        """
        Execute a command inside a Docker container.

        Args:
            container_name: Name of the Docker container.
            command_with_args: Command to execute inside the container.
            timeout: Maximum seconds to wait for command completion.
            capture_output: Whether to capture stdout and stderr.
            text: Whether to return output as text (str) or bytes.
            check: Whether to raise exception on non-zero exit codes.

        Returns:
            CompletedProcess object with execution results.

        Raises:
            PermissionError: If docker command is not allowed.
            subprocess.CalledProcessError: If check=True and command fails.
            ValueError: If container_name or command is empty.
        """

        command_with_args.insert(0, container_name)
        command_with_args.insert(0, "exec")
        command_with_args.insert(0, "docker")

        return self.run(command_with_args, timeout, capture_output, text, check)

    def cd(self, path: str) -> None:
        """
        Change the current working directory.

        Args:
            path: Path to change to (relative or absolute).

        Raises:
            NotADirectoryError: If the path is not a valid directory.
            ValueError: If path is empty.
        """
        if not path or not path.strip():
            raise ValueError("Path cannot be empty")

        if os.name == "nt":
            new_path = Path("C:") / os.path.abspath(
                os.path.join(self.cwd, path.replace("/", "\\"))
            )

        new_path = os.path.abspath(os.path.join(self.cwd, path))

        if not os.path.isdir(new_path):
            raise NotADirectoryError(f"{new_path} is not a valid directory.")

        self.cwd = new_path

    def get_env_var(self, var_name: str) -> Optional[str]:
        """
        Get the value of an environment variable.

        Args:
            var_name: Name of the environment variable.

        Returns:
            Value of the environment variable or None if not found.

        Raises:
            ValueError: If var_name is empty.
        """
        if not var_name or not var_name.strip():
            raise ValueError("Variable name cannot be empty")

        return self.env.get(var_name)

    def set_env_var(self, var_name: str, value: str) -> None:
        """
        Set an environment variable.

        Args:
            var_name: Name of the environment variable.
            value: Value to set.

        Raises:
            ValueError: If var_name is empty.
        """
        if not var_name or not var_name.strip():
            raise ValueError("Variable name cannot be empty")

        self.env[var_name] = value

    def sleep(self, seconds: Union[int, float]) -> None:
        """
        Sleep for the specified number of seconds.

        Args:
            seconds: Number of seconds to sleep.

        Raises:
            ValueError: If seconds is negative.
        """
        if seconds < 0:
            raise ValueError("Sleep duration cannot be negative")

        time.sleep(seconds)

    def depends_on(self, names: Union[str, List[str]]) -> bool:
        """
        Check if Docker containers are running.

        Args:
            names: Container name(s) to check.

        Returns:
            True if all containers are running.

        Raises:
            SystemExit: If any container is not running.
            RuntimeError: If docker command fails.
        """
        if isinstance(names, str):
            names = [names]

        if not names:
            raise ValueError("Container names cannot be empty")

        try:
            # Ensure docker is allowed
            if "docker" not in self.allow_commands:
                if not self.allow("docker"):
                    raise RuntimeError("Docker command is not available")

            result = self.run("docker ps --format '{{.Names}}'", check=True)
            running_containers = [
                line.strip().replace("'", "") for line in result.stdout.splitlines()
            ]

            for name in names:
                if name not in running_containers:
                    console.print(f"[red]Container {name} is not running![/red]")
                    sys.exit(2)

            return True

        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"Failed to check Docker containers: {e}")

    def get_allowed_commands(self) -> List[str]:
        """
        Get the list of currently allowed commands.

        Returns:
            List of allowed command names.
        """
        return self.allow_commands.copy()

    def is_command_allowed(self, command: str) -> bool:
        """
        Check if a command is in the allow list.

        Args:
            command: Command name to check.

        Returns:
            True if command is allowed, False otherwise.
        """
        if not command or not command.strip():
            return False

        command_name = command.strip().split()[0]
        return command_name in self.allow_commands
Functions
__init__(cwd=None, env=None, env_file='~/.env')

Initialize the Shell instance.

Parameters:

Name Type Description Default
cwd Optional[Union[str, PathLike]]

Working directory for command execution. Defaults to current directory.

None
env Optional[dict[str, str]]

Environment variables dictionary. Defaults to copy of os.environ.

None
env_file str

Path to environment file to load variables from.

'~/.env'

Raises:

Type Description
FileNotFoundError

If the specified working directory doesn't exist.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def __init__(
    self,
    cwd: Optional[Union[str, PathLike]] = None,
    env: Optional[dict[str, str]] = None,
    env_file: str = "~/.env",
) -> None:
    """
    Initialize the Shell instance.

    Args:
        cwd: Working directory for command execution. Defaults to current directory.
        env: Environment variables dictionary. Defaults to copy of os.environ.
        env_file: Path to environment file to load variables from.

    Raises:
        FileNotFoundError: If the specified working directory doesn't exist.
    """
    super().__init__(name="shell", path=str(cwd or os.getcwd()))

    self.cwd = str(Path(cwd or os.getcwd()).resolve())
    if not os.path.isdir(self.cwd):
        raise FileNotFoundError(f"Working directory does not exist: {self.cwd}")

    self.env = env or os.environ.copy()
    self.env_file = os.path.expanduser(env_file)
    self._load_env_file()
    self.allow_commands: List[str] = []
    self.last_result: Optional[subprocess.CompletedProcess] = None

    # define basic commands
    self.allow_commands.append("which")
allow(command)

Allow a command to be executed.

Parameters:

Name Type Description Default
command Union[str, List[str]]

The command or lost of commands to allow (first word will be extracted).

required

Returns:

Type Description
bool

True if command was successfully allowed, False if command doesn't exist.

Raises:

Type Description
ValueError

If command is empty or invalid.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def allow(self, command: Union[str, List[str]]) -> bool:
    """
    Allow a command to be executed.

    Args:
        command: The command or lost of commands to allow (first word will be extracted).

    Returns:
        True if command was successfully allowed, False if command doesn't exist.

    Raises:
        ValueError: If command is empty or invalid.
    """
    do_check: bool = True

    if isinstance(command, str):
        command = [command]

    for cmd in command:
        if not cmd or not cmd.split(" ")[0].strip():
            raise ValueError("Command cannot be empty")

        # check if available
        if do_check:
            # Use a different approach for checking command availability
            # to avoid recursive call to run()
            try:
                if platform.system() == "Windows":
                    # On Windows, try 'where' command instead of 'which'
                    check_result = subprocess.run(
                        ["where", cmd], capture_output=True, timeout=2, env=self.env
                    )
                else:
                    # On Unix systems, use 'which'
                    check_result = subprocess.run(
                        ["which", cmd], capture_output=True, timeout=2, env=self.env
                    )
                add = check_result.returncode == 0
            except (
                subprocess.CalledProcessError,
                subprocess.TimeoutExpired,
                FileNotFoundError,
            ):
                add = True  # If we can't check, assume it's available
                console.print(f"⚠️ Could not verify availability of command: {cmd}")
        else:
            add = True
            console.print(f"⚠️ Skipping availability check for command: {cmd}")

        if add:
            # command is available
            if cmd not in self.allow_commands:
                self.allow_commands.append(cmd)
            else:
                pass
            result = True
        else:
            console.print(
                f"❌ Command not found: {cmd} in {self.env.get('PATH', '')}"
            )
            result = False

    return result
run(command_with_args, timeout=None, capture_output=True, text=True, check=True)

Execute a shell command with security checks.

Parameters:

Name Type Description Default
command_with_args str

The shell command to execute including arguments.

required
timeout Optional[int]

Maximum seconds to wait for command completion.

None
capture_output bool

Whether to capture stdout and stderr.

True
text bool

Whether to return output as text (str) or bytes.

True
check bool

Whether to raise exception on non-zero exit codes.

True

Returns:

Type Description
CompletedProcess

CompletedProcess object with execution results.

Raises:

Type Description
PermissionError

If the command is not in the allow list.

CalledProcessError

If check=True and command fails.

TimeoutExpired

If command times out.

ValueError

If command is empty or invalid.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def run(
    self,
    command_with_args: str,
    timeout: Optional[int] = None,
    capture_output: bool = True,
    text: bool = True,
    check: bool = True,
) -> subprocess.CompletedProcess:
    """
    Execute a shell command with security checks.

    Args:
        command_with_args: The shell command to execute including arguments.
        timeout: Maximum seconds to wait for command completion.
        capture_output: Whether to capture stdout and stderr.
        text: Whether to return output as text (str) or bytes.
        check: Whether to raise exception on non-zero exit codes.

    Returns:
        CompletedProcess object with execution results.

    Raises:
        PermissionError: If the command is not in the allow list.
        subprocess.CalledProcessError: If check=True and command fails.
        subprocess.TimeoutExpired: If command times out.
        ValueError: If command is empty or invalid.
    """

    if isinstance(command_with_args, str):
        command_parts = command_with_args.strip().split()
    else:
        command_parts = command_with_args

    command_name = command_parts[0]

    console.print(f"[bold]$ {command_with_args}[/bold]")

    if command_name not in self.allow_commands:
        raise PermissionError(
            f"Command '{command_name}' is not allowed. Use allow() first."
        )

    try:
        result = subprocess.run(
            command_parts,
            cwd=self.cwd,
            env=self.env,
            timeout=timeout,
            capture_output=capture_output,
            text=text,
            check=check,
        )

        if result.stderr and capture_output:
            console.print(f"[red]Error: {result.stderr}[/red]")

        self.last_result = result
        return result

    except subprocess.CalledProcessError as e:
        self.last_result = e
        raise
    except subprocess.TimeoutExpired as e:
        console.print(
            f"[red]Command timed out after {timeout} seconds: {command_with_args}[/red]"
        )
        raise
run_in(container_name, command_with_args, timeout=None, capture_output=True, text=True, check=True)

Execute a command inside a Docker container.

Parameters:

Name Type Description Default
container_name str

Name of the Docker container.

required
command_with_args list[str] | str

Command to execute inside the container.

required
timeout Optional[int]

Maximum seconds to wait for command completion.

None
capture_output bool

Whether to capture stdout and stderr.

True
text bool

Whether to return output as text (str) or bytes.

True
check bool

Whether to raise exception on non-zero exit codes.

True

Returns:

Type Description
CompletedProcess

CompletedProcess object with execution results.

Raises:

Type Description
PermissionError

If docker command is not allowed.

CalledProcessError

If check=True and command fails.

ValueError

If container_name or command is empty.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def run_in(
    self,
    container_name: str,
    command_with_args: list[str] | str,
    timeout: Optional[int] = None,
    capture_output: bool = True,
    text: bool = True,
    check: bool = True,
) -> subprocess.CompletedProcess:
    """
    Execute a command inside a Docker container.

    Args:
        container_name: Name of the Docker container.
        command_with_args: Command to execute inside the container.
        timeout: Maximum seconds to wait for command completion.
        capture_output: Whether to capture stdout and stderr.
        text: Whether to return output as text (str) or bytes.
        check: Whether to raise exception on non-zero exit codes.

    Returns:
        CompletedProcess object with execution results.

    Raises:
        PermissionError: If docker command is not allowed.
        subprocess.CalledProcessError: If check=True and command fails.
        ValueError: If container_name or command is empty.
    """

    command_with_args.insert(0, container_name)
    command_with_args.insert(0, "exec")
    command_with_args.insert(0, "docker")

    return self.run(command_with_args, timeout, capture_output, text, check)
cd(path)

Change the current working directory.

Parameters:

Name Type Description Default
path str

Path to change to (relative or absolute).

required

Raises:

Type Description
NotADirectoryError

If the path is not a valid directory.

ValueError

If path is empty.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def cd(self, path: str) -> None:
    """
    Change the current working directory.

    Args:
        path: Path to change to (relative or absolute).

    Raises:
        NotADirectoryError: If the path is not a valid directory.
        ValueError: If path is empty.
    """
    if not path or not path.strip():
        raise ValueError("Path cannot be empty")

    if os.name == "nt":
        new_path = Path("C:") / os.path.abspath(
            os.path.join(self.cwd, path.replace("/", "\\"))
        )

    new_path = os.path.abspath(os.path.join(self.cwd, path))

    if not os.path.isdir(new_path):
        raise NotADirectoryError(f"{new_path} is not a valid directory.")

    self.cwd = new_path
get_env_var(var_name)

Get the value of an environment variable.

Parameters:

Name Type Description Default
var_name str

Name of the environment variable.

required

Returns:

Type Description
Optional[str]

Value of the environment variable or None if not found.

Raises:

Type Description
ValueError

If var_name is empty.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def get_env_var(self, var_name: str) -> Optional[str]:
    """
    Get the value of an environment variable.

    Args:
        var_name: Name of the environment variable.

    Returns:
        Value of the environment variable or None if not found.

    Raises:
        ValueError: If var_name is empty.
    """
    if not var_name or not var_name.strip():
        raise ValueError("Variable name cannot be empty")

    return self.env.get(var_name)
set_env_var(var_name, value)

Set an environment variable.

Parameters:

Name Type Description Default
var_name str

Name of the environment variable.

required
value str

Value to set.

required

Raises:

Type Description
ValueError

If var_name is empty.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def set_env_var(self, var_name: str, value: str) -> None:
    """
    Set an environment variable.

    Args:
        var_name: Name of the environment variable.
        value: Value to set.

    Raises:
        ValueError: If var_name is empty.
    """
    if not var_name or not var_name.strip():
        raise ValueError("Variable name cannot be empty")

    self.env[var_name] = value
sleep(seconds)

Sleep for the specified number of seconds.

Parameters:

Name Type Description Default
seconds Union[int, float]

Number of seconds to sleep.

required

Raises:

Type Description
ValueError

If seconds is negative.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def sleep(self, seconds: Union[int, float]) -> None:
    """
    Sleep for the specified number of seconds.

    Args:
        seconds: Number of seconds to sleep.

    Raises:
        ValueError: If seconds is negative.
    """
    if seconds < 0:
        raise ValueError("Sleep duration cannot be negative")

    time.sleep(seconds)
depends_on(names)

Check if Docker containers are running.

Parameters:

Name Type Description Default
names Union[str, List[str]]

Container name(s) to check.

required

Returns:

Type Description
bool

True if all containers are running.

Raises:

Type Description
SystemExit

If any container is not running.

RuntimeError

If docker command fails.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def depends_on(self, names: Union[str, List[str]]) -> bool:
    """
    Check if Docker containers are running.

    Args:
        names: Container name(s) to check.

    Returns:
        True if all containers are running.

    Raises:
        SystemExit: If any container is not running.
        RuntimeError: If docker command fails.
    """
    if isinstance(names, str):
        names = [names]

    if not names:
        raise ValueError("Container names cannot be empty")

    try:
        # Ensure docker is allowed
        if "docker" not in self.allow_commands:
            if not self.allow("docker"):
                raise RuntimeError("Docker command is not available")

        result = self.run("docker ps --format '{{.Names}}'", check=True)
        running_containers = [
            line.strip().replace("'", "") for line in result.stdout.splitlines()
        ]

        for name in names:
            if name not in running_containers:
                console.print(f"[red]Container {name} is not running![/red]")
                sys.exit(2)

        return True

    except subprocess.CalledProcessError as e:
        raise RuntimeError(f"Failed to check Docker containers: {e}")
get_allowed_commands()

Get the list of currently allowed commands.

Returns:

Type Description
List[str]

List of allowed command names.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def get_allowed_commands(self) -> List[str]:
    """
    Get the list of currently allowed commands.

    Returns:
        List of allowed command names.
    """
    return self.allow_commands.copy()
is_command_allowed(command)

Check if a command is in the allow list.

Parameters:

Name Type Description Default
command str

Command name to check.

required

Returns:

Type Description
bool

True if command is allowed, False otherwise.

Source code in src/hands_scaphoid/objects/ShellExecutable.py
def is_command_allowed(self, command: str) -> bool:
    """
    Check if a command is in the allow list.

    Args:
        command: Command name to check.

    Returns:
        True if command is allowed, False otherwise.
    """
    if not command or not command.strip():
        return False

    command_name = command.strip().split()[0]
    return command_name in self.allow_commands