Skip to content

Conversation

@OlegWock
Copy link
Contributor

@OlegWock OlegWock commented Jan 30, 2026

This PR replaces runtime patch for query cancellation on KeyboardInterrupt for BigQuery with more general system. We now wrap SQLAlchemy connection to track produced cursors and if we receive BaseException, we try to cancel them before reraising.

This was tested with Trino and BigQuery. See instructions for testing Trino in #63 and for BigQuery in #34

Summary by CodeRabbit

  • New Features

    • Enhanced query and cursor lifecycle management for improved cancellation handling in SQL execution.
  • Bug Fixes

    • Better resource cleanup when SQL operations are interrupted or encounter errors.
  • Refactor

    • Simplified runtime initialization process.

✏️ Tip: You can customize this high-level summary in your review settings.

@linear
Copy link

linear bot commented Jan 30, 2026

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

The changes replace a BigQuery-specific monkeypatch approach with a generalized cursor-tracking mechanism for SQL query cancellation. The runtime_patches.py file—which patched BigQuery's internal \_wait_or_cancel function—is entirely removed. New wrapper classes (CursorTrackingDBAPIConnection and CursorTrackingSQLAlchemyConnection) intercept cursor creation and track active cursors during SQL execution. On KeyboardInterrupt or exception, tracked cursors are canceled through a unified \_cancel_cursor utility. Error handling in \_execute_sql_on_engine now triggers cleanup of all tracked cursors.

Sequence Diagram

sequenceDiagram
    participant User as User
    participant SQLExec as SQL Executor
    participant ConnWrapper as Cursor Tracking<br/>Wrapper
    participant Cursor as DB Cursor
    participant Job as BigQuery Job<br/>(if applicable)

    User->>SQLExec: Execute SQL Query
    SQLExec->>ConnWrapper: Get connection<br/>(wrapped)
    ConnWrapper->>ConnWrapper: Track cursor in<br/>WeakSet
    ConnWrapper->>Cursor: Create cursor
    Cursor->>Job: Execute query
    
    rect rgba(220, 53, 69, 0.5)
    User->>SQLExec: Cancel (KeyboardInterrupt)
    SQLExec->>ConnWrapper: cancel_all_cursors()
    ConnWrapper->>Cursor: cursor.cancel()
    Cursor->>Job: cancel()
    Job-->>Cursor: Cancellation acknowledged
    Cursor-->>ConnWrapper: Cancelled
    ConnWrapper-->>SQLExec: All cursors cleaned
    SQLExec-->>User: Interrupt propagated
    end
Loading
🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 56.25% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed Title accurately describes the main change: replacing a BigQuery-specific runtime patch with a general cursor-tracking solution for all database backends.
Linked Issues check ✅ Passed Changes implement general query cancellation via cursor tracking, addressing the Trino cancellation issue [BLU-5525] by replacing ad-hoc patches with a universally-applicable mechanism.
Out of Scope Changes check ✅ Passed All changes are in-scope: removal of BigQuery patch, addition of cursor-tracking wrappers, and test updates directly support the cancellation fix objective.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

github-actions bot commented Jan 30, 2026

📦 Python package built successfully!

  • Version: 2.1.1.dev4+a94f530
  • Wheel: deepnote_toolkit-2.1.1.dev4+a94f530-py3-none-any.whl
  • Install:
    pip install "deepnote-toolkit @ https://deepnote-staging-runtime-artifactory.s3.amazonaws.com/deepnote-toolkit-packages/2.1.1.dev4%2Ba94f530/deepnote_toolkit-2.1.1.dev4%2Ba94f530-py3-none-any.whl"

@codecov
Copy link

codecov bot commented Jan 30, 2026

Codecov Report

❌ Patch coverage is 74.50980% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.29%. Comparing base (a3ac920) to head (1ad219e).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
deepnote_toolkit/sql/sql_execution.py 74.50% 9 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #64      +/-   ##
==========================================
+ Coverage   73.28%   73.29%   +0.01%     
==========================================
  Files          93       92       -1     
  Lines        5199     5220      +21     
  Branches      757      763       +6     
==========================================
+ Hits         3810     3826      +16     
- Misses       1147     1148       +1     
- Partials      242      246       +4     
Flag Coverage Δ
combined 73.29% <74.50%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@deepnote-bot
Copy link

🚀 Review App Deployment Started

