- Implement explicit threading.Event propagation for robust GUI cancellation. - Optimize file synchronization by skipping hash validation for up-to-date files (matching size and timestamp). - Update Windows long path support to correctly handle UNC network shares. - Refactor configuration management to eliminate global state and improve modularity. - Remove requests.get monkey-patch in GUI. - Delete CLAUDE.md as it is no longer required.
465 lines
19 KiB
Python
465 lines
19 KiB
Python
import os
|
|
import csv
|
|
import requests
|
|
import time
|
|
import threading
|
|
import logging
|
|
import base64
|
|
import struct
|
|
try:
|
|
import quickxorhash as qxh_lib
|
|
except ImportError:
|
|
qxh_lib = None
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from msal import ConfidentialClientApplication
|
|
from urllib.parse import urlparse, quote
|
|
|
|
# --- Production Configuration ---
|
|
MAX_WORKERS = 5
|
|
MAX_RETRIES = 5
|
|
CHUNK_SIZE = 1024 * 1024 # 1MB Chunks
|
|
MAX_FOLDER_DEPTH = 50
|
|
LOG_FILE = "sharepoint_download.log"
|
|
|
|
# Setup Logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE, encoding='utf-8'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
report_lock = threading.Lock()
|
|
|
|
def format_size(size_bytes):
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.2f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.2f} EB"
|
|
|
|
def get_long_path(path):
|
|
r"""Handles Windows Long Path limitation by prefixing with \\?\ for absolute paths.
|
|
Correctly handles UNC paths (e.g. \\server\share -> \\?\UNC\server\share)."""
|
|
path = os.path.abspath(path)
|
|
if os.name == 'nt' and not path.startswith("\\\\?\\"):
|
|
if path.startswith("\\\\"):
|
|
return "\\\\?\\UNC\\" + path[2:]
|
|
return "\\\\?\\" + path
|
|
return path
|
|
|
|
def load_config(file_path):
|
|
config = {}
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"Configuration file {file_path} not found.")
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
config[key.strip()] = value.strip().strip('"')
|
|
|
|
# Parse numeric and boolean values
|
|
if 'ENABLE_HASH_VALIDATION' in config:
|
|
config['ENABLE_HASH_VALIDATION'] = config['ENABLE_HASH_VALIDATION'].lower() == 'true'
|
|
else:
|
|
config['ENABLE_HASH_VALIDATION'] = True
|
|
|
|
if 'HASH_THRESHOLD_MB' in config:
|
|
try:
|
|
config['HASH_THRESHOLD_MB'] = int(config['HASH_THRESHOLD_MB'])
|
|
except ValueError:
|
|
config['HASH_THRESHOLD_MB'] = 30
|
|
else:
|
|
config['HASH_THRESHOLD_MB'] = 30
|
|
|
|
return config
|
|
|
|
# --- Punkt 1: Exponential Backoff & Retry Logic ---
|
|
def retry_request(func):
|
|
def wrapper(*args, **kwargs):
|
|
retries = 0
|
|
while retries < MAX_RETRIES:
|
|
try:
|
|
response = func(*args, **kwargs)
|
|
if response.status_code == 429:
|
|
retry_after = int(response.headers.get("Retry-After", 2 ** retries))
|
|
logger.warning(f"Throttled (429). Waiting {retry_after}s...")
|
|
time.sleep(retry_after)
|
|
retries += 1
|
|
continue
|
|
response.raise_for_status()
|
|
return response
|
|
except requests.exceptions.RequestException as e:
|
|
# Hvis det er 401, skal vi ikke vente/retry her, da token/URL sandsynligvis er udløbet
|
|
if isinstance(e, requests.exceptions.HTTPError) and e.response is not None and e.response.status_code == 401:
|
|
raise e
|
|
|
|
retries += 1
|
|
wait = 2 ** retries
|
|
if retries >= MAX_RETRIES:
|
|
raise e
|
|
logger.error(f"Request failed: {e}. Retrying in {wait}s...")
|
|
time.sleep(wait)
|
|
raise requests.exceptions.RetryError(f"Max retries ({MAX_RETRIES}) exceeded.")
|
|
return wrapper
|
|
|
|
@retry_request
|
|
def safe_get(url, headers, stream=False, timeout=60, params=None):
|
|
return requests.get(url, headers=headers, stream=stream, timeout=timeout, params=params)
|
|
|
|
def safe_graph_get(app, url):
|
|
"""Specialized helper for Graph API calls that handles 401 by refreshing tokens."""
|
|
try:
|
|
return safe_get(url, headers=get_headers(app))
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response is not None and e.response.status_code == 401:
|
|
logger.info("Access Token expired during Graph call. Forcing refresh...")
|
|
return safe_get(url, headers=get_headers(app, force_refresh=True))
|
|
raise
|
|
|
|
# --- Punkt 4: Integrity Validation (QuickXorHash) ---
|
|
def quickxorhash(file_path):
|
|
"""Compute Microsoft QuickXorHash for a file. Returns base64-encoded string.
|
|
Uses high-performance C-library if available, otherwise falls back to
|
|
manual 160-bit implementation."""
|
|
|
|
# 1. Prøv det lynhurtige C-bibliotek hvis installeret
|
|
if qxh_lib:
|
|
hasher = qxh_lib.quickxorhash()
|
|
with open(get_long_path(file_path), 'rb') as f:
|
|
while True:
|
|
chunk = f.read(CHUNK_SIZE)
|
|
if not chunk: break
|
|
hasher.update(chunk)
|
|
return base64.b64encode(hasher.digest()).decode('ascii')
|
|
|
|
# 2. Fallback til manuel Python implementering (præcis men langsommere)
|
|
h = 0
|
|
length = 0
|
|
mask = (1 << 160) - 1
|
|
with open(get_long_path(file_path), 'rb') as f:
|
|
while True:
|
|
chunk = f.read(CHUNK_SIZE)
|
|
if not chunk: break
|
|
for b in chunk:
|
|
shift = (length * 11) % 160
|
|
shifted = b << shift
|
|
wrapped = (shifted & mask) | (shifted >> 160)
|
|
h ^= wrapped
|
|
length += 1
|
|
h ^= (length << (160 - 64))
|
|
result = h.to_bytes(20, byteorder='little')
|
|
return base64.b64encode(result).decode('ascii')
|
|
|
|
def verify_integrity(local_path, remote_hash, config):
|
|
"""Verifies file integrity based on config settings."""
|
|
if not remote_hash or not config.get('ENABLE_HASH_VALIDATION', True):
|
|
return True
|
|
|
|
file_size = os.path.getsize(get_long_path(local_path))
|
|
threshold_mb = config.get('HASH_THRESHOLD_MB', 30)
|
|
threshold_bytes = threshold_mb * 1024 * 1024
|
|
|
|
if file_size > threshold_bytes:
|
|
logger.info(f"Skipping hash check (size > {threshold_mb}MB): {os.path.basename(local_path)}")
|
|
return True
|
|
|
|
local_hash = quickxorhash(local_path)
|
|
if local_hash != remote_hash:
|
|
logger.warning(f"Hash mismatch for {local_path}: local={local_hash}, remote={remote_hash}")
|
|
return False
|
|
return True
|
|
|
|
def get_headers(app, force_refresh=False):
|
|
scopes = ["https://graph.microsoft.com/.default"]
|
|
# If force_refresh is True, we don't rely on the cache
|
|
result = None
|
|
if not force_refresh:
|
|
result = app.acquire_token_for_client(scopes=scopes)
|
|
|
|
if force_refresh or not result or "access_token" not in result:
|
|
logger.info("Refreshing Access Token...")
|
|
result = app.acquire_token_for_client(scopes=scopes, force_refresh=True)
|
|
|
|
if "access_token" in result:
|
|
return {'Authorization': f'Bearer {result["access_token"]}'}
|
|
raise Exception(f"Auth failed: {result.get('error_description')}")
|
|
|
|
def get_site_id(app, site_url):
|
|
parsed = urlparse(site_url)
|
|
url = f"https://graph.microsoft.com/v1.0/sites/{parsed.netloc}:{parsed.path}"
|
|
response = safe_graph_get(app, url)
|
|
return response.json()['id']
|
|
|
|
def get_drive_id(app, site_id, drive_name):
|
|
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
|
|
response = safe_graph_get(app, url)
|
|
drives = response.json().get('value', [])
|
|
|
|
# Prøv præcis match
|
|
for drive in drives:
|
|
if drive['name'] == drive_name:
|
|
return drive['id']
|
|
|
|
# Prøv fallback til "Documents" hvis "Delte dokumenter" fejler (SharePoint standard)
|
|
if drive_name == "Delte dokumenter":
|
|
for drive in drives:
|
|
if drive['name'] == "Documents":
|
|
logger.info("Found 'Documents' as fallback for 'Delte dokumenter'")
|
|
return drive['id']
|
|
|
|
# Log tilgængelige navne for at hjælpe brugeren
|
|
available_names = [d['name'] for d in drives]
|
|
logger.error(f"Drive '{drive_name}' not found. Available drives on this site: {available_names}")
|
|
raise Exception(f"Drive {drive_name} not found. Check the log for available drive names.")
|
|
|
|
# --- Punkt 2: Resume / Chunked Download logic ---
|
|
def get_fresh_download_url(app, drive_id, item_id):
|
|
"""Fetches a fresh download URL for a specific item ID with retries and robust error handling."""
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}"
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
headers = get_headers(app)
|
|
response = requests.get(url, headers=headers, timeout=60)
|
|
|
|
if response.status_code == 429:
|
|
retry_after = int(response.headers.get("Retry-After", 2 ** attempt))
|
|
logger.warning(f"Throttled (429) in get_fresh_download_url. Waiting {retry_after}s...")
|
|
time.sleep(retry_after)
|
|
continue
|
|
|
|
if response.status_code == 401:
|
|
logger.info(f"Access Token expired during refresh (Attempt {attempt+1}). Forcing refresh...")
|
|
headers = get_headers(app, force_refresh=True)
|
|
response = requests.get(url, headers=headers, timeout=60)
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
download_url = data.get('@microsoft.graph.downloadUrl')
|
|
|
|
if download_url:
|
|
return download_url, None
|
|
|
|
# If item exists but URL is missing, it might be a transient SharePoint issue
|
|
logger.warning(f"Attempt {attempt+1}: '@microsoft.graph.downloadUrl' missing for {item_id}. Retrying in {2 ** attempt}s...")
|
|
time.sleep(2 ** attempt)
|
|
|
|
except Exception as e:
|
|
if attempt == 2:
|
|
return None, str(e)
|
|
logger.warning(f"Attempt {attempt+1} failed: {e}. Retrying in {2 ** attempt}s...")
|
|
time.sleep(2 ** attempt)
|
|
|
|
return None, "Item returned but '@microsoft.graph.downloadUrl' was missing after 3 attempts."
|
|
|
|
def download_single_file(app, drive_id, item_id, local_path, expected_size, display_name, config, stop_event=None, remote_hash=None, initial_url=None, remote_mtime_str=None):
|
|
try:
|
|
if stop_event and stop_event.is_set():
|
|
raise InterruptedError("Sync cancelled")
|
|
|
|
file_mode = 'wb'
|
|
resume_header = {}
|
|
existing_size = 0
|
|
download_url = initial_url
|
|
|
|
long_local_path = get_long_path(local_path)
|
|
|
|
if os.path.exists(long_local_path):
|
|
existing_size = os.path.getsize(long_local_path)
|
|
local_mtime = os.path.getmtime(long_local_path)
|
|
|
|
# Konvertér SharePoint ISO8601 UTC tid (f.eks. 2024-03-29T12:00:00Z) til unix timestamp
|
|
remote_mtime = datetime.fromisoformat(remote_mtime_str.replace('Z', '+00:00')).timestamp()
|
|
|
|
# Hvis filen findes, har rigtig størrelse OG lokal er ikke ældre end remote -> SKIP
|
|
if existing_size == expected_size:
|
|
if local_mtime >= (remote_mtime - 1): # Vi tillader 1 sekuds difference pga. filsystem-præcision
|
|
logger.info(f"Skipped (up-to-date): {display_name}")
|
|
return True, None
|
|
else:
|
|
logger.info(f"Update available: {display_name} (Remote is newer)")
|
|
existing_size = 0
|
|
elif existing_size < expected_size:
|
|
# Ved resume tjekker vi også om kilden er ændret siden vi startede
|
|
if local_mtime < (remote_mtime - 1):
|
|
logger.warning(f"Remote file changed during partial download: {display_name}. Restarting.")
|
|
existing_size = 0
|
|
else:
|
|
logger.info(f"Resuming: {display_name} from {format_size(existing_size)}")
|
|
resume_header = {'Range': f'bytes={existing_size}-'}
|
|
file_mode = 'ab'
|
|
else:
|
|
logger.warning(f"Local file larger than remote: {display_name}. Overwriting.")
|
|
existing_size = 0
|
|
|
|
logger.info(f"Starting: {display_name} ({format_size(expected_size)})")
|
|
os.makedirs(os.path.dirname(long_local_path), exist_ok=True)
|
|
|
|
# Initial download attempt
|
|
if not download_url:
|
|
download_url, err = get_fresh_download_url(app, drive_id, item_id)
|
|
if not download_url:
|
|
return False, f"Could not fetch initial URL: {err}"
|
|
|
|
try:
|
|
response = safe_get(download_url, resume_header, stream=True, timeout=120)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response is not None and e.response.status_code == 401:
|
|
# Handle 401 Unauthorized from SharePoint (expired download link)
|
|
logger.warning(f"URL expired for {display_name}. Fetching fresh URL...")
|
|
download_url, err = get_fresh_download_url(app, drive_id, item_id)
|
|
if not download_url:
|
|
return False, f"Failed to refresh download URL: {err}"
|
|
response = safe_get(download_url, resume_header, stream=True, timeout=120)
|
|
else:
|
|
raise
|
|
|
|
with open(long_local_path, file_mode) as f:
|
|
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
|
|
if stop_event and stop_event.is_set():
|
|
raise InterruptedError("Sync cancelled")
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
# Post-download check
|
|
final_size = os.path.getsize(long_local_path)
|
|
if final_size == expected_size:
|
|
if verify_integrity(local_path, remote_hash, config):
|
|
logger.info(f"DONE: {display_name}")
|
|
return True, None
|
|
else:
|
|
return False, "Integrity check failed (Hash mismatch)"
|
|
else:
|
|
return False, f"Size mismatch: Remote={expected_size}, Local={final_size}"
|
|
|
|
except InterruptedError:
|
|
raise
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
# --- Main Traversal Logic ---
|
|
def process_item_list(app, drive_id, item_path, local_root_path, report, executor, futures, config, stop_event=None, depth=0):
|
|
if depth >= MAX_FOLDER_DEPTH:
|
|
logger.warning(f"Max folder depth ({MAX_FOLDER_DEPTH}) reached at: {item_path}. Skipping subtree.")
|
|
return
|
|
try:
|
|
if stop_event and stop_event.is_set():
|
|
raise InterruptedError("Sync cancelled")
|
|
|
|
encoded_path = quote(item_path)
|
|
|
|
if not item_path:
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
|
|
else:
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children"
|
|
|
|
while url:
|
|
response = safe_graph_get(app, url)
|
|
data = response.json()
|
|
items = data.get('value', [])
|
|
|
|
for item in items:
|
|
if stop_event and stop_event.is_set():
|
|
raise InterruptedError("Sync cancelled")
|
|
|
|
item_name = item['name']
|
|
local_path = os.path.join(local_root_path, item_name)
|
|
display_path = f"{item_path}/{item_name}".strip('/')
|
|
|
|
if 'folder' in item:
|
|
process_item_list(app, drive_id, display_path, local_path, report, executor, futures, config, stop_event, depth + 1)
|
|
elif 'file' in item:
|
|
item_id = item['id']
|
|
download_url = item.get('@microsoft.graph.downloadUrl')
|
|
remote_hash = item.get('file', {}).get('hashes', {}).get('quickXorHash')
|
|
remote_mtime = item.get('lastModifiedDateTime')
|
|
|
|
future = executor.submit(
|
|
download_single_file,
|
|
app, drive_id, item_id,
|
|
local_path, item['size'], display_path,
|
|
config, stop_event, remote_hash, download_url, remote_mtime
|
|
)
|
|
futures[future] = display_path
|
|
|
|
url = data.get('@odata.nextLink')
|
|
|
|
except InterruptedError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error traversing {item_path}: {e}")
|
|
with report_lock:
|
|
report.append({"Path": item_path, "Error": str(e), "Timestamp": datetime.now().isoformat()})
|
|
|
|
def create_msal_app(tenant_id, client_id, client_secret):
|
|
return ConfidentialClientApplication(
|
|
client_id, authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret
|
|
)
|
|
|
|
def main(config=None, stop_event=None):
|
|
try:
|
|
if config is None:
|
|
config = load_config('connection_info.txt')
|
|
|
|
tenant_id = config.get('TENANT_ID', '')
|
|
client_id = config.get('CLIENT_ID', '')
|
|
client_secret = config.get('CLIENT_SECRET', '')
|
|
site_url = config.get('SITE_URL', '')
|
|
drive_name = config.get('DOCUMENT_LIBRARY', '')
|
|
folders_str = config.get('FOLDERS_TO_DOWNLOAD', '')
|
|
local_base = config.get('LOCAL_PATH', '').replace('\\', os.sep)
|
|
|
|
folders = [f.strip() for f in folders_str.split(',') if f.strip()] or [""]
|
|
|
|
logger.info("Initializing SharePoint Production Sync Tool...")
|
|
app = create_msal_app(tenant_id, client_id, client_secret)
|
|
site_id = get_site_id(app, site_url)
|
|
drive_id = get_drive_id(app, site_id, drive_name)
|
|
|
|
report = []
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="DL") as executor:
|
|
futures = {}
|
|
for folder in folders:
|
|
if stop_event and stop_event.is_set():
|
|
break
|
|
logger.info(f"Scanning: {folder or 'Root'}")
|
|
process_item_list(app, drive_id, folder, os.path.join(local_base, folder), report, executor, futures, config, stop_event)
|
|
|
|
logger.info(f"Scan complete. Processing {len(futures)} tasks...")
|
|
for future in as_completed(futures):
|
|
if stop_event and stop_event.is_set():
|
|
break
|
|
path = futures[future]
|
|
try:
|
|
success, error = future.result()
|
|
if not success:
|
|
logger.error(f"FAILED: {path} | {error}")
|
|
with report_lock:
|
|
report.append({"Path": path, "Error": error, "Timestamp": datetime.now().isoformat()})
|
|
except InterruptedError:
|
|
continue # The executor will shut down anyway
|
|
|
|
if stop_event and stop_event.is_set():
|
|
logger.warning("Synchronization was stopped by user.")
|
|
return
|
|
|
|
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
with open(report_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"])
|
|
writer.writeheader()
|
|
writer.writerows(report)
|
|
|
|
logger.info(f"Sync complete. Errors: {len(report)}. Report: {report_file}")
|
|
|
|
except InterruptedError:
|
|
logger.warning("Synchronization was stopped by user.")
|
|
except Exception as e:
|
|
logger.critical(f"FATAL ERROR: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|