From 6333f27037c0e4f6b1e556fa3d5dc18f32a6b336 Mon Sep 17 00:00:00 2001 From: Martin Tranberg Date: Thu, 26 Mar 2026 15:15:05 +0100 Subject: [PATCH] =?UTF-8?q?Implementer=20parallel=20download=20med=20Threa?= =?UTF-8?q?dPoolExecutor=20(max=205=20tr=C3=A5de)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- download_sharepoint.py | 194 +++++++++++++---------------------------- 1 file changed, 60 insertions(+), 134 deletions(-) diff --git a/download_sharepoint.py b/download_sharepoint.py index b744ac6..172a235 100644 --- a/download_sharepoint.py +++ b/download_sharepoint.py @@ -2,56 +2,15 @@ import os import csv import requests import time -import re -import sys +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote -# Global stats to track progress -stats = { - "total_checked": 0, - "downloaded": 0, - "skipped": 0, - "failed": 0 -} - -def print_status(current_item="", action="Processing"): - """Prints a single-line progress update with optional current item and action.""" - status_line = f"Checked: {stats['total_checked']} | Downloaded: {stats['downloaded']} | Skipped: {stats['skipped']} | Failed: {stats['failed']}" - if current_item: - max_len = 40 - display_item = current_item if len(current_item) <= max_len else "..." + current_item[-(max_len-3):] - status_line += f" | {action}: {display_item}" - sys.stdout.write(f"\r{status_line.ljust(120)}") - sys.stdout.flush() - -def sanitize_filename(name): - """Removes invalid characters and handles Unicode whitespace for Windows.""" - if not name: - return "unnamed_item" - name = name.replace('\u00A0', ' ').replace('\u200b', '') - invalid_chars = '<>:"/\\|?*' - for char in invalid_chars: - name = name.replace(char, '_') - name = "".join(c for c in name if ord(c) >= 32) - name = name.strip(' .') - reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", - "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"} - base_part = name.split('.')[0].upper() - if base_part in reserved_names: - name = "_" + name - return name if name else "unnamed_item" - -def get_long_path(path): - """Converts a path to a long-path-aware format on Windows.""" - if os.name == 'nt': - abs_path = os.path.abspath(path) - if not abs_path.startswith('\\\\?\\'): - if abs_path.startswith('\\\\'): # UNC path - return '\\\\?\\UNC\\' + abs_path[2:] - return '\\\\?\\' + abs_path - return path +# Configuration for concurrency +MAX_WORKERS = 5 +report_lock = threading.Lock() def load_config(file_path): config = {} @@ -99,58 +58,43 @@ def get_drive_id(app, site_id, drive_name): return drive['id'] raise Exception(f"Drive '{drive_name}' not found in site.") -def download_file(download_url, local_path, expected_size, item_path): +def download_single_file(download_url, local_path, expected_size, display_name): + """Worker function for a single file download.""" try: - long_local_path = get_long_path(local_path) - - # Verify local storage is still accessible - target_dir = os.path.dirname(long_local_path) - if not os.path.exists(os.path.dirname(target_dir.rstrip('\\/')) or target_dir): - # We check the parent of the target dir to see if the drive is still there - pass - - if os.path.exists(long_local_path): - if os.path.isdir(long_local_path): - return False, f"Path exists as a directory: {local_path}" - local_size = os.path.getsize(long_local_path) + # Check if file exists and size matches + if os.path.exists(local_path): + local_size = os.path.getsize(local_path) if int(local_size) == int(expected_size): - return True, "Skipped" - - if not os.path.exists(target_dir): - try: - os.makedirs(target_dir, exist_ok=True) - except Exception: - return False, f"Could not create directory (Drive disconnected?): {target_dir}" - elif not os.path.isdir(target_dir): - return False, f"Parent path exists but is not a directory: {target_dir}" - - print_status(item_path, action="Downloading") + print(f"Skipped (matches local): {display_name}") + return True, None + os.makedirs(os.path.dirname(local_path), exist_ok=True) + # Using a timeout for the request to prevent hanging indefinitely response = requests.get(download_url, stream=True, timeout=60) response.raise_for_status() - with open(long_local_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=1024*1024): + + with open(local_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=1024*1024): # 1MB chunks if chunk: f.write(chunk) - local_size = os.path.getsize(long_local_path) + # Verify size after download + local_size = os.path.getsize(local_path) if int(local_size) == int(expected_size): - return True, "Downloaded" + print(f"Downloaded: {display_name}") + return True, None else: return False, f"Size mismatch: Remote={expected_size}, Local={local_size}" - except OSError as e: - if e.errno in [2, 3, 15, 21, 53, 67]: # Common drive/network related errors - return False, f"Storage Error (Drive disconnected?): {str(e)}" - return False, f"[OSError {e.errno}] {str(e)}" except Exception as e: - return False, f"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}" + return False, str(e) -def download_folder_recursive(app, drive_id, item_path, local_root_path, report): +def process_item_list(app, drive_id, item_path, local_root_path, report, executor, futures): + """Traverses folders and submits file downloads to the executor.""" try: - print_status(item_path, action="Scanning") headers = get_headers(app) encoded_path = quote(item_path) url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" + if not item_path: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" @@ -160,42 +104,25 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report) for item in items: item_name = item['name'] - sanitized_name = sanitize_filename(item_name) - local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name)) + local_path = os.path.join(local_root_path, item_name) + display_path = f"{item_path}/{item_name}".strip('/') if 'folder' in item: - sub_item_path = f"{item_path}/{item_name}".strip('/') - download_folder_recursive(app, drive_id, sub_item_path, local_path, report) + process_item_list(app, drive_id, display_path, local_path, report, executor, futures) elif 'file' in item: - stats["total_checked"] += 1 - full_item_path = f"{item_path}/{item_name}".strip('/') download_url = item.get('@microsoft.graph.downloadUrl') if not download_url: - stats["failed"] += 1 - report.append({"Path": full_item_path, "Error": "No URL", "Timestamp": datetime.now().isoformat()}) + with report_lock: + report.append({"Path": display_path, "Error": "No download URL", "Timestamp": datetime.now().isoformat()}) continue - print_status(full_item_path, action="Checking") - success, status = download_file(download_url, local_path, item['size'], full_item_path) + # Submit download to thread pool + future = executor.submit(download_single_file, download_url, local_path, item['size'], display_path) + futures[future] = display_path - if success: - if status == "Downloaded": - stats["downloaded"] += 1 - sys.stdout.write(f"\r{' ' * 120}\r") - print(f"Downloaded: {full_item_path}") - else: - stats["skipped"] += 1 - else: - stats["failed"] += 1 - sys.stdout.write(f"\r{' ' * 120}\r") - print(f"FAILED: {full_item_path} - {status}") - report.append({"Path": full_item_path, "Error": status, "Timestamp": datetime.now().isoformat()}) - print_status(full_item_path, action="Done") except Exception as e: - err_msg = f"Folder error: {str(e)}" - sys.stdout.write(f"\r{' ' * 120}\r") - print(f"FAILED FOLDER: {item_path} - {err_msg}") - report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()}) + with report_lock: + report.append({"Path": item_path, "Error": f"Folder error: {str(e)}", "Timestamp": datetime.now().isoformat()}) def main(): config = load_config('connection_info.txt') @@ -205,53 +132,52 @@ def main(): site_url = config.get('SITE_URL') drive_name = config.get('DOCUMENT_LIBRARY') folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '') - local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep) + local_path_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) - # Pre-check: Is the local path accessible? - if not os.path.exists(local_path_base): - print(f"\nCRITICAL ERROR: Local path '{local_path_base}' is not accessible.") - print("Please ensure your network drive is connected and try again.") - return - folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] if not folders_to_download: folders_to_download = [""] - print("Step 1: Authenticating with Microsoft Entra ID...") - app = create_msal_app(tenant_id, client_id, client_secret) + print(f"Connecting via Graph API (Parallel Download, Workers={MAX_WORKERS})...") report = [] + try: - print("Step 2: Connecting to SharePoint Site...") + app = create_msal_app(tenant_id, client_id, client_secret) site_id = get_site_id(app, site_url) - print(f" - Site found (ID: {site_id[:10]}...)") - - print(f"Step 3: Accessing Document Library '{drive_name}'...") drive_id = get_drive_id(app, site_id, drive_name) - print(f" - Library found (ID: {drive_id[:10]}...)") - for folder in folders_to_download: - folder_parts = [sanitize_filename(p) for p in folder.split('/') if p] - local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts)) - print(f"\nStep 4: Processing folder: {folder if folder else 'Root'}") - download_folder_recursive(app, drive_id, folder, local_folder_path, report) + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + futures = {} + for folder in folders_to_download: + if folder == "": + print("\nProcessing entire document library (Root)...") + else: + print(f"\nProcessing folder: {folder}") + + local_folder_path = os.path.join(local_path_base, folder) + process_item_list(app, drive_id, folder, local_folder_path, report, executor, futures) + + # Wait for all downloads to complete and collect errors + for future in as_completed(futures): + path = futures[future] + success, error_msg = future.result() + if not success: + print(f"FAILED: {path} - {error_msg}") + with report_lock: + report.append({"Path": path, "Error": error_msg, "Timestamp": datetime.now().isoformat()}) except Exception as e: - print(f"\nCRITICAL ERROR: {e}") + print(f"Critical error: {e}") report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()}) - sys.stdout.write(f"\r{' ' * 120}\r") report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer.writeheader() writer.writerows(report) - print(f"\nProcess complete.") - print(f"Summary: {stats['total_checked']} items checked.") - print(f" - {stats['downloaded']} new files downloaded.") - print(f" - {stats['skipped']} existing files skipped.") - print(f" - {stats['failed']} errors encountered (see {report_file}).") + print(f"\nProcess complete. Errors: {len(report)}") if __name__ == "__main__": main()