Håndter Access Token udløb ved automatisk at forny token på 401-fejl fra Graph API

This commit is contained in:
Martin Tranberg
2026-03-27 11:03:14 +01:00
parent 931fd0dd05
commit 18158d52b2

View File

@@ -76,21 +76,58 @@ def safe_get(url, headers, stream=False, timeout=60, params=None):
return requests.get(url, headers=headers, stream=stream, timeout=timeout, params=params)
# --- Punkt 4: Integrity Validation (QuickXorHash - Placeholder for full logic) ---
# Note: Full QuickXorHash calculation is complex. We'll log the hash for audit.
def verify_integrity(local_path, remote_hash):
"""Placeholder for QuickXorHash verification. Currently logs hash comparison."""
"""Placeholder for QuickXorHash verification."""
if not remote_hash:
return True # Fallback to size check
# Future implementation would calculate local hash here.
return True
def get_headers(app, force_refresh=False):
scopes = ["https://graph.microsoft.com/.default"]
# If force_refresh is True, we don't rely on the cache
result = None
if not force_refresh:
result = app.acquire_token_for_client(scopes=scopes)
if force_refresh or not result or "access_token" not in result:
logger.info("Refreshing Access Token...")
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result:
return {'Authorization': f'Bearer {result["access_token"]}'}
raise Exception(f"Auth failed: {result.get('error_description')}")
def get_site_id(app, site_url):
parsed = urlparse(site_url)
url = f"https://graph.microsoft.com/v1.0/sites/{parsed.netloc}:{parsed.path}"
response = safe_get(url, headers=get_headers(app))
return response.json()['id']
def get_drive_id(app, site_id, drive_name):
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
response = safe_get(url, headers=get_headers(app))
for drive in response.json().get('value', []):
if drive['name'] == drive_name: return drive['id']
raise Exception(f"Drive {drive_name} not found")
# --- Punkt 2: Resume / Chunked Download logic ---
def get_fresh_download_url(app, drive_id, item_id):
"""Fetches a fresh download URL for a specific item ID."""
"""Fetches a fresh download URL for a specific item ID with token refresh support."""
url = f"https://graph.microsoft.com/v1.0/drives/{drive_id}/items/{item_id}?$select=id,@microsoft.graph.downloadUrl"
headers = get_headers(app)
response = safe_get(url, headers=headers)
return response.json().get('@microsoft.graph.downloadUrl')
try:
headers = get_headers(app)
response = requests.get(url, headers=headers, timeout=60)
if response.status_code == 401:
logger.info("Access Token expired. Forcing refresh...")
headers = get_headers(app, force_refresh=True)
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
return response.json().get('@microsoft.graph.downloadUrl'), None
except Exception as e:
return None, str(e)
def download_single_file(app, drive_id, item_id, local_path, expected_size, display_name, remote_hash=None, initial_url=None):
try:
@@ -117,16 +154,18 @@ def download_single_file(app, drive_id, item_id, local_path, expected_size, disp
# Initial download attempt
if not download_url:
download_url = get_fresh_download_url(app, drive_id, item_id)
download_url, err = get_fresh_download_url(app, drive_id, item_id)
if not download_url:
return False, f"Could not fetch initial URL: {err}"
response = requests.get(download_url, headers=resume_header, stream=True, timeout=120)
# Handle 401 Unauthorized by refreshing the URL
# Handle 401 Unauthorized from SharePoint (expired download link)
if response.status_code == 401:
logger.warning(f"URL expired for {display_name}. Fetching fresh URL...")
download_url = get_fresh_download_url(app, drive_id, item_id)
download_url, err = get_fresh_download_url(app, drive_id, item_id)
if not download_url:
return False, "Failed to refresh download URL (401)"
return False, f"Failed to refresh download URL: {err}"
# Retry download with new URL
response = requests.get(download_url, headers=resume_header, stream=True, timeout=120)
@@ -201,26 +240,6 @@ def create_msal_app(tenant_id, client_id, client_secret):
client_id, authority=f"https://login.microsoftonline.com/{tenant_id}", client_credential=client_secret
)
def get_headers(app):
scopes = ["https://graph.microsoft.com/.default"]
result = app.acquire_token_for_client(scopes=scopes)
if "access_token" in result:
return {'Authorization': f'Bearer {result["access_token"]}'}
raise Exception(f"Auth failed: {result.get('error_description')}")
def get_site_id(app, site_url):
parsed = urlparse(site_url)
url = f"https://graph.microsoft.com/v1.0/sites/{parsed.netloc}:{parsed.path}"
response = safe_get(url, headers=get_headers(app))
return response.json()['id']
def get_drive_id(app, site_id, drive_name):
url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
response = safe_get(url, headers=get_headers(app))
for drive in response.json().get('value', []):
if drive['name'] == drive_name: return drive['id']
raise Exception(f"Drive {drive_name} not found")
def main():
try:
config = load_config('connection_info.txt')