-
Notifications
You must be signed in to change notification settings - Fork 3
fix: Replace runtime patches with general solution #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: Replace runtime patches with general solution #64
Conversation
📝 WalkthroughWalkthroughThe changes replace a BigQuery-specific monkeypatch approach with a generalized cursor-tracking mechanism for SQL query cancellation. The runtime_patches.py file—which patched BigQuery's internal \_wait_or_cancel function—is entirely removed. New wrapper classes (CursorTrackingDBAPIConnection and CursorTrackingSQLAlchemyConnection) intercept cursor creation and track active cursors during SQL execution. On KeyboardInterrupt or exception, tracked cursors are canceled through a unified \_cancel_cursor utility. Error handling in \_execute_sql_on_engine now triggers cleanup of all tracked cursors. Sequence DiagramsequenceDiagram
participant User as User
participant SQLExec as SQL Executor
participant ConnWrapper as Cursor Tracking<br/>Wrapper
participant Cursor as DB Cursor
participant Job as BigQuery Job<br/>(if applicable)
User->>SQLExec: Execute SQL Query
SQLExec->>ConnWrapper: Get connection<br/>(wrapped)
ConnWrapper->>ConnWrapper: Track cursor in<br/>WeakSet
ConnWrapper->>Cursor: Create cursor
Cursor->>Job: Execute query
rect rgba(220, 53, 69, 0.5)
User->>SQLExec: Cancel (KeyboardInterrupt)
SQLExec->>ConnWrapper: cancel_all_cursors()
ConnWrapper->>Cursor: cursor.cancel()
Cursor->>Job: cancel()
Job-->>Cursor: Cancellation acknowledged
Cursor-->>ConnWrapper: Cancelled
ConnWrapper-->>SQLExec: All cursors cleaned
SQLExec-->>User: Interrupt propagated
end
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
|
📦 Python package built successfully!
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #64 +/- ##
==========================================
+ Coverage 73.28% 73.29% +0.01%
==========================================
Files 93 92 -1
Lines 5199 5220 +21
Branches 757 763 +6
==========================================
+ Hits 3810 3826 +16
- Misses 1147 1148 +1
- Partials 242 246 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
🚀 Review App Deployment Started
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@deepnote_toolkit/sql/sql_execution.py`:
- Around line 525-540: The cursor tracking currently uses a WeakSet and will
raise TypeError for non‑weakrefable cursor objects; update __init__ to also
create a fallback strong set (e.g., self._self_strong_cursor_set = set()),
change cursor(self, *args, **kwargs) to attempt
self._self_cursor_registry.add(cursor) inside a try/except TypeError and on
exception add the cursor to self._self_strong_cursor_set, and modify
cancel_all_cursors() to iterate both self._self_cursor_registry and
self._self_strong_cursor_set to call _cancel_cursor(cursor) while catching
errors, then clear both collections after attempting cancellation.
- Around line 522-597: Add explicit typing to the new wrappers and helpers:
annotate CursorTrackingDBAPIConnection.__init__(self, wrapped: Any,
cursor_registry: Optional[weakref.WeakSet] = None) -> None,
CursorTrackingDBAPIConnection.cursor(self, *args: Any, **kwargs: Any) -> Any,
and CursorTrackingDBAPIConnection.cancel_all_cursors(self) -> None; annotate
CursorTrackingSQLAlchemyConnection.__init__(self, wrapped: Any) -> None,
_install_dbapi_wrapper(self) -> None, and cancel_all_cursors(self) -> None;
annotate _cancel_cursor(cursor: Any) -> None (and update _execute_sql_on_engine
signature similarly if it accepts nullable/Any types). Use Optional[T] for
parameters that can be None and import typing names (Any, Optional) as needed.
Ensure return types use -> None or -> Any where appropriate.
- Around line 555-566: The _install_dbapi_wrapper currently replaces
self.__wrapped__._dbapi_connection unconditionally, which can create chains when
it's already a CursorTrackingDBAPIConnection; update _install_dbapi_wrapper to
check the existing self.__wrapped__._dbapi_connection first, detect if it's an
instance of CursorTrackingDBAPIConnection (or exposes the registry/registry
holder), and if so reuse that wrapper by attaching/merging self._self_cursors
into the existing wrapper's registry instead of rewrapping; only create a new
CursorTrackingDBAPIConnection when the current _dbapi_connection is a raw DBAPI
connection (not already wrapped).
In `@tests/unit/test_sql_execution_internal.py`:
- Around line 12-33: Add explicit type hints to the
_setup_mock_engine_with_cursor helper: annotate the parameter mock_cursor as
unittest.mock.Mock (or typing.Any) and the return type as unittest.mock.Mock (or
sqlalchemy.engine.Connection) to match what the function returns; also type the
inner mock_exec_driver_sql callable (e.g., def mock_exec_driver_sql(sql: str,
*args: Any) -> unittest.mock.Mock) and annotate mock_sa_connection and
mock_dbapi_connection variables where declared to clarify their types. Ensure
you import typing.Any and unittest.mock.Mock (or appropriate sqlalchemy types)
at the top of the test file and update the function signature and nested
function signature accordingly.
| class CursorTrackingDBAPIConnection(wrapt.ObjectProxy): | ||
| """Wraps DBAPI connection to track cursors as they're created.""" | ||
|
|
||
| def __init__(self, wrapped, cursor_registry=None): | ||
| super().__init__(wrapped) | ||
| # Use provided registry or create our own | ||
| self._self_cursor_registry = ( | ||
| cursor_registry if cursor_registry is not None else weakref.WeakSet() | ||
| ) | ||
|
|
||
| def cursor(self, *args, **kwargs): | ||
| cursor = self.__wrapped__.cursor(*args, **kwargs) | ||
| self._self_cursor_registry.add(cursor) | ||
| return cursor | ||
|
|
||
| def cancel_all_cursors(self): | ||
| """Cancel all tracked cursors. Best-effort, ignores errors.""" | ||
| for cursor in list(self._self_cursor_registry): | ||
| _cancel_cursor(cursor) | ||
|
|
||
|
|
||
| class CursorTrackingSQLAlchemyConnection(wrapt.ObjectProxy): | ||
| """A SQLAlchemy connection wrapper that tracks cursors for cancellation. | ||
|
|
||
| This wrapper replaces the internal _dbapi_connection with a tracking proxy, | ||
| so all cursors created (including by exec_driver_sql) are tracked. | ||
| """ | ||
|
|
||
| def __init__(self, wrapped): | ||
| super().__init__(wrapped) | ||
| self._self_cursors = weakref.WeakSet() | ||
| self._install_dbapi_wrapper() | ||
|
|
||
| def _install_dbapi_wrapper(self): | ||
| """Replace SQLAlchemy's internal DBAPI connection with our tracking wrapper.""" | ||
| try: | ||
| # Access the internal DBAPI connection | ||
| dbapi_conn = self.__wrapped__._dbapi_connection | ||
| if dbapi_conn is None: | ||
| logger.warning("DBAPI connection is None, cannot install tracking") | ||
| return | ||
|
|
||
| self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection( | ||
| dbapi_conn, self._self_cursors | ||
| ) | ||
| except Exception as e: | ||
| logger.warning(f"Could not install DBAPI wrapper: {e}") | ||
|
|
||
| def cancel_all_cursors(self): | ||
| """Cancel all tracked cursors. Best-effort, ignores errors.""" | ||
| for cursor in list(self._self_cursors): | ||
| _cancel_cursor(cursor) | ||
|
|
||
|
|
||
| def _cancel_cursor(cursor): | ||
| """Best-effort cancel a cursor using available methods.""" | ||
| try: | ||
| # BigQuery: cancel via query_job if available | ||
| query_job = getattr(cursor, "query_job", None) | ||
| if query_job is not None and hasattr(query_job, "cancel"): | ||
| try: | ||
| query_job.cancel() | ||
| except (Exception, KeyboardInterrupt): | ||
| pass | ||
|
|
||
| # Generic DBAPI: try cursor.cancel() if available (Trino, etc.) | ||
| if hasattr(cursor, "cancel"): | ||
| try: | ||
| cursor.cancel() | ||
| except (Exception, KeyboardInterrupt): | ||
| pass | ||
| except (Exception, KeyboardInterrupt): | ||
| pass # Best effort, ignore all errors | ||
|
|
||
|
|
||
| def _execute_sql_on_engine(engine, query, bind_params): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add explicit type hints (and Optional) for the new wrappers/helpers.
These new public methods lack parameter/return annotations.
✍️ Suggested fix
-from typing import Any
+from typing import Any, Optional
@@
class CursorTrackingDBAPIConnection(wrapt.ObjectProxy):
@@
- def __init__(self, wrapped, cursor_registry=None):
+ def __init__(
+ self,
+ wrapped: Any,
+ cursor_registry: Optional[weakref.WeakSet[Any]] = None,
+ ) -> None:
@@
- def cursor(self, *args, **kwargs):
+ def cursor(self, *args: Any, **kwargs: Any) -> Any:
@@
- def cancel_all_cursors(self):
+ def cancel_all_cursors(self) -> None:
@@
class CursorTrackingSQLAlchemyConnection(wrapt.ObjectProxy):
@@
- def __init__(self, wrapped):
+ def __init__(self, wrapped: Any) -> None:
@@
- def _install_dbapi_wrapper(self):
+ def _install_dbapi_wrapper(self) -> None:
@@
- def cancel_all_cursors(self):
+ def cancel_all_cursors(self) -> None:
@@
-def _cancel_cursor(cursor):
+def _cancel_cursor(cursor: Any) -> None:
@@
-def _execute_sql_on_engine(engine, query, bind_params):
+def _execute_sql_on_engine(engine: Any, query: str, bind_params: Any) -> Any:🧰 Tools
🪛 Ruff (0.14.14)
[warning] 525-525: Missing return type annotation for special method __init__
Add return type annotation: None
(ANN204)
[warning] 532-532: Missing type annotation for *args
(ANN002)
[warning] 532-532: Missing type annotation for **kwargs
(ANN003)
[warning] 550-550: Missing return type annotation for special method __init__
Add return type annotation: None
(ANN204)
[warning] 555-555: Missing return type annotation for private function _install_dbapi_wrapper
Add return type annotation: None
(ANN202)
[warning] 567-567: Do not catch blind exception: Exception
(BLE001)
[warning] 568-568: Logging statement uses f-string
(G004)
[warning] 576-576: Missing return type annotation for private function _cancel_cursor
Add return type annotation: None
(ANN202)
[error] 584-585: try-except-pass detected, consider logging the exception
(S110)
[warning] 584-584: Do not catch blind exception: Exception
(BLE001)
[error] 591-592: try-except-pass detected, consider logging the exception
(S110)
[warning] 591-591: Do not catch blind exception: Exception
(BLE001)
[error] 593-594: try-except-pass detected, consider logging the exception
(S110)
[warning] 593-593: Do not catch blind exception: Exception
(BLE001)
[warning] 597-597: Missing return type annotation for private function _execute_sql_on_engine
(ANN202)
🤖 Prompt for AI Agents
In `@deepnote_toolkit/sql/sql_execution.py` around lines 522 - 597, Add explicit
typing to the new wrappers and helpers: annotate
CursorTrackingDBAPIConnection.__init__(self, wrapped: Any, cursor_registry:
Optional[weakref.WeakSet] = None) -> None,
CursorTrackingDBAPIConnection.cursor(self, *args: Any, **kwargs: Any) -> Any,
and CursorTrackingDBAPIConnection.cancel_all_cursors(self) -> None; annotate
CursorTrackingSQLAlchemyConnection.__init__(self, wrapped: Any) -> None,
_install_dbapi_wrapper(self) -> None, and cancel_all_cursors(self) -> None;
annotate _cancel_cursor(cursor: Any) -> None (and update _execute_sql_on_engine
signature similarly if it accepts nullable/Any types). Use Optional[T] for
parameters that can be None and import typing names (Any, Optional) as needed.
Ensure return types use -> None or -> Any where appropriate.
| def __init__(self, wrapped, cursor_registry=None): | ||
| super().__init__(wrapped) | ||
| # Use provided registry or create our own | ||
| self._self_cursor_registry = ( | ||
| cursor_registry if cursor_registry is not None else weakref.WeakSet() | ||
| ) | ||
|
|
||
| def cursor(self, *args, **kwargs): | ||
| cursor = self.__wrapped__.cursor(*args, **kwargs) | ||
| self._self_cursor_registry.add(cursor) | ||
| return cursor | ||
|
|
||
| def cancel_all_cursors(self): | ||
| """Cancel all tracked cursors. Best-effort, ignores errors.""" | ||
| for cursor in list(self._self_cursor_registry): | ||
| _cancel_cursor(cursor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard cursor tracking against non‑weakrefable cursor objects.
WeakSet.add() can raise TypeError for cursor types without weakref support, which would abort query execution. Add a strong‑set fallback and clear it after cancellation.
🛠️ Suggested fix
class CursorTrackingDBAPIConnection(wrapt.ObjectProxy):
"""Wraps DBAPI connection to track cursors as they're created."""
def __init__(self, wrapped, cursor_registry=None):
super().__init__(wrapped)
# Use provided registry or create our own
self._self_cursor_registry = (
cursor_registry if cursor_registry is not None else weakref.WeakSet()
)
+ self._self_strong_cursors = set()
def cursor(self, *args, **kwargs):
cursor = self.__wrapped__.cursor(*args, **kwargs)
- self._self_cursor_registry.add(cursor)
+ try:
+ self._self_cursor_registry.add(cursor)
+ except TypeError:
+ # Some DBAPI cursor types don't support weakrefs
+ self._self_strong_cursors.add(cursor)
return cursor
def cancel_all_cursors(self):
"""Cancel all tracked cursors. Best-effort, ignores errors."""
- for cursor in list(self._self_cursor_registry):
+ for cursor in list(self._self_cursor_registry) + list(
+ self._self_strong_cursors
+ ):
_cancel_cursor(cursor)
+ self._self_strong_cursors.clear()🧰 Tools
🪛 Ruff (0.14.14)
[warning] 525-525: Missing return type annotation for special method __init__
Add return type annotation: None
(ANN204)
[warning] 532-532: Missing type annotation for *args
(ANN002)
[warning] 532-532: Missing type annotation for **kwargs
(ANN003)
🤖 Prompt for AI Agents
In `@deepnote_toolkit/sql/sql_execution.py` around lines 525 - 540, The cursor
tracking currently uses a WeakSet and will raise TypeError for non‑weakrefable
cursor objects; update __init__ to also create a fallback strong set (e.g.,
self._self_strong_cursor_set = set()), change cursor(self, *args, **kwargs) to
attempt self._self_cursor_registry.add(cursor) inside a try/except TypeError and
on exception add the cursor to self._self_strong_cursor_set, and modify
cancel_all_cursors() to iterate both self._self_cursor_registry and
self._self_strong_cursor_set to call _cancel_cursor(cursor) while catching
errors, then clear both collections after attempting cancellation.
| def _install_dbapi_wrapper(self): | ||
| """Replace SQLAlchemy's internal DBAPI connection with our tracking wrapper.""" | ||
| try: | ||
| # Access the internal DBAPI connection | ||
| dbapi_conn = self.__wrapped__._dbapi_connection | ||
| if dbapi_conn is None: | ||
| logger.warning("DBAPI connection is None, cannot install tracking") | ||
| return | ||
|
|
||
| self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection( | ||
| dbapi_conn, self._self_cursors | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Avoid stacking DBAPI wrappers on pooled connections.
If _dbapi_connection is already wrapped, rewrapping builds proxy chains and redundant registries. Reuse the existing wrapper/registry instead.
♻️ Suggested fix
try:
# Access the internal DBAPI connection
dbapi_conn = self.__wrapped__._dbapi_connection
if dbapi_conn is None:
logger.warning("DBAPI connection is None, cannot install tracking")
return
- self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection(
- dbapi_conn, self._self_cursors
- )
+ if isinstance(dbapi_conn, CursorTrackingDBAPIConnection):
+ self._self_cursors = dbapi_conn._self_cursor_registry
+ return
+ self.__wrapped__._dbapi_connection = CursorTrackingDBAPIConnection(
+ dbapi_conn, self._self_cursors
+ )
except Exception as e:
logger.warning(f"Could not install DBAPI wrapper: {e}")🧰 Tools
🪛 Ruff (0.14.14)
[warning] 555-555: Missing return type annotation for private function _install_dbapi_wrapper
Add return type annotation: None
(ANN202)
🤖 Prompt for AI Agents
In `@deepnote_toolkit/sql/sql_execution.py` around lines 555 - 566, The
_install_dbapi_wrapper currently replaces self.__wrapped__._dbapi_connection
unconditionally, which can create chains when it's already a
CursorTrackingDBAPIConnection; update _install_dbapi_wrapper to check the
existing self.__wrapped__._dbapi_connection first, detect if it's an instance of
CursorTrackingDBAPIConnection (or exposes the registry/registry holder), and if
so reuse that wrapper by attaching/merging self._self_cursors into the existing
wrapper's registry instead of rewrapping; only create a new
CursorTrackingDBAPIConnection when the current _dbapi_connection is a raw DBAPI
connection (not already wrapped).
| def _setup_mock_engine_with_cursor(mock_cursor): | ||
| """Helper to set up mock engine and connection with a custom cursor. | ||
|
|
||
| mock_job = mock.Mock() | ||
| mock_job.result.side_effect = KeyboardInterrupt("User interrupted") | ||
| mock_job.cancel = mock.Mock() | ||
| Returns mock_engine that can be passed to _execute_sql_on_engine. | ||
| """ | ||
| import sqlalchemy | ||
|
|
||
| mock_dbapi_connection = mock.Mock() | ||
| mock_dbapi_connection.cursor.return_value = mock_cursor | ||
|
|
||
| mock_sa_connection = mock.Mock(spec=sqlalchemy.engine.Connection) | ||
| mock_sa_connection._dbapi_connection = mock_dbapi_connection | ||
| mock_sa_connection.connection = mock_dbapi_connection | ||
| mock_sa_connection.in_transaction.return_value = False | ||
|
|
||
| # Mock exec_driver_sql to simulate cursor creation and execute | ||
| def mock_exec_driver_sql(sql, *args): | ||
| cursor = mock_sa_connection._dbapi_connection.cursor() | ||
| cursor.execute(sql, *args) | ||
| return cursor | ||
|
|
||
| mock_sa_connection.exec_driver_sql = mock_exec_driver_sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Add explicit type hints for the new test helper.
This helper (and nested mock) lack parameter/return annotations.
✍️ Suggested fix
import uuid
from unittest import mock
+from typing import Any
@@
-def _setup_mock_engine_with_cursor(mock_cursor):
+def _setup_mock_engine_with_cursor(mock_cursor: Any) -> mock.Mock:
@@
- def mock_exec_driver_sql(sql, *args):
+ def mock_exec_driver_sql(sql: str, *args: Any, **kwargs: Any) -> Any:
cursor = mock_sa_connection._dbapi_connection.cursor()
- cursor.execute(sql, *args)
+ cursor.execute(sql, *args, **kwargs)
return cursor🧰 Tools
🪛 Ruff (0.14.14)
[warning] 12-12: Missing return type annotation for private function _setup_mock_engine_with_cursor
(ANN202)
[warning] 28-28: Missing return type annotation for private function mock_exec_driver_sql
(ANN202)
[warning] 28-28: Missing type annotation for *args
(ANN002)
🤖 Prompt for AI Agents
In `@tests/unit/test_sql_execution_internal.py` around lines 12 - 33, Add explicit
type hints to the _setup_mock_engine_with_cursor helper: annotate the parameter
mock_cursor as unittest.mock.Mock (or typing.Any) and the return type as
unittest.mock.Mock (or sqlalchemy.engine.Connection) to match what the function
returns; also type the inner mock_exec_driver_sql callable (e.g., def
mock_exec_driver_sql(sql: str, *args: Any) -> unittest.mock.Mock) and annotate
mock_sa_connection and mock_dbapi_connection variables where declared to clarify
their types. Ensure you import typing.Any and unittest.mock.Mock (or appropriate
sqlalchemy types) at the top of the test file and update the function signature
and nested function signature accordingly.
This PR replaces runtime patch for query cancellation on
KeyboardInterruptfor BigQuery with more general system. We now wrap SQLAlchemy connection to track produced cursors and if we receiveBaseException, we try to cancel them before reraising.This was tested with Trino and BigQuery. See instructions for testing Trino in #63 and for BigQuery in #34
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.