Fix [Errno 22] Invalid argument by adding filename sanitization and long path support. Improved error reporting and folder path cleaning for Windows compatibility.

This commit is contained in:
Martin Tranberg
2026-03-26 14:36:48 +01:00
parent aec8858362
commit ef3d8fa56d
2 changed files with 153 additions and 82 deletions

51
GEMINI.md Normal file
View File

@@ -0,0 +1,51 @@
# SharePoint Download Tool
A Python-based utility designed to recursively download folders and files from a specific SharePoint Online Site using the Microsoft Graph API.
## Project Overview
* **Purpose:** Automates the synchronization of specific SharePoint document library folders to a local directory.
* **Technologies:**
* **Python 3.x**
* **Microsoft Graph API:** Used for robust data access.
* **MSAL (Microsoft Authentication Library):** Handles Entra ID (Azure AD) authentication using Client Credentials flow.
* **Requests:** Manages HTTP streaming for large file downloads.
* **Architecture:**
* `download_sharepoint.py`: The core script that orchestrates authentication, site/drive discovery, and recursive folder traversal.
* `connection_info.txt`: Centralized configuration file for credentials and target paths.
* `requirements.txt`: Defines necessary Python dependencies.
## Building and Running
### Prerequisites
* Python 3.x installed.
* A registered application in Microsoft Entra ID with `Sites.Read.All` (or higher) application permissions.
### Setup
1. **Install Dependencies:**
```bash
pip install -r requirements.txt
```
2. **Configure Connection:**
Edit `connection_info.txt` with your specific details:
* `TENANT_ID`, `CLIENT_ID`, `CLIENT_SECRET`
* `SITE_URL`: Full URL to the SharePoint site.
* `DOCUMENT_LIBRARY`: The name of the target library (e.g., "Documents").
* `FOLDERS_TO_DOWNLOAD`: Comma-separated list of folder names to sync.
* `LOCAL_PATH`: The destination path on your local machine.
### Execution
Run the main download script:
```bash
python download_sharepoint.py
```
### Validation
After execution, a CSV report named `download_report_YYYYMMDD_HHMMSS.csv` is generated, detailing any failed downloads or size mismatches for verification.
## Development Conventions
* **Authentication:** Always use the Graph API with MSAL for app-only authentication.
* **Error Handling:** All file and folder operations should be wrapped in try-except blocks, with errors logged to the generated CSV report.
* **Verification:** Post-download verification is performed by comparing the local file size against the `size` property returned by the Graph API.
* **Security:** Never commit `connection_info.txt` or any file containing secrets. Use the provided `.gitignore`.

View File

