diff --git a/GEMINI.md b/GEMINI.md new file mode 100644 index 0000000..38ab393 --- /dev/null +++ b/GEMINI.md @@ -0,0 +1,51 @@ +# SharePoint Download Tool + +A Python-based utility designed to recursively download folders and files from a specific SharePoint Online Site using the Microsoft Graph API. + +## Project Overview + +* **Purpose:** Automates the synchronization of specific SharePoint document library folders to a local directory. +* **Technologies:** + * **Python 3.x** + * **Microsoft Graph API:** Used for robust data access. + * **MSAL (Microsoft Authentication Library):** Handles Entra ID (Azure AD) authentication using Client Credentials flow. + * **Requests:** Manages HTTP streaming for large file downloads. +* **Architecture:** + * `download_sharepoint.py`: The core script that orchestrates authentication, site/drive discovery, and recursive folder traversal. + * `connection_info.txt`: Centralized configuration file for credentials and target paths. + * `requirements.txt`: Defines necessary Python dependencies. + +## Building and Running + +### Prerequisites +* Python 3.x installed. +* A registered application in Microsoft Entra ID with `Sites.Read.All` (or higher) application permissions. + +### Setup +1. **Install Dependencies:** + ```bash + pip install -r requirements.txt + ``` +2. **Configure Connection:** + Edit `connection_info.txt` with your specific details: + * `TENANT_ID`, `CLIENT_ID`, `CLIENT_SECRET` + * `SITE_URL`: Full URL to the SharePoint site. + * `DOCUMENT_LIBRARY`: The name of the target library (e.g., "Documents"). + * `FOLDERS_TO_DOWNLOAD`: Comma-separated list of folder names to sync. + * `LOCAL_PATH`: The destination path on your local machine. + +### Execution +Run the main download script: +```bash +python download_sharepoint.py +``` + +### Validation +After execution, a CSV report named `download_report_YYYYMMDD_HHMMSS.csv` is generated, detailing any failed downloads or size mismatches for verification. + +## Development Conventions + +* **Authentication:** Always use the Graph API with MSAL for app-only authentication. +* **Error Handling:** All file and folder operations should be wrapped in try-except blocks, with errors logged to the generated CSV report. +* **Verification:** Post-download verification is performed by comparing the local file size against the `size` property returned by the Graph API. +* **Security:** Never commit `connection_info.txt` or any file containing secrets. Use the provided `.gitignore`. diff --git a/download_sharepoint.py b/download_sharepoint.py index 4482fa1..5749132 100644 --- a/download_sharepoint.py +++ b/download_sharepoint.py @@ -2,90 +2,137 @@ import os import csv import requests import time +import re from datetime import datetime from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote +def sanitize_filename(name): + \"\"\"Removes invalid characters and handles Unicode whitespace for Windows.\"\"\" + if not name: + return \"unnamed_item\" + + # Handle Unicode non-breaking spaces (common in SharePoint names) + name = name.replace('\u00A0', ' ').replace('\u200b', '') + + # Illegal characters: < > : \" / \ | ? * + invalid_chars = '<>:\"/\\\\|?*' + for char in invalid_chars: + name = name.replace(char, '_') + + # Control characters (0-31) + name = \"\".join(c for c in name if ord(c) >= 32) + + # Windows doesn't like trailing spaces or dots + name = name.strip(' .') + + # Reserved names + reserved_names = {\"CON\", \"PRN\", \"AUX\", \"NUL\", \"COM1\", \"COM2\", \"COM3\", \"COM4\", \"COM5\", \"COM6\", \"COM7\", \"COM8\", \"COM9\", + \"LPT1\", \"LPT2\", \"LPT3\", \"LPT4\", \"LPT5\", \"LPT6\", \"LPT7\", \"LPT8\", \"LPT9\"} + base_part = name.split('.')[0].upper() + if base_part in reserved_names: + name = \"_\" + name + + return name if name else \"unnamed_item\" + +def get_long_path(path): + \"\"\"Converts a path to a long-path-aware format on Windows.\"\"\" + if os.name == 'nt': + abs_path = os.path.abspath(path) + if not abs_path.startswith('\\\\\\\\?\\\\'): + if abs_path.startswith('\\\\\\\\'): # UNC path + return '\\\\\\\\?\\\\UNC\\\\' + abs_path[2:] + return '\\\\\\\\?\\\\' + abs_path + return path + def load_config(file_path): config = {} with open(file_path, 'r', encoding='utf-8') as f: for line in f: if '=' in line: key, value = line.split('=', 1) - config[key.strip()] = value.strip().strip('"') + config[key.strip()] = value.strip().strip('\"') return config def create_msal_app(tenant_id, client_id, client_secret): return ConfidentialClientApplication( client_id, - authority=f"https://login.microsoftonline.com/{tenant_id}", + authority=f\"https://login.microsoftonline.com/{tenant_id}\", client_credential=client_secret, ) def get_headers(app): - """Acquires a token from cache or fetches a new one if expired.""" - scopes = ["https://graph.microsoft.com/.default"] + \"\"\"Acquires a token from cache or fetches a new one if expired.\"\"\" + scopes = [\"https://graph.microsoft.com/.default\"] result = app.acquire_token_for_client(scopes=scopes) - if "access_token" in result: - return {'Authorization': f'Bearer {result["access_token"]}'} + if \"access_token\" in result: + return {'Authorization': f'Bearer {result[\"access_token\"]}'} else: - raise Exception(f"Could not acquire token: {result.get('error_description')}") + raise Exception(f\"Could not acquire token: {result.get('error_description')}\") def get_site_id(app, site_url): headers = get_headers(app) parsed = urlparse(site_url) hostname = parsed.netloc site_path = parsed.path - url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}" + url = f\"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}\" response = requests.get(url, headers=headers) response.raise_for_status() return response.json()['id'] def get_drive_id(app, site_id, drive_name): headers = get_headers(app) - url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" + url = f\"https://graph.microsoft.com/v1.0/sites/{site_id}/drives\" response = requests.get(url, headers=headers) response.raise_for_status() drives = response.json().get('value', []) for drive in drives: if drive['name'] == drive_name: return drive['id'] - raise Exception(f"Drive '{drive_name}' not found in site.") + raise Exception(f\"Drive '{drive_name}' not found in site.\") def download_file(download_url, local_path, expected_size): try: - # Check if file exists and size matches - if os.path.exists(local_path): - local_size = os.path.getsize(local_path) - if int(local_size) == int(expected_size): - return True, "Skipped" - - os.makedirs(os.path.dirname(local_path), exist_ok=True) - response = requests.get(download_url, stream=True) - response.raise_for_status() - with open(local_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + long_local_path = get_long_path(local_path) - # Verify size after download - local_size = os.path.getsize(local_path) + # Check if file exists and size matches + if os.path.exists(long_local_path): + if os.path.isdir(long_local_path): + return False, f\"Path exists as a directory: {local_path}\" + local_size = os.path.getsize(long_local_path) + if int(local_size) == int(expected_size): + return True, \"Skipped\" + + target_dir = os.path.dirname(long_local_path) + if not os.path.exists(target_dir): + os.makedirs(target_dir, exist_ok=True) + elif not os.path.isdir(target_dir): + return False, f\"Parent path exists but is not a directory: {target_dir}\" + + # Stream download with timeout and 1MB chunks + response = requests.get(download_url, stream=True, timeout=60) + response.raise_for_status() + with open(long_local_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=1024*1024): + if chunk: + f.write(chunk) + + local_size = os.path.getsize(long_local_path) if int(local_size) == int(expected_size): - return True, "Downloaded" + return True, \"Downloaded\" else: - return False, f"Size mismatch after download: Remote={expected_size}, Local={local_size}" + return False, f\"Size mismatch: Remote={expected_size}, Local={local_size}\" except Exception as e: - return False, str(e) + return False, f\"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}\" def download_folder_recursive(app, drive_id, item_path, local_root_path, report): try: - # Get fresh headers for this folder batch to ensure token is valid headers = get_headers(app) - encoded_path = quote(item_path) - url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" + url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children\" if not item_path: - url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" + url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children\" response = requests.get(url, headers=headers) response.raise_for_status() @@ -93,40 +140,29 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report) for item in items: item_name = item['name'] - local_path = os.path.join(local_root_path, item_name) + sanitized_name = sanitize_filename(item_name) + local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name)) if 'folder' in item: - sub_item_path = f"{item_path}/{item_name}".strip('/') + sub_item_path = f\"{item_path}/{item_name}\".strip('/') download_folder_recursive(app, drive_id, sub_item_path, local_path, report) elif 'file' in item: download_url = item.get('@microsoft.graph.downloadUrl') if not download_url: - report.append({ - "Path": f"{item_path}/{item_name}", - "Error": "No download URL available", - "Timestamp": datetime.now().isoformat() - }) + report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": \"No URL\", \"Timestamp\": datetime.now().isoformat()}) continue success, status = download_file(download_url, local_path, item['size']) if success: - if status == "Downloaded": - print(f"Downloaded: {item_path}/{item_name}") - else: - print(f"Skipped (matches local): {item_path}/{item_name}") + if status != \"Skipped\": + print(f\"Downloaded: {item_path}/{item_name}\") else: - print(f"FAILED: {item_path}/{item_name} - {status}") - report.append({ - "Path": f"{item_path}/{item_name}", - "Error": status, - "Timestamp": datetime.now().isoformat() - }) + print(f\"FAILED: {item_path}/{item_name} - {status}\") + report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": status, \"Timestamp\": datetime.now().isoformat()}) except Exception as e: - report.append({ - "Path": item_path, - "Error": f"Folder processing error: {str(e)}", - "Timestamp": datetime.now().isoformat() - }) + err_msg = f\"Folder error: {str(e)}\" + print(f\"FAILED FOLDER: {item_path} - {err_msg}\") + report.append({\"Path\": item_path, \"Error\": err_msg, \"Timestamp\": datetime.now().isoformat()}) def main(): config = load_config('connection_info.txt') @@ -136,55 +172,39 @@ def main(): site_url = config.get('SITE_URL') drive_name = config.get('DOCUMENT_LIBRARY') folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '') - local_path_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) + local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep) folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] - - # If no folders specified, download everything from root if not folders_to_download: - folders_to_download = [""] # Empty string represents root folder + folders_to_download = [\"\"] - print(f"Connecting via Graph API (with auto-refresh and skip logic)...") - + print(f\"Connecting via Graph API...\") report = [] try: app = create_msal_app(tenant_id, client_id, client_secret) - - print("Getting Site ID...") site_id = get_site_id(app, site_url) - - print(f"Getting Drive ID for '{drive_name}'...") drive_id = get_drive_id(app, site_id, drive_name) for folder in folders_to_download: - if folder == "": - print("\nProcessing entire document library (Root)...") - else: - print(f"\nProcessing folder: {folder}") + # Clean folder paths from config + folder_parts = [sanitize_filename(p) for p in folder.split('/') if p] + local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts)) - local_folder_path = os.path.join(local_path_base, folder) + print(f\"\\nProcessing: {folder if folder else 'Root'}\") download_folder_recursive(app, drive_id, folder, local_folder_path, report) except Exception as e: - print(f"Critical error: {e}") - report.append({ - "Path": "GENERAL", - "Error": str(e), - "Timestamp": datetime.now().isoformat() - }) + print(f\"Critical error: {e}\") + report.append({\"Path\": \"GENERAL\", \"Error\": str(e), \"Timestamp\": datetime.now().isoformat()}) - report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + report_file = f\"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv\" with open(report_file, 'w', newline='', encoding='utf-8') as f: - writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) + writer = csv.DictWriter(f, fieldnames=[\"Path\", \"Error\", \"Timestamp\"]) writer.writeheader() writer.writerows(report) - print(f"\nProcess complete.") - if report: - print(f"Errors found: {len(report)}. See {report_file} for details.") - else: - print("All downloads successful.") + print(f\"\\nProcess complete. Errors: {len(report)}\") -if __name__ == "__main__": +if __name__ == \"__main__\": main()