Files
sharepoint-browser/tests/test_review_fixes.py
Martin Tranberg 7fd69a9c3f fix: task 10 + review fix #1 — thread safety and sleep bug
Fix #1: Don't sleep after the final exhausted retry attempt in
_graph_request(). The sleep on the last iteration was pure overhead —
the loop exits immediately after, so the caller just waited an extra
backoff period for no reason. Guard with attempt < _MAX_RETRIES - 1.

Task 10: Add threading.Lock (_edits_lock) for all compound operations
on active_edits, which is accessed from both the UI thread and
background edit threads:
- __init__: declare self._edits_lock = threading.Lock()
- open_file: snapshot already_editing and at_limit under the lock,
  then release before showing blocking UI dialogs
- process_file: wrap initial dict assignment, waiting=True, the
  in-check+waiting=False, and in-check+del under the lock
- on_done_editing_clicked: lock the items() snapshot used to build
  waiting_files, preventing iteration over a dict being mutated

Add 8 new unit tests (1 for Fix #1 sleep count, 7 for Task 10 lock).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:36:29 +02:00

502 lines
23 KiB
Python

"""
Unit tests for the code-review bug fixes applied to sharepoint_browser.py.
Covers:
C1 - url=None prevents UnboundLocalError in _fetch_tree_children_bg
I1 - nav_gen guard in _finalize_list_loading discards stale results
I2 - System tab label derivation from STRINGS key
I3 - status_loading_items translation key present and formattable
C-1 - Refresh calls (nav_gen=None default) always apply their results
S-2 - nav_gen=None sentinel is safer than 0
S-1 - is_breadcrumb parameter removed from _navigate_to_item_data
S2 - Dead SITE branch removed from _append_list_items
All tests run without a live display; wx is imported but no widgets are
instantiated.
"""
import sys
import os
import inspect
import unittest
from unittest.mock import MagicMock, patch
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import sharepoint_browser as sb
# ---------------------------------------------------------------------------
# I2 + I3: STRINGS dictionary
# ---------------------------------------------------------------------------
class TestStrings(unittest.TestCase):
def test_system_tab_label_da(self):
"""I2: Danish settings_logging_group yields 'System' after split/strip."""
group = sb.STRINGS["da"]["settings_logging_group"]
self.assertEqual(group.split("/")[0].strip(), "System")
def test_system_tab_label_en(self):
"""I2: English settings_logging_group yields 'System' after split/strip."""
group = sb.STRINGS["en"]["settings_logging_group"]
self.assertEqual(group.split("/")[0].strip(), "System")
def test_status_loading_items_present_da(self):
"""I3: status_loading_items key exists in Danish STRINGS."""
self.assertIn("status_loading_items", sb.STRINGS["da"])
def test_status_loading_items_present_en(self):
"""I3: status_loading_items key exists in English STRINGS."""
self.assertIn("status_loading_items", sb.STRINGS["en"])
def test_status_loading_items_format_da(self):
"""I3: Danish template formats with named {count} argument."""
result = sb.STRINGS["da"]["status_loading_items"].format(count=42)
self.assertIn("42", result)
def test_status_loading_items_format_en(self):
"""I3: English template formats with named {count} argument."""
result = sb.STRINGS["en"]["status_loading_items"].format(count=99)
self.assertIn("99", result)
def test_status_loading_items_da_uses_count_kwarg(self):
"""I3: Danish template uses {count} placeholder (not positional)."""
template = sb.STRINGS["da"]["status_loading_items"]
self.assertIn("{count}", template)
def test_status_loading_items_en_uses_count_kwarg(self):
"""I3: English template uses {count} placeholder (not positional)."""
template = sb.STRINGS["en"]["status_loading_items"]
self.assertIn("{count}", template)
# ---------------------------------------------------------------------------
# I1 + C-1 + S-2: nav_gen guard in _finalize_list_loading
# ---------------------------------------------------------------------------
class TestNavGenGuard(unittest.TestCase):
"""
_finalize_list_loading(self, items_data, nav_gen=None)
Guard logic:
nav_gen is None → always apply (refresh / unconstrained calls)
nav_gen == self._nav_gen → apply (matches current navigation)
nav_gen != self._nav_gen → discard (stale; user navigated away)
"""
def _make_app(self, current_gen: int):
"""Minimal mock that satisfies _finalize_list_loading's needs."""
app = MagicMock()
app._nav_gen = current_gen
# MagicMock is truthy by default so `if not self` passes
return app
# --- nav_gen=None cases (C-1 / S-2) ---
def test_none_gen_applies_when_nav_gen_is_1(self):
"""C-1/S-2: nav_gen=None applies results regardless of _nav_gen."""
app = self._make_app(1)
items = [{"name": "a"}]
sb.SharePointApp._finalize_list_loading(app, items, nav_gen=None)
self.assertEqual(app.current_items, items)
app.apply_sorting.assert_called_once()
def test_none_gen_applies_when_nav_gen_is_high(self):
"""C-1: nav_gen=None still applies when _nav_gen is large."""
app = self._make_app(99)
items = [{"name": "b"}]
sb.SharePointApp._finalize_list_loading(app, items, nav_gen=None)
self.assertEqual(app.current_items, items)
def test_default_gen_arg_applies(self):
"""C-1: Omitting nav_gen entirely (default=None) always applies."""
app = self._make_app(5)
items = [{"name": "refresh"}]
sb.SharePointApp._finalize_list_loading(app, items) # no nav_gen kwarg
self.assertEqual(app.current_items, items)
app.apply_sorting.assert_called_once()
# --- matching gen (I1, happy path) ---
def test_matching_gen_applies(self):
"""I1: Results applied when nav_gen matches _nav_gen."""
app = self._make_app(3)
items = [{"name": "c"}]
sb.SharePointApp._finalize_list_loading(app, items, nav_gen=3)
self.assertEqual(app.current_items, items)
app.apply_sorting.assert_called_once()
# --- stale gen cases (I1) ---
def test_stale_gen_discards_results(self):
"""I1: Results discarded when nav_gen < _nav_gen (user navigated away)."""
app = self._make_app(5)
sentinel = object()
app.current_items = sentinel
sb.SharePointApp._finalize_list_loading(app, [{"name": "old"}], nav_gen=2)
self.assertIs(app.current_items, sentinel,
"current_items was overwritten by stale result")
app.apply_sorting.assert_not_called()
def test_future_gen_discards_results(self):
"""I1: Results discarded when nav_gen > _nav_gen (shouldn't happen, but safe)."""
app = self._make_app(3)
sentinel = object()
app.current_items = sentinel
sb.SharePointApp._finalize_list_loading(app, [{"name": "future"}], nav_gen=99)
self.assertIs(app.current_items, sentinel)
app.apply_sorting.assert_not_called()
def test_gen_zero_still_discarded_when_nav_gen_nonzero(self):
"""S-2: nav_gen=0 (old broken default) is now treated as a stale gen, not a pass."""
app = self._make_app(1)
sentinel = object()
app.current_items = sentinel
sb.SharePointApp._finalize_list_loading(app, [{"name": "zero"}], nav_gen=0)
self.assertIs(app.current_items, sentinel,
"nav_gen=0 should be treated as stale, not as 'no constraint'")
app.apply_sorting.assert_not_called()
# ---------------------------------------------------------------------------
# C1: url=None initialization in _fetch_tree_children_bg
# ---------------------------------------------------------------------------
class TestUrlInitialization(unittest.TestCase):
def test_url_initialized_to_none_before_conditional(self):
"""C1: _fetch_tree_children_bg initializes url=None before the if block."""
source = inspect.getsource(sb.SharePointApp._fetch_tree_children_bg)
# Find the line that sets url = None before the if data['type'] conditional
lines = source.splitlines()
url_none_idx = next(
(i for i, l in enumerate(lines) if "url = None" in l), None
)
site_if_idx = next(
(i for i, l in enumerate(lines)
if 'data[\'type\'] == "SITE"' in l or 'data["type"] == "SITE"' in l),
None
)
self.assertIsNotNone(url_none_idx,
"_fetch_tree_children_bg has no 'url = None' initializer")
self.assertIsNotNone(site_if_idx,
"_fetch_tree_children_bg has no SITE type conditional")
self.assertLess(url_none_idx, site_if_idx,
"'url = None' must appear before the if data['type'] block")
# ---------------------------------------------------------------------------
# S2: Dead SITE branch removed from _append_list_items
# ---------------------------------------------------------------------------
class TestDeadSiteBranch(unittest.TestCase):
def test_append_list_items_has_no_site_img_branch(self):
"""S2: _append_list_items no longer contains the dead SITE image branch."""
source = inspect.getsource(sb.SharePointApp._append_list_items)
self.assertNotIn(
'item[\'type\'] == "SITE"',
source,
"_append_list_items still contains dead SITE branch"
)
# ---------------------------------------------------------------------------
# S-1: is_breadcrumb parameter removed from _navigate_to_item_data
# ---------------------------------------------------------------------------
class TestIsBreakcrumbRemoved(unittest.TestCase):
def test_no_is_breadcrumb_param(self):
"""S-1: _navigate_to_item_data no longer has an is_breadcrumb parameter."""
sig = inspect.signature(sb.SharePointApp._navigate_to_item_data)
self.assertNotIn(
"is_breadcrumb", sig.parameters,
"_navigate_to_item_data still declares is_breadcrumb"
)
def test_tree_item_param_still_present(self):
"""Regression: tree_item parameter was not accidentally removed."""
sig = inspect.signature(sb.SharePointApp._navigate_to_item_data)
self.assertIn("tree_item", sig.parameters)
# ---------------------------------------------------------------------------
# Task 7: bare except → except Exception, print → logger, basename sanitization
# ---------------------------------------------------------------------------
class TestTask7BareExcept(unittest.TestCase):
"""Verify that bare except: clauses have been replaced with except Exception:."""
def _get_source_lines(self):
return inspect.getsource(sb).splitlines()
def test_no_bare_except_in_load_settings(self):
"""7a: load_settings() uses except Exception, not bare except."""
source = inspect.getsource(sb.load_settings)
self.assertNotIn("except:", source,
"load_settings still has a bare except:")
def test_no_bare_except_in_get_txt(self):
"""7a: get_txt() uses except Exception, not bare except."""
source = inspect.getsource(sb.SharePointApp.get_txt)
self.assertNotIn("except:", source,
"get_txt still has a bare except:")
def test_no_bare_except_in_get_icon_idx(self):
"""7a: get_icon_idx_for_file() uses except Exception, not bare except."""
source = inspect.getsource(sb.SharePointApp.get_icon_idx_for_file)
self.assertNotIn("except:", source,
"get_icon_idx_for_file still has a bare except:")
def test_no_bare_except_in_process_file(self):
"""7a: process_file() cleanup block uses except Exception, not bare except."""
source = inspect.getsource(sb.SharePointApp.process_file)
self.assertNotIn("except:", source,
"process_file still has a bare except: in cleanup block")
def test_no_print_in_msal_init(self):
"""7b: MSAL init error uses logger, not print()."""
source = inspect.getsource(sb.SharePointApp.__init__)
self.assertNotIn('print(f"MSAL Init Error', source,
"__init__ still uses print() for MSAL Init Error")
def test_no_print_in_ensure_valid_token(self):
"""7b: ensure_valid_token() uses logger, not print()."""
source = inspect.getsource(sb.SharePointApp.ensure_valid_token)
self.assertNotIn('print(f"Token refresh error', source,
"ensure_valid_token still uses print() for token error")
def test_basename_in_download_folder_recursive(self):
"""7c: _download_folder_recursive_sync uses os.path.basename on item name."""
source = inspect.getsource(sb.SharePointApp._download_folder_recursive_sync)
self.assertIn("os.path.basename", source,
"_download_folder_recursive_sync does not sanitize item['name'] with basename")
def test_basename_in_on_download_clicked(self):
"""7c: on_download_clicked uses os.path.basename when building dest_path for folders."""
source = inspect.getsource(sb.SharePointApp.on_download_clicked)
self.assertIn("os.path.basename", source,
"on_download_clicked does not sanitize item['name'] with basename for folder downloads")
# ---------------------------------------------------------------------------
# Task 8: all requests.* calls must carry a timeout= parameter
# ---------------------------------------------------------------------------
class TestNetworkTimeouts(unittest.TestCase):
"""
Every requests.get/post/put/patch/delete call must include timeout=
so that background threads cannot hang indefinitely on a stalled network.
"""
def _calls_missing_timeout(self):
"""
Return a list of (line_number, line_text) for every requests.*()
call in the source file that is missing a timeout= argument.
Strategy: single-line calls are the norm in this codebase. We look
for lines that contain 'requests.METHOD(' and do NOT also contain
'timeout='. Lines that contain 'timeout=' anywhere on the same line
are considered compliant.
"""
import re
src_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"sharepoint_browser.py"
)
# Matches real call sites: assignment or standalone call (not docstrings)
pattern = re.compile(
r'(=\s*|^\s*)requests\.(get|post|put|patch|delete)\('
)
missing = []
with open(src_path, encoding='utf-8') as fh:
for lineno, line in enumerate(fh, 1):
stripped = line.lstrip()
# Skip comment lines and docstring prose (lines that are plain text)
if stripped.startswith('#'):
continue
if pattern.search(line) and 'timeout=' not in line:
missing.append((lineno, line.rstrip()))
return missing
def test_all_requests_calls_have_timeout(self):
"""Task 8: No requests.* call is missing a timeout= parameter."""
missing = self._calls_missing_timeout()
if missing:
details = "\n".join(f" line {n}: {txt}" for n, txt in missing)
self.fail(
f"{len(missing)} requests call(s) are missing timeout=:\n{details}"
)
# ---------------------------------------------------------------------------
# Task 9: _graph_request helper — retry on 429/503 with Retry-After support
# ---------------------------------------------------------------------------
class TestGraphRequest(unittest.TestCase):
"""Unit tests for the _graph_request() module-level helper."""
def test_helper_exists(self):
"""Task 9: _graph_request function is defined at module level."""
self.assertTrue(
callable(getattr(sb, "_graph_request", None)),
"_graph_request is not defined in sharepoint_browser"
)
def test_success_on_first_attempt(self):
"""Task 9: Returns immediately when first response is 200."""
mock_resp = MagicMock()
mock_resp.status_code = 200
with patch("sharepoint_browser.requests.request", return_value=mock_resp) as mock_req:
result = sb._graph_request("GET", "https://example.com/", headers={})
self.assertEqual(result.status_code, 200)
self.assertEqual(mock_req.call_count, 1)
def test_retries_on_429(self):
"""Task 9: Retries when response is 429 (rate limited)."""
responses = [
MagicMock(status_code=429, headers={"Retry-After": "0"}),
MagicMock(status_code=429, headers={"Retry-After": "0"}),
MagicMock(status_code=200, headers={}),
]
with patch("sharepoint_browser.requests.request", side_effect=responses) as mock_req:
with patch("sharepoint_browser.time.sleep"):
result = sb._graph_request("GET", "https://example.com/", headers={})
self.assertEqual(result.status_code, 200)
self.assertEqual(mock_req.call_count, 3)
def test_retries_on_503(self):
"""Task 9: Retries when response is 503 (service unavailable)."""
responses = [
MagicMock(status_code=503, headers={}),
MagicMock(status_code=200, headers={}),
]
with patch("sharepoint_browser.requests.request", side_effect=responses) as mock_req:
with patch("sharepoint_browser.time.sleep"):
result = sb._graph_request("POST", "https://example.com/", headers={})
self.assertEqual(result.status_code, 200)
self.assertEqual(mock_req.call_count, 2)
def test_returns_last_response_after_max_retries(self):
"""Task 9: Returns last 429 response when all retries are exhausted."""
resp_429 = MagicMock(status_code=429, headers={"Retry-After": "0"})
responses = [resp_429] * sb._MAX_RETRIES
with patch("sharepoint_browser.requests.request", side_effect=responses):
with patch("sharepoint_browser.time.sleep"):
result = sb._graph_request("GET", "https://example.com/", headers={})
self.assertEqual(result.status_code, 429)
def test_respects_retry_after_header(self):
"""Task 9: sleep() is called with the Retry-After value from the response."""
responses = [
MagicMock(status_code=429, headers={"Retry-After": "5"}),
MagicMock(status_code=200, headers={}),
]
with patch("sharepoint_browser.requests.request", side_effect=responses):
with patch("sharepoint_browser.time.sleep") as mock_sleep:
sb._graph_request("GET", "https://example.com/", headers={})
mock_sleep.assert_called_once_with(5)
def test_default_timeout_injected(self):
"""Task 9: timeout=30 is injected when caller does not provide one."""
mock_resp = MagicMock(status_code=200, headers={})
with patch("sharepoint_browser.requests.request", return_value=mock_resp) as mock_req:
sb._graph_request("GET", "https://example.com/", headers={})
_, kwargs = mock_req.call_args
self.assertEqual(kwargs.get("timeout"), 30)
def test_caller_timeout_not_overridden(self):
"""Task 9: Explicit timeout from caller is not overwritten by the helper."""
mock_resp = MagicMock(status_code=200, headers={})
with patch("sharepoint_browser.requests.request", return_value=mock_resp) as mock_req:
sb._graph_request("GET", "https://example.com/", headers={}, timeout=60)
_, kwargs = mock_req.call_args
self.assertEqual(kwargs.get("timeout"), 60)
def test_max_retries_constant_exists(self):
"""Task 9: _MAX_RETRIES constant is defined."""
self.assertTrue(
hasattr(sb, "_MAX_RETRIES"),
"_MAX_RETRIES constant not found in sharepoint_browser"
)
def test_no_sleep_after_final_retry(self):
"""Fix #1: sleep() is NOT called after the last exhausted attempt."""
resp_429 = MagicMock(status_code=429, headers={"Retry-After": "0"})
responses = [resp_429] * sb._MAX_RETRIES
with patch("sharepoint_browser.requests.request", side_effect=responses):
with patch("sharepoint_browser.time.sleep") as mock_sleep:
sb._graph_request("GET", "https://example.com/", headers={})
# sleep should be called for the first N-1 failures, NOT the last one
self.assertEqual(
mock_sleep.call_count,
sb._MAX_RETRIES - 1,
f"sleep() called {mock_sleep.call_count} times; expected {sb._MAX_RETRIES - 1} "
f"(no sleep after the final failed attempt)"
)
# ---------------------------------------------------------------------------
# Task 10: threading.Lock for active_edits compound operations
# ---------------------------------------------------------------------------
class TestActiveEditsLock(unittest.TestCase):
"""Verify that _edits_lock exists and guards all compound active_edits operations."""
def test_edits_lock_declared_in_init(self):
"""Task 10: SharePointApp.__init__ creates self._edits_lock."""
source = inspect.getsource(sb.SharePointApp.__init__)
self.assertIn("_edits_lock", source,
"__init__ does not declare _edits_lock")
def test_edits_lock_is_threading_lock(self):
"""Task 10: _edits_lock initialisation uses threading.Lock()."""
source = inspect.getsource(sb.SharePointApp.__init__)
self.assertIn("threading.Lock()", source,
"__init__ does not initialise _edits_lock with threading.Lock()")
def test_open_file_uses_lock(self):
"""Task 10: open_file() acquires _edits_lock before checking active_edits."""
source = inspect.getsource(sb.SharePointApp.open_file)
self.assertIn("_edits_lock", source,
"open_file does not use _edits_lock")
def test_process_file_uses_lock(self):
"""Task 10: process_file() acquires _edits_lock when writing active_edits."""
source = inspect.getsource(sb.SharePointApp.process_file)
self.assertIn("_edits_lock", source,
"process_file does not use _edits_lock")
def test_process_file_lock_on_initial_assign(self):
"""Task 10: active_edits[item_id] = ... assignment is inside a lock block."""
source = inspect.getsource(sb.SharePointApp.process_file)
# Check lock wraps the initial dict assignment
lock_idx = source.find("_edits_lock")
assign_idx = source.find('self.active_edits[item_id] = ')
self.assertGreater(assign_idx, 0,
"active_edits assignment not found in process_file")
# The lock must appear before the assignment
self.assertLess(lock_idx, assign_idx,
"_edits_lock must appear before the active_edits assignment in process_file")
def test_process_file_lock_on_delete(self):
"""Task 10: del active_edits[item_id] is inside a lock block in process_file."""
source = inspect.getsource(sb.SharePointApp.process_file)
self.assertIn("del self.active_edits[item_id]", source,
"del active_edits[item_id] not found in process_file")
# Count lock usages — there should be at least 2
lock_count = source.count("_edits_lock")
self.assertGreaterEqual(lock_count, 2,
f"Expected at least 2 uses of _edits_lock in process_file, found {lock_count}")
def test_on_done_editing_uses_lock(self):
"""Task 10: on_done_editing_clicked acquires lock for active_edits iteration."""
source = inspect.getsource(sb.SharePointApp.on_done_editing_clicked)
self.assertIn("_edits_lock", source,
"on_done_editing_clicked does not use _edits_lock")
if __name__ == "__main__":
unittest.main(verbosity=2)