📝 Description 🌐 Link / Info
🌍 Review application ra-64
🔑 Sign-in URL Click to sign-in
📊 Application logs View logs
🔄 Actions Click to redeploy
🚀 ArgoCD deployment View deployment
Last deployed 2026-01-30 13:06:43 (UTC)
📜 Deployed commit 4a36ae9b489fe40d9e41e663e3e234334d348c9d
🛠️ Toolkit version a94f530

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@deepnote_toolkit/sql/sql_execution.py`:
- Around line 525-540: The cursor tracking currently uses a WeakSet and will
raise TypeError for non‑weakrefable cursor objects; update __init__ to also
create a fallback strong set (e.g., self._self_strong_cursor_set = set()),
change cursor(self, *args, **kwargs) to attempt
self._self_cursor_registry.add(cursor) inside a try/except TypeError and on
exception add the cursor to self._self_strong_cursor_set, and modify
cancel_all_cursors() to iterate both self._self_cursor_registry and
self._self_strong_cursor_set to call _cancel_cursor(cursor) while catching
errors, then clear both collections after attempting cancellation.
- Around line 522-597: Add explicit typing to the new wrappers and helpers:
annotate CursorTrackingDBAPIConnection.__init__(self, wrapped: Any,
cursor_registry: Optional[weakref.WeakSet] = None) -> None,
CursorTrackingDBAPIConnection.cursor(self, *args: Any, **kwargs: Any) -> Any,
and CursorTrackingDBAPIConnection.cancel_all_cursors(self) -> None; annotate
CursorTrackingSQLAlchemyConnection.__init__(self, wrapped: Any) -> None,
_install_dbapi_wrapper(self) -> None, and cancel_all_cursors(self) -> None;
annotate _cancel_cursor(cursor: Any) -> None (and update _execute_sql_on_engine
signature similarly if it accepts nullable/Any types). Use Optional[T] for
parameters that can be None and import typing names (Any, Optional) as needed.
Ensure return types use -> None or -> Any where appropriate.
- Around line 555-566: The _install_dbapi_wrapper currently replaces
self.__wrapped__._dbapi_connection unconditionally, which can create chains when
it's already a CursorTrackingDBAPIConnection; update _install_dbapi_wrapper to
check the existing self.__wrapped__._dbapi_connection first, detect if it's an
instance of CursorTrackingDBAPIConnection (or exposes the registry/registry
holder), and if so reuse that wrapper by attaching/merging self._self_cursors
into the existing wrapper's registry instead of rewrapping; only create a new
CursorTrackingDBAPIConnection when the current _dbapi_connection is a raw DBAPI
connection (not already wrapped).

In `@tests/unit/test_sql_execution_internal.py`:
- Around line 12-33: Add explicit type hints to the
_setup_mock_engine_with_cursor helper: annotate the parameter mock_cursor as
unittest.mock.Mock (or typing.Any) and the return type as unittest.mock.Mock (or
sqlalchemy.engine.Connection) to match what the function returns; also type the
inner mock_exec_driver_sql callable (e.g., def mock_exec_driver_sql(sql: str,
*args: Any) -> unittest.mock.Mock) and annotate mock_sa_connection and
mock_dbapi_connection variables where declared to clarify their types. Ensure
you import typing.Any and unittest.mock.Mock (or appropriate sqlalchemy types)
at the top of the test file and update the function signature and nested
function signature accordingly.

Comment on lines +522 to 597
class CursorTrackingDBAPIConnection(wrapt.ObjectProxy):
"""Wraps DBAPI connection to track cursors as they're created."""

def __init__(self, wrapped, cursor_registry=None):
super().__init__(wrapped)
# Use provided registry or create our own
self._self_cursor_registry = (
cursor_registry if cursor_registry is not None else weakref.WeakSet()
)

def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
self._self_cursor_registry.add(cursor)
return cursor

def cancel_all_cursors(self):
"""Cancel all tracked cursors. Best-effort, ignores errors."""
for cursor in list(self._self_cursor_registry):
_cancel_cursor(cursor)


class CursorTrackingSQLAlchemyConnection(wrapt.ObjectProxy):
"""A SQLAlchemy connection wrapper that tracks cursors for cancellation.

This wrapper replaces the internal _dbapi_connection with a tracking proxy,
so all cursors created (including by exec_driver_sql) are tracked.
"""

def __init__(self, wrapped):
super().__init__(wrapped)
self._self_cursors = weakref.WeakSet()
self._install_dbapi_wrapper()

def _install_dbapi_wrapper(self):
"""Replace SQLAlchemy's internal DBAPI connection with our tracking wrapper."""
try:
# Access the internal DBAPI connection
dbapi_conn = self.__wrapped__._dbapi_connection
if dbapi_conn is None:
logger.warning("DBAPI connection is None, cannot install tracking")
return

self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection(
dbapi_conn, self._self_cursors
)
except Exception as e:
logger.warning(f"Could not install DBAPI wrapper: {e}")

def cancel_all_cursors(self):
"""Cancel all tracked cursors. Best-effort, ignores errors."""
for cursor in list(self._self_cursors):
_cancel_cursor(cursor)


def _cancel_cursor(cursor):
"""Best-effort cancel a cursor using available methods."""
try:
# BigQuery: cancel via query_job if available
query_job = getattr(cursor, "query_job", None)
if query_job is not None and hasattr(query_job, "cancel"):
try:
query_job.cancel()
except (Exception, KeyboardInterrupt):
pass

# Generic DBAPI: try cursor.cancel() if available (Trino, etc.)
if hasattr(cursor, "cancel"):
try:
cursor.cancel()
except (Exception, KeyboardInterrupt):
pass
except (Exception, KeyboardInterrupt):
pass # Best effort, ignore all errors


def _execute_sql_on_engine(engine, query, bind_params):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add explicit type hints (and Optional) for the new wrappers/helpers.
These new public methods lack parameter/return annotations.

✍️ Suggested fix
-from typing import Any
+from typing import Any, Optional
@@
 class CursorTrackingDBAPIConnection(wrapt.ObjectProxy):
@@
-    def __init__(self, wrapped, cursor_registry=None):
+    def __init__(
+        self,
+        wrapped: Any,
+        cursor_registry: Optional[weakref.WeakSet[Any]] = None,
+    ) -> None:
@@
-    def cursor(self, *args, **kwargs):
+    def cursor(self, *args: Any, **kwargs: Any) -> Any:
@@
-    def cancel_all_cursors(self):
+    def cancel_all_cursors(self) -> None:
@@
 class CursorTrackingSQLAlchemyConnection(wrapt.ObjectProxy):
@@
-    def __init__(self, wrapped):
+    def __init__(self, wrapped: Any) -> None:
@@
-    def _install_dbapi_wrapper(self):
+    def _install_dbapi_wrapper(self) -> None:
@@
-    def cancel_all_cursors(self):
+    def cancel_all_cursors(self) -> None:
@@
-def _cancel_cursor(cursor):
+def _cancel_cursor(cursor: Any) -> None:
@@
-def _execute_sql_on_engine(engine, query, bind_params):
+def _execute_sql_on_engine(engine: Any, query: str, bind_params: Any) -> Any:
As per coding guidelines: Use explicit type hints for function parameters and return values; Always use Optional[T] for parameters that can be None.
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 525-525: Missing return type annotation for special method __init__

Add return type annotation: None

(ANN204)


[warning] 532-532: Missing type annotation for *args

(ANN002)


[warning] 532-532: Missing type annotation for **kwargs

(ANN003)


[warning] 550-550: Missing return type annotation for special method __init__

Add return type annotation: None

(ANN204)


[warning] 555-555: Missing return type annotation for private function _install_dbapi_wrapper

Add return type annotation: None

(ANN202)


[warning] 567-567: Do not catch blind exception: Exception

(BLE001)


[warning] 568-568: Logging statement uses f-string

(G004)


[warning] 576-576: Missing return type annotation for private function _cancel_cursor

Add return type annotation: None

(ANN202)


[error] 584-585: try-except-pass detected, consider logging the exception

(S110)


[warning] 584-584: Do not catch blind exception: Exception

(BLE001)


[error] 591-592: try-except-pass detected, consider logging the exception

(S110)


[warning] 591-591: Do not catch blind exception: Exception

(BLE001)


[error] 593-594: try-except-pass detected, consider logging the exception

(S110)


[warning] 593-593: Do not catch blind exception: Exception

(BLE001)


[warning] 597-597: Missing return type annotation for private function _execute_sql_on_engine

(ANN202)

🤖 Prompt for AI Agents
In `@deepnote_toolkit/sql/sql_execution.py` around lines 522 - 597, Add explicit
typing to the new wrappers and helpers: annotate
CursorTrackingDBAPIConnection.__init__(self, wrapped: Any, cursor_registry:
Optional[weakref.WeakSet] = None) -> None,
CursorTrackingDBAPIConnection.cursor(self, *args: Any, **kwargs: Any) -> Any,
and CursorTrackingDBAPIConnection.cancel_all_cursors(self) -> None; annotate
CursorTrackingSQLAlchemyConnection.__init__(self, wrapped: Any) -> None,
_install_dbapi_wrapper(self) -> None, and cancel_all_cursors(self) -> None;
annotate _cancel_cursor(cursor: Any) -> None (and update _execute_sql_on_engine
signature similarly if it accepts nullable/Any types). Use Optional[T] for
parameters that can be None and import typing names (Any, Optional) as needed.
Ensure return types use -> None or -> Any where appropriate.

Comment on lines +525 to +540
def __init__(self, wrapped, cursor_registry=None):
super().__init__(wrapped)
# Use provided registry or create our own
self._self_cursor_registry = (
cursor_registry if cursor_registry is not None else weakref.WeakSet()
)

def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
self._self_cursor_registry.add(cursor)
return cursor

def cancel_all_cursors(self):
"""Cancel all tracked cursors. Best-effort, ignores errors."""
for cursor in list(self._self_cursor_registry):
_cancel_cursor(cursor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard cursor tracking against non‑weakrefable cursor objects.
WeakSet.add() can raise TypeError for cursor types without weakref support, which would abort query execution. Add a strong‑set fallback and clear it after cancellation.

🛠️ Suggested fix
 class CursorTrackingDBAPIConnection(wrapt.ObjectProxy):
     """Wraps DBAPI connection to track cursors as they're created."""
 
     def __init__(self, wrapped, cursor_registry=None):
         super().__init__(wrapped)
         # Use provided registry or create our own
         self._self_cursor_registry = (
             cursor_registry if cursor_registry is not None else weakref.WeakSet()
         )
+        self._self_strong_cursors = set()
 
     def cursor(self, *args, **kwargs):
         cursor = self.__wrapped__.cursor(*args, **kwargs)
-        self._self_cursor_registry.add(cursor)
+        try:
+            self._self_cursor_registry.add(cursor)
+        except TypeError:
+            # Some DBAPI cursor types don't support weakrefs
+            self._self_strong_cursors.add(cursor)
         return cursor
 
     def cancel_all_cursors(self):
         """Cancel all tracked cursors. Best-effort, ignores errors."""
-        for cursor in list(self._self_cursor_registry):
+        for cursor in list(self._self_cursor_registry) + list(
+            self._self_strong_cursors
+        ):
             _cancel_cursor(cursor)
+        self._self_strong_cursors.clear()
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 525-525: Missing return type annotation for special method __init__

Add return type annotation: None

(ANN204)


[warning] 532-532: Missing type annotation for *args

(ANN002)


[warning] 532-532: Missing type annotation for **kwargs

(ANN003)

🤖 Prompt for AI Agents
In `@deepnote_toolkit/sql/sql_execution.py` around lines 525 - 540, The cursor
tracking currently uses a WeakSet and will raise TypeError for non‑weakrefable
cursor objects; update __init__ to also create a fallback strong set (e.g.,
self._self_strong_cursor_set = set()), change cursor(self, *args, **kwargs) to
attempt self._self_cursor_registry.add(cursor) inside a try/except TypeError and
on exception add the cursor to self._self_strong_cursor_set, and modify
cancel_all_cursors() to iterate both self._self_cursor_registry and
self._self_strong_cursor_set to call _cancel_cursor(cursor) while catching
errors, then clear both collections after attempting cancellation.

Comment on lines +555 to +566
def _install_dbapi_wrapper(self):
"""Replace SQLAlchemy's internal DBAPI connection with our tracking wrapper."""
try:
# Access the internal DBAPI connection
dbapi_conn = self.__wrapped__._dbapi_connection
if dbapi_conn is None:
logger.warning("DBAPI connection is None, cannot install tracking")
return

self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection(
dbapi_conn, self._self_cursors
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Avoid stacking DBAPI wrappers on pooled connections.
If _dbapi_connection is already wrapped, rewrapping builds proxy chains and redundant registries. Reuse the existing wrapper/registry instead.

♻️ Suggested fix
         try:
             # Access the internal DBAPI connection
             dbapi_conn = self.__wrapped__._dbapi_connection
             if dbapi_conn is None:
                 logger.warning("DBAPI connection is None, cannot install tracking")
                 return
 
-            self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection(
-                dbapi_conn, self._self_cursors
-            )
+            if isinstance(dbapi_conn, CursorTrackingDBAPIConnection):
+                self._self_cursors = dbapi_conn._self_cursor_registry
+                return
+            self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection(
+                dbapi_conn, self._self_cursors
+            )
         except Exception as e:
             logger.warning(f"Could not install DBAPI wrapper: {e}")
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 555-555: Missing return type annotation for private function _install_dbapi_wrapper

Add return type annotation: None

(ANN202)

🤖 Prompt for AI Agents
In `@deepnote_toolkit/sql/sql_execution.py` around lines 555 - 566, The
_install_dbapi_wrapper currently replaces self.__wrapped__._dbapi_connection
unconditionally, which can create chains when it's already a
CursorTrackingDBAPIConnection; update _install_dbapi_wrapper to check the
existing self.__wrapped__._dbapi_connection first, detect if it's an instance of
CursorTrackingDBAPIConnection (or exposes the registry/registry holder), and if
so reuse that wrapper by attaching/merging self._self_cursors into the existing
wrapper's registry instead of rewrapping; only create a new
CursorTrackingDBAPIConnection when the current _dbapi_connection is a raw DBAPI
connection (not already wrapped).

Comment on lines +12 to +33
def _setup_mock_engine_with_cursor(mock_cursor):
"""Helper to set up mock engine and connection with a custom cursor.

mock_job = mock.Mock()
mock_job.result.side_effect = KeyboardInterrupt("User interrupted")
mock_job.cancel = mock.Mock()
Returns mock_engine that can be passed to _execute_sql_on_engine.
"""
import sqlalchemy

mock_dbapi_connection = mock.Mock()
mock_dbapi_connection.cursor.return_value = mock_cursor

mock_sa_connection = mock.Mock(spec=sqlalchemy.engine.Connection)
mock_sa_connection._dbapi_connection = mock_dbapi_connection
mock_sa_connection.connection = mock_dbapi_connection
mock_sa_connection.in_transaction.return_value = False

# Mock exec_driver_sql to simulate cursor creation and execute
def mock_exec_driver_sql(sql, *args):
cursor = mock_sa_connection._dbapi_connection.cursor()
cursor.execute(sql, *args)
return cursor

mock_sa_connection.exec_driver_sql = mock_exec_driver_sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add explicit type hints for the new test helper.
This helper (and nested mock) lack parameter/return annotations.

✍️ Suggested fix
 import uuid
 from unittest import mock
+from typing import Any
@@
-def _setup_mock_engine_with_cursor(mock_cursor):
+def _setup_mock_engine_with_cursor(mock_cursor: Any) -> mock.Mock:
@@
-    def mock_exec_driver_sql(sql, *args):
+    def mock_exec_driver_sql(sql: str, *args: Any, **kwargs: Any) -> Any:
         cursor = mock_sa_connection._dbapi_connection.cursor()
-        cursor.execute(sql, *args)
+        cursor.execute(sql, *args, **kwargs)
         return cursor
As per coding guidelines: Use explicit type hints for function parameters and return values.
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 12-12: Missing return type annotation for private function _setup_mock_engine_with_cursor

(ANN202)


[warning] 28-28: Missing return type annotation for private function mock_exec_driver_sql

(ANN202)


[warning] 28-28: Missing type annotation for *args

(ANN002)

🤖 Prompt for AI Agents
In `@tests/unit/test_sql_execution_internal.py` around lines 12 - 33, Add explicit
type hints to the _setup_mock_engine_with_cursor helper: annotate the parameter
mock_cursor as unittest.mock.Mock (or typing.Any) and the return type as
unittest.mock.Mock (or sqlalchemy.engine.Connection) to match what the function
returns; also type the inner mock_exec_driver_sql callable (e.g., def
mock_exec_driver_sql(sql: str, *args: Any) -> unittest.mock.Mock) and annotate
mock_sa_connection and mock_dbapi_connection variables where declared to clarify
their types. Ensure you import typing.Any and unittest.mock.Mock (or appropriate
sqlalchemy types) at the top of the test file and update the function signature
and nested function signature accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants