Fix SyntaxError in download_sharepoint.py caused by incorrect escaping during previous update.

This commit is contained in:
Martin Tranberg
2026-03-26 14:41:28 +01:00
parent 81d5a2f822
commit e3a3824a1c

View File

@@ -8,41 +8,41 @@ from msal import ConfidentialClientApplication
from urllib.parse import urlparse, quote
def sanitize_filename(name):
\"\"\"Removes invalid characters and handles Unicode whitespace for Windows.\"\"\"
"""Removes invalid characters and handles Unicode whitespace for Windows."""
if not name:
return \"unnamed_item\"
return "unnamed_item"
# Handle Unicode non-breaking spaces (common in SharePoint names)
name = name.replace('\u00A0', ' ').replace('\u200b', '')
# Illegal characters: < > : \" / \ | ? *
invalid_chars = '<>:\"/\\\\|?*'
# Illegal characters: < > : " / \ | ? *
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
name = name.replace(char, '_')
# Control characters (0-31)
name = \"\".join(c for c in name if ord(c) >= 32)
name = "".join(c for c in name if ord(c) >= 32)
# Windows doesn't like trailing spaces or dots
name = name.strip(' .')
# Reserved names
reserved_names = {\"CON\", \"PRN\", \"AUX\", \"NUL\", \"COM1\", \"COM2\", \"COM3\", \"COM4\", \"COM5\", \"COM6\", \"COM7\", \"COM8\", \"COM9\",
\"LPT1\", \"LPT2\", \"LPT3\", \"LPT4\", \"LPT5\", \"LPT6\", \"LPT7\", \"LPT8\", \"LPT9\"}
reserved_names = {"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9",
"LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9"}
base_part = name.split('.')[0].upper()
if base_part in reserved_names:
name = \"_\" + name
name = "_" + name
return name if name else \"unnamed_item\"
return name if name else "unnamed_item"
def get_long_path(path):
\"\"\"Converts a path to a long-path-aware format on Windows.\"\"\"
"""Converts a path to a long-path-aware format on Windows."""
if os.name == 'nt':
abs_path = os.path.abspath(path)
if not abs_path.startswith('\\\\\\\\?\\\\'):
if abs_path.startswith('\\\\\\\\'): # UNC path
return '\\\\\\\\?\\\\UNC\\\\' + abs_path[2:]
return '\\\\\\\\?\\\\' + abs_path
if not abs_path.startswith('\\\\?\\'):
if abs_path.startswith('\\\\'): # UNC path
return '\\\\?\\UNC\\' + abs_path[2:]
return '\\\\?\\' + abs_path
return path
def load_config(file_path):
@@ -51,45 +51,45 @@ def load_config(file_path):
for line in f:
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip().strip('\"')
config[key.strip()] = value.strip().strip('"')
return config
def create_msal_app(tenant_id, client_id, client_secret):
return ConfidentialClientApplication(
client_id,
authority=f\"https://login.microsoftonline.com/{tenant_id}\",
authority=f"https://login.microsoftonline.com/{tenant_id}",
client_credential=client_secret,
)
def get_headers(app):
\"\"\"Acquires a token from cache or fetches a new one if expired.\"\"\"
scopes = [\"https://graph.microsoft.com/.default\"]
"""Acquires a token from cache or fetches a new one if expired."""
scopes = ["https://graph.microsoft.com/.default"]
result = app.acquire_token_for_client(scopes=scopes)
if \"access_token\" in result:
return {'Authorization': f'Bearer {result[\"access_token\"]}'}
if "access_token" in result:
return {'Authorization': f'Bearer {result["access_token"]}'}
else:
raise Exception(f\"Could not acquire token: {result.get('error_description')}\")
raise Exception(f"Could not acquire token: {result.get('error_description')}")
def get_site_id(app, site_url):
headers = get_headers(app)
parsed = urlparse(site_url)
hostname = parsed.netloc
site_path = parsed.path
url = f\"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}\"
url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()['id']
def get_drive_id(app, site_id, drive_name):
headers = get_headers(app)
url = f\"https://graph.microsoft.com/v1.0/sites/{site_id}/drives\"
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
response = requests.get(url, headers=headers)
response.raise_for_status()
drives = response.json().get('value', [])
for drive in drives:
if drive['name'] == drive_name:
return drive['id']
raise Exception(f\"Drive '{drive_name}' not found in site.\")
raise Exception(f"Drive '{drive_name}' not found in site.")
def download_file(download_url, local_path, expected_size):
try:
@@ -98,16 +98,16 @@ def download_file(download_url, local_path, expected_size):
# Check if file exists and size matches
if os.path.exists(long_local_path):
if os.path.isdir(long_local_path):
return False, f\"Path exists as a directory: {local_path}\"
return False, f"Path exists as a directory: {local_path}"
local_size = os.path.getsize(long_local_path)
if int(local_size) == int(expected_size):
return True, \"Skipped\"
return True, "Skipped"
target_dir = os.path.dirname(long_local_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
elif not os.path.isdir(target_dir):
return False, f\"Parent path exists but is not a directory: {target_dir}\"
return False, f"Parent path exists but is not a directory: {target_dir}"
# Stream download with timeout and 1MB chunks
response = requests.get(download_url, stream=True, timeout=60)
@@ -119,20 +119,20 @@ def download_file(download_url, local_path, expected_size):
local_size = os.path.getsize(long_local_path)
if int(local_size) == int(expected_size):
return True, \"Downloaded\"
return True, "Downloaded"
else:
return False, f\"Size mismatch: Remote={expected_size}, Local={local_size}\"
return False, f"Size mismatch: Remote={expected_size}, Local={local_size}"
except Exception as e:
return False, f\"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}\"
return False, f"[Error {getattr(e, 'errno', 'unknown')}] {str(e)}"
def download_folder_recursive(app, drive_id, item_path, local_root_path, report):
try:
headers = get_headers(app)
encoded_path = quote(item_path)
url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children\"
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children"
if not item_path:
url = f\"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children\"
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
response = requests.get(url, headers=headers)
response.raise_for_status()
@@ -144,25 +144,25 @@ def download_folder_recursive(app, drive_id, item_path, local_root_path, report)
local_path = os.path.normpath(os.path.join(local_root_path, sanitized_name))
if 'folder' in item:
sub_item_path = f\"{item_path}/{item_name}\".strip('/')
sub_item_path = f"{item_path}/{item_name}".strip('/')
download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
elif 'file' in item:
download_url = item.get('@microsoft.graph.downloadUrl')
if not download_url:
report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": \"No URL\", \"Timestamp\": datetime.now().isoformat()})
report.append({"Path": f"{item_path}/{item_name}", "Error": "No URL", "Timestamp": datetime.now().isoformat()})
continue
success, status = download_file(download_url, local_path, item['size'])
if success:
if status != \"Skipped\":
print(f\"Downloaded: {item_path}/{item_name}\")
if status != "Skipped":
print(f"Downloaded: {item_path}/{item_name}")
else:
print(f\"FAILED: {item_path}/{item_name} - {status}\")
report.append({\"Path\": f\"{item_path}/{item_name}\", \"Error\": status, \"Timestamp\": datetime.now().isoformat()})
print(f"FAILED: {item_path}/{item_name} - {status}")
report.append({"Path": f"{item_path}/{item_name}", "Error": status, "Timestamp": datetime.now().isoformat()})
except Exception as e:
err_msg = f\"Folder error: {str(e)}\"
print(f\"FAILED FOLDER: {item_path} - {err_msg}\")
report.append({\"Path\": item_path, \"Error\": err_msg, \"Timestamp\": datetime.now().isoformat()})
err_msg = f"Folder error: {str(e)}"
print(f"FAILED FOLDER: {item_path} - {err_msg}")
report.append({"Path": item_path, "Error": err_msg, "Timestamp": datetime.now().isoformat()})
def main():
config = load_config('connection_info.txt')
@@ -176,9 +176,9 @@ def main():
folders_to_download = [f.strip() for f in folders_to_download_str.split(',') if f.strip()]
if not folders_to_download:
folders_to_download = [\"\"]
folders_to_download = [""]
print(f\"Connecting via Graph API...\")
print(f"Connecting via Graph API...")
report = []
try:
@@ -191,20 +191,20 @@ def main():
folder_parts = [sanitize_filename(p) for p in folder.split('/') if p]
local_folder_path = os.path.normpath(os.path.join(local_path_base, *folder_parts))
print(f\"\\nProcessing: {folder if folder else 'Root'}\")
print(f"\nProcessing: {folder if folder else 'Root'}")
download_folder_recursive(app, drive_id, folder, local_folder_path, report)
except Exception as e:
print(f\"Critical error: {e}\")
report.append({\"Path\": \"GENERAL\", \"Error\": str(e), \"Timestamp\": datetime.now().isoformat()})
print(f"Critical error: {e}")
report.append({"Path": "GENERAL", "Error": str(e), "Timestamp": datetime.now().isoformat()})
report_file = f\"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv\"
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(report_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=[\"Path\", \"Error\", \"Timestamp\"])
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"])
writer.writeheader()
writer.writerows(report)
print(f\"\\nProcess complete. Errors: {len(report)}\")
print(f"\nProcess complete. Errors: {len(report)}")
if __name__ == \"__main__\":
if __name__ == "__main__":
main()