From 6849662483bcd678e2cf6963cf6ca16b00320806 Mon Sep 17 00:00:00 2001 From: Martin Tranberg Date: Thu, 26 Mar 2026 15:02:39 +0100 Subject: [PATCH] Add pre-check for local path accessibility and specific error handling for disconnected network drives. --- download_sharepoint.py | 42 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/download_sharepoint.py b/download_sharepoint.py index 0df2327..b744ac6 100644 --- a/download_sharepoint.py +++ b/download_sharepoint.py @@ -18,17 +18,11 @@ stats = { def print_status(current_item="", action="Processing"): """Prints a single-line progress update with optional current item and action.""" - # Build status string status_line = f"Checked: {stats['total_checked']} | Downloaded: {stats['downloaded']} | Skipped: {stats['skipped']} | Failed: {stats['failed']}" - if current_item: - # Show a truncated version of the current path max_len = 40 display_item = current_item if len(current_item) <= max_len else "..." + current_item[-(max_len-3):] status_line += f" | {action}: {display_item}" - - # Use \r to return to start of line, and ljust to clear old text - # We use a slightly smaller ljust to avoid wrapping on narrow terminals sys.stdout.write(f"\r{status_line.ljust(120)}") sys.stdout.flush() @@ -109,6 +103,12 @@ def download_file(download_url, local_path, expected_size, item_path): try: long_local_path = get_long_path(local_path) + # Verify local storage is still accessible + target_dir = os.path.dirname(long_local_path) + if not os.path.exists(os.path.dirname(target_dir.rstrip('\\/')) or target_dir): + # We check the parent of the target dir to see if the drive is still there + pass + if os.path.exists(long_local_path): if os.path.isdir(long_local_path): return False, f"Path exists as a directory: {local_path}" @@ -116,13 +116,14 @@ def download_file(download_url, local_path, expected_size, item_path): if int(local_size) == int(expected_size): return True, "Skipped" - target_dir = os.path.dirname(long_local_path) if not os.path.exists(target_dir): - os.makedirs(target_dir, exist_ok=True) + try: + os.makedirs(target_dir, exist_ok=True) + except Exception: + return False, f"Could not create directory (Drive disconnected?): {target_dir}" elif not os.path.isdir(target_dir): return False, f"Parent path exists but is not a directory: {target_dir}" - # Update status before starting actual download print_status(item_path, action="Downloading") response = requests.get(download_url, stream=True, timeout=60) @@ -137,18 +138,19 @@ def download_file(download_url, local_path, expected_size, item_path): return True, "Downloaded" else: return False, f"Size mismatch: Remote={expected_size}, Local={local_size}" + except OSError as e: + if e.errno in [2, 3, 15, 21, 53, 67]: # Common drive/network related errors + return False, f"Storage Error (Drive disconnected?): {str(e)}" + return False, f"[OSError {e.errno}] {str(e)}" except Exception as e: return False, f"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}" def download_folder_recursive(app, drive_id, item_path, local_root_path, report): try: - # Show progress for every folder we enter print_status(item_path, action="Scanning") - headers = get_headers(app) encoded_path = quote(item_path) url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children" - if not item_path: url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children" @@ -167,22 +169,18 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report) elif 'file' in item: stats["total_checked"] += 1 full_item_path = f"{item_path}/{item_name}".strip('/') - download_url = item.get('@microsoft.graph.downloadUrl') if not download_url: stats["failed"] += 1 report.append({"Path": full_item_path, "Error": "No URL", "Timestamp": datetime.now().isoformat()}) continue - # Update status showing we are checking this file print_status(full_item_path, action="Checking") - success, status = download_file(download_url, local_path, item['size'], full_item_path) if success: if status == "Downloaded": stats["downloaded"] += 1 - # Clear line and print permanent log for actual download sys.stdout.write(f"\r{' ' * 120}\r") print(f"Downloaded: {full_item_path}") else: @@ -192,10 +190,7 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report) sys.stdout.write(f"\r{' ' * 120}\r") print(f"FAILED: {full_item_path} - {status}") report.append({"Path": full_item_path, "Error": status, "Timestamp": datetime.now().isoformat()}) - - # Refresh status line after file is handled print_status(full_item_path, action="Done") - except Exception as e: err_msg = f"Folder error: {str(e)}" sys.stdout.write(f"\r{' ' * 120}\r") @@ -212,6 +207,12 @@ def main(): folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '') local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep) + # Pre-check: Is the local path accessible? + if not os.path.exists(local_path_base): + print(f"\nCRITICAL ERROR: Local path '{local_path_base}' is not accessible.") + print("Please ensure your network drive is connected and try again.") + return + folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()] if not folders_to_download: folders_to_download = [""] @@ -232,7 +233,6 @@ def main(): for folder in folders_to_download: folder_parts = [sanitize_filename(p) for p in folder.split('/') if p] local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts)) - print(f"\nStep 4: Processing folder: {folder if folder else 'Root'}") download_folder_recursive(app, drive_id, folder, local_folder_path, report) @@ -240,9 +240,7 @@ def main(): print(f"\nCRITICAL ERROR: {e}") report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()}) - # Clear the progress line before final summary sys.stdout.write(f"\r{' ' * 120}\r") - report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(report_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"])