Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
Upcoming (TBD)
==============

Features
---------
* Add `-r` raw mode to `system` command.
* Set timeouts, show exit codes, and better formatting for `system` commands.


1.63.0 (2026/03/12)
==============

Expand Down
72 changes: 53 additions & 19 deletions mycli/packages/special/iocommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,31 +365,65 @@ def delete_favorite_query(arg: str, **_) -> list[SQLResult]:
return [SQLResult(status=status)]


@special_command("system", "system <command>", "Execute a system shell commmand.")
@special_command("system", "system [-r] <command>", "Execute a system shell command (raw mode with -r).")
def execute_system_command(arg: str, **_) -> list[SQLResult]:
"""Execute a system shell command."""
usage = "Syntax: system [command].\n"
usage = "Syntax: system [-r] [command].\n-r denotes \"raw\" mode, in which output is passed through without formatting."

if not arg:
IMPLICIT_RAW_MODE_COMMANDS = {
'clear',
'vim',
'vi',
'bash',
'zsh',
}

if not arg.strip():
return [SQLResult(status=usage)]

try:
command = arg.strip()
if command.startswith("cd"):
ok, error_message = handle_cd_command(arg)
if not ok:
return [SQLResult(status=error_message)]
return [SQLResult(status="")]

args = arg.split(" ")
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
response = output if not error else error

encoding = locale.getpreferredencoding(False)
response_str = response.decode(encoding)

return [SQLResult(status=response_str)]
command = shlex.split(arg.strip(), posix=not WIN)
except ValueError as e:
return [SQLResult(status=f"Cannot parse system command: {e}")]

raw = False
if command[0] == '-r':
command.pop(0)
raw = True
elif command[0].lower() in IMPLICIT_RAW_MODE_COMMANDS:
raw = True

if not command:
return [SQLResult(status=usage)]

if command[0].lower() == 'cd':
ok, error_message = handle_cd_command(command)
if not ok:
return [SQLResult(status=error_message)]
return [SQLResult()]

try:
if raw:
completed_process = subprocess.run(command, check=False)
if completed_process.returncode:
return [SQLResult(status=f'Command exited with return code {completed_process.returncode}')]
else:
return [SQLResult()]
else:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
output, error = process.communicate(timeout=60)
except subprocess.TimeoutExpired:
process.kill()
output, error = process.communicate()
response = output if not error else error
encoding = locale.getpreferredencoding(False)
response_str = response.decode(encoding)
if process.returncode:
status = f'Command exited with return code {process.returncode}'
else:
status = None
return [SQLResult(preamble=response_str, status=status)]
except OSError as e:
return [SQLResult(status=f"OSError: {e.strerror}")]

Expand Down
19 changes: 4 additions & 15 deletions mycli/packages/special/utils.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,22 @@
import logging
import os
import shlex

import click
import pymysql
from pymysql.cursors import Cursor

from mycli.compat import WIN

logger = logging.getLogger(__name__)

CACHED_SSL_VERSION: dict[tuple, str | None] = {}


def handle_cd_command(arg: str) -> tuple[bool, str | None]:
def handle_cd_command(command: list[str]) -> tuple[bool, str | None]:
"""Handles a `cd` shell command by calling python's os.chdir."""
CD_CMD = "cd"
tokens: list[str] = []
try:
tokens = shlex.split(arg, posix=not WIN)
except ValueError:
return False, 'Cannot parse cd command.'
if not tokens:
return False, 'Not a cd command.'
if not tokens[0].lower() == CD_CMD:
if not command[0].lower() == 'cd':
return False, 'Not a cd command.'
if len(tokens) != 2:
if len(command) != 2:
return False, 'Exactly one directory name must be provided.'
directory = tokens[1]
directory = command[1]
try:
os.chdir(directory)
click.echo(os.getcwd(), err=True)
Expand Down
2 changes: 1 addition & 1 deletion test/features/fixture_data/help_commands.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
| rehash | \# | rehash | Refresh auto-completions. |
| source | \. | source <filename> | Execute queries from a file. |
| status | \s | status | Get status information from the server. |
| system | <null> | system <command> | Execute a system shell commmand. |
| system | <null> | system [-r] <command> | Execute a system shell command (raw mode with -r). |
| tableformat | \T | tableformat <format> | Change the table format used to output interactive results. |
| tee | <null> | tee [-o] <filename> | Append all results to an output file (overwrite using -o). |
| use | \u | use <database> | Change to a new database. |
Expand Down
10 changes: 7 additions & 3 deletions test/test_sqlexecute.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def test_cd_command_with_one_nonexistent_folder_name(executor):
def test_cd_command_with_one_real_folder_name(executor):
results = run(executor, 'system cd screenshots')
# todo would be better to capture stderr but there was a problem with capsys
assert results[0]['status_plain'] == ''
assert results[0]['status_plain'] is None


@dbtest
Expand All @@ -304,7 +304,11 @@ def test_cd_command_with_two_folder_names(executor):
@dbtest
def test_cd_command_unbalanced(executor):
results = run(executor, "system cd 'one")
assert_result_equal(results, status='Cannot parse cd command.', status_plain='Cannot parse cd command.')
assert_result_equal(
results,
status='Cannot parse system command: No closing quotation',
status_plain='Cannot parse system command: No closing quotation',
)


@dbtest
Expand All @@ -322,7 +326,7 @@ def test_system_command_output(executor):
test_dir = os.path.abspath(os.path.dirname(__file__))
test_file_path = os.path.join(test_dir, "test.txt")
results = run(executor, f"system cat {test_file_path}")
assert_result_equal(results, status=f"mycli rocks!{eol}", status_plain=f"mycli rocks!{eol}")
assert_result_equal(results, preamble=f"mycli rocks!{eol}")


@dbtest
Expand Down
Loading