Files
Sharepoint-Download-Tool/download_sharepoint.py

186 lines
7.0 KiB
Python

import os
import csv
import requests
import time
from datetime import datetime
from msal import ConfidentialClientApplication
from urllib.parse import urlparse, quote
def load_config(file_path):
config = {}
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip().strip('"')
return config
def create_msal_app(tenant_id, client_id, client_secret):
return ConfidentialClientApplication(
client_id,
authority=f"https://login.microsoftonline.com/{tenant_id}",
client_credential=client_secret,
)
def get_headers(app):
"""Acquires a token from cache or fetches a new one if expired."""
scopes = ["https://graph.microsoft.com/.default"]
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result:
return {'Authorization': f'Bearer {result["access_token"]}'}
else:
raise Exception(f"Could not acquire token: {result.get('error_description')}")
def get_site_id(app, site_url):
headers = get_headers(app)
parsed = urlparse(site_url)
hostname = parsed.netloc
site_path = parsed.path
url = f"https://graph.microsoft.com/v1.0/sites/{hostname}:{site_path}"
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()['id']
def get_drive_id(app, site_id, drive_name):
headers = get_headers(app)
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
response = requests.get(url, headers=headers)
response.raise_for_status()
drives = response.json().get('value', [])
for drive in drives:
if drive['name'] == drive_name:
return drive['id']
raise Exception(f"Drive '{drive_name}' not found in site.")
def download_file(download_url, local_path, expected_size):
try:
# Check if file exists and size matches
if os.path.exists(local_path):
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size):
return True, "Skipped"
os.makedirs(os.path.dirname(local_path), exist_ok=True)
response = requests.get(download_url, stream=True)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify size after download
local_size = os.path.getsize(local_path)
if int(local_size) == int(expected_size):
return True, "Downloaded"
else:
return False, f"Size mismatch after download: Remote={expected_size}, Local={local_size}"
except Exception as e:
return False, str(e)
def download_folder_recursive(app, drive_id, item_path, local_root_path, report):
try:
# Get fresh headers for this folder batch to ensure token is valid
headers = get_headers(app)
encoded_path = quote(item_path)
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root:/{encoded_path}:/children"
if not item_path:
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/root/children"
response = requests.get(url, headers=headers)
response.raise_for_status()
items = response.json().get('value', [])
for item in items:
item_name = item['name']
local_path = os.path.join(local_root_path, item_name)
if 'folder' in item:
sub_item_path = f"{item_path}/{item_name}".strip('/')
download_folder_recursive(app, drive_id, sub_item_path, local_path, report)
elif 'file' in item:
download_url = item.get('@microsoft.graph.downloadUrl')
if not download_url:
report.append({
"Path": f"{item_path}/{item_name}",
"Error": "No download URL available",
"Timestamp": datetime.now().isoformat()
})
continue
success, status = download_file(download_url, local_path, item['size'])
if success:
if status == "Downloaded":
print(f"Downloaded: {item_path}/{item_name}")
else:
print(f"Skipped (matches local): {item_path}/{item_name}")
else:
print(f"FAILED: {item_path}/{item_name} - {status}")
report.append({
"Path": f"{item_path}/{item_name}",
"Error": status,
"Timestamp": datetime.now().isoformat()
})
except Exception as e:
report.append({
"Path": item_path,
"Error": f"Folder processing error: {str(e)}",
"Timestamp": datetime.now().isoformat()
})
def main():
config = load_config('connection_info.txt')
tenant_id = config.get('TENANT_ID')
client_id = config.get('CLIENT_ID')
client_secret = config.get('CLIENT_SECRET')
site_url = config.get('SITE_URL')
drive_name = config.get('DOCUMENT_LIBRARY')
folders_to_download_str = config.get('FOLDERS_TO_DOWNLOAD')
local_path_base = config.get('LOCAL_PATH').replace('\\', os.sep)
folders_to_download = [f.strip() for f in folders_str.split(',')] if 'FOLDERS_TO_DOWNLOAD' in config else []
# Safeguard if FOLDERS_TO_DOWNLOAD is missing
if not folders_to_download:
folders_to_download = [f.strip() for f in folders_to_download_str.split(',')]
print(f"Connecting via Graph API (with auto-refresh and skip logic)...")
report = []
try:
app = create_msal_app(tenant_id, client_id, client_secret)
print("Getting Site ID...")
site_id = get_site_id(app, site_url)
print(f"Getting Drive ID for '{drive_name}'...")
drive_id = get_drive_id(app, site_id, drive_name)
for folder in folders_to_download:
print(f"\nProcessing folder: {folder}")
local_folder_path = os.path.join(local_path_base, folder)
download_folder_recursive(app, drive_id, folder, local_folder_path, report)
except Exception as e:
print(f"Critical error: {e}")
report.append({
"Path": "GENERAL",
"Error": str(e),
"Timestamp": datetime.now().isoformat()
})
report_file = f"download_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(report_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=["Path", "Error", "Timestamp"])
writer.writeheader()
writer.writerows(report)
print(f"\nProcess complete.")
if report:
print(f"Errors found: {len(report)}. See {report_file} for details.")
else:
print("All downloads successful.")
if __name__ == "__main__":
main()