Add progress indicator and detailed status updates to the download process.

This commit is contained in:
Martin Tranberg
2026-03-26 14:44:12 +01:00
parent e3a3824a1c
commit 95622ee1a8

View File

@@ -3,36 +3,40 @@ import csv
import requests import requests
import time import time
import re import re
import sys
from datetime import datetime from datetime import datetime
from msal import ConfidentialClientApplication from msal import ConfidentialClientApplication
from urllib.parse import urlparse, quote from urllib.parse import urlparse, quote
# Global stats to track progress
stats = {
"total_checked": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0
}
def print_progress(force=False):
"""Prints a single-line progress update."""
if force or stats["total_checked"] % 50 == 0:
sys.stdout.write(f"\rStatus: {stats['total_checked']} checked, {stats['downloaded']} downloaded, {stats['skipped']} skipped, {stats['failed']} failed... ")
sys.stdout.flush()
def sanitize_filename(name): def sanitize_filename(name):
"""Removes invalid characters and handles Unicode whitespace for Windows.""" """Removes invalid characters and handles Unicode whitespace for Windows."""
if not name: if not name:
return "unnamed_item" return "unnamed_item"
# Handle Unicode non-breaking spaces (common in SharePoint names)
name = name.replace('\u00A0', ' ').replace('\u200b', '') name = name.replace('\u00A0', ' ').replace('\u200b', '')
# Illegal characters: < > : " / \ | ? *
invalid_chars = '<>:"/\\|?*' invalid_chars = '<>:"/\\|?*'
for char in invalid_chars: for char in invalid_chars:
name = name.replace(char, '_') name = name.replace(char, '_')
# Control characters (0-31)
name = "".join(c for c in name if ord(c) >= 32) name = "".join(c for c in name if ord(c) >= 32)
# Windows doesn't like trailing spaces or dots
name = name.strip(' .') name = name.strip(' .')
# Reserved names
reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"} "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"}
base_part = name.split('.')[0].upper() base_part = name.split('.')[0].upper()
if base_part in reserved_names: if base_part in reserved_names:
name = "_" + name name = "_" + name
return name if name else "unnamed_item" return name if name else "unnamed_item"
def get_long_path(path): def get_long_path(path):
@@ -95,7 +99,6 @@ def download_file(download_url, local_path, expected_size):
try: try:
long_local_path = get_long_path(local_path) long_local_path = get_long_path(local_path)
# Check if file exists and size matches
if os.path.exists(long_local_path): if os.path.exists(long_local_path):
if os.path.isdir(long_local_path): if os.path.isdir(long_local_path):
return False, f"Path exists as a directory: {local_path}" return False, f"Path exists as a directory: {local_path}"
@@ -109,7 +112,6 @@ def download_file(download_url, local_path, expected_size):
elif not os.path.isdir(target_dir): elif not os.path.isdir(target_dir):
return False, f"Parent path exists but is not a directory: {target_dir}" return False, f"Parent path exists but is not a directory: {target_dir}"
# Stream download with timeout and 1MB chunks
response = requests.get(download_url, stream=True, timeout=60) response = requests.get(download_url, stream=True, timeout=60)
response.raise_for_status() response.raise_for_status()
with open(long_local_path, 'wb') as f: with open(long_local_path, 'wb') as f:
@@ -147,21 +149,30 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report)
sub_item_path = f"{item_path}/{item_name}".strip('/') sub_item_path = f"{item_path}/{item_name}".strip('/')
download_folder_recursive(app, drive_id, sub_item_path, local_path, report) download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
elif 'file' in item: elif 'file' in item:
stats["total_checked"] += 1
download_url = item.get('@microsoft.graph.downloadUrl') download_url = item.get('@microsoft.graph.downloadUrl')
if not download_url: if not download_url:
stats["failed"] += 1
report.append({"Path": f"{item_path}/{item_name}", "Error": "No URL", "Timestamp": datetime.now().isoformat()}) report.append({"Path": f"{item_path}/{item_name}", "Error": "No URL", "Timestamp": datetime.now().isoformat()})
continue continue
success, status = download_file(download_url, local_path, item['size']) success, status = download_file(download_url, local_path, item['size'])
if success: if success:
if status != "Skipped": if status == "Downloaded":
print(f"Downloaded: {item_path}/{item_name}") stats["downloaded"] += 1
sys.stdout.write(f"\nDownloaded: {item_path}/{item_name}\n")
else: else:
print(f"FAILED: {item_path}/{item_name} - {status}") stats["skipped"] += 1
else:
stats["failed"] += 1
sys.stdout.write(f"\nFAILED: {item_path}/{item_name} - {status}\n")
report.append({"Path": f"{item_path}/{item_name}", "Error": status, "Timestamp": datetime.now().isoformat()}) report.append({"Path": f"{item_path}/{item_name}", "Error": status, "Timestamp": datetime.now().isoformat()})
print_progress()
except Exception as e: except Exception as e:
err_msg = f"Folder error: {str(e)}" err_msg = f"Folder error: {str(e)}"
print(f"FAILED FOLDER: {item_path} - {err_msg}") sys.stdout.write(f"\nFAILED FOLDER: {item_path} - {err_msg}\n")
report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()}) report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()})
def main(): def main():
@@ -178,25 +189,30 @@ def main():
if not folders_to_download: if not folders_to_download:
folders_to_download = [""] folders_to_download = [""]
print(f"Connecting via Graph API...") print("Step 1: Authenticating with Microsoft Entra ID...")
report = [] app = create_msal_app(tenant_id, client_id, client_secret)
try: try:
app = create_msal_app(tenant_id, client_id, client_secret) print("Step 2: Connecting to SharePoint Site...")
site_id = get_site_id(app, site_url) site_id = get_site_id(app, site_url)
print(f" - Site found (ID: {site_id[:10]}...)")
print(f"Step 3: Accessing Document Library '{drive_name}'...")
drive_id = get_drive_id(app, site_id, drive_name) drive_id = get_drive_id(app, site_id, drive_name)
print(f" - Library found (ID: {drive_id[:10]}...)")
for folder in folders_to_download: for folder in folders_to_download:
# Clean folder paths from config
folder_parts = [sanitize_filename(p) for p in folder.split('/') if p] folder_parts = [sanitize_filename(p) for p in folder.split('/') if p]
local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts)) local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts))
print(f"\nProcessing: {folder if folder else 'Root'}") print(f"\nStep 4: Processing folder: {folder if folder else 'Root'}")
download_folder_recursive(app, drive_id, folder, local_folder_path, report) download_folder_recursive(app, drive_id, folder, local_folder_path, report := [])
except Exception as e: except Exception as e:
print(f"Critical error: {e}") print(f"\nCRITICAL ERROR: {e}")
report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()}) report = [{"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()}]
print_progress(force=True)
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(report_file, 'w', newline='', encoding='utf-8') as f: with open(report_file, 'w', newline='', encoding='utf-8') as f:
@@ -204,7 +220,11 @@ def main():
writer.writeheader() writer.writeheader()
writer.writerows(report) writer.writerows(report)
print(f"\nProcess complete. Errors: {len(report)}") print(f"\n\nProcess complete.")
print(f"Summary: {stats['total_checked']} items checked.")
print(f" - {stats['downloaded']} new files downloaded.")
print(f" - {stats['skipped']} existing files skipped.")
print(f" - {stats['failed']} errors encountered (see {report_file}).")
if __name__ == "__main__": if __name__ == "__main__":
main() main()