import os import csv import requests import time import threading import logging import base64 import struct from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from msal import ConfidentialClientApplication from urllib.parse import urlparse, quote # --- Production Configuration --- MAX_WORKERS = 5 MAX_RETRIES = 5 CHUNK_SIZE = 1024 * 1024 # 1MB Chunks MAX_FOLDER_DEPTH = 50 LOG_FILE = "sharepoint_download.log" # Setup Logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) report_lock = threading.Lock() def format_size(size_bytes): for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} EB" def load_config(file_path): config = {} if not os.path.exists(file_path): raise FileNotFoundError(f"Configuration file {file_path} not found.") with open(file_path, 'r', encoding='utf-8') as f: for line in f: if '=' in line: key, value = line.split('=', 1) config[key.strip()] = value.strip().strip('"') return config # --- Punkt 1: Exponential Backoff & Retry Logic --- def retry_request(func): def wrapper(*args, **kwargs): retries = 0 while retries < MAX_RETRIES: try: response = func(*args, **kwargs) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 2 ** retries)) logger.warning(f"Throttled (429). Waiting {retry_after}s...") time.sleep(retry_after) retries += 1 continue response.raise_for_status() return response except requests.exceptions.RequestException as e: retries += 1 wait = 2 ** retries if retries >= MAX_RETRIES: raise e logger.error(f"Request failed: {e}. Retrying in {wait}s...") time.sleep(wait) raise requests.exceptions.RetryError(f"Max retries ({MAX_RETRIES}) exceeded.") return wrapper @retry_request def safe_get(url, headers, stream=False, timeout=60, params=None): return requests.get(url, headers=headers, stream=stream, timeout=timeout, params=params) # --- Punkt 4: Integrity Validation (QuickXorHash) --- def quickxorhash(file_path): """Compute Microsoft QuickXorHash for a file. Returns base64-encoded string. Uses 3 × uint64 cells matching Microsoft's C# reference implementation.""" SHIFT = 11 WIDTH = 160 data = [0, 0, 0] # 3 × 64-bit unsigned integers i = 0 with open(file_path, 'rb') as f: while True: chunk = f.read(CHUNK_SIZE) if not chunk: break for byte in chunk: bit_idx = (i * SHIFT) % WIDTH cell = bit_idx // 64 shift = bit_idx % 64 data[cell] = (data[cell] ^ (byte << shift)) & 0xFFFFFFFFFFFFFFFF i += 1 result = struct.pack('= MAX_FOLDER_DEPTH: logger.warning(f"Max folder depth ({MAX_FOLDER_DEPTH}) reached at: {item_path}. Skipping subtree.") return try: auth_headers = get_headers(app) encoded_path = quote(item_path) if not item_path: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" else: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" while url: response = safe_get(url, headers=auth_headers) data = response.json() items = data.get('value', []) for item in items: item_name = item['name'] local_path = os.path.join(local_root_path, item_name) display_path = f"{item_path}/{item_name}".strip('/') if 'folder' in item: process_item_list(app, drive_id, display_path, local_path, report, executor, futures, depth + 1) elif 'file' in item: item_id = item['id'] download_url = item.get('@microsoft.graph.downloadUrl') remote_hash = item.get('file', {}).get('hashes', {}).get('quickXorHash') future = executor.submit( download_single_file, app, drive_id, item_id, local_path, item['size'], display_path, remote_hash, download_url ) futures[future] = display_path url = data.get('@odata.nextLink') if url: auth_headers = get_headers(app) except Exception as e: logger.error(f"Error traversing {item_path}: {e}") with report_lock: report.append({"Path": item_path, "Error": str(e), "Timestamp": datetime.now().isoformat()}) def create_msal_app(tenant_id, client_id, client_secret): return ConfidentialClientApplication( client_id, authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret ) def main(): try: config = load_config('connection_info.txt') tenant_id = config.get('TENANT_ID', '') client_id = config.get('CLIENT_ID', '') client_secret = config.get('CLIENT_SECRET', '') site_url = config.get('SITE_URL', '') drive_name = config.get('DOCUMENT_LIBRARY', '') folders_str = config.get('FOLDERS_TO_DOWNLOAD', '') local_base = config.get('LOCAL_PATH', '').replace('\\', os.sep) folders = [f.strip() for f in folders_str.split(',') if f.strip()] or [""] logger.info("Initializing SharePoint Production Sync Tool...") app = create_msal_app(tenant_id, client_id, client_secret) site_id = get_site_id(app, site_url) drive_id = get_drive_id(app, site_id, drive_name) report = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="DL") as executor: futures = {} for folder in folders: logger.info(f"Scanning: {folder or 'Root'}") process_item_list(app, drive_id, folder, os.path.join(local_base, folder), report, executor, futures) logger.info(f"Scan complete. Processing {len(futures)} tasks...") for future in as_completed(futures): path = futures[future] success, error = future.result() if not success: logger.error(f"FAILED: {path} | {error}") with report_lock: report.append({"Path": path, "Error": error, "Timestamp": datetime.now().isoformat()}) report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"]) writer.writeheader() writer.writerows(report) logger.info(f"Sync complete. Errors: {len(report)}. Report: {report_file}") except Exception as e: logger.critical(f"FATAL ERROR: {e}") if __name__ == "__main__": main()