From 62725f9be642991a3c610187f2d06b768f0d6c07 Mon Sep 17 00:00:00 2001 From: Martin Tranberg Date: Wed, 1 Apr 2026 09:42:06 +0200 Subject: [PATCH] feat: implement QuickXorHash integrity verification for downloads and add SharePoint site navigation support --- sharepoint_browser.py | 246 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 225 insertions(+), 21 deletions(-) diff --git a/sharepoint_browser.py b/sharepoint_browser.py index aa1127b..5e61571 100644 --- a/sharepoint_browser.py +++ b/sharepoint_browser.py @@ -11,8 +11,23 @@ import wx.lib.newevent import webbrowser import re import ctypes +import base64 +import logging from ctypes import wintypes +# --- LOGGING & KONSTANTER --- +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger("SP_Browser") + +CHUNK_SIZE = 1 * 1024 * 1024 # 1MB +ENABLE_HASH_VALIDATION = True +HASH_THRESHOLD_MB = 250 + +try: + import quickxorhash as qxh_lib +except ImportError: + qxh_lib = None + # --- STIHÅNDTERING (Til EXE-brug) --- if getattr(sys, 'frozen', False): # RESOURCE_DIR er mapper indeni EXE-filen (ikoner, billeder) @@ -290,14 +305,68 @@ class UploadDropTarget(wx.FileDropTarget): daemon=True).start() return True -def get_file_hash(path): - if not os.path.exists(path): - return None - sha256_hash = hashlib.sha256() - with open(path, "rb") as f: - for byte_block in iter(lambda: f.read(4096), b""): - sha256_hash.update(byte_block) - return sha256_hash.hexdigest() +CHUNK_SIZE = 1 * 1024 * 1024 # 1MB chunks +ENABLE_HASH_VALIDATION = True +HASH_THRESHOLD_MB = 250 # Grænse for hvornår vi gider tjekke hash (pga. hastighed i Python) + +# --- HJÆLPEFUNKTIONER --- +def get_long_path(path): + if os.name == 'nt' and not path.startswith('\\\\?\\'): + return '\\\\?\\' + os.path.abspath(path) + return path + +def quickxorhash(file_path): + """Compute Microsoft QuickXorHash for a file. Returns base64-encoded string.""" + lp = get_long_path(file_path) + if not os.path.exists(lp): return None + + if qxh_lib: + hasher = qxh_lib.quickxorhash() + with open(lp, 'rb') as f: + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: break + hasher.update(chunk) + return base64.b64encode(hasher.digest()).decode('ascii') + + # Fallback til manuel Python implementering + h = 0 + length = 0 + mask = (1 << 160) - 1 + with open(lp, 'rb') as f: + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: break + for b in chunk: + shift = (length * 11) % 160 + shifted = b << shift + wrapped = (shifted & mask) | (shifted >> 160) + h ^= wrapped + length += 1 + h ^= (length << (160 - 64)) + result = h.to_bytes(20, byteorder='little') + return base64.b64encode(result).decode('ascii') + +def verify_integrity(local_path, remote_hash): + """Verifies file integrity based on global settings.""" + if not remote_hash or not ENABLE_HASH_VALIDATION: + return True + + lp = get_long_path(local_path) + if not os.path.exists(lp): return False + + file_size = os.path.getsize(lp) + threshold_bytes = HASH_THRESHOLD_MB * 1024 * 1024 + + if file_size > threshold_bytes: + logger.info(f"Skipping hash check (size > {HASH_THRESHOLD_MB}MB): {os.path.basename(local_path)}") + return True + + local_hash = quickxorhash(local_path) + if local_hash != remote_hash: + logger.warning(f"Hash mismatch for {local_path}: local={local_hash}, remote={remote_hash}") + return False + return True def format_size(bytes_num): if bytes_num is None: @@ -1186,6 +1255,9 @@ class SharePointApp(wx.Frame): if not self.ensure_valid_token(): return self.set_status(self.get_txt("msg_downloading_to", name=item['name'], path=dest_path)) if self._download_file_sync_call(item['drive_id'], item['id'], dest_path): + remote_hash = item.get('hash') + if remote_hash and not verify_integrity(dest_path, remote_hash): + self.show_info(f"Advarsel: Hash mismatch ved download af '{item['name']}'", wx.ICON_WARNING) self.set_status(self.get_txt("msg_download_done", name=item['name'])) else: self.set_status(self.get_txt("msg_error")) @@ -1630,6 +1702,93 @@ class SharePointApp(wx.Frame): folders.sort(key=lambda x: x['name'].lower()) wx.CallAfter(self._populate_tree_folders, parent_node, folders, data) + def load_sites(self, event=None): + self.set_status(self.get_txt("status_fetching_sites")) + self.tree_ctrl.DeleteAllItems() + self.list_ctrl.DeleteAllItems() + self.current_items = [] + self.tree_item_data = {} + self._pending_tree_selection_id = None + + self.tree_root = self.tree_ctrl.AddRoot("HiddenRoot") + self.current_path = ["SharePoint"] + self.history = [] + self.update_path_display() + + threading.Thread(target=self._fetch_sites_bg, daemon=True).start() + + def _fetch_sites_bg(self): + if not self.ensure_valid_token(): return + url = "https://graph.microsoft.com/v1.0/sites?search=*" + res = requests.get(url, headers=self.headers) + if res.status_code == 200: + sites = res.json().get('value', []) + sites.sort(key=lambda x: x.get('displayName', x.get('name', '')).lower()) + wx.CallAfter(self._populate_sites_tree, sites) + else: + self.set_status(self.get_txt("msg_unknown_error")) + + def _populate_sites_tree(self, sites): + self.set_status(f"{len(sites)} sites.") + for site in sites: + name = site.get('displayName', site.get('name')) + node = self.tree_ctrl.AppendItem(self.tree_root, name, image=self.idx_site) + self.tree_item_data[node] = { + "type": "SITE", "id": site['id'], "name": name, + "drive_id": None, "path": ["SharePoint", name], "loaded": False, + "web_url": site.get('webUrl') + } + self.tree_ctrl.SetItemHasChildren(node, True) + + # Select the first site or just show in list (defaulting to showing root sites in list) + self.list_ctrl.DeleteAllItems() + self.current_items = [] + for i, site in enumerate(sites): + name = site.get('displayName', site.get('name')) + self.list_ctrl.InsertItem(i, name, self.idx_site) + self.list_ctrl.SetItem(i, 1, self.get_txt("type_site")) + self.list_ctrl.SetItem(i, 2, "") # Størrelse + self.list_ctrl.SetItem(i, 3, "") # Sidst ændret + self.current_items.append({ + "type": "SITE", "id": site['id'], "name": name, + "size": None, "modified": "", "web_url": site.get('webUrl') + }) + + def on_tree_expanding(self, event): + item = event.GetItem() + data = self.tree_item_data.get(item) + if not data or data.get("loaded"): + return + + loading_node = self.tree_ctrl.AppendItem(item, self.get_txt("status_loading")) + threading.Thread(target=self._fetch_tree_children_bg, args=(item, data), daemon=True).start() + + def _fetch_tree_children_bg(self, parent_node, data): + if not self.ensure_valid_token(): return + if data['type'] == "SITE": + url = f"https://graph.microsoft.com/v1.0/sites/{data['id']}/drives" + res = requests.get(url, headers=self.headers) + if res.status_code == 200: + drives = res.json().get('value', []) + drives.sort(key=lambda x: x.get('name', '').lower()) + wx.CallAfter(self._populate_tree_drives, parent_node, drives, data) + elif data['type'] == "DRIVE": + url = f"https://graph.microsoft.com/v1.0/drives/{data['id']}/root/children" + res = requests.get(url, headers=self.headers) + if res.status_code == 200: + items = res.json().get('value', []) + folders = [x for x in items if 'folder' in x] + folders.sort(key=lambda x: x['name'].lower()) + wx.CallAfter(self._populate_tree_folders, parent_node, folders, data) + elif data['type'] == "FOLDER": + url = f"https://graph.microsoft.com/v1.0/drives/{data['drive_id']}/items/{data['id']}/children" + res = requests.get(url, headers=self.headers) + if res.status_code == 200: + items = res.json().get('value', []) + folders = [x for x in items if 'folder' in x] + folders.sort(key=lambda x: x['name'].lower()) + wx.CallAfter(self._populate_tree_folders, parent_node, folders, data) + def _populate_tree_drives(self, parent_node, drives, parent_data): self.tree_ctrl.DeleteChildren(parent_node) parent_data["loaded"] = True @@ -1681,14 +1840,18 @@ class SharePointApp(wx.Frame): if not data: return + self._navigate_to_item_data(data, tree_item=item) + + def _navigate_to_item_data(self, data, tree_item=None): try: self.current_path = data["path"] self.update_path_display() - if not self.is_navigating_back: - self.history.append(item) + if tree_item and not self.is_navigating_back: + # Undgå dubletter i historikken hvis vi allerede er der + if not self.history or self.history[-1] != tree_item: + self.history.append(tree_item) - # Check if button still exists if self.back_btn: self.back_btn.Enable(len(self.history) > 1) @@ -1734,7 +1897,8 @@ class SharePointApp(wx.Frame): "id": item['id'], "name": item['name'], "drive_id": drive_id, "modified": modified, "size": item.get('size') if not is_folder else None, - "web_url": item.get('webUrl') + "web_url": item.get('webUrl'), + "hash": item.get('file', {}).get('hashes', {}).get('quickXorHash') if not is_folder else None }) wx.CallAfter(self._populate_list_ctrl, items_data, data) @@ -1909,7 +2073,10 @@ class SharePointApp(wx.Frame): item = self.current_items[item_idx] if item['type'] in ["SITE", "DRIVE", "FOLDER"]: - self._sync_tree_selection(item['id']) + # Prøv at finde og vælge den i træet (hvilket trigger on_tree_selected) + if not self._sync_tree_selection(item['id']): + # Hvis den allerede var valgt eller ikke findes i træet, tvinger vi navigationen + self._navigate_to_item_data(item) elif item['type'] == "FILE": self.open_file(item) @@ -1920,18 +2087,27 @@ class SharePointApp(wx.Frame): if selected.IsOk(): data = self.tree_item_data.get(selected) + + # Hvis vi allerede HAR valgt den rigtige node, så returner False (så on_item_activated tvinger refresh) + if data and data['id'] == target_id: + return False + if data and not data.get("loaded"): self._pending_tree_selection_id = target_id self.tree_ctrl.Expand(selected) - return + return True # Vi har sat en handling i gang child, cookie = self.tree_ctrl.GetFirstChild(selected) while child.IsOk(): cdata = self.tree_item_data.get(child) if cdata and cdata['id'] == target_id: + if self.tree_ctrl.GetSelection() == child: + return False # Allerede valgt self.tree_ctrl.SelectItem(child) - return + return True # Selection changed child, cookie = self.tree_ctrl.GetNextChild(selected, cookie) + + return False def go_back(self, event=None): if len(self.history) > 1: @@ -1954,7 +2130,7 @@ class SharePointApp(wx.Frame): wx.MessageBox("Du kan kun have 10 filer åbne til redigering ad gangen.", "Maksimum grænse nået", wx.OK | wx.ICON_WARNING) return - threading.Thread(target=self.process_file, args=(item_id, file_name, drive_id), daemon=True).start() + threading.Thread(target=self.process_file, args=(item_id, file_name, drive_id, item.get('hash')), daemon=True).start() def update_edit_ui(self): def _do(): @@ -1983,7 +2159,7 @@ class SharePointApp(wx.Frame): pass wx.CallAfter(_do) - def process_file(self, item_id, file_name, drive_id): + def process_file(self, item_id, file_name, drive_id, remote_hash=None): if not self.ensure_valid_token(): return edit_event = threading.Event() @@ -2008,11 +2184,29 @@ class SharePointApp(wx.Frame): if res.status_code != 200: raise Exception(f"{self.get_txt('msg_unknown_error')}: {res.status_code}") - with open(local_path, 'wb') as f: + with open(get_long_path(local_path), 'wb') as f: f.write(res.content) - # Beregn udgangspunkt hash - original_hash = get_file_hash(local_path) + # Verificer integritet og gem hash til senere sammenligning + original_hash = None + if remote_hash and ENABLE_HASH_VALIDATION: + file_size = os.path.getsize(get_long_path(local_path)) + if file_size <= (HASH_THRESHOLD_MB * 1024 * 1024): + original_hash = quickxorhash(local_path) + if original_hash != remote_hash: + logger.warning(f"Hash mismatch for {file_name}: local={original_hash}, remote={remote_hash}") + self.show_info(f"Advarsel: Filens integritet kunne ikke bekræftes (XorHash mismatch)", wx.ICON_WARNING) + + # Hvis vi ikke beregnede hash pga. størrelse eller manglende remote_hash, gør det nu for lokal detektering + if original_hash is None: + # Her bruger vi SHA256 af hastighedsårsager hvis XOR ikke er absolut nødvendig til lokal sammenligning + sha256 = hashlib.sha256() + with open(get_long_path(local_path), 'rb') as f: + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: break + sha256.update(chunk) + original_hash = "SHA256:" + sha256.hexdigest() # Checkout requests.post(f"{base_url}/checkout", headers=self.headers) @@ -2053,7 +2247,17 @@ class SharePointApp(wx.Frame): self.update_edit_ui() # 4. Tjek om noget er ændret - new_hash = get_file_hash(local_path) + if original_hash.startswith("SHA256:"): + sha256 = hashlib.sha256() + with open(get_long_path(local_path), 'rb') as f: + while True: + chunk = f.read(CHUNK_SIZE) + if not chunk: break + sha256.update(chunk) + new_hash = "SHA256:" + sha256.hexdigest() + else: + new_hash = quickxorhash(local_path) + if original_hash == new_hash: self.set_status(self.get_txt("msg_file_unchanged")) else: