260 lines
11 KiB
Python
260 lines
11 KiB
Python
import os
|
|
import csv
|
|
import requests
|
|
import time
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from msal import ConfidentialClientApplication
|
|
from urllib.parse import urlparse, quote
|
|
|
|
# Global stats to track progress
|
|
stats = {
|
|
"total_checked": 0,
|
|
"downloaded": 0,
|
|
"skipped": 0,
|
|
"failed": 0
|
|
}
|
|
|
|
def print_status(current_item="", action="Processing"):
|
|
"""Prints a single-line progress update with optional current item and action."""
|
|
# Build status string
|
|
status_line = f"Checked: {stats['total_checked']} | Downloaded: {stats['downloaded']} | Skipped: {stats['skipped']} | Failed: {stats['failed']}"
|
|
|
|
if current_item:
|
|
# Show a truncated version of the current path
|
|
max_len = 40
|
|
display_item = current_item if len(current_item) <= max_len else "..." + current_item[-(max_len-3):]
|
|
status_line += f" | {action}: {display_item}"
|
|
|
|
# Use \r to return to start of line, and ljust to clear old text
|
|
# We use a slightly smaller ljust to avoid wrapping on narrow terminals
|
|
sys.stdout.write(f"\r{status_line.ljust(120)}")
|
|
sys.stdout.flush()
|
|
|
|
def sanitize_filename(name):
|
|
"""Removes invalid characters and handles Unicode whitespace for Windows."""
|
|
if not name:
|
|
return "unnamed_item"
|
|
name = name.replace('\u00A0', ' ').replace('\u200b', '')
|
|
invalid_chars = '<>:"/\\|?*'
|
|
for char in invalid_chars:
|
|
name = name.replace(char, '_')
|
|
name = "".join(c for c in name if ord(c) >= 32)
|
|
name = name.strip(' .')
|
|
reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
|
|
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"}
|
|
base_part = name.split('.')[0].upper()
|
|
if base_part in reserved_names:
|
|
name = "_" + name
|
|
return name if name else "unnamed_item"
|
|
|
|
def get_long_path(path):
|
|
"""Converts a path to a long-path-aware format on Windows."""
|
|
if os.name == 'nt':
|
|
abs_path = os.path.abspath(path)
|
|
if not abs_path.startswith('\\\\?\\'):
|
|
if abs_path.startswith('\\\\'): # UNC path
|
|
return '\\\\?\\UNC\\' + abs_path[2:]
|
|
return '\\\\?\\' + abs_path
|
|
return path
|
|
|
|
def load_config(file_path):
|
|
config = {}
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
config[key.strip()] = value.strip().strip('"')
|
|
return config
|
|
|
|
def create_msal_app(tenant_id, client_id, client_secret):
|
|
return ConfidentialClientApplication(
|
|
client_id,
|
|
authority=f"https://login.microsoftonline.com/{tenant_id}",
|
|
client_credential=client_secret,
|
|
)
|
|
|
|
def get_headers(app):
|
|
"""Acquires a token from cache or fetches a new one if expired."""
|
|
scopes = ["https://graph.microsoft.com/.default"]
|
|
result = app.acquire_token_for_client(scopes=scopes)
|
|
if "access_token" in result:
|
|
return {'Authorization': f'Bearer {result["access_token"]}'}
|
|
else:
|
|
raise Exception(f"Could not acquire token: {result.get('error_description')}")
|
|
|
|
def get_site_id(app, site_url):
|
|
headers = get_headers(app)
|
|
parsed = urlparse(site_url)
|
|
hostname = parsed.netloc
|
|
site_path = parsed.path
|
|
url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()['id']
|
|
|
|
def get_drive_id(app, site_id, drive_name):
|
|
headers = get_headers(app)
|
|
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
drives = response.json().get('value', [])
|
|
for drive in drives:
|
|
if drive['name'] == drive_name:
|
|
return drive['id']
|
|
raise Exception(f"Drive '{drive_name}' not found in site.")
|
|
|
|
def download_file(download_url, local_path, expected_size, item_path):
|
|
try:
|
|
long_local_path = get_long_path(local_path)
|
|
|
|
if os.path.exists(long_local_path):
|
|
if os.path.isdir(long_local_path):
|
|
return False, f"Path exists as a directory: {local_path}"
|
|
local_size = os.path.getsize(long_local_path)
|
|
if int(local_size) == int(expected_size):
|
|
return True, "Skipped"
|
|
|
|
target_dir = os.path.dirname(long_local_path)
|
|
if not os.path.exists(target_dir):
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
elif not os.path.isdir(target_dir):
|
|
return False, f"Parent path exists but is not a directory: {target_dir}"
|
|
|
|
# Update status before starting actual download
|
|
print_status(item_path, action="Downloading")
|
|
|
|
response = requests.get(download_url, stream=True, timeout=60)
|
|
response.raise_for_status()
|
|
with open(long_local_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=1024*1024):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
local_size = os.path.getsize(long_local_path)
|
|
if int(local_size) == int(expected_size):
|
|
return True, "Downloaded"
|
|
else:
|
|
return False, f"Size mismatch: Remote={expected_size}, Local={local_size}"
|
|
except Exception as e:
|
|
return False, f"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}"
|
|
|
|
def download_folder_recursive(app, drive_id, item_path, local_root_path, report):
|
|
try:
|
|
# Show progress for every folder we enter
|
|
print_status(item_path, action="Scanning")
|
|
|
|
headers = get_headers(app)
|
|
encoded_path = quote(item_path)
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children"
|
|
|
|
if not item_path:
|
|
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
|
|
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
items = response.json().get('value', [])
|
|
|
|
for item in items:
|
|
item_name = item['name']
|
|
sanitized_name = sanitize_filename(item_name)
|
|
local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name))
|
|
|
|
if 'folder' in item:
|
|
sub_item_path = f"{item_path}/{item_name}".strip('/')
|
|
download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
|
|
elif 'file' in item:
|
|
stats["total_checked"] += 1
|
|
full_item_path = f"{item_path}/{item_name}".strip('/')
|
|
|
|
download_url = item.get('@microsoft.graph.downloadUrl')
|
|
if not download_url:
|
|
stats["failed"] += 1
|
|
report.append({"Path": full_item_path, "Error": "No URL", "Timestamp": datetime.now().isoformat()})
|
|
continue
|
|
|
|
# Update status showing we are checking this file
|
|
print_status(full_item_path, action="Checking")
|
|
|
|
success, status = download_file(download_url, local_path, item['size'], full_item_path)
|
|
|
|
if success:
|
|
if status == "Downloaded":
|
|
stats["downloaded"] += 1
|
|
# Clear line and print permanent log for actual download
|
|
sys.stdout.write(f"\r{' ' * 120}\r")
|
|
print(f"Downloaded: {full_item_path}")
|
|
else:
|
|
stats["skipped"] += 1
|
|
else:
|
|
stats["failed"] += 1
|
|
sys.stdout.write(f"\r{' ' * 120}\r")
|
|
print(f"FAILED: {full_item_path} - {status}")
|
|
report.append({"Path": full_item_path, "Error": status, "Timestamp": datetime.now().isoformat()})
|
|
|
|
# Refresh status line after file is handled
|
|
print_status(full_item_path, action="Done")
|
|
|
|
except Exception as e:
|
|
err_msg = f"Folder error: {str(e)}"
|
|
sys.stdout.write(f"\r{' ' * 120}\r")
|
|
print(f"FAILED FOLDER: {item_path} - {err_msg}")
|
|
report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()})
|
|
|
|
def main():
|
|
config = load_config('connection_info.txt')
|
|
tenant_id = config.get('TENANT_ID')
|
|
client_id = config.get('CLIENT_ID')
|
|
client_secret = config.get('CLIENT_SECRET')
|
|
site_url = config.get('SITE_URL')
|
|
drive_name = config.get('DOCUMENT_LIBRARY')
|
|
folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD', '')
|
|
local_path_base = config.get('LOCAL_PATH', '').replace('\\\\', os.sep)
|
|
|
|
folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()]
|
|
if not folders_to_download:
|
|
folders_to_download = [""]
|
|
|
|
print("Step 1: Authenticating with Microsoft Entra ID...")
|
|
app = create_msal_app(tenant_id, client_id, client_secret)
|
|
|
|
report = []
|
|
try:
|
|
print("Step 2: Connecting to SharePoint Site...")
|
|
site_id = get_site_id(app, site_url)
|
|
print(f" - Site found (ID: {site_id[:10]}...)")
|
|
|
|
print(f"Step 3: Accessing Document Library '{drive_name}'...")
|
|
drive_id = get_drive_id(app, site_id, drive_name)
|
|
print(f" - Library found (ID: {drive_id[:10]}...)")
|
|
|
|
for folder in folders_to_download:
|
|
folder_parts = [sanitize_filename(p) for p in folder.split('/') if p]
|
|
local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts))
|
|
|
|
print(f"\nStep 4: Processing folder: {folder if folder else 'Root'}")
|
|
download_folder_recursive(app, drive_id, folder, local_folder_path, report)
|
|
|
|
except Exception as e:
|
|
print(f"\nCRITICAL ERROR: {e}")
|
|
report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()})
|
|
|
|
# Clear the progress line before final summary
|
|
sys.stdout.write(f"\r{' ' * 120}\r")
|
|
|
|
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
with open(report_file, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"])
|
|
writer.writeheader()
|
|
writer.writerows(report)
|
|
|
|
print(f"\nProcess complete.")
|
|
print(f"Summary: {stats['total_checked']} items checked.")
|
|
print(f" - {stats['downloaded']} new files downloaded.")
|
|
print(f" - {stats['skipped']} existing files skipped.")
|
|
print(f" - {stats['failed']} errors encountered (see {report_file}).")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|