@@ -2,90 +2,137 @@ import os
import csv import csv
import requests import requests
import time import time
import re
from datetime import datetime from datetime import datetime
from msal import ConfidentialClientApplication from msal import ConfidentialClientApplication
from urllib.parse import urlparse, quote from urllib.parse import urlparse, quote
def sanitize_filename(name):
\"\"\"Removes invalid characters and handles Unicode whitespace for Windows.\"\"\"
if not name:
return \"unnamed_item\"
# Handle Unicode non-breaking spaces (common in SharePoint names)
name = name.replace('\u00A0', ' ').replace('\u200b', '')
# Illegal characters: < > : \" / \ | ? *
invalid_chars = '<>:\"/\\\\|?*'
for char in invalid_chars:
name = name.replace(char, '_')
# Control characters (0-31)
name = \"\".join(c for c in name if ord(c) >= 32)
# Windows doesn't like trailing spaces or dots
name = name.strip(' .')
# Reserved names
reserved_names = {\"CON\", \"PRN\", \"AUX\", \"NUL\", \"COM1\", \"COM2\", \"COM3\", \"COM4\", \"COM5\", \"COM6\", \"COM7\", \"COM8\", \"COM9\",
\"LPT1\", \"LPT2\", \"LPT3\", \"LPT4\", \"LPT5\", \"LPT6\", \"LPT7\", \"LPT8\", \"LPT9\"}
base_part = name.split('.')[0].upper()
if base_part in reserved_names:
name = \"_\" + name
return name if name else \"unnamed_item\"
def get_long_path(path):
\"\"\"Converts a path to a long-path-aware format on Windows.\"\"\"
if os.name == 'nt':
abs_path = os.path.abspath(path)
if not abs_path.startswith('\\\\\\\\?\\\\'):
if abs_path.startswith('\\\\\\\\'): # UNC path
return '\\\\\\\\?\\\\UNC\\\\' + abs_path[2:]
return '\\\\\\\\?\\\\' + abs_path
return path
def load_config(file_path): def load_config(file_path):
config = {} config = {}
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
for line in f: for line in f:
if '=' in line: if '=' in line:
key, value = line.split('=', 1) key, value = line.split('=', 1)
config[key.strip()] = value.strip().strip('"') config[key.strip()] = value.strip().strip('\"')
return config return config
def create_msal_app(tenant_id, client_id, client_secret): def create_msal_app(tenant_id, client_id, client_secret):
return ConfidentialClientApplication( return ConfidentialClientApplication(
client_id, client_id,
authority=f"https://login.microsoftonline.com/{tenant_id}", authority=f\"https://login.microsoftonline.com/{tenant_id}\",
client_credential=client_secret, client_credential=client_secret,
) )
def get_headers(app): def get_headers(app):
"""Acquires a token from cache or fetches a new one if expired.""" \"\"\"Acquires a token from cache or fetches a new one if expired.\"\"\"
scopes = ["https://graph.microsoft.com/.default"] scopes = [\"https://graph.microsoft.com/.default\"]
result = app.acquire_token_for_client(scopes=scopes) result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result: if \"access_token\" in result:
return {'Authorization': f'Bearer {result["access_token"]}'} return {'Authorization': f'Bearer {result[\"access_token\"]}'}
else: else:
raise Exception(f"Could not acquire token: {result.get('error_description')}") raise Exception(f\"Could not acquire token: {result.get('error_description')}\")
def get_site_id(app, site_url): def get_site_id(app, site_url):
headers = get_headers(app) headers = get_headers(app)
parsed = urlparse(site_url) parsed = urlparse(site_url)
hostname = parsed.netloc hostname = parsed.netloc
site_path = parsed.path site_path = parsed.path
url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}" url = f\"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}\"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
return response.json()['id'] return response.json()['id']
def get_drive_id(app, site_id, drive_name): def get_drive_id(app, site_id, drive_name):
headers = get_headers(app) headers = get_headers(app)
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" url = f\"https://graph.microsoft.com/v1.0/sites/{site_id}/drives\"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
drives = response.json().get('value', []) drives = response.json().get('value', [])
for drive in drives: for drive in drives:
if drive['name'] == drive_name: if drive['name'] == drive_name:
return drive['id'] return drive['id']
raise Exception(f"Drive '{drive_name}' not found in site.") raise Exception(f\"Drive '{drive_name}' not found in site.\")
def download_file(download_url, local_path, expected_size): def download_file(download_url, local_path, expected_size):
try: try:
# Check if file exists and size matches long_local_path = get_long_path(local_path)
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size):
return True, "Skipped"
os.makedirs(os.path.dirname(local_path), exist_ok=True) # Check if file exists and size matches
response = requests.get(download_url, stream=True) if os.path.exists(long_local_path):
if os.path.isdir(long_local_path):
return False, f\"Path exists as a directory: {local_path}\"
local_size = os.path.getsize(long_local_path)
if int(local_size) == int(expected_size):
return True, \"Skipped\"
target_dir = os.path.dirname(long_local_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
elif not os.path.isdir(target_dir):
return False, f\"Parent path exists but is not a directory: {target_dir}\"
# Stream download with timeout and 1MB chunks
response = requests.get(download_url, stream=True, timeout=60)
response.raise_for_status() response.raise_for_status()
with open(local_path, 'wb') as f: with open(long_local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192): for chunk in response.iter_content(chunk_size=1024*1024):
if chunk:
f.write(chunk) f.write(chunk)
# Verify size after download local_size = os.path.getsize(long_local_path)
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size): if int(local_size) == int(expected_size):
return True, "Downloaded" return True, \"Downloaded\"
else: else:
return False, f"Size mismatch after download: Remote={expected_size}, Local={local_size}" return False, f\"Size mismatch: Remote={expected_size}, Local={local_size}\"
except Exception as e: except Exception as e:
return False, str(e) return False, f\"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}\"
def download_folder_recursive(app, drive_id, item_path, local_root_path, report): def download_folder_recursive(app, drive_id, item_path, local_root_path, report):
try: try:
# Get fresh headers for this folder batch to ensure token is valid
headers = get_headers(app) headers = get_headers(app)
encoded_path = quote(item_path) encoded_path = quote(item_path)
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children\"
if not item_path: if not item_path:
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children\"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
@@ -93,40 +140,29 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report)
for item in items: for item in items:
item_name = item['name'] item_name = item['name']
local_path = os.path.join(local_root_path, item_name) sanitized_name = sanitize_filename(item_name)
local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name))
if 'folder' in item: if 'folder' in item:
sub_item_path = f"{item_path}/{item_name}".strip('/') sub_item_path = f\"{item_path}/{item_name}\".strip('/')
download_folder_recursive(app, drive_id, sub_item_path, local_path, report) download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
elif 'file' in item: elif 'file' in item:
download_url = item.get('@microsoft.graph.downloadUrl') download_url = item.get('@microsoft.graph.downloadUrl')
if not download_url: if not download_url:
report.append({ report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": \"No URL\", \"Timestamp\": datetime.now().isoformat()})
"Path": f"{item_path}/{item_name}",
"Error": "No download URL available",
"Timestamp": datetime.now().isoformat()
})
continue continue
success, status = download_file(download_url, local_path, item['size']) success, status = download_file(download_url, local_path, item['size'])
if success: if success:
if status == "Downloaded": if status != \"Skipped\":
print(f"Downloaded: {item_path}/{item_name}") print(f\"Downloaded: {item_path}/{item_name}\")
else: else:
print(f"Skipped (matches local): {item_path}/{item_name}") print(f\"FAILED: {item_path}/{item_name} - {status}\")
else: report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": status, \"Timestamp\": datetime.now().isoformat()})
print(f"FAILED: {item_path}/{item_name} - {status}")
report.append({
"Path": f"{item_path}/{item_name}",
"Error": status,
"Timestamp": datetime.now().isoformat()
})
except Exception as e: except Exception as e:
report.append({ err_msg = f\"Folder error: {str(e)}\"
"Path": item_path, print(f\"FAILED FOLDER: {item_path} - {err_msg}\")
"Error": f"Folder processing error: {str(e)}", report.append({\"Path\": item_path, \"Error\": err_msg, \"Timestamp\": datetime.now().isoformat()})
"Timestamp": datetime.now().isoformat()
})
def main(): def main():
config = load_config('connection_info.txt') config = load_config('connection_info.txt')
@@ -136,55 +172,39 @@ def main():
site_url = config.get('SITE_URL') site_url = config.get('SITE_URL')
drive_name = config.get('DOCUMENT_LIBRARY') drive_name = config.get('DOCUMENT_LIBRARY')
folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '') folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '')
local_path_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep)
folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()]
# If no folders specified, download everything from root
if not folders_to_download: if not folders_to_download:
folders_to_download = [""] # Empty string represents root folder folders_to_download = [\"\"]
print(f"Connecting via Graph API (with auto-refresh and skip logic)...")
print(f\"Connecting via Graph API...\")
report = [] report = []
try: try:
app = create_msal_app(tenant_id, client_id, client_secret) app = create_msal_app(tenant_id, client_id, client_secret)
print("Getting Site ID...")
site_id = get_site_id(app, site_url) site_id = get_site_id(app, site_url)
print(f"Getting Drive ID for '{drive_name}'...")
drive_id = get_drive_id(app, site_id, drive_name) drive_id = get_drive_id(app, site_id, drive_name)
for folder in folders_to_download: for folder in folders_to_download:
if folder == "": # Clean folder paths from config
print("\nProcessing entire document library (Root)...") folder_parts = [sanitize_filename(p) for p in folder.split('/') if p]
else: local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts))
print(f"\nProcessing folder: {folder}")
local_folder_path = os.path.join(local_path_base, folder) print(f\"\\nProcessing: {folder if folder else 'Root'}\")
download_folder_recursive(app, drive_id, folder, local_folder_path, report) download_folder_recursive(app, drive_id, folder, local_folder_path, report)
except Exception as e: except Exception as e:
print(f"Critical error: {e}") print(f\"Critical error: {e}\")
report.append({ report.append({\"Path\": \"GENERAL\", \"Error\": str(e), \"Timestamp\": datetime.now().isoformat()})
"Path": "GENERAL",
"Error": str(e),
"Timestamp": datetime.now().isoformat()
})
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" report_file = f\"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv\"
with open(report_file, 'w', newline='', encoding='utf-8') as f: with open(report_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer = csv.DictWriter(f, fieldnames=[\"Path\", \"Error\", \"Timestamp\"])
writer.writeheader() writer.writeheader()
writer.writerows(report) writer.writerows(report)
print(f"\nProcess complete.") print(f\"\\nProcess complete. Errors: {len(report)}\")
if report:
print(f"Errors found: {len(report)}. See {report_file} for details.")
else:
print("All downloads successful.")
if __name__ == "__main__": if __name__ == \"__main__\":
main() main()