Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions castervoice/rules/core/navigation_rules/window_mgmt_rule.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
from dragonfly import MappingRule, Function, Repeat, ShortIntegerRef
from dragonfly import DictListRef, Function, MappingRule, Repeat, Repetition, ShortIntegerRef

from castervoice.lib import utilities
from castervoice.lib import virtual_desktops
from castervoice.lib.actions import Key
from castervoice.lib.ctrl.mgr.rule_details import RuleDetails
from castervoice.lib.merge.state.short import R

try: # Try first loading from caster user directory
from navigation_rules.window_mgmt_rule_support import ( # pylint: disable=import-error
debug_window_switching, open_windows_dictlist, switch_window, timerinstance,
)
except ImportError:
from castervoice.rules.core.navigation_rules.window_mgmt_rule_support import (
debug_window_switching,
open_windows_dictlist,
switch_window,
timerinstance,
)


class WindowManagementRule(MappingRule):
mapping = {
Expand All @@ -17,6 +29,10 @@ class WindowManagementRule(MappingRule):
R(Function(utilities.restore_window)),
'window close':
R(Function(utilities.close_window)),
"window switch <windows>":
R(Function(switch_window), rdescript=""),
"window switch show":
R(Function(debug_window_switching)),

# Workspace management
"show work [spaces]":
Expand All @@ -42,9 +58,11 @@ class WindowManagementRule(MappingRule):

extras = [
ShortIntegerRef("n", 1, 20, default=1),
Repetition(name="windows", min=1, max=5,
child=DictListRef("window_by_keyword", open_windows_dictlist)),
]


def get_rule():
timerinstance.set()
details = RuleDetails(name="window management rule")
return WindowManagementRule, details
157 changes: 157 additions & 0 deletions castervoice/rules/core/navigation_rules/window_mgmt_rule_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# All credit goes to caspark
# This is adapted from caspark's grammar at https://gist.github.com/caspark/9c2c5e2853a14b6e28e9aa4f121164a6

from __future__ import print_function

import re
import time

import six
from dragonfly import DictList, Window, get_current_engine, get_engine

from castervoice.lib.util import recognition_history

_history = recognition_history.get_and_register_history(1)

open_windows_dictlist = DictList("open_windows")

WORD_SPLITTER = re.compile('[^a-zA-Z0-9]+')


def get_caster_messaging_window():
if get_current_engine().name == 'natlink':
from natlinkcore import natlinkstatus # pylint: disable=import-error
status = natlinkstatus.NatlinkStatus()
if status.NatlinkIsEnabled() == 1:
return "Messages from Natlink"
return "Caster: Status Window"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated messaging window title logic across files

Low Severity

get_caster_messaging_window() duplicates the engine-name and natlink-status title-determination logic already present in show_window() in show_window_on_error_hook.py. Both check get_current_engine().name == 'natlink', import natlinkstatus, test NatlinkIsEnabled() == 1, and return the same two title strings. The PR description mentions this is meant to be "a shared messaging-window helper used by the status-window code," but the existing show_window() was not updated to consume it, so the logic is duplicated rather than shared.

Additional Locations (1)
Fix in Cursor Fix in Web



def lower_if_not_abbreviation(s):
if len(s) <= 4 and s.upper() == s:
return s
else:
return s.lower()


def find_window(window_matcher_func, timeout_ms=3000):
"""
Returns a Window matching the given matcher function, or raises an error otherwise
"""
steps = int(timeout_ms / 100)
for i in range(steps):
for win in Window.get_all_windows():
if window_matcher_func(win):
return win
time.sleep(0.1)
raise ValueError(
"no matching window found within {} ms".format(timeout_ms))


def refresh_open_windows_dictlist():
"""
Refreshes `open_windows_dictlist`
"""
window_options = {}
for window in (x for x in Window.get_all_windows() if
x.is_valid and
x.is_enabled and
x.is_visible and
not x.executable.startswith("C:\\Windows") and
x.classname != "DgnResultsBoxWindow"):
for word in {lower_if_not_abbreviation(word)
for word
in WORD_SPLITTER.split(window.title)
if len(word)}:
if word in window_options:
window_options[word] += [window]
else:
window_options[word] = [window]

open_windows_dictlist.set(window_options)


def debug_window_switching():
"""
Prints out contents of `open_windows_dictlist`
"""
options = open_windows_dictlist.copy()
print("*** Windows known:\n",
"\n".join(sorted({w.title for list_of_windows in six.itervalues(options)
for w in list_of_windows})))

print("*** Single word switching options:\n", "\n".join(
"{}: '{}'".format(
k.ljust(20), "', '".join(window.title for window in options[k])
) for k in sorted(six.iterkeys(options)) if len(options[k]) == 1))
print("*** Ambiguous switching options:\n", "\n".join(
"{}: '{}'".format(
k.ljust(20), "', '".join(window.title for window in options[k])
) for k in sorted(six.iterkeys(options)) if len(options[k]) > 1))


def switch_window(windows):
"""
Matches keywords to window titles stored in `open_windows_dictlist`
"""
matched_window_handles = {w.handle: w for w in windows[0]}
for window_options in windows[1:]:
matched_window_handles = {
w.handle: w for w in window_options if w.handle in matched_window_handles}
if six.PY2:
matched_windows = matched_window_handles.values()
else:
matched_windows = list(matched_window_handles.values())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead Python 2 compatibility code via six

Low Severity

This new file introduces six.PY2, six.itervalues, and six.iterkeys — all Python 2 compatibility shims. The six.PY2 branch in switch_window is dead code on Python 3. six.itervalues(options) and six.iterkeys(options) in debug_window_switching are unnecessary wrappers equivalent to options.values() and options.keys() on Python 3. Plain Python 3 idioms would be simpler and avoid the six dependency in new code.

Additional Locations (1)
Fix in Cursor Fix in Web

if len(matched_windows) == 1:
window = matched_windows[0]
print("Window Management: Switching to", window.title)
window.set_foreground()
else:
try:
messaging_title = get_caster_messaging_window()
messaging_window = find_window(
lambda w: messaging_title in w.title, timeout_ms=100)
if messaging_window.is_minimized:
messaging_window.restore()
else:
messaging_window.set_foreground()
except ValueError:
pass
if len(matched_windows) >= 2:
print("Ambiguous window switch command:\n", "\n".join(
"'{}' from {} (handle: {})".format(w.title, w.executable, w.handle)
for w in matched_windows))
else:
spec_n_word = 2
words = list(map(str, _history[0]))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing bounds check on _history[0] causes IndexError

Medium Severity

_history[0] is accessed without checking if _history is empty, which will raise an IndexError. The existing codebase consistently guards against this: again.py checks if len(_history) == 0: return before accessing history, and dragon_support.py wraps access in a try/except. The new switch_window function lacks any such guard. This path is reachable when multiple keywords intersect to zero matching windows.

Fix in Cursor Fix in Web

del words[:spec_n_word]
print("Window Management: No matching window title containing keywords: `{}`".
format(' '.join(map(str, words))))


class Timer:
"""
Dragonfly timer runs every 2 seconds updating open_windows_dictlist
"""
timer = None

def __init__(self):
pass

def set(self):
if self.timer is None:
self.timer = get_engine().create_timer(refresh_open_windows_dictlist, 2)
self.timer.start()

def stop(self):
if self.timer is not None:
self.timer.stop()
self.timer = None


_previous_timerinstance = globals().get("timerinstance")
if _previous_timerinstance is not None:
_previous_timer = getattr(_previous_timerinstance, "timer", None)
if _previous_timer is not None:
_previous_timer.stop()
timerinstance = Timer()
Loading