diff --git a/changelog.md b/changelog.md index c8ff509a..3ebe8df4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,12 @@ +Upcoming (TBD) +============== + +Features +--------- +* Add `-r` raw mode to `system` command. +* Set timeouts, show exit codes, and better formatting for `system` commands. + + 1.63.0 (2026/03/12) ============== diff --git a/mycli/packages/special/iocommands.py b/mycli/packages/special/iocommands.py index cfcc3433..16011826 100644 --- a/mycli/packages/special/iocommands.py +++ b/mycli/packages/special/iocommands.py @@ -365,31 +365,65 @@ def delete_favorite_query(arg: str, **_) -> list[SQLResult]: return [SQLResult(status=status)] -@special_command("system", "system ", "Execute a system shell commmand.") +@special_command("system", "system [-r] ", "Execute a system shell command (raw mode with -r).") def execute_system_command(arg: str, **_) -> list[SQLResult]: """Execute a system shell command.""" - usage = "Syntax: system [command].\n" + usage = "Syntax: system [-r] [command].\n-r denotes \"raw\" mode, in which output is passed through without formatting." - if not arg: + IMPLICIT_RAW_MODE_COMMANDS = { + 'clear', + 'vim', + 'vi', + 'bash', + 'zsh', + } + + if not arg.strip(): return [SQLResult(status=usage)] try: - command = arg.strip() - if command.startswith("cd"): - ok, error_message = handle_cd_command(arg) - if not ok: - return [SQLResult(status=error_message)] - return [SQLResult(status="")] - - args = arg.split(" ") - process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - output, error = process.communicate() - response = output if not error else error - - encoding = locale.getpreferredencoding(False) - response_str = response.decode(encoding) - - return [SQLResult(status=response_str)] + command = shlex.split(arg.strip(), posix=not WIN) + except ValueError as e: + return [SQLResult(status=f"Cannot parse system command: {e}")] + + raw = False + if command[0] == '-r': + command.pop(0) + raw = True + elif command[0].lower() in IMPLICIT_RAW_MODE_COMMANDS: + raw = True + + if not command: + return [SQLResult(status=usage)] + + if command[0].lower() == 'cd': + ok, error_message = handle_cd_command(command) + if not ok: + return [SQLResult(status=error_message)] + return [SQLResult()] + + try: + if raw: + completed_process = subprocess.run(command, check=False) + if completed_process.returncode: + return [SQLResult(status=f'Command exited with return code {completed_process.returncode}')] + else: + return [SQLResult()] + else: + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + try: + output, error = process.communicate(timeout=60) + except subprocess.TimeoutExpired: + process.kill() + output, error = process.communicate() + response = output if not error else error + encoding = locale.getpreferredencoding(False) + response_str = response.decode(encoding) + if process.returncode: + status = f'Command exited with return code {process.returncode}' + else: + status = None + return [SQLResult(preamble=response_str, status=status)] except OSError as e: return [SQLResult(status=f"OSError: {e.strerror}")] diff --git a/mycli/packages/special/utils.py b/mycli/packages/special/utils.py index c6e12ebe..c395c2c9 100644 --- a/mycli/packages/special/utils.py +++ b/mycli/packages/special/utils.py @@ -1,33 +1,22 @@ import logging import os -import shlex import click import pymysql from pymysql.cursors import Cursor -from mycli.compat import WIN - logger = logging.getLogger(__name__) CACHED_SSL_VERSION: dict[tuple, str | None] = {} -def handle_cd_command(arg: str) -> tuple[bool, str | None]: +def handle_cd_command(command: list[str]) -> tuple[bool, str | None]: """Handles a `cd` shell command by calling python's os.chdir.""" - CD_CMD = "cd" - tokens: list[str] = [] - try: - tokens = shlex.split(arg, posix=not WIN) - except ValueError: - return False, 'Cannot parse cd command.' - if not tokens: - return False, 'Not a cd command.' - if not tokens[0].lower() == CD_CMD: + if not command[0].lower() == 'cd': return False, 'Not a cd command.' - if len(tokens) != 2: + if len(command) != 2: return False, 'Exactly one directory name must be provided.' - directory = tokens[1] + directory = command[1] try: os.chdir(directory) click.echo(os.getcwd(), err=True) diff --git a/test/features/fixture_data/help_commands.txt b/test/features/fixture_data/help_commands.txt index 70327aea..0d317eda 100644 --- a/test/features/fixture_data/help_commands.txt +++ b/test/features/fixture_data/help_commands.txt @@ -28,7 +28,7 @@ | rehash | \# | rehash | Refresh auto-completions. | | source | \. | source | Execute queries from a file. | | status | \s | status | Get status information from the server. | -| system | | system | Execute a system shell commmand. | +| system | | system [-r] | Execute a system shell command (raw mode with -r). | | tableformat | \T | tableformat | Change the table format used to output interactive results. | | tee | | tee [-o] | Append all results to an output file (overwrite using -o). | | use | \u | use | Change to a new database. | diff --git a/test/test_sqlexecute.py b/test/test_sqlexecute.py index 469ddaec..3ee2ca42 100644 --- a/test/test_sqlexecute.py +++ b/test/test_sqlexecute.py @@ -290,7 +290,7 @@ def test_cd_command_with_one_nonexistent_folder_name(executor): def test_cd_command_with_one_real_folder_name(executor): results = run(executor, 'system cd screenshots') # todo would be better to capture stderr but there was a problem with capsys - assert results[0]['status_plain'] == '' + assert results[0]['status_plain'] is None @dbtest @@ -304,7 +304,11 @@ def test_cd_command_with_two_folder_names(executor): @dbtest def test_cd_command_unbalanced(executor): results = run(executor, "system cd 'one") - assert_result_equal(results, status='Cannot parse cd command.', status_plain='Cannot parse cd command.') + assert_result_equal( + results, + status='Cannot parse system command: No closing quotation', + status_plain='Cannot parse system command: No closing quotation', + ) @dbtest @@ -322,7 +326,7 @@ def test_system_command_output(executor): test_dir = os.path.abspath(os.path.dirname(__file__)) test_file_path = os.path.join(test_dir, "test.txt") results = run(executor, f"system cat {test_file_path}") - assert_result_equal(results, status=f"mycli rocks!{eol}", status_plain=f"mycli rocks!{eol}") + assert_result_equal(results, preamble=f"mycli rocks!{eol}") @dbtest