Implementer parallel download med ThreadPoolExecutor (max 5 tråde)

This commit is contained in:
Martin Tranberg
2026-03-26 15:15:05 +01:00
parent 6849662483
commit 6333f27037

View File

@@ -2,56 +2,15 @@ import os
import csv
import requests
import time
import re
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from msal import ConfidentialClientApplication
from urllib.parse import urlparse, quote
# Global stats to track progress
stats = {
"total_checked": 0,
"downloaded": 0,
"skipped": 0,
"failed": 0
}
def print_status(current_item="", action="Processing"):
"""Prints a single-line progress update with optional current item and action."""
status_line = f"Checked: {stats['total_checked']} | Downloaded: {stats['downloaded']} | Skipped: {stats['skipped']} | Failed: {stats['failed']}"
if current_item:
max_len = 40
display_item = current_item if len(current_item) <= max_len else "..." + current_item[-(max_len-3):]
status_line += f" | {action}: {display_item}"
sys.stdout.write(f"\r{status_line.ljust(120)}")
sys.stdout.flush()
def sanitize_filename(name):
"""Removes invalid characters and handles Unicode whitespace for Windows."""
if not name:
return "unnamed_item"
name = name.replace('\u00A0', ' ').replace('\u200b', '')
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
name = name.replace(char, '_')
name = "".join(c for c in name if ord(c) >= 32)
name = name.strip(' .')
reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"}
base_part = name.split('.')[0].upper()
if base_part in reserved_names:
name = "_" + name
return name if name else "unnamed_item"
def get_long_path(path):
"""Converts a path to a long-path-aware format on Windows."""
if os.name == 'nt':
abs_path = os.path.abspath(path)
if not abs_path.startswith('\\\\?\\'):
if abs_path.startswith('\\\\'): # UNC path
return '\\\\?\\UNC\\' + abs_path[2:]
return '\\\\?\\' + abs_path
return path
# Configuration for concurrency
MAX_WORKERS = 5
report_lock = threading.Lock()
def load_config(file_path):
config = {}
@@ -99,58 +58,43 @@ def get_drive_id(app, site_id, drive_name):
return drive['id']
raise Exception(f"Drive '{drive_name}' not found in site.")
def download_file(download_url, local_path, expected_size, item_path):
def download_single_file(download_url, local_path, expected_size, display_name):
"""Worker function for a single file download."""
try:
long_local_path = get_long_path(local_path)
# Verify local storage is still accessible
target_dir = os.path.dirname(long_local_path)
if not os.path.exists(os.path.dirname(target_dir.rstrip('\\/')) or target_dir):
# We check the parent of the target dir to see if the drive is still there
pass
if os.path.exists(long_local_path):
if os.path.isdir(long_local_path):
return False, f"Path exists as a directory: {local_path}"
local_size = os.path.getsize(long_local_path)
# Check if file exists and size matches
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size):
return True, "Skipped"
if not os.path.exists(target_dir):
try:
os.makedirs(target_dir, exist_ok=True)
except Exception:
return False, f"Could not create directory (Drive disconnected?): {target_dir}"
elif not os.path.isdir(target_dir):
return False, f"Parent path exists but is not a directory: {target_dir}"
print_status(item_path, action="Downloading")
print(f"Skipped (matches local): {display_name}")
return True, None
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# Using a timeout for the request to prevent hanging indefinitely
response = requests.get(download_url, stream=True, timeout=60)
response.raise_for_status()
with open(long_local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024*1024):
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024*1024): # 1MB chunks
if chunk:
f.write(chunk)
local_size = os.path.getsize(long_local_path)
# Verify size after download
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size):
return True, "Downloaded"
print(f"Downloaded: {display_name}")
return True, None
else:
return False, f"Size mismatch: Remote={expected_size}, Local={local_size}"
except OSError as e:
if e.errno in [2, 3, 15, 21, 53, 67]: # Common drive/network related errors
return False, f"Storage Error (Drive disconnected?): {str(e)}"
return False, f"[OSError {e.errno}] {str(e)}"
except Exception as e:
return False, f"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}"
return False, str(e)
def download_folder_recursive(app, drive_id, item_path, local_root_path, report):
def process_item_list(app, drive_id, item_path, local_root_path, report, executor, futures):
"""Traverses folders and submits file downloads to the executor."""
try:
print_status(item_path, action="Scanning")
headers = get_headers(app)
encoded_path = quote(item_path)
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children"
if not item_path:
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
@@ -160,42 +104,25 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report)
for item in items:
item_name = item['name']
sanitized_name = sanitize_filename(item_name)
local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name))
local_path = os.path.join(local_root_path, item_name)
display_path = f"{item_path}/{item_name}".strip('/')
if 'folder' in item:
sub_item_path = f"{item_path}/{item_name}".strip('/')
download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
process_item_list(app, drive_id, display_path, local_path, report, executor, futures)
elif 'file' in item:
stats["total_checked"] += 1
full_item_path = f"{item_path}/{item_name}".strip('/')
download_url = item.get('@microsoft.graph.downloadUrl')
if not download_url:
stats["failed"] += 1
report.append({"Path": full_item_path, "Error": "No URL", "Timestamp": datetime.now().isoformat()})
with report_lock:
report.append({"Path": display_path, "Error": "No download URL", "Timestamp": datetime.now().isoformat()})
continue
print_status(full_item_path, action="Checking")
success, status = download_file(download_url, local_path, item['size'], full_item_path)
# Submit download to thread pool
future = executor.submit(download_single_file, download_url, local_path, item['size'], display_path)
futures[future] = display_path
if success:
if status == "Downloaded":
stats["downloaded"] += 1
sys.stdout.write(f"\r{' ' * 120}\r")
print(f"Downloaded: {full_item_path}")
else:
stats["skipped"] += 1
else:
stats["failed"] += 1
sys.stdout.write(f"\r{' ' * 120}\r")
print(f"FAILED: {full_item_path} - {status}")
report.append({"Path": full_item_path, "Error": status, "Timestamp": datetime.now().isoformat()})
print_status(full_item_path, action="Done")
except Exception as e:
err_msg = f"Folder error: {str(e)}"
sys.stdout.write(f"\r{' ' * 120}\r")
print(f"FAILED FOLDER: {item_path} - {err_msg}")
report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()})
with report_lock:
report.append({"Path": item_path, "Error": f"Folder error: {str(e)}", "Timestamp": datetime.now().isoformat()})
def main():
config = load_config('connection_info.txt')
@@ -205,53 +132,52 @@ def main():
site_url = config.get('SITE_URL')
drive_name = config.get('DOCUMENT_LIBRARY')
folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '')
local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep)
local_path_base = config.get('LOCAL_PATH', '').replace('\\', os.sep)
# Pre-check: Is the local path accessible?
if not os.path.exists(local_path_base):
print(f"\nCRITICAL ERROR: Local path '{local_path_base}' is not accessible.")
print("Please ensure your network drive is connected and try again.")
return
folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()]
if not folders_to_download:
folders_to_download = [""]
print("Step 1: Authenticating with Microsoft Entra ID...")
app = create_msal_app(tenant_id, client_id, client_secret)
print(f"Connecting via Graph API (Parallel Download, Workers={MAX_WORKERS})...")
report = []
try:
print("Step 2: Connecting to SharePoint Site...")
app = create_msal_app(tenant_id, client_id, client_secret)
site_id = get_site_id(app, site_url)
print(f" - Site found (ID: {site_id[:10]}...)")
print(f"Step 3: Accessing Document Library '{drive_name}'...")
drive_id = get_drive_id(app, site_id, drive_name)
print(f" - Library found (ID: {drive_id[:10]}...)")
for folder in folders_to_download:
folder_parts = [sanitize_filename(p) for p in folder.split('/') if p]
local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts))
print(f"\nStep 4: Processing folder: {folder if folder else 'Root'}")
download_folder_recursive(app, drive_id, folder, local_folder_path, report)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {}
for folder in folders_to_download:
if folder == "":
print("\nProcessing entire document library (Root)...")
else:
print(f"\nProcessing folder: {folder}")
local_folder_path = os.path.join(local_path_base, folder)
process_item_list(app, drive_id, folder, local_folder_path, report, executor, futures)
# Wait for all downloads to complete and collect errors
for future in as_completed(futures):
path = futures[future]
success, error_msg = future.result()
if not success:
print(f"FAILED: {path} - {error_msg}")
with report_lock:
report.append({"Path": path, "Error": error_msg, "Timestamp": datetime.now().isoformat()})
except Exception as e:
print(f"\nCRITICAL ERROR: {e}")
print(f"Critical error: {e}")
report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()})
sys.stdout.write(f"\r{' ' * 120}\r")
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(report_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"])
writer.writeheader()
writer.writerows(report)
print(f"\nProcess complete.")
print(f"Summary: {stats['total_checked']} items checked.")
print(f" - {stats['downloaded']} new files downloaded.")
print(f" - {stats['skipped']} existing files skipped.")
print(f" - {stats['failed']} errors encountered (see {report_file}).")
print(f"\nProcess complete. Errors: {len(report)}")
if __name__ == "__main__":
main()