Compare commits

...

2 Commits

3 changed files with 156 additions and 83 deletions

51
GEMINI.md Normal file
View File

@@ -0,0 +1,51 @@
# SharePoint Download Tool
A Python-based utility designed to recursively download folders and files from a specific SharePoint Online Site using the Microsoft Graph API.
## Project Overview
* **Purpose:** Automates the synchronization of specific SharePoint document library folders to a local directory.
* **Technologies:**
* **Python 3.x**
* **Microsoft Graph API:** Used for robust data access.
* **MSAL (Microsoft Authentication Library):** Handles Entra ID (Azure AD) authentication using Client Credentials flow.
* **Requests:** Manages HTTP streaming for large file downloads.
* **Architecture:**
* `download_sharepoint.py`: The core script that orchestrates authentication, site/drive discovery, and recursive folder traversal.
* `connection_info.txt`: Centralized configuration file for credentials and target paths.
* `requirements.txt`: Defines necessary Python dependencies.
## Building and Running
### Prerequisites
* Python 3.x installed.
* A registered application in Microsoft Entra ID with `Sites.Read.All` (or higher) application permissions.
### Setup
1. **Install Dependencies:**
```bash
pip install -r requirements.txt
```
2. **Configure Connection:**
Edit `connection_info.txt` with your specific details:
* `TENANT_ID`, `CLIENT_ID`, `CLIENT_SECRET`
* `SITE_URL`: Full URL to the SharePoint site.
* `DOCUMENT_LIBRARY`: The name of the target library (e.g., "Documents").
* `FOLDERS_TO_DOWNLOAD`: Comma-separated list of folder names to sync.
* `LOCAL_PATH`: The destination path on your local machine.
### Execution
Run the main download script:
```bash
python download_sharepoint.py
```
### Validation
After execution, a CSV report named `download_report_YYYYMMDD_HHMMSS.csv` is generated, detailing any failed downloads or size mismatches for verification.
## Development Conventions
* **Authentication:** Always use the Graph API with MSAL for app-only authentication.
* **Error Handling:** All file and folder operations should be wrapped in try-except blocks, with errors logged to the generated CSV report.
* **Verification:** Post-download verification is performed by comparing the local file size against the `size` property returned by the Graph API.
* **Security:** Never commit `connection_info.txt` or any file containing secrets. Use the provided `.gitignore`.

View File

