-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_blob_access.py
More file actions
96 lines (83 loc) · 4.27 KB
/
Copy pathtest_blob_access.py
File metadata and controls
96 lines (83 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import os
import sys
import json
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceNotFoundError, ClientAuthenticationError
# Reusing your get_connection_string_from_settings function
def get_connection_string_from_settings():
settings_path = os.path.join(os.path.dirname(__file__), "local.settings.json")
if not os.path.exists(settings_path):
print(f"Error: local.settings.json not found at {settings_path}")
sys.exit(1)
try:
with open(settings_path, "r") as f:
settings = json.load(f)
connection_string = settings.get("Values", {}).get("AZURE_STORAGE_CONNECTION_STRING")
if not connection_string:
print("Error: AZURE_STORAGE_CONNECTION_STRING not found in local.settings.json -> Values.")
sys.exit(1)
return connection_string
except json.JSONDecodeError:
print(f"Error: Could not parse local.settings.json. Invalid JSON.")
sys.exit(1)
except Exception as e:
print(f"An unexpected error occurred while reading local.settings.json: {e}")
sys.exit(1)
def test_azure_blob_access():
connection_string = get_connection_string_from_settings()
container_name = "scaniq"
test_blob_name = "test_access_blob.txt"
test_content = "This is a test file to check Azure Blob Storage access."
local_download_path = "downloaded_test_access_blob.txt"
print("--- Starting Azure Blob Access Test ---")
try:
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
print(f"Attempting to create container '{container_name}' if it doesn't exist (exists_ok=True)...")
#container_client.create_container(exists_ok=True)
print(f"Container '{container_name}' is ready.")
# --- Test Upload ---
print(f"Attempting to upload blob '{test_blob_name}' to '{container_name}'...")
blob_client = container_client.get_blob_client(test_blob_name)
blob_client.upload_blob(test_content, overwrite=True)
print(f"Successfully uploaded '{test_blob_name}'.")
# --- Test List Blobs ---
print(f"Attempting to list blobs in '{container_name}'...")
blob_list = container_client.list_blobs()
found_test_blob = False
for blob in blob_list:
print(f" - {blob.name}")
if blob.name == test_blob_name:
found_test_blob = True
if found_test_blob:
print(f"Successfully found '{test_blob_name}' in the list.")
else:
print(f"Warning: '{test_blob_name}' was not found in the list after upload.")
# --- Test Download ---
print(f"Attempting to download blob '{test_blob_name}'...")
with open(local_download_path, "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
print(f"Successfully downloaded '{test_blob_name}' to '{local_download_path}'.")
with open(local_download_path, "r") as f:
downloaded_content = f.read()
if downloaded_content == test_content:
print("Downloaded content matches original content.")
else:
print("Downloaded content DOES NOT match original content.")
# --- Test Delete (Optional) ---
print(f"Attempting to delete blob '{test_blob_name}'...")
blob_client.delete_blob()
print(f"Successfully deleted '{test_blob_name}'.")
if os.path.exists(local_download_path):
os.remove(local_download_path)
print(f"Removed local file '{local_download_path}'.")
print("--- Azure Blob Access Test Completed Successfully ---")
except ClientAuthenticationError as e:
print(f"Authentication Error: Please check your AZURE_STORAGE_CONNECTION_STRING. Details: {e}")
except ResourceNotFoundError as e:
print(f"Resource Not Found Error: The container '{container_name}' might not exist or is inaccessible. Details: {e}")
except Exception as e:
print(f"An unexpected error occurred during blob access test: {e}")
print("This could indicate network issues, DNS problems, or other connectivity challenges.")
if __name__ == "__main__":
test_azure_blob_access()