Compare commits

...

4 Commits

Author SHA1 Message Date
Martin Tranberg
7fd69a9c3f fix: task 10 + review fix #1 — thread safety and sleep bug
Fix #1: Don't sleep after the final exhausted retry attempt in
_graph_request(). The sleep on the last iteration was pure overhead —
the loop exits immediately after, so the caller just waited an extra
backoff period for no reason. Guard with attempt < _MAX_RETRIES - 1.

Task 10: Add threading.Lock (_edits_lock) for all compound operations
on active_edits, which is accessed from both the UI thread and
background edit threads:
- __init__: declare self._edits_lock = threading.Lock()
- open_file: snapshot already_editing and at_limit under the lock,
  then release before showing blocking UI dialogs
- process_file: wrap initial dict assignment, waiting=True, the
  in-check+waiting=False, and in-check+del under the lock
- on_done_editing_clicked: lock the items() snapshot used to build
  waiting_files, preventing iteration over a dict being mutated

Add 8 new unit tests (1 for Fix #1 sleep count, 7 for Task 10 lock).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:36:29 +02:00
Martin Tranberg
0dfef3e611 feat: task 9 — add _graph_request helper with retry on 429/503
Add module-level _graph_request() that wraps requests.request() with:
- Up to 3 retries on HTTP 429 (rate limited) and 503 (unavailable)
- Exponential backoff capped at 60 s, honouring Retry-After header
- Default timeout=30 s injected via setdefault (caller can override)

Wire all 13 retry-eligible API calls through _graph_request(). The 3
file-upload requests.put(data=f) calls are kept direct since an open
stream cannot be re-read after the first attempt.

Add 9 unit tests covering: success path, 429/503 retry, Retry-After
header, max-retry exhaustion, timeout injection and override.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:29:39 +02:00
Martin Tranberg
707645ab36 fix: task 8 — add timeout to all requests calls
Add timeout= to every requests.get/post/put/patch/delete call so that
background threads cannot hang indefinitely when the network is stalled:

- timeout=30 on all API calls (delete, post, patch, get — 13 locations)
- timeout=120 on file upload calls (requests.put with data= — 3 locations)
  to allow sufficient time for large file transfers

Add 1 new unit test that scans the source file and fails if any
requests.* call is missing a timeout= parameter.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:26:04 +02:00
Martin Tranberg
df55660291 fix: task 7 — production hardening quick fixes
- Replace 4 bare `except:` with `except Exception:` (load_settings,
  get_txt, get_icon_idx_for_file, process_file cleanup block) so
  SystemExit and KeyboardInterrupt are no longer swallowed
- Replace 2 print() calls with logger.error() (__init__ MSAL init,
  ensure_valid_token) so errors appear in the configurable log output
- Sanitize item['name'] with os.path.basename() in on_download_clicked
  and _download_folder_recursive_sync to prevent path traversal from
  server-controlled filenames
- Add 8 new unit tests covering all Task 7 changes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:22:54 +02:00
2 changed files with 347 additions and 38 deletions

View File

@@ -67,13 +67,36 @@ def load_settings():
with open(SETTINGS_FILE, 'r', encoding='utf-8') as f:
try:
return json.load(f)
except:
except Exception:
return default_settings
def save_settings(new_settings):
with open(SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump(new_settings, f, indent=4)
# --- GRAPH API REQUEST HELPER ---
_RETRY_STATUSES = {429, 503}
_MAX_RETRIES = 3
def _graph_request(method, url, **kwargs):
"""Thin wrapper around requests.request with retry for 429/503.
Retries up to _MAX_RETRIES times when Graph API signals rate limiting
(429) or transient unavailability (503), honouring the Retry-After header
when present. A default timeout of 30 s is injected if the caller does
not supply one. File-upload calls that pass an open stream as data=
should use requests.put() directly, since a stream cannot be re-read.
"""
kwargs.setdefault("timeout", 30)
for attempt in range(_MAX_RETRIES):
res = requests.request(method, url, **kwargs)
if res.status_code not in _RETRY_STATUSES:
return res
if attempt < _MAX_RETRIES - 1: # Don't sleep after the final attempt
retry_after = int(res.headers.get("Retry-After", 2 ** attempt))
time.sleep(min(retry_after, 60))
return res # Return last response after exhausting retries
settings = load_settings()
CLIENT_ID = settings.get("client_id")
TENANT_ID = settings.get("tenant_id")
@@ -675,6 +698,7 @@ class SharePointApp(wx.Frame):
self.tree_root = None
self.is_navigating_back = False
self.active_edits = {} # item_id -> { "name": name, "event": Event, "waiting": bool }
self._edits_lock = threading.Lock()
self.favorites = settings.get("favorites", [])
self.fav_visible = settings.get("fav_visible", True)
self.sort_col = 0 # Default (Navn)
@@ -708,7 +732,7 @@ class SharePointApp(wx.Frame):
try:
self.msal_app = msal.PublicClientApplication(CLIENT_ID, authority=AUTHORITY)
except Exception as e:
print(f"MSAL Init Error: {e}")
logger.error(f"MSAL Init Error: {e}")
self.InitUI()
self.Centre()
@@ -737,7 +761,7 @@ class SharePointApp(wx.Frame):
if kwargs:
try:
return text.format(**kwargs)
except:
except Exception:
pass
return text
@@ -1170,7 +1194,7 @@ class SharePointApp(wx.Frame):
status_text = "Sletter" if self.lang == "da" else "Deleting"
self.set_status(f"{status_text} {count+1}/{total}: '{item['name']}'...")
url = f"https://graph.microsoft.com/v1.0/drives/{item['drive_id']}/items/{item['id']}"
res = requests.delete(url, headers=self.headers)
res = _graph_request("DELETE", url, headers=self.headers, timeout=30)
if res.status_code in [204, 200]:
count += 1
else:
@@ -1197,7 +1221,7 @@ class SharePointApp(wx.Frame):
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{parent_id}:/{filename}:/content"
try:
with open(local_path, 'rb') as f:
res = requests.put(url, headers=self.headers, data=f)
res = requests.put(url, headers=self.headers, data=f, timeout=120)
if res.status_code in [200, 201]:
self.set_status(self.get_txt("msg_upload_success", name=filename))
self._refresh_current_view()
@@ -1238,7 +1262,7 @@ class SharePointApp(wx.Frame):
def _create_folder_sync(self, name, drive_id, parent_id):
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{parent_id}/children"
body = {"name": name, "folder": {}, "@microsoft.graph.conflictBehavior": "rename"}
res = requests.post(url, headers=self.headers, json=body)
res = _graph_request("POST", url, headers=self.headers, json=body, timeout=30)
if res.status_code in [200, 201]:
return res.json().get('id')
return None
@@ -1248,7 +1272,7 @@ class SharePointApp(wx.Frame):
filename = os.path.basename(local_path)
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{parent_id}:/{filename}:/content"
with open(local_path, 'rb') as f:
requests.put(url, headers=self.headers, data=f)
requests.put(url, headers=self.headers, data=f, timeout=120)
def on_new_folder_clicked(self, event):
if not self.current_drive_id: return
@@ -1282,7 +1306,7 @@ class SharePointApp(wx.Frame):
self.set_status(f"{self.get_txt('msg_rename')}...")
url = f"https://graph.microsoft.com/v1.0/drives/{item['drive_id']}/items/{item['id']}"
body = {"name": new_name}
res = requests.patch(url, headers=self.headers, json=body)
res = _graph_request("PATCH", url, headers=self.headers, json=body, timeout=30)
if res.status_code in [200, 201]:
self.set_status(self.get_txt("msg_success"))
self._refresh_current_view()
@@ -1303,7 +1327,7 @@ class SharePointApp(wx.Frame):
with wx.DirDialog(self, self.get_txt("msg_select_folder"), style=wx.DD_DEFAULT_STYLE | wx.DD_DIR_MUST_EXIST) as dd:
if dd.ShowModal() == wx.ID_OK:
parent_path = dd.GetPath()
dest_path = os.path.join(parent_path, item['name'])
dest_path = os.path.join(parent_path, os.path.basename(item['name']))
threading.Thread(target=self._download_folder_bg_task, args=(item, dest_path), daemon=True).start()
def _download_file_bg_task(self, item, dest_path):
@@ -1319,7 +1343,7 @@ class SharePointApp(wx.Frame):
def _download_file_sync_call(self, drive_id, item_id, dest_path):
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}/content"
res = requests.get(url, headers=self.headers)
res = _graph_request("GET", url, headers=self.headers, timeout=30)
if res.status_code == 200:
with open(dest_path, 'wb') as f:
f.write(res.content)
@@ -1338,12 +1362,12 @@ class SharePointApp(wx.Frame):
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{folder_id}/children"
while url:
res = requests.get(url, headers=self.headers)
res = _graph_request("GET", url, headers=self.headers, timeout=30)
if res.status_code == 200:
res_data = res.json()
items = res_data.get('value', [])
for item in items:
item_path = os.path.join(local_dir, item['name'])
item_path = os.path.join(local_dir, os.path.basename(item['name']))
if 'folder' in item:
self._download_folder_recursive_sync(drive_id, item['id'], item_path)
else:
@@ -1382,7 +1406,8 @@ class SharePointApp(wx.Frame):
wx.CallAfter(_do)
def on_done_editing_clicked(self, event):
waiting_files = [fid for fid, d in self.active_edits.items() if d.get("waiting")]
with self._edits_lock:
waiting_files = [fid for fid, d in self.active_edits.items() if d.get("waiting")]
if not waiting_files:
return
@@ -1666,7 +1691,7 @@ class SharePointApp(wx.Frame):
self.headers = {'Authorization': f'Bearer {self.access_token}'}
return True
except Exception as e:
print(f"Token refresh error: {e}")
logger.error(f"Token refresh error: {e}")
self.set_status(self.get_txt("status_login_needed"))
return False
@@ -1715,9 +1740,9 @@ class SharePointApp(wx.Frame):
self.pulse_gauge(True)
all_sites = []
url = "https://graph.microsoft.com/v1.0/sites?search=*"
while url:
res = requests.get(url, headers=self.headers)
res = _graph_request("GET", url, headers=self.headers, timeout=30)
if res.status_code == 200:
data = res.json()
all_sites.extend(data.get('value', []))
@@ -1783,7 +1808,7 @@ class SharePointApp(wx.Frame):
url = f"https://graph.microsoft.com/v1.0/drives/{data['drive_id']}/items/{data['id']}/children"
while url:
res = requests.get(url, headers=self.headers)
res = _graph_request("GET", url, headers=self.headers, timeout=30)
if res.status_code == 200:
res_data = res.json()
all_children.extend(res_data.get('value', []))
@@ -1906,7 +1931,7 @@ class SharePointApp(wx.Frame):
first_chunk = True
while url:
res = requests.get(url, headers=self.headers)
res = _graph_request("GET", url, headers=self.headers, timeout=30)
if res.status_code != 200: break
res_data = res.json()
@@ -2073,7 +2098,7 @@ class SharePointApp(wx.Frame):
idx = self.image_list.Add(bmp)
self.ext_icons[ext] = idx
return idx
except:
except Exception:
pass
self.ext_icons[ext] = self.idx_file
@@ -2231,12 +2256,16 @@ class SharePointApp(wx.Frame):
item_id = item['id']
file_name = item['name']
drive_id = item['drive_id']
if item_id in self.active_edits:
with self._edits_lock:
already_editing = item_id in self.active_edits
at_limit = len(self.active_edits) >= 10
# UI dialogs are called outside the lock to avoid holding it during blocking calls
if already_editing:
self.show_info(f"'{file_name}' er allerede ved at blive redigeret.", wx.ICON_INFORMATION)
return
if len(self.active_edits) >= 10:
if at_limit:
wx.MessageBox("Du kan kun have 10 filer åbne til redigering ad gangen.", "Maksimum grænse nået", wx.OK | wx.ICON_WARNING)
return
@@ -2273,7 +2302,8 @@ class SharePointApp(wx.Frame):
if not self.ensure_valid_token(): return
edit_event = threading.Event()
self.active_edits[item_id] = {"name": file_name, "event": edit_event, "waiting": False}
with self._edits_lock:
self.active_edits[item_id] = {"name": file_name, "event": edit_event, "waiting": False}
self.update_edit_ui()
try:
@@ -2290,7 +2320,7 @@ class SharePointApp(wx.Frame):
# 2. Download
self.set_status(self.get_txt("msg_fetching_file", name=file_name))
res = requests.get(f"{base_url}/content", headers=self.headers)
res = _graph_request("GET", f"{base_url}/content", headers=self.headers, timeout=30)
if res.status_code != 200:
raise Exception(f"{self.get_txt('msg_unknown_error')}: {res.status_code}")
@@ -2327,7 +2357,7 @@ class SharePointApp(wx.Frame):
# Checkout
is_checked_out = False
checkout_res = requests.post(f"{base_url}/checkout", headers=self.headers)
checkout_res = _graph_request("POST", f"{base_url}/checkout", headers=self.headers, timeout=30)
if checkout_res.status_code in [200, 201, 204]:
is_checked_out = True
logger.info(f"Fil {file_name} udtjekket succesfuldt.")
@@ -2360,13 +2390,15 @@ class SharePointApp(wx.Frame):
else:
self.set_status(self.get_txt("msg_waiting_for_file", name=file_name))
edit_event.clear()
self.active_edits[item_id]["waiting"] = True
with self._edits_lock:
self.active_edits[item_id]["waiting"] = True
self.update_edit_ui()
edit_event.wait()
if item_id in self.active_edits:
self.active_edits[item_id]["waiting"] = False
with self._edits_lock:
if item_id in self.active_edits:
self.active_edits[item_id]["waiting"] = False
self.update_edit_ui()
# 4. Tjek om noget er ændret
@@ -2387,7 +2419,7 @@ class SharePointApp(wx.Frame):
if is_checked_out:
logger.info(f"Annullerer udtjekning (discardCheckout) for {file_name}...")
res = requests.post(f"{base_url}/discardCheckout", headers=self.headers)
res = _graph_request("POST", f"{base_url}/discardCheckout", headers=self.headers, timeout=30)
if res.status_code in [200, 204]:
is_checked_out = False
else:
@@ -2395,14 +2427,14 @@ class SharePointApp(wx.Frame):
logger.info(f"Ændring fundet! Uploader {file_name}...")
self.set_status(self.get_txt("msg_updating_changes"))
with open(local_path, 'rb') as f:
upload_res = requests.put(f"{base_url}/content", headers=self.headers, data=f)
upload_res = requests.put(f"{base_url}/content", headers=self.headers, data=f, timeout=120)
if upload_res.status_code not in [200, 201]:
raise Exception(f"{self.get_txt('msg_update_failed_code', code=upload_res.status_code)}")
# 6. Checkin (Kun hvis vi faktisk uploadede noget)
if is_checked_out:
self.set_status(self.get_txt("msg_checking_in", name=file_name))
res = requests.post(f"{base_url}/checkin", headers=self.headers, json={"comment": "SP Explorer Edit"})
res = _graph_request("POST", f"{base_url}/checkin", headers=self.headers, json={"comment": "SP Explorer Edit"}, timeout=30)
if res.status_code in [200, 201, 204]:
is_checked_out = False
@@ -2410,7 +2442,7 @@ class SharePointApp(wx.Frame):
try:
os.remove(local_path)
os.rmdir(working_dir)
except:
except Exception:
pass
self.set_status(self.get_txt("msg_update_success", name=file_name))
@@ -2424,10 +2456,11 @@ class SharePointApp(wx.Frame):
if is_checked_out:
# Emergency cleanup hvis vi stadig har fat i filen (f.eks. ved crash eller afbrydelse)
logger.info(f"Rydder op: Kalder discardCheckout for {file_name}...")
requests.post(f"{base_url}/discardCheckout", headers=self.headers)
_graph_request("POST", f"{base_url}/discardCheckout", headers=self.headers, timeout=30)
if item_id in self.active_edits:
del self.active_edits[item_id]
with self._edits_lock:
if item_id in self.active_edits:
del self.active_edits[item_id]
self.update_edit_ui()
if __name__ == "__main__":

View File

@@ -221,5 +221,281 @@ class TestIsBreakcrumbRemoved(unittest.TestCase):
self.assertIn("tree_item", sig.parameters)
# ---------------------------------------------------------------------------
# Task 7: bare except → except Exception, print → logger, basename sanitization
# ---------------------------------------------------------------------------
class TestTask7BareExcept(unittest.TestCase):
"""Verify that bare except: clauses have been replaced with except Exception:."""
def _get_source_lines(self):
return inspect.getsource(sb).splitlines()
def test_no_bare_except_in_load_settings(self):
"""7a: load_settings() uses except Exception, not bare except."""
source = inspect.getsource(sb.load_settings)
self.assertNotIn("except:", source,
"load_settings still has a bare except:")
def test_no_bare_except_in_get_txt(self):
"""7a: get_txt() uses except Exception, not bare except."""
source = inspect.getsource(sb.SharePointApp.get_txt)
self.assertNotIn("except:", source,
"get_txt still has a bare except:")
def test_no_bare_except_in_get_icon_idx(self):
"""7a: get_icon_idx_for_file() uses except Exception, not bare except."""
source = inspect.getsource(sb.SharePointApp.get_icon_idx_for_file)
self.assertNotIn("except:", source,
"get_icon_idx_for_file still has a bare except:")
def test_no_bare_except_in_process_file(self):
"""7a: process_file() cleanup block uses except Exception, not bare except."""
source = inspect.getsource(sb.SharePointApp.process_file)
self.assertNotIn("except:", source,
"process_file still has a bare except: in cleanup block")
def test_no_print_in_msal_init(self):
"""7b: MSAL init error uses logger, not print()."""
source = inspect.getsource(sb.SharePointApp.__init__)
self.assertNotIn('print(f"MSAL Init Error', source,
"__init__ still uses print() for MSAL Init Error")
def test_no_print_in_ensure_valid_token(self):
"""7b: ensure_valid_token() uses logger, not print()."""
source = inspect.getsource(sb.SharePointApp.ensure_valid_token)
self.assertNotIn('print(f"Token refresh error', source,
"ensure_valid_token still uses print() for token error")
def test_basename_in_download_folder_recursive(self):
"""7c: _download_folder_recursive_sync uses os.path.basename on item name."""
source = inspect.getsource(sb.SharePointApp._download_folder_recursive_sync)
self.assertIn("os.path.basename", source,
"_download_folder_recursive_sync does not sanitize item['name'] with basename")
def test_basename_in_on_download_clicked(self):
"""7c: on_download_clicked uses os.path.basename when building dest_path for folders."""
source = inspect.getsource(sb.SharePointApp.on_download_clicked)
self.assertIn("os.path.basename", source,
"on_download_clicked does not sanitize item['name'] with basename for folder downloads")
# ---------------------------------------------------------------------------
# Task 8: all requests.* calls must carry a timeout= parameter
# ---------------------------------------------------------------------------
class TestNetworkTimeouts(unittest.TestCase):
"""
Every requests.get/post/put/patch/delete call must include timeout=
so that background threads cannot hang indefinitely on a stalled network.
"""
def _calls_missing_timeout(self):
"""
Return a list of (line_number, line_text) for every requests.*()
call in the source file that is missing a timeout= argument.
Strategy: single-line calls are the norm in this codebase. We look
for lines that contain 'requests.METHOD(' and do NOT also contain
'timeout='. Lines that contain 'timeout=' anywhere on the same line
are considered compliant.
"""
import re
src_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"sharepoint_browser.py"
)
# Matches real call sites: assignment or standalone call (not docstrings)
pattern = re.compile(
r'(=\s*|^\s*)requests\.(get|post|put|patch|delete)\('
)
missing = []
with open(src_path, encoding='utf-8') as fh:
for lineno, line in enumerate(fh, 1):
stripped = line.lstrip()
# Skip comment lines and docstring prose (lines that are plain text)
if stripped.startswith('#'):
continue
if pattern.search(line) and 'timeout=' not in line:
missing.append((lineno, line.rstrip()))
return missing
def test_all_requests_calls_have_timeout(self):
"""Task 8: No requests.* call is missing a timeout= parameter."""
missing = self._calls_missing_timeout()
if missing:
details = "\n".join(f" line {n}: {txt}" for n, txt in missing)
self.fail(
f"{len(missing)} requests call(s) are missing timeout=:\n{details}"
)
# ---------------------------------------------------------------------------
# Task 9: _graph_request helper — retry on 429/503 with Retry-After support
# ---------------------------------------------------------------------------
class TestGraphRequest(unittest.TestCase):
"""Unit tests for the _graph_request() module-level helper."""
def test_helper_exists(self):
"""Task 9: _graph_request function is defined at module level."""
self.assertTrue(
callable(getattr(sb, "_graph_request", None)),
"_graph_request is not defined in sharepoint_browser"
)
def test_success_on_first_attempt(self):
"""Task 9: Returns immediately when first response is 200."""
mock_resp = MagicMock()
mock_resp.status_code = 200
with patch("sharepoint_browser.requests.request", return_value=mock_resp) as mock_req:
result = sb._graph_request("GET", "https://example.com/", headers={})
self.assertEqual(result.status_code, 200)
self.assertEqual(mock_req.call_count, 1)
def test_retries_on_429(self):
"""Task 9: Retries when response is 429 (rate limited)."""
responses = [
MagicMock(status_code=429, headers={"Retry-After": "0"}),
MagicMock(status_code=429, headers={"Retry-After": "0"}),
MagicMock(status_code=200, headers={}),
]
with patch("sharepoint_browser.requests.request", side_effect=responses) as mock_req:
with patch("sharepoint_browser.time.sleep"):
result = sb._graph_request("GET", "https://example.com/", headers={})
self.assertEqual(result.status_code, 200)
self.assertEqual(mock_req.call_count, 3)
def test_retries_on_503(self):
"""Task 9: Retries when response is 503 (service unavailable)."""
responses = [
MagicMock(status_code=503, headers={}),
MagicMock(status_code=200, headers={}),
]
with patch("sharepoint_browser.requests.request", side_effect=responses) as mock_req:
with patch("sharepoint_browser.time.sleep"):
result = sb._graph_request("POST", "https://example.com/", headers={})
self.assertEqual(result.status_code, 200)
self.assertEqual(mock_req.call_count, 2)
def test_returns_last_response_after_max_retries(self):
"""Task 9: Returns last 429 response when all retries are exhausted."""
resp_429 = MagicMock(status_code=429, headers={"Retry-After": "0"})
responses = [resp_429] * sb._MAX_RETRIES
with patch("sharepoint_browser.requests.request", side_effect=responses):
with patch("sharepoint_browser.time.sleep"):
result = sb._graph_request("GET", "https://example.com/", headers={})
self.assertEqual(result.status_code, 429)
def test_respects_retry_after_header(self):
"""Task 9: sleep() is called with the Retry-After value from the response."""
responses = [
MagicMock(status_code=429, headers={"Retry-After": "5"}),
MagicMock(status_code=200, headers={}),
]
with patch("sharepoint_browser.requests.request", side_effect=responses):
with patch("sharepoint_browser.time.sleep") as mock_sleep:
sb._graph_request("GET", "https://example.com/", headers={})
mock_sleep.assert_called_once_with(5)
def test_default_timeout_injected(self):
"""Task 9: timeout=30 is injected when caller does not provide one."""
mock_resp = MagicMock(status_code=200, headers={})
with patch("sharepoint_browser.requests.request", return_value=mock_resp) as mock_req:
sb._graph_request("GET", "https://example.com/", headers={})
_, kwargs = mock_req.call_args
self.assertEqual(kwargs.get("timeout"), 30)
def test_caller_timeout_not_overridden(self):
"""Task 9: Explicit timeout from caller is not overwritten by the helper."""
mock_resp = MagicMock(status_code=200, headers={})
with patch("sharepoint_browser.requests.request", return_value=mock_resp) as mock_req:
sb._graph_request("GET", "https://example.com/", headers={}, timeout=60)
_, kwargs = mock_req.call_args
self.assertEqual(kwargs.get("timeout"), 60)
def test_max_retries_constant_exists(self):
"""Task 9: _MAX_RETRIES constant is defined."""
self.assertTrue(
hasattr(sb, "_MAX_RETRIES"),
"_MAX_RETRIES constant not found in sharepoint_browser"
)
def test_no_sleep_after_final_retry(self):
"""Fix #1: sleep() is NOT called after the last exhausted attempt."""
resp_429 = MagicMock(status_code=429, headers={"Retry-After": "0"})
responses = [resp_429] * sb._MAX_RETRIES
with patch("sharepoint_browser.requests.request", side_effect=responses):
with patch("sharepoint_browser.time.sleep") as mock_sleep:
sb._graph_request("GET", "https://example.com/", headers={})
# sleep should be called for the first N-1 failures, NOT the last one
self.assertEqual(
mock_sleep.call_count,
sb._MAX_RETRIES - 1,
f"sleep() called {mock_sleep.call_count} times; expected {sb._MAX_RETRIES - 1} "
f"(no sleep after the final failed attempt)"
)
# ---------------------------------------------------------------------------
# Task 10: threading.Lock for active_edits compound operations
# ---------------------------------------------------------------------------
class TestActiveEditsLock(unittest.TestCase):
"""Verify that _edits_lock exists and guards all compound active_edits operations."""
def test_edits_lock_declared_in_init(self):
"""Task 10: SharePointApp.__init__ creates self._edits_lock."""
source = inspect.getsource(sb.SharePointApp.__init__)
self.assertIn("_edits_lock", source,
"__init__ does not declare _edits_lock")
def test_edits_lock_is_threading_lock(self):
"""Task 10: _edits_lock initialisation uses threading.Lock()."""
source = inspect.getsource(sb.SharePointApp.__init__)
self.assertIn("threading.Lock()", source,
"__init__ does not initialise _edits_lock with threading.Lock()")
def test_open_file_uses_lock(self):
"""Task 10: open_file() acquires _edits_lock before checking active_edits."""
source = inspect.getsource(sb.SharePointApp.open_file)
self.assertIn("_edits_lock", source,
"open_file does not use _edits_lock")
def test_process_file_uses_lock(self):
"""Task 10: process_file() acquires _edits_lock when writing active_edits."""
source = inspect.getsource(sb.SharePointApp.process_file)
self.assertIn("_edits_lock", source,
"process_file does not use _edits_lock")
def test_process_file_lock_on_initial_assign(self):
"""Task 10: active_edits[item_id] = ... assignment is inside a lock block."""
source = inspect.getsource(sb.SharePointApp.process_file)
# Check lock wraps the initial dict assignment
lock_idx = source.find("_edits_lock")
assign_idx = source.find('self.active_edits[item_id] = ')
self.assertGreater(assign_idx, 0,
"active_edits assignment not found in process_file")
# The lock must appear before the assignment
self.assertLess(lock_idx, assign_idx,
"_edits_lock must appear before the active_edits assignment in process_file")
def test_process_file_lock_on_delete(self):
"""Task 10: del active_edits[item_id] is inside a lock block in process_file."""
source = inspect.getsource(sb.SharePointApp.process_file)
self.assertIn("del self.active_edits[item_id]", source,
"del active_edits[item_id] not found in process_file")
# Count lock usages — there should be at least 2
lock_count = source.count("_edits_lock")
self.assertGreaterEqual(lock_count, 2,
f"Expected at least 2 uses of _edits_lock in process_file, found {lock_count}")
def test_on_done_editing_uses_lock(self):
"""Task 10: on_done_editing_clicked acquires lock for active_edits iteration."""
source = inspect.getsource(sb.SharePointApp.on_done_editing_clicked)
self.assertIn("_edits_lock", source,
"on_done_editing_clicked does not use _edits_lock")
if __name__ == "__main__":
unittest.main(verbosity=2)