import os import csv import requests import time import threading import logging import base64 import struct try: import quickxorhash as qxh_lib except ImportError: qxh_lib = None from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote # --- Production Configuration --- MAX_WORKERS = 5 MAX_RETRIES = 5 CHUNK_SIZE = 1024 * 1024 # 1MB Chunks MAX_FOLDER_DEPTH = 50 LOG_FILE = "sharepoint_download.log" # Setup Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) report_lock = threading.Lock() def format_size(size_bytes): for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} EB" def get_long_path(path): r"""Handles Windows Long Path limitation by prefixing with \\?\ for absolute paths. Correctly handles UNC paths (e.g. \\server\share -> \\?\UNC\server\share).""" path = os.path.abspath(path) if os.name == 'nt' and not path.startswith("\\\\?\\"): if path.startswith("\\\\"): return "\\\\?\\UNC\\" + path[2:] return "\\\\?\\" + path return path def load_config(file_path): config = {} if not os.path.exists(file_path): raise FileNotFoundError(f"Configuration file {file_path} not found.") with open(file_path, 'r', encoding='utf-8') as f: for line in f: if '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip().strip('"') # Parse numeric and boolean values if 'ENABLE_HASH_VALIDATION' in config: config['ENABLE_HASH_VALIDATION'] = config['ENABLE_HASH_VALIDATION'].lower() == 'true' else: config['ENABLE_HASH_VALIDATION'] = True if 'HASH_THRESHOLD_MB' in config: try: config['HASH_THRESHOLD_MB'] = int(config['HASH_THRESHOLD_MB']) except ValueError: config['HASH_THRESHOLD_MB'] = 30 else: config['HASH_THRESHOLD_MB'] = 30 return config # --- Punkt 1: Exponential Backoff & Retry Logic --- def retry_request(func): def wrapper(*args, **kwargs): retries = 0 while retries < MAX_RETRIES: try: response = func(*args, **kwargs) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 2 ** retries)) logger.warning(f"Throttled (429). Waiting {retry_after}s...") time.sleep(retry_after) retries += 1 continue response.raise_for_status() return response except requests.exceptions.RequestException as e: # Hvis det er 401, skal vi ikke vente/retry her, da token/URL sandsynligvis er udløbet if isinstance(e, requests.exceptions.HTTPError) and e.response is not None and e.response.status_code == 401: raise e retries += 1 wait = 2 ** retries if retries >= MAX_RETRIES: raise e logger.error(f"Request failed: {e}. Retrying in {wait}s...") time.sleep(wait) raise requests.exceptions.RetryError(f"Max retries ({MAX_RETRIES}) exceeded.") return wrapper @retry_request def safe_get(url, headers, stream=False, timeout=60, params=None): return requests.get(url, headers=headers, stream=stream, timeout=timeout, params=params) def safe_graph_get(app, url): """Specialized helper for Graph API calls that handles 401 by refreshing tokens.""" try: return safe_get(url, headers=get_headers(app)) except requests.exceptions.HTTPError as e: if e.response is not None and e.response.status_code == 401: logger.info("Access Token expired during Graph call. Forcing refresh...") return safe_get(url, headers=get_headers(app, force_refresh=True)) raise # --- Punkt 4: Integrity Validation (QuickXorHash) --- def quickxorhash(file_path): """Compute Microsoft QuickXorHash for a file. Returns base64-encoded string. Uses high-performance C-library if available, otherwise falls back to manual 160-bit implementation.""" # 1. Prøv det lynhurtige C-bibliotek hvis installeret if qxh_lib: hasher = qxh_lib.quickxorhash() with open(get_long_path(file_path), 'rb') as f: while True: chunk = f.read(CHUNK_SIZE) if not chunk: break hasher.update(chunk) return base64.b64encode(hasher.digest()).decode('ascii') # 2. Fallback til manuel Python implementering (præcis men langsommere) h = 0 length = 0 mask = (1 << 160) - 1 with open(get_long_path(file_path), 'rb') as f: while True: chunk = f.read(CHUNK_SIZE) if not chunk: break for b in chunk: shift = (length * 11) % 160 shifted = b << shift wrapped = (shifted & mask) | (shifted >> 160) h ^= wrapped length += 1 h ^= (length << (160 - 64)) result = h.to_bytes(20, byteorder='little') return base64.b64encode(result).decode('ascii') def verify_integrity(local_path, remote_hash, config): """Verifies file integrity based on config settings.""" if not remote_hash or not config.get('ENABLE_HASH_VALIDATION', True): return True file_size = os.path.getsize(get_long_path(local_path)) threshold_mb = config.get('HASH_THRESHOLD_MB', 30) threshold_bytes = threshold_mb * 1024 * 1024 if file_size > threshold_bytes: logger.info(f"Skipping hash check (size > {threshold_mb}MB): {os.path.basename(local_path)}") return True local_hash = quickxorhash(local_path) if local_hash != remote_hash: logger.warning(f"Hash mismatch for {local_path}: local={local_hash}, remote={remote_hash}") return False return True def get_headers(app, force_refresh=False): scopes = ["https://graph.microsoft.com/.default"] # If force_refresh is True, we don't rely on the cache result = None if not force_refresh: result = app.acquire_token_for_client(scopes=scopes) if force_refresh or not result or "access_token" not in result: logger.info("Refreshing Access Token...") result = app.acquire_token_for_client(scopes=scopes, force_refresh=True) if "access_token" in result: return {'Authorization': f'Bearer {result["access_token"]}'} raise Exception(f"Auth failed: {result.get('error_description')}") def get_site_id(app, site_url): parsed = urlparse(site_url) url = f"https://graph.microsoft.com/v1.0/sites/{parsed.netloc}:{parsed.path}" response = safe_graph_get(app, url) return response.json()['id'] def get_drive_id(app, site_id, drive_name): url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" response = safe_graph_get(app, url) drives = response.json().get('value', []) # Prøv præcis match for drive in drives: if drive['name'] == drive_name: return drive['id'] # Prøv fallback til "Documents" hvis "Delte dokumenter" fejler (SharePoint standard) if drive_name == "Delte dokumenter": for drive in drives: if drive['name'] == "Documents": logger.info("Found 'Documents' as fallback for 'Delte dokumenter'") return drive['id'] # Log tilgængelige navne for at hjælpe brugeren available_names = [d['name'] for d in drives] logger.error(f"Drive '{drive_name}' not found. Available drives on this site: {available_names}") raise Exception(f"Drive {drive_name} not found. Check the log for available drive names.") # --- Punkt 2: Resume / Chunked Download logic --- def get_fresh_download_url(app, drive_id, item_id): """Fetches a fresh download URL for a specific item ID with retries and robust error handling.""" url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}" for attempt in range(3): try: headers = get_headers(app) response = requests.get(url, headers=headers, timeout=60) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 2 ** attempt)) logger.warning(f"Throttled (429) in get_fresh_download_url. Waiting {retry_after}s...") time.sleep(retry_after) continue if response.status_code == 401: logger.info(f"Access Token expired during refresh (Attempt {attempt+1}). Forcing refresh...") headers = get_headers(app, force_refresh=True) response = requests.get(url, headers=headers, timeout=60) response.raise_for_status() data = response.json() download_url = data.get('@microsoft.graph.downloadUrl') if download_url: return download_url, None # If item exists but URL is missing, it might be a transient SharePoint issue logger.warning(f"Attempt {attempt+1}: '@microsoft.graph.downloadUrl' missing for {item_id}. Retrying in {2 ** attempt}s...") time.sleep(2 ** attempt) except Exception as e: if attempt == 2: return None, str(e) logger.warning(f"Attempt {attempt+1} failed: {e}. Retrying in {2 ** attempt}s...") time.sleep(2 ** attempt) return None, "Item returned but '@microsoft.graph.downloadUrl' was missing after 3 attempts." def download_single_file(app, drive_id, item_id, local_path, expected_size, display_name, config, stop_event=None, remote_hash=None, initial_url=None, remote_mtime_str=None): try: if stop_event and stop_event.is_set(): raise InterruptedError("Sync cancelled") file_mode = 'wb' resume_header = {} existing_size = 0 download_url = initial_url long_local_path = get_long_path(local_path) if os.path.exists(long_local_path): existing_size = os.path.getsize(long_local_path) local_mtime = os.path.getmtime(long_local_path) # Konvertér SharePoint ISO8601 UTC tid (f.eks. 2024-03-29T12:00:00Z) til unix timestamp remote_mtime = datetime.fromisoformat(remote_mtime_str.replace('Z', '+00:00')).timestamp() # Hvis filen findes, har rigtig størrelse OG lokal er ikke ældre end remote -> SKIP if existing_size == expected_size: if local_mtime >= (remote_mtime - 1): # Vi tillader 1 sekuds difference pga. filsystem-præcision logger.info(f"Skipped (up-to-date): {display_name}") return True, None else: logger.info(f"Update available: {display_name} (Remote is newer)") existing_size = 0 elif existing_size < expected_size: # Ved resume tjekker vi også om kilden er ændret siden vi startede if local_mtime < (remote_mtime - 1): logger.warning(f"Remote file changed during partial download: {display_name}. Restarting.") existing_size = 0 else: logger.info(f"Resuming: {display_name} from {format_size(existing_size)}") resume_header = {'Range': f'bytes={existing_size}-'} file_mode = 'ab' else: logger.warning(f"Local file larger than remote: {display_name}. Overwriting.") existing_size = 0 logger.info(f"Starting: {display_name} ({format_size(expected_size)})") os.makedirs(os.path.dirname(long_local_path), exist_ok=True) # Initial download attempt if not download_url: download_url, err = get_fresh_download_url(app, drive_id, item_id) if not download_url: return False, f"Could not fetch initial URL: {err}" try: response = safe_get(download_url, resume_header, stream=True, timeout=120) except requests.exceptions.HTTPError as e: if e.response is not None and e.response.status_code == 401: # Handle 401 Unauthorized from SharePoint (expired download link) logger.warning(f"URL expired for {display_name}. Fetching fresh URL...") download_url, err = get_fresh_download_url(app, drive_id, item_id) if not download_url: return False, f"Failed to refresh download URL: {err}" response = safe_get(download_url, resume_header, stream=True, timeout=120) else: raise with open(long_local_path, file_mode) as f: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): if stop_event and stop_event.is_set(): raise InterruptedError("Sync cancelled") if chunk: f.write(chunk) # Post-download check final_size = os.path.getsize(long_local_path) if final_size == expected_size: if verify_integrity(local_path, remote_hash, config): logger.info(f"DONE: {display_name}") return True, None else: return False, "Integrity check failed (Hash mismatch)" else: return False, f"Size mismatch: Remote={expected_size}, Local={final_size}" except InterruptedError: raise except Exception as e: return False, str(e) # --- Main Traversal Logic --- def process_item_list(app, drive_id, item_path, local_root_path, report, executor, futures, config, stop_event=None, depth=0): if depth >= MAX_FOLDER_DEPTH: logger.warning(f"Max folder depth ({MAX_FOLDER_DEPTH}) reached at: {item_path}. Skipping subtree.") return try: if stop_event and stop_event.is_set(): raise InterruptedError("Sync cancelled") encoded_path = quote(item_path) if not item_path: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" else: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" while url: response = safe_graph_get(app, url) data = response.json() items = data.get('value', []) for item in items: if stop_event and stop_event.is_set(): raise InterruptedError("Sync cancelled") item_name = item['name'] local_path = os.path.join(local_root_path, item_name) display_path = f"{item_path}/{item_name}".strip('/') if 'folder' in item: process_item_list(app, drive_id, display_path, local_path, report, executor, futures, config, stop_event, depth + 1) elif 'file' in item: item_id = item['id'] download_url = item.get('@microsoft.graph.downloadUrl') remote_hash = item.get('file', {}).get('hashes', {}).get('quickXorHash') remote_mtime = item.get('lastModifiedDateTime') future = executor.submit( download_single_file, app, drive_id, item_id, local_path, item['size'], display_path, config, stop_event, remote_hash, download_url, remote_mtime ) futures[future] = display_path url = data.get('@odata.nextLink') except InterruptedError: raise except Exception as e: logger.error(f"Error traversing {item_path}: {e}") with report_lock: report.append({"Path": item_path, "Error": str(e), "Timestamp": datetime.now().isoformat()}) def create_msal_app(tenant_id, client_id, client_secret): return ConfidentialClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret ) def main(config=None, stop_event=None): try: if config is None: config = load_config('connection_info.txt') tenant_id = config.get('TENANT_ID', '') client_id = config.get('CLIENT_ID', '') client_secret = config.get('CLIENT_SECRET', '') site_url = config.get('SITE_URL', '') drive_name = config.get('DOCUMENT_LIBRARY', '') folders_str = config.get('FOLDERS_TO_DOWNLOAD', '') local_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) folders = [f.strip() for f in folders_str.split(',') if f.strip()] or [""] logger.info("Initializing SharePoint Production Sync Tool...") app = create_msal_app(tenant_id, client_id, client_secret) site_id = get_site_id(app, site_url) drive_id = get_drive_id(app, site_id, drive_name) report = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="DL") as executor: futures = {} for folder in folders: if stop_event and stop_event.is_set(): break logger.info(f"Scanning: {folder or 'Root'}") process_item_list(app, drive_id, folder, os.path.join(local_base, folder), report, executor, futures, config, stop_event) logger.info(f"Scan complete. Processing {len(futures)} tasks...") for future in as_completed(futures): if stop_event and stop_event.is_set(): break path = futures[future] try: success, error = future.result() if not success: logger.error(f"FAILED: {path} | {error}") with report_lock: report.append({"Path": path, "Error": error, "Timestamp": datetime.now().isoformat()}) except InterruptedError: continue # The executor will shut down anyway if stop_event and stop_event.is_set(): logger.warning("Synchronization was stopped by user.") return report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer.writeheader() writer.writerows(report) logger.info(f"Sync complete. Errors: {len(report)}. Report: {report_file}") except InterruptedError: logger.warning("Synchronization was stopped by user.") except Exception as e: logger.critical(f"FATAL ERROR: {e}") if __name__ == "__main__": main()