import streamlit as st import os import tempfile import io import zipfile import shutil def upload_model_form(model_info): """Form for uploading model files""" st.subheader("Upload Model Files") if not model_info: st.error("Model information not found") return False repo_id = model_info.modelId st.info( """ Upload model files to your repository. You can upload: - Individual model files (e.g., model weights, configuration files) - A ZIP archive containing multiple files (will be extracted automatically) **Important**: Large files may take time to upload. """ ) with st.form("upload_model_form"): # File upload uploaded_files = st.file_uploader( "Upload model files", accept_multiple_files=True, help="Select one or more files to upload", ) # ZIP handling option extract_zip = st.checkbox( "Extract ZIP files automatically", value=True, help="If enabled, ZIP files will be extracted in the repository", ) # Commit message commit_message = st.text_input( "Commit message", value="Upload model files", help="Describe the changes you're making", ) # Submit button submitted = st.form_submit_button("Upload Files", use_container_width=True) if submitted and uploaded_files: with st.spinner("Uploading files..."): try: # Create a temporary directory to work with files with tempfile.TemporaryDirectory() as temp_dir: files_to_upload = {} for uploaded_file in uploaded_files: file_content = uploaded_file.read() file_name = uploaded_file.name # Handle ZIP files if extraction is enabled if extract_zip and file_name.lower().endswith(".zip"): with zipfile.ZipFile( io.BytesIO(file_content) ) as zip_ref: zip_ref.extractall( os.path.join(temp_dir, "extracted") ) # Walk through the extracted files for root, _, files in os.walk( os.path.join(temp_dir, "extracted") ): for file in files: file_path = os.path.join(root, file) rel_path = os.path.relpath( file_path, os.path.join(temp_dir, "extracted"), ) with open(file_path, "rb") as f: files_to_upload[rel_path] = f.read() else: # Regular file files_to_upload[file_name] = file_content # Upload the files if files_to_upload: success, response = ( st.session_state.client.upload_model_files( repo_id=repo_id, files=files_to_upload, commit_message=commit_message, ) ) if success: st.success( f"Successfully uploaded {len(files_to_upload)} files!" ) return True else: st.error(f"Failed to upload files: {response}") return False else: st.warning("No files to upload") return False except Exception as e: st.error(f"Error uploading files: {str(e)}") return False elif submitted: st.warning("Please select files to upload") return False return False