import os import csv import requests import time import threading from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote # Configuration for concurrency MAX_WORKERS = 5 report_lock = threading.Lock() def format_size(size_bytes): """Formats bytes into a human-readable string.""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 def load_config(file_path): config = {} with open(file_path, 'r', encoding='utf-8') as f: for line in f: if '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip().strip('"') return config def create_msal_app(tenant_id, client_id, client_secret): return ConfidentialClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret, ) def get_headers(app): """Acquires a token from cache or fetches a new one if expired.""" scopes = ["https://graph.microsoft.com/.default"] result = app.acquire_token_for_client(scopes=scopes) if "access_token" in result: return {'Authorization': f'Bearer {result["access_token"]}'} else: raise Exception(f"Could not acquire token: {result.get('error_description')}") def get_site_id(app, site_url): headers = get_headers(app) parsed = urlparse(site_url) hostname = parsed.netloc site_path = parsed.path url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()['id'] def get_drive_id(app, site_id, drive_name): headers = get_headers(app) url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" response = requests.get(url, headers=headers) response.raise_for_status() drives = response.json().get('value', []) for drive in drives: if drive['name'] == drive_name: return drive['id'] raise Exception(f"Drive '{drive_name}' not found in site.") def download_single_file(download_url, local_path, expected_size, display_name): """Worker function for a single file download.""" try: # Check if file exists and size matches if os.path.exists(local_path): local_size = os.path.getsize(local_path) if int(local_size) == int(expected_size): print(f"Skipped (matches local): {display_name}") return True, None print(f"Starting: {display_name} ({format_size(expected_size)})") os.makedirs(os.path.dirname(local_path), exist_ok=True) # Using a longer timeout for the initial connection on very large files response = requests.get(download_url, stream=True, timeout=120) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=1024*1024): # 1MB chunks if chunk: f.write(chunk) # Verify size after download local_size = os.path.getsize(local_path) if int(local_size) == int(expected_size): print(f"DONE: {display_name}") return True, None else: return False, f"Size mismatch: Remote={expected_size}, Local={local_size}" except Exception as e: return False, str(e) def process_item_list(app, drive_id, item_path, local_root_path, report, executor, futures): """Traverses folders and submits file downloads to the executor with pagination support.""" try: headers = get_headers(app) encoded_path = quote(item_path) # Initial URL for the folder children if not item_path: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" else: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" while url: response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() items = data.get('value', []) for item in items: item_name = item['name'] local_path = os.path.join(local_root_path, item_name) display_path = f"{item_path}/{item_name}".strip('/') if 'folder' in item: process_item_list(app, drive_id, display_path, local_path, report, executor, futures) elif 'file' in item: download_url = item.get('@microsoft.graph.downloadUrl') if not download_url: with report_lock: report.append({"Path": display_path, "Error": "No download URL", "Timestamp": datetime.now().isoformat()}) continue # Submit download to thread pool future = executor.submit(download_single_file, download_url, local_path, item['size'], display_path) futures[future] = display_path # Check for next page of items url = data.get('@odata.nextLink') if url: # Refresh token if needed for the next page request headers = get_headers(app) except Exception as e: with report_lock: report.append({"Path": item_path, "Error": f"Folder error: {str(e)}", "Timestamp": datetime.now().isoformat()}) def main(): config = load_config('connection_info.txt') tenant_id = config.get('TENANT_ID', '') client_id = config.get('CLIENT_ID', '') client_secret = config.get('CLIENT_SECRET', '') site_url = config.get('SITE_URL', '') drive_name = config.get('DOCUMENT_LIBRARY', '') folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '') local_path_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] if not folders_to_download: folders_to_download = [""] print(f"Connecting via Graph API (Parallel Download, Workers={MAX_WORKERS})...") report = [] try: app = create_msal_app(tenant_id, client_id, client_secret) site_id = get_site_id(app, site_url) drive_id = get_drive_id(app, site_id, drive_name) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = {} for folder in folders_to_download: if folder == "": print("\nScanning entire document library (Root)...") else: print(f"\nScanning folder: {folder}") local_folder_path = os.path.join(local_path_base, folder) process_item_list(app, drive_id, folder, local_folder_path, report, executor, futures) print(f"\n--- Scanning complete. Active downloads: {len(futures)} ---\n") # Wait for all downloads to complete and collect errors for future in as_completed(futures): path = futures[future] success, error_msg = future.result() if not success: print(f"FAILED: {path} - {error_msg}") with report_lock: report.append({"Path": path, "Error": error_msg, "Timestamp": datetime.now().isoformat()}) except Exception as e: print(f"Critical error: {e}") report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()}) report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer.writeheader() writer.writerows(report) print(f"\nProcess complete. Errors logged: {len(report)}") print(f"Report file: {report_file}") if __name__ == "__main__": main()