@@ -5,9 +5,11 @@ Dette script gør det muligt at downloade specifikke mapper fra et SharePoint do
## Funktioner ## Funktioner
* **Rekursiv Download:** Downloader alle undermapper og filer i de valgte mapper. * **Rekursiv Download:** Downloader alle undermapper og filer i de valgte mapper.
* **Filnavn-sanitering:** Håndterer ulovlige tegn (f.eks. `<`, `>`, `:`, `"`, `/`, `\`, `|`, `?`, `*`) og Unicode-mellemrum, så SharePoint-filer altid kan gemmes på Windows.
* **Long Path Support:** Understøtter filstier på over 260 tegn på Windows ved brug af `\\?\` præfiks.
* **Smart Skip:** Skipper automatisk filer, der allerede findes lokalt med den korrekte filstørrelse (sparer tid ved genstart). * **Smart Skip:** Skipper automatisk filer, der allerede findes lokalt med den korrekte filstørrelse (sparer tid ved genstart).
* **Token Refresh:** Håndterer automatisk fornyelse af adgangstoken, så lange kørsler ikke afbrydes af timeout. * **Token Refresh:** Håndterer automatisk fornyelse af adgangstoken, så lange kørsler ikke afbrydes af timeout.
* **Fejlrapportering:** Genererer en CSV-fil med detaljer om eventuelle fejl undervejs. * **Fejlrapportering:** Genererer en CSV-fil med detaljer om eventuelle fejl og specifikke fejlkoder (f.eks. `[Error 22]`).
* **Dataintegritet:** Sammenligner lokal filstørrelse med SharePoint-størrelsen for at sikre korrekt overførsel. * **Dataintegritet:** Sammenligner lokal filstørrelse med SharePoint-størrelsen for at sikre korrekt overførsel.
* **Entra ID Integration:** Benytter MSAL for sikker godkendelse via Client Credentials flow. * **Entra ID Integration:** Benytter MSAL for sikker godkendelse via Client Credentials flow.

View File

@@ -2,90 +2,137 @@ import os
import csv import csv
import requests import requests
import time import time
import re
from datetime import datetime from datetime import datetime
from msal import ConfidentialClientApplication from msal import ConfidentialClientApplication
from urllib.parse import urlparse, quote from urllib.parse import urlparse, quote
def sanitize_filename(name):
\"\"\"Removes invalid characters and handles Unicode whitespace for Windows.\"\"\"
if not name:
return \"unnamed_item\"
# Handle Unicode non-breaking spaces (common in SharePoint names)
name = name.replace('\u00A0', ' ').replace('\u200b', '')
# Illegal characters: < > : \" / \ | ? *
invalid_chars = '<>:\"/\\\\|?*'
for char in invalid_chars:
name = name.replace(char, '_')
# Control characters (0-31)
name = \"\".join(c for c in name if ord(c) >= 32)
# Windows doesn't like trailing spaces or dots
name = name.strip(' .')
# Reserved names
reserved_names = {\"CON\", \"PRN\", \"AUX\", \"NUL\", \"COM1\", \"COM2\", \"COM3\", \"COM4\", \"COM5\", \"COM6\", \"COM7\", \"COM8\", \"COM9\",
\"LPT1\", \"LPT2\", \"LPT3\", \"LPT4\", \"LPT5\", \"LPT6\", \"LPT7\", \"LPT8\", \"LPT9\"}
base_part = name.split('.')[0].upper()
if base_part in reserved_names:
name = \"_\" + name
return name if name else \"unnamed_item\"
def get_long_path(path):
\"\"\"Converts a path to a long-path-aware format on Windows.\"\"\"
if os.name == 'nt':
abs_path = os.path.abspath(path)
if not abs_path.startswith('\\\\\\\\?\\\\'):
if abs_path.startswith('\\\\\\\\'): # UNC path
return '\\\\\\\\?\\\\UNC\\\\' + abs_path[2:]
return '\\\\\\\\?\\\\' + abs_path
return path
def load_config(file_path): def load_config(file_path):
config = {} config = {}
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
for line in f: for line in f:
if '=' in line: if '=' in line:
key, value = line.split('=', 1) key, value = line.split('=', 1)
config[key.strip()] = value.strip().strip('"') config[key.strip()] = value.strip().strip('\"')
return config return config
def create_msal_app(tenant_id, client_id, client_secret): def create_msal_app(tenant_id, client_id, client_secret):
return ConfidentialClientApplication( return ConfidentialClientApplication(
client_id, client_id,
authority=f"https://login.microsoftonline.com/{tenant_id}", authority=f\"https://login.microsoftonline.com/{tenant_id}\",
client_credential=client_secret, client_credential=client_secret,
) )
def get_headers(app): def get_headers(app):
"""Acquires a token from cache or fetches a new one if expired.""" \"\"\"Acquires a token from cache or fetches a new one if expired.\"\"\"
scopes = ["https://graph.microsoft.com/.default"] scopes = [\"https://graph.microsoft.com/.default\"]
result = app.acquire_token_for_client(scopes=scopes) result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result: if \"access_token\" in result:
return {'Authorization': f'Bearer {result["access_token"]}'} return {'Authorization': f'Bearer {result[\"access_token\"]}'}
else: else:
raise Exception(f"Could not acquire token: {result.get('error_description')}") raise Exception(f\"Could not acquire token: {result.get('error_description')}\")
def get_site_id(app, site_url): def get_site_id(app, site_url):
headers = get_headers(app) headers = get_headers(app)
parsed = urlparse(site_url) parsed = urlparse(site_url)
hostname = parsed.netloc hostname = parsed.netloc
site_path = parsed.path site_path = parsed.path
url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}" url = f\"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}\"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
return response.json()['id'] return response.json()['id']
def get_drive_id(app, site_id, drive_name): def get_drive_id(app, site_id, drive_name):
headers = get_headers(app) headers = get_headers(app)
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives" url = f\"https://graph.microsoft.com/v1.0/sites/{site_id}/drives\"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
drives = response.json().get('value', []) drives = response.json().get('value', [])
for drive in drives: for drive in drives:
if drive['name'] == drive_name: if drive['name'] == drive_name:
return drive['id'] return drive['id']
raise Exception(f"Drive '{drive_name}' not found in site.") raise Exception(f\"Drive '{drive_name}' not found in site.\")
def download_file(download_url, local_path, expected_size): def download_file(download_url, local_path, expected_size):
try: try:
# Check if file exists and size matches long_local_path = get_long_path(local_path)
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size):
return True, "Skipped"
os.makedirs(os.path.dirname(local_path), exist_ok=True)
response = requests.get(download_url, stream=True)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify size after download # Check if file exists and size matches
local_size = os.path.getsize(local_path) if os.path.exists(long_local_path):
if os.path.isdir(long_local_path):
return False, f\"Path exists as a directory: {local_path}\"
local_size = os.path.getsize(long_local_path)
if int(local_size) == int(expected_size):
return True, \"Skipped\"
target_dir = os.path.dirname(long_local_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
elif not os.path.isdir(target_dir):
return False, f\"Parent path exists but is not a directory: {target_dir}\"
# Stream download with timeout and 1MB chunks
response = requests.get(download_url, stream=True, timeout=60)
response.raise_for_status()
with open(long_local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024*1024):
if chunk:
f.write(chunk)
local_size = os.path.getsize(long_local_path)
if int(local_size) == int(expected_size): if int(local_size) == int(expected_size):
return True, "Downloaded" return True, \"Downloaded\"
else: else:
return False, f"Size mismatch after download: Remote={expected_size}, Local={local_size}" return False, f\"Size mismatch: Remote={expected_size}, Local={local_size}\"
except Exception as e: except Exception as e:
return False, str(e) return False, f\"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}\"
def download_folder_recursive(app, drive_id, item_path, local_root_path, report): def download_folder_recursive(app, drive_id, item_path, local_root_path, report):
try: try:
# Get fresh headers for this folder batch to ensure token is valid
headers = get_headers(app) headers = get_headers(app)
encoded_path = quote(item_path) encoded_path = quote(item_path)
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children\"
if not item_path: if not item_path:
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children\"
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status() response.raise_for_status()
@@ -93,40 +140,29 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report)
for item in items: for item in items:
item_name = item['name'] item_name = item['name']
local_path = os.path.join(local_root_path, item_name) sanitized_name = sanitize_filename(item_name)
local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name))
if 'folder' in item: if 'folder' in item:
sub_item_path = f"{item_path}/{item_name}".strip('/') sub_item_path = f\"{item_path}/{item_name}\".strip('/')
download_folder_recursive(app, drive_id, sub_item_path, local_path, report) download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
elif 'file' in item: elif 'file' in item:
download_url = item.get('@microsoft.graph.downloadUrl') download_url = item.get('@microsoft.graph.downloadUrl')
if not download_url: if not download_url:
report.append({ report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": \"No URL\", \"Timestamp\": datetime.now().isoformat()})
"Path": f"{item_path}/{item_name}",
"Error": "No download URL available",
"Timestamp": datetime.now().isoformat()
})
continue continue
success, status = download_file(download_url, local_path, item['size']) success, status = download_file(download_url, local_path, item['size'])
if success: if success:
if status == "Downloaded": if status != \"Skipped\":
print(f"Downloaded: {item_path}/{item_name}") print(f\"Downloaded: {item_path}/{item_name}\")
else:
print(f"Skipped (matches local): {item_path}/{item_name}")
else: else:
print(f"FAILED: {item_path}/{item_name} - {status}") print(f\"FAILED: {item_path}/{item_name} - {status}\")
report.append({ report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": status, \"Timestamp\": datetime.now().isoformat()})
"Path": f"{item_path}/{item_name}",
"Error": status,
"Timestamp": datetime.now().isoformat()
})
except Exception as e: except Exception as e:
report.append({ err_msg = f\"Folder error: {str(e)}\"
"Path": item_path, print(f\"FAILED FOLDER: {item_path} - {err_msg}\")
"Error": f"Folder processing error: {str(e)}", report.append({\"Path\": item_path, \"Error\": err_msg, \"Timestamp\": datetime.now().isoformat()})
"Timestamp": datetime.now().isoformat()
})
def main(): def main():
config = load_config('connection_info.txt') config = load_config('connection_info.txt')
@@ -136,55 +172,39 @@ def main():
site_url = config.get('SITE_URL') site_url = config.get('SITE_URL')
drive_name = config.get('DOCUMENT_LIBRARY') drive_name = config.get('DOCUMENT_LIBRARY')
folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '') folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '')
local_path_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep)
folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()]
# If no folders specified, download everything from root
if not folders_to_download: if not folders_to_download:
folders_to_download = [""] # Empty string represents root folder folders_to_download = [\"\"]
print(f"Connecting via Graph API (with auto-refresh and skip logic)...") print(f\"Connecting via Graph API...\")
report = [] report = []
try: try:
app = create_msal_app(tenant_id, client_id, client_secret) app = create_msal_app(tenant_id, client_id, client_secret)
print("Getting Site ID...")
site_id = get_site_id(app, site_url) site_id = get_site_id(app, site_url)
print(f"Getting Drive ID for '{drive_name}'...")
drive_id = get_drive_id(app, site_id, drive_name) drive_id = get_drive_id(app, site_id, drive_name)
for folder in folders_to_download: for folder in folders_to_download:
if folder == "": # Clean folder paths from config
print("\nProcessing entire document library (Root)...") folder_parts = [sanitize_filename(p) for p in folder.split('/') if p]
else: local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts))
print(f"\nProcessing folder: {folder}")
local_folder_path = os.path.join(local_path_base, folder) print(f\"\\nProcessing: {folder if folder else 'Root'}\")
download_folder_recursive(app, drive_id, folder, local_folder_path, report) download_folder_recursive(app, drive_id, folder, local_folder_path, report)
except Exception as e: except Exception as e:
print(f"Critical error: {e}") print(f\"Critical error: {e}\")
report.append({ report.append({\"Path\": \"GENERAL\", \"Error\": str(e), \"Timestamp\": datetime.now().isoformat()})
"Path": "GENERAL",
"Error": str(e),
"Timestamp": datetime.now().isoformat()
})
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" report_file = f\"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv\"
with open(report_file, 'w', newline='', encoding='utf-8') as f: with open(report_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer = csv.DictWriter(f, fieldnames=[\"Path\", \"Error\", \"Timestamp\"])
writer.writeheader() writer.writeheader()
writer.writerows(report) writer.writerows(report)
print(f"\nProcess complete.") print(f\"\\nProcess complete. Errors: {len(report)}\")
if report:
print(f"Errors found: {len(report)}. See {report_file} for details.")
else:
print("All downloads successful.")
if __name__ == "__main__": if __name__ == \"__main__\":
main() main()