diff --git a/download_sharepoint.py b/download_sharepoint.py index 5749132..cfc4e61 100644 --- a/download_sharepoint.py +++ b/download_sharepoint.py @@ -8,41 +8,41 @@ from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote def sanitize_filename(name): - \"\"\"Removes invalid characters and handles Unicode whitespace for Windows.\"\"\" + """Removes invalid characters and handles Unicode whitespace for Windows.""" if not name: - return \"unnamed_item\" + return "unnamed_item" # Handle Unicode non-breaking spaces (common in SharePoint names) name = name.replace('\u00A0', ' ').replace('\u200b', '') - # Illegal characters: < > : \" / \ | ? * - invalid_chars = '<>:\"/\\\\|?*' + # Illegal characters: < > : " / \ | ? * + invalid_chars = '<>:"/\\|?*' for char in invalid_chars: name = name.replace(char, '_') # Control characters (0-31) - name = \"\".join(c for c in name if ord(c) >= 32) + name = "".join(c for c in name if ord(c) >= 32) # Windows doesn't like trailing spaces or dots name = name.strip(' .') # Reserved names - reserved_names = {\"CON\", \"PRN\", \"AUX\", \"NUL\", \"COM1\", \"COM2\", \"COM3\", \"COM4\", \"COM5\", \"COM6\", \"COM7\", \"COM8\", \"COM9\", - \"LPT1\", \"LPT2\", \"LPT3\", \"LPT4\", \"LPT5\", \"LPT6\", \"LPT7\", \"LPT8\", \"LPT9\"} + reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", + "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"} base_part = name.split('.')[0].upper() if base_part in reserved_names: - name = \"_\" + name + name = "_" + name - return name if name else \"unnamed_item\" + return name if name else "unnamed_item" def get_long_path(path): - \"\"\"Converts a path to a long-path-aware format on Windows.\"\"\" + """Converts a path to a long-path-aware format on Windows.""" if os.name == 'nt': abs_path = os.path.abspath(path) - if not abs_path.startswith('\\\\\\\\?\\\\'): - if abs_path.startswith('\\\\\\\\'): # UNC path - return '\\\\\\\\?\\\\UNC\\\\' + abs_path[2:] - return '\\\\\\\\?\\\\' + abs_path + if not abs_path.startswith('\\\\?\\'): + if abs_path.startswith('\\\\'): # UNC path + return '\\\\?\\UNC\\' + abs_path[2:] + return '\\\\?\\' + abs_path return path def load_config(file_path): @@ -51,45 +51,45 @@ def load_config(file_path): for line in f: if '=' in line: key, value = line.split('=', 1) - config[key.strip()] = value.strip().strip('\"') + config[key.strip()] = value.strip().strip('"') return config def create_msal_app(tenant_id, client_id, client_secret): return ConfidentialClientApplication( client_id, - authority=f\"https://login.microsoftonline.com/{tenant_id}\", + authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret, ) def get_headers(app): - \"\"\"Acquires a token from cache or fetches a new one if expired.\"\"\" - scopes = [\"https://graph.microsoft.com/.default\"] + """Acquires a token from cache or fetches a new one if expired.""" + scopes = ["https://graph.microsoft.com/.default"] result = app.acquire_token_for_client(scopes=scopes) - if \"access_token\" in result: - return {'Authorization': f'Bearer {result[\"access_token\"]}'} + if "access_token" in result: + return {'Authorization': f'Bearer {result["access_token"]}'} else: - raise Exception(f\"Could not acquire token: {result.get('error_description')}\") + raise Exception(f"Could not acquire token: {result.get('error_description')}") def get_site_id(app, site_url): headers = get_headers(app) parsed = urlparse(site_url) hostname = parsed.netloc site_path = parsed.path - url = f\"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}\" + url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()['id'] def get_drive_id(app, site_id, drive_name): headers = get_headers(app) - url = f\"https://graph.microsoft.com/v1.0/sites/{site_id}/drives\" + url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" response = requests.get(url, headers=headers) response.raise_for_status() drives = response.json().get('value', []) for drive in drives: if drive['name'] == drive_name: return drive['id'] - raise Exception(f\"Drive '{drive_name}' not found in site.\") + raise Exception(f"Drive '{drive_name}' not found in site.") def download_file(download_url, local_path, expected_size): try: @@ -98,16 +98,16 @@ def download_file(download_url, local_path, expected_size): # Check if file exists and size matches if os.path.exists(long_local_path): if os.path.isdir(long_local_path): - return False, f\"Path exists as a directory: {local_path}\" + return False, f"Path exists as a directory: {local_path}" local_size = os.path.getsize(long_local_path) if int(local_size) == int(expected_size): - return True, \"Skipped\" + return True, "Skipped" target_dir = os.path.dirname(long_local_path) if not os.path.exists(target_dir): os.makedirs(target_dir, exist_ok=True) elif not os.path.isdir(target_dir): - return False, f\"Parent path exists but is not a directory: {target_dir}\" + return False, f"Parent path exists but is not a directory: {target_dir}" # Stream download with timeout and 1MB chunks response = requests.get(download_url, stream=True, timeout=60) @@ -119,20 +119,20 @@ def download_file(download_url, local_path, expected_size): local_size = os.path.getsize(long_local_path) if int(local_size) == int(expected_size): - return True, \"Downloaded\" + return True, "Downloaded" else: - return False, f\"Size mismatch: Remote={expected_size}, Local={local_size}\" + return False, f"Size mismatch: Remote={expected_size}, Local={local_size}" except Exception as e: - return False, f\"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}\" + return False, f"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}" def download_folder_recursive(app, drive_id, item_path, local_root_path, report): try: headers = get_headers(app) encoded_path = quote(item_path) - url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children\" + url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" if not item_path: - url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children\" + url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" response = requests.get(url, headers=headers) response.raise_for_status() @@ -144,25 +144,25 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report) local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name)) if 'folder' in item: - sub_item_path = f\"{item_path}/{item_name}\".strip('/') + sub_item_path = f"{item_path}/{item_name}".strip('/') download_folder_recursive(app, drive_id, sub_item_path, local_path, report) elif 'file' in item: download_url = item.get('@microsoft.graph.downloadUrl') if not download_url: - report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": \"No URL\", \"Timestamp\": datetime.now().isoformat()}) + report.append({"Path": f"{item_path}/{item_name}", "Error": "No URL", "Timestamp": datetime.now().isoformat()}) continue success, status = download_file(download_url, local_path, item['size']) if success: - if status != \"Skipped\": - print(f\"Downloaded: {item_path}/{item_name}\") + if status != "Skipped": + print(f"Downloaded: {item_path}/{item_name}") else: - print(f\"FAILED: {item_path}/{item_name} - {status}\") - report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": status, \"Timestamp\": datetime.now().isoformat()}) + print(f"FAILED: {item_path}/{item_name} - {status}") + report.append({"Path": f"{item_path}/{item_name}", "Error": status, "Timestamp": datetime.now().isoformat()}) except Exception as e: - err_msg = f\"Folder error: {str(e)}\" - print(f\"FAILED FOLDER: {item_path} - {err_msg}\") - report.append({\"Path\": item_path, \"Error\": err_msg, \"Timestamp\": datetime.now().isoformat()}) + err_msg = f"Folder error: {str(e)}" + print(f"FAILED FOLDER: {item_path} - {err_msg}") + report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()}) def main(): config = load_config('connection_info.txt') @@ -176,9 +176,9 @@ def main(): folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] if not folders_to_download: - folders_to_download = [\"\"] + folders_to_download = [""] - print(f\"Connecting via Graph API...\") + print(f"Connecting via Graph API...") report = [] try: @@ -191,20 +191,20 @@ def main(): folder_parts = [sanitize_filename(p) for p in folder.split('/') if p] local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts)) - print(f\"\\nProcessing: {folder if folder else 'Root'}\") + print(f"\nProcessing: {folder if folder else 'Root'}") download_folder_recursive(app, drive_id, folder, local_folder_path, report) except Exception as e: - print(f\"Critical error: {e}\") - report.append({\"Path\": \"GENERAL\", \"Error\": str(e), \"Timestamp\": datetime.now().isoformat()}) + print(f"Critical error: {e}") + report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()}) - report_file = f\"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv\" + report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: - writer = csv.DictWriter(f, fieldnames=[\"Path\", \"Error\", \"Timestamp\"]) + writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer.writeheader() writer.writerows(report) - print(f\"\\nProcess complete. Errors: {len(report)}\") + print(f"\nProcess complete. Errors: {len(report)}") -if __name__ == \"__main__\": +if __name__ == "__main__": main()