import os import csv import requests from datetime import datetime from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote def load_config(file_path): config = {} with open(file_path, 'r', encoding='utf-8') as f: for line in f: if '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip().strip('"') return config def get_access_token(tenant_id, client_id, client_secret): app = ConfidentialClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret, ) result = app.acquire_token_for_client(scopes=["https://graph.microsoft.com/.default"]) if "access_token" in result: return result["access_token"] else: raise Exception(f"Could not acquire token: {result.get('error_description')}") def get_site_id(headers, site_url): parsed = urlparse(site_url) hostname = parsed.netloc site_path = parsed.path url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()['id'] def get_drive_id(headers, site_id, drive_name): url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" response = requests.get(url, headers=headers) response.raise_for_status() drives = response.json().get('value', []) for drive in drives: if drive['name'] == drive_name: return drive['id'] raise Exception(f"Drive '{drive_name}' not found in site.") def download_file(headers, download_url, local_path, expected_size): try: os.makedirs(os.path.dirname(local_path), exist_ok=True) response = requests.get(download_url, stream=True) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Verify size local_size = os.path.getsize(local_path) if int(local_size) == int(expected_size): return True, None else: return False, f"Size mismatch: Remote={expected_size}, Local={local_size}" except Exception as e: return False, str(e) def download_folder_recursive(headers, drive_id, item_path, local_root_path, report): try: # Get children of the folder # Path must be encoded correctly encoded_path = quote(item_path) url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" # If item_path is empty, use root if not item_path: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" response = requests.get(url, headers=headers) response.raise_for_status() items = response.json().get('value', []) for item in items: item_name = item['name'] local_path = os.path.join(local_root_path, item_name) if 'folder' in item: # Recursive call sub_item_path = f"{item_path}/{item_name}".strip('/') download_folder_recursive(headers, drive_id, sub_item_path, local_path, report) elif 'file' in item: download_url = item.get('@microsoft.graph.downloadUrl') if not download_url: # Fallback or error report.append({ "Path": f"{item_path}/{item_name}", "Error": "No download URL available", "Timestamp": datetime.now().isoformat() }) continue print(f"Downloading: {item_path}/{item_name}...") success, error_msg = download_file(headers, download_url, local_path, item['size']) if not success: report.append({ "Path": f"{item_path}/{item_name}", "Error": error_msg, "Timestamp": datetime.now().isoformat() }) except Exception as e: report.append({ "Path": item_path, "Error": f"Folder processing error: {str(e)}", "Timestamp": datetime.now().isoformat() }) def main(): config = load_config('connection_info.txt') tenant_id = config.get('TENANT_ID') client_id = config.get('CLIENT_ID') client_secret = config.get('CLIENT_SECRET') site_url = config.get('SITE_URL') drive_name = config.get('DOCUMENT_LIBRARY') folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD') local_path_base = config.get('LOCAL_PATH').replace('\\', os.sep) folders_to_download = [f.strip() for f in folders_to_download_str.split(',')] print(f"Connecting via Graph API...") report = [] try: token = get_access_token(tenant_id, client_id, client_secret) headers = {'Authorization': f'Bearer {token}'} print("Getting Site ID...") site_id = get_site_id(headers, site_url) print(f"Getting Drive ID for '{drive_name}'...") drive_id = get_drive_id(headers, site_id, drive_name) for folder in folders_to_download: print(f"\nProcessing folder: {folder}") local_folder_path = os.path.join(local_path_base, folder) download_folder_recursive(headers, drive_id, folder, local_folder_path, report) except Exception as e: print(f"Critical error: {e}") report.append({ "Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat() }) # Generate Report report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer.writeheader() writer.writerows(report) print(f"\nProcess complete.") if report: print(f"Errors found: {len(report)}. See {report_file} for details.") else: print("All downloads successful.") if __name__ == "__main__": main()