feat: implement QuickXorHash integrity verification for downloads and add SharePoint site navigation support

This commit is contained in:
Martin Tranberg
2026-04-01 09:42:06 +02:00
parent 1a85f1d963
commit 62725f9be6

View File

@@ -11,8 +11,23 @@ import wx.lib.newevent
import webbrowser
import re
import ctypes
import base64
import logging
from ctypes import wintypes
# --- LOGGING & KONSTANTER ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("SP_Browser")
CHUNK_SIZE = 1 * 1024 * 1024 # 1MB
ENABLE_HASH_VALIDATION = True
HASH_THRESHOLD_MB = 250
try:
import quickxorhash as qxh_lib
except ImportError:
qxh_lib = None
# --- STIHÅNDTERING (Til EXE-brug) ---
if getattr(sys, 'frozen', False):
# RESOURCE_DIR er mapper indeni EXE-filen (ikoner, billeder)
@@ -290,14 +305,68 @@ class UploadDropTarget(wx.FileDropTarget):
daemon=True).start()
return True
def get_file_hash(path):
if not os.path.exists(path):
return None
sha256_hash = hashlib.sha256()
with open(path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
CHUNK_SIZE = 1 * 1024 * 1024 # 1MB chunks
ENABLE_HASH_VALIDATION = True
HASH_THRESHOLD_MB = 250 # Grænse for hvornår vi gider tjekke hash (pga. hastighed i Python)
# --- HJÆLPEFUNKTIONER ---
def get_long_path(path):
if os.name == 'nt' and not path.startswith('\\\\?\\'):
return '\\\\?\\' + os.path.abspath(path)
return path
def quickxorhash(file_path):
"""Compute Microsoft QuickXorHash for a file. Returns base64-encoded string."""
lp = get_long_path(file_path)
if not os.path.exists(lp): return None
if qxh_lib:
hasher = qxh_lib.quickxorhash()
with open(lp, 'rb') as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk: break
hasher.update(chunk)
return base64.b64encode(hasher.digest()).decode('ascii')
# Fallback til manuel Python implementering
h = 0
length = 0
mask = (1 << 160) - 1
with open(lp, 'rb') as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk: break
for b in chunk:
shift = (length * 11) % 160
shifted = b << shift
wrapped = (shifted & mask) | (shifted >> 160)
h ^= wrapped
length += 1
h ^= (length << (160 - 64))
result = h.to_bytes(20, byteorder='little')
return base64.b64encode(result).decode('ascii')
def verify_integrity(local_path, remote_hash):
"""Verifies file integrity based on global settings."""
if not remote_hash or not ENABLE_HASH_VALIDATION:
return True
lp = get_long_path(local_path)
if not os.path.exists(lp): return False
file_size = os.path.getsize(lp)
threshold_bytes = HASH_THRESHOLD_MB * 1024 * 1024
if file_size > threshold_bytes:
logger.info(f"Skipping hash check (size > {HASH_THRESHOLD_MB}MB): {os.path.basename(local_path)}")
return True
local_hash = quickxorhash(local_path)
if local_hash != remote_hash:
logger.warning(f"Hash mismatch for {local_path}: local={local_hash}, remote={remote_hash}")
return False
return True
def format_size(bytes_num):
if bytes_num is None:
@@ -1186,6 +1255,9 @@ class SharePointApp(wx.Frame):
if not self.ensure_valid_token(): return
self.set_status(self.get_txt("msg_downloading_to", name=item['name'], path=dest_path))
if self._download_file_sync_call(item['drive_id'], item['id'], dest_path):
remote_hash = item.get('hash')
if remote_hash and not verify_integrity(dest_path, remote_hash):
self.show_info(f"Advarsel: Hash mismatch ved download af '{item['name']}'", wx.ICON_WARNING)
self.set_status(self.get_txt("msg_download_done", name=item['name']))
else:
self.set_status(self.get_txt("msg_error"))
@@ -1630,6 +1702,93 @@ class SharePointApp(wx.Frame):
folders.sort(key=lambda x: x['name'].lower())
wx.CallAfter(self._populate_tree_folders, parent_node, folders, data)
def load_sites(self, event=None):
self.set_status(self.get_txt("status_fetching_sites"))
self.tree_ctrl.DeleteAllItems()
self.list_ctrl.DeleteAllItems()
self.current_items = []
self.tree_item_data = {}
self._pending_tree_selection_id = None
self.tree_root = self.tree_ctrl.AddRoot("HiddenRoot")
self.current_path = ["SharePoint"]
self.history = []
self.update_path_display()
threading.Thread(target=self._fetch_sites_bg, daemon=True).start()
def _fetch_sites_bg(self):
if not self.ensure_valid_token(): return
url = "https://graph.microsoft.com/v1.0/sites?search=*"
res = requests.get(url, headers=self.headers)
if res.status_code == 200:
sites = res.json().get('value', [])
sites.sort(key=lambda x: x.get('displayName', x.get('name', '')).lower())
wx.CallAfter(self._populate_sites_tree, sites)
else:
self.set_status(self.get_txt("msg_unknown_error"))
def _populate_sites_tree(self, sites):
self.set_status(f"{len(sites)} sites.")
for site in sites:
name = site.get('displayName', site.get('name'))
node = self.tree_ctrl.AppendItem(self.tree_root, name, image=self.idx_site)
self.tree_item_data[node] = {
"type": "SITE", "id": site['id'], "name": name,
"drive_id": None, "path": ["SharePoint", name], "loaded": False,
"web_url": site.get('webUrl')
}
self.tree_ctrl.SetItemHasChildren(node, True)
# Select the first site or just show in list (defaulting to showing root sites in list)
self.list_ctrl.DeleteAllItems()
self.current_items = []
for i, site in enumerate(sites):
name = site.get('displayName', site.get('name'))
self.list_ctrl.InsertItem(i, name, self.idx_site)
self.list_ctrl.SetItem(i, 1, self.get_txt("type_site"))
self.list_ctrl.SetItem(i, 2, "") # Størrelse
self.list_ctrl.SetItem(i, 3, "") # Sidst ændret
self.current_items.append({
"type": "SITE", "id": site['id'], "name": name,
"size": None, "modified": "", "web_url": site.get('webUrl')
})
def on_tree_expanding(self, event):
item = event.GetItem()
data = self.tree_item_data.get(item)
if not data or data.get("loaded"):
return
loading_node = self.tree_ctrl.AppendItem(item, self.get_txt("status_loading"))
threading.Thread(target=self._fetch_tree_children_bg, args=(item, data), daemon=True).start()
def _fetch_tree_children_bg(self, parent_node, data):
if not self.ensure_valid_token(): return
if data['type'] == "SITE":
url = f"https://graph.microsoft.com/v1.0/sites/{data['id']}/drives"
res = requests.get(url, headers=self.headers)
if res.status_code == 200:
drives = res.json().get('value', [])
drives.sort(key=lambda x: x.get('name', '').lower())
wx.CallAfter(self._populate_tree_drives, parent_node, drives, data)
elif data['type'] == "DRIVE":
url = f"https://graph.microsoft.com/v1.0/drives/{data['id']}/root/children"
res = requests.get(url, headers=self.headers)
if res.status_code == 200:
items = res.json().get('value', [])
folders = [x for x in items if 'folder' in x]
folders.sort(key=lambda x: x['name'].lower())
wx.CallAfter(self._populate_tree_folders, parent_node, folders, data)
elif data['type'] == "FOLDER":
url = f"https://graph.microsoft.com/v1.0/drives/{data['drive_id']}/items/{data['id']}/children"
res = requests.get(url, headers=self.headers)
if res.status_code == 200:
items = res.json().get('value', [])
folders = [x for x in items if 'folder' in x]
folders.sort(key=lambda x: x['name'].lower())
wx.CallAfter(self._populate_tree_folders, parent_node, folders, data)
def _populate_tree_drives(self, parent_node, drives, parent_data):
self.tree_ctrl.DeleteChildren(parent_node)
parent_data["loaded"] = True
@@ -1681,14 +1840,18 @@ class SharePointApp(wx.Frame):
if not data:
return
self._navigate_to_item_data(data, tree_item=item)
def _navigate_to_item_data(self, data, tree_item=None):
try:
self.current_path = data["path"]
self.update_path_display()
if not self.is_navigating_back:
self.history.append(item)
if tree_item and not self.is_navigating_back:
# Undgå dubletter i historikken hvis vi allerede er der
if not self.history or self.history[-1] != tree_item:
self.history.append(tree_item)
# Check if button still exists
if self.back_btn:
self.back_btn.Enable(len(self.history) > 1)
@@ -1734,7 +1897,8 @@ class SharePointApp(wx.Frame):
"id": item['id'], "name": item['name'],
"drive_id": drive_id, "modified": modified,
"size": item.get('size') if not is_folder else None,
"web_url": item.get('webUrl')
"web_url": item.get('webUrl'),
"hash": item.get('file', {}).get('hashes', {}).get('quickXorHash') if not is_folder else None
})
wx.CallAfter(self._populate_list_ctrl, items_data, data)
@@ -1909,7 +2073,10 @@ class SharePointApp(wx.Frame):
item = self.current_items[item_idx]
if item['type'] in ["SITE", "DRIVE", "FOLDER"]:
self._sync_tree_selection(item['id'])
# Prøv at finde og vælge den i træet (hvilket trigger on_tree_selected)
if not self._sync_tree_selection(item['id']):
# Hvis den allerede var valgt eller ikke findes i træet, tvinger vi navigationen
self._navigate_to_item_data(item)
elif item['type'] == "FILE":
self.open_file(item)
@@ -1920,18 +2087,27 @@ class SharePointApp(wx.Frame):
if selected.IsOk():
data = self.tree_item_data.get(selected)
# Hvis vi allerede HAR valgt den rigtige node, så returner False (så on_item_activated tvinger refresh)
if data and data['id'] == target_id:
return False
if data and not data.get("loaded"):
self._pending_tree_selection_id = target_id
self.tree_ctrl.Expand(selected)
return
return True # Vi har sat en handling i gang
child, cookie = self.tree_ctrl.GetFirstChild(selected)
while child.IsOk():
cdata = self.tree_item_data.get(child)
if cdata and cdata['id'] == target_id:
if self.tree_ctrl.GetSelection() == child:
return False # Allerede valgt
self.tree_ctrl.SelectItem(child)
return
return True # Selection changed
child, cookie = self.tree_ctrl.GetNextChild(selected, cookie)
return False
def go_back(self, event=None):
if len(self.history) > 1:
@@ -1954,7 +2130,7 @@ class SharePointApp(wx.Frame):
wx.MessageBox("Du kan kun have 10 filer åbne til redigering ad gangen.", "Maksimum grænse nået", wx.OK | wx.ICON_WARNING)
return
threading.Thread(target=self.process_file, args=(item_id, file_name, drive_id), daemon=True).start()
threading.Thread(target=self.process_file, args=(item_id, file_name, drive_id, item.get('hash')), daemon=True).start()
def update_edit_ui(self):
def _do():
@@ -1983,7 +2159,7 @@ class SharePointApp(wx.Frame):
pass
wx.CallAfter(_do)
def process_file(self, item_id, file_name, drive_id):
def process_file(self, item_id, file_name, drive_id, remote_hash=None):
if not self.ensure_valid_token(): return
edit_event = threading.Event()
@@ -2008,11 +2184,29 @@ class SharePointApp(wx.Frame):
if res.status_code != 200:
raise Exception(f"{self.get_txt('msg_unknown_error')}: {res.status_code}")
with open(local_path, 'wb') as f:
with open(get_long_path(local_path), 'wb') as f:
f.write(res.content)
# Beregn udgangspunkt hash
original_hash = get_file_hash(local_path)
# Verificer integritet og gem hash til senere sammenligning
original_hash = None
if remote_hash and ENABLE_HASH_VALIDATION:
file_size = os.path.getsize(get_long_path(local_path))
if file_size <= (HASH_THRESHOLD_MB * 1024 * 1024):
original_hash = quickxorhash(local_path)
if original_hash != remote_hash:
logger.warning(f"Hash mismatch for {file_name}: local={original_hash}, remote={remote_hash}")
self.show_info(f"Advarsel: Filens integritet kunne ikke bekræftes (XorHash mismatch)", wx.ICON_WARNING)
# Hvis vi ikke beregnede hash pga. størrelse eller manglende remote_hash, gør det nu for lokal detektering
if original_hash is None:
# Her bruger vi SHA256 af hastighedsårsager hvis XOR ikke er absolut nødvendig til lokal sammenligning
sha256 = hashlib.sha256()
with open(get_long_path(local_path), 'rb') as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk: break
sha256.update(chunk)
original_hash = "SHA256:" + sha256.hexdigest()
# Checkout
requests.post(f"{base_url}/checkout", headers=self.headers)
@@ -2053,7 +2247,17 @@ class SharePointApp(wx.Frame):
self.update_edit_ui()
# 4. Tjek om noget er ændret
new_hash = get_file_hash(local_path)
if original_hash.startswith("SHA256:"):
sha256 = hashlib.sha256()
with open(get_long_path(local_path), 'rb') as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk: break
sha256.update(chunk)
new_hash = "SHA256:" + sha256.hexdigest()
else:
new_hash = quickxorhash(local_path)
if original_hash == new_hash:
self.set_status(self.get_txt("msg_file_unchanged"))
else: