Spaces:
Sleeping
Sleeping
| import os | |
| import streamlit as st | |
| from azure.cosmos import CosmosClient, PartitionKey | |
| import glob | |
| # Environment Variables | |
| COSMOS_CONNECTION_STRING = os.getenv('COSMOS_CONNECTION_STRING') | |
| # Initialize Azure Cosmos DB Client | |
| cosmos_client = CosmosClient.from_connection_string(COSMOS_CONNECTION_STRING) | |
| # Function to Retrieve, Display, and Manage Cosmos DB Images | |
| def manage_cosmos_db_images(): | |
| st.subheader('Manage Cosmos DB Images') | |
| db_properties = next(cosmos_client.list_databases(), None) | |
| if db_properties: | |
| db_name = db_properties['id'] | |
| database_client = cosmos_client.get_database_client(db_name) | |
| container_properties = next(database_client.list_containers(), None) | |
| if container_properties: | |
| container_name = container_properties['id'] | |
| container_client = database_client.get_container_client(container_name) | |
| items = list(container_client.read_all_items()) | |
| # Display existing items with delete option | |
| for item in items: | |
| item_desc = f"Item: `{item['id']}`" | |
| if 'file_name' in item and item['file_name'].endswith('.png'): | |
| st.markdown(item_desc) | |
| st.image(item['file_name']) | |
| if st.button(f"🗑️ Delete {item['id']}", key=f"delete_{item['id']}"): | |
| container_client.delete_item(item=item, partition_key=item['id']) | |
| st.success(f"Deleted Item: {item['id']}") | |
| # Add or update PNG files from directory | |
| png_files = glob.glob('*.png') | |
| for file_name in png_files: | |
| item_id = os.path.splitext(file_name)[0] | |
| item_data = {"id": item_id, "file_name": file_name} | |
| if item_id not in {item['id'] for item in items}: | |
| container_client.create_item(body=item_data) | |
| st.write(f"Added Item: {item_id}") | |
| # UI to Manage PNG Images | |
| st.title('Manage PNG Images in Cosmos DB') | |
| if st.button('Manage PNG Images'): | |
| manage_cosmos_db_images() | |
| # Azure Blob Storage - Upload/Download | |
| st.subheader('Azure Blob Storage - Upload/Download') | |
| blob_container = st.text_input('Blob Container') | |
| blob_file = st.file_uploader('Upload file to Blob') | |
| if blob_file is not None and st.button('Upload to Blob'): | |
| blob_client = blob_service.get_blob_client(container=blob_container, blob=blob_file.name) | |
| blob_client.upload_blob(blob_file.getvalue()) | |
| # Azure Functions - Trigger | |
| st.subheader('Azure Functions - Trigger') | |
| function_url = st.text_input('Function URL') | |
| if st.button('Call Azure Function'): | |
| response = requests.get(function_url) | |
| st.write('Function Response:', response.text) | |