import gradio as gr import pandas as pd import numpy as np from simple_salesforce import Salesforce import io import traceback from datetime import datetime import os import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SalesforceDataLoader: def __init__(self): self.sf = None self.connected = False self.available_objects = [] def connect_to_salesforce(self, username, password, security_token, sandbox=False): """Connect to Salesforce using credentials""" try: domain = 'test' if sandbox else None self.sf = Salesforce( username=username, password=password, security_token=security_token, domain=domain ) self.connected = True # Get available objects self._get_available_objects() return f"✅ Successfully connected to Salesforce as {username}", gr.update(visible=True), gr.update(choices=self.available_objects, value=None) except Exception as e: logger.error(f"Connection failed: {str(e)}") self.connected = False return f"❌ Connection failed: {str(e)}", gr.update(visible=False), gr.update(choices=[], value=None) def _get_available_objects(self): """Get list of available Salesforce objects""" try: # Get commonly used objects common_objects = ['Account', 'Contact', 'Lead', 'Opportunity', 'Case', 'Campaign', 'User', 'Product2'] self.available_objects = [] for obj_name in common_objects: try: # Test if object exists and is accessible getattr(self.sf, obj_name).describe() self.available_objects.append(obj_name) except: continue except Exception as e: logger.error(f"Error getting objects: {str(e)}") self.available_objects = ['Account', 'Contact', 'Lead'] # Fallback def get_object_fields(self, object_name): """Get fields for selected Salesforce object""" if not self.connected or not object_name: return gr.update(choices=[], value=None), "" try: obj = getattr(self.sf, object_name) metadata = obj.describe() fields = [] field_info = [] for field in metadata['fields']: if field['createable'] or field['updateable']: field_name = field['name'] field_type = field['type'] required = "Required" if not field['nillable'] and not field['defaultedOnCreate'] else "Optional" fields.append(field_name) field_info.append(f"**{field_name}** ({field_type}) - {required}") field_info_text = "\n".join(field_info[:20]) # Show first 20 fields if len(field_info) > 20: field_info_text += f"\n... and {len(field_info) - 20} more fields" return gr.update(choices=fields, value=None), field_info_text except Exception as e: logger.error(f"Error getting fields: {str(e)}") return gr.update(choices=[], value=None), f"Error: {str(e)}" def upload_data_to_salesforce(self, file, object_name, operation, external_id_field=None): """Upload data to Salesforce""" if not self.connected: return "❌ Please connect to Salesforce first", None if not file or not object_name: return "❌ Please select a file and object", None try: # Read the uploaded file if file.name.endswith('.csv'): df = pd.read_csv(file.name) elif file.name.endswith(('.xlsx', '.xls')): df = pd.read_excel(file.name) else: return "❌ Please upload a CSV or Excel file", None if df.empty: return "❌ The uploaded file is empty", None # Get Salesforce object sf_object = getattr(self.sf, object_name) # Prepare data for upload records = df.to_dict('records') # Clean data - remove NaN values for record in records: record = {k: v for k, v in record.items() if pd.notna(v)} results = [] errors = [] # Perform bulk operation if operation == "Insert": try: result = sf_object.bulk.insert(records) results = result except Exception as e: return f"❌ Bulk insert failed: {str(e)}", None elif operation == "Update": try: result = sf_object.bulk.update(records) results = result except Exception as e: return f"❌ Bulk update failed: {str(e)}", None elif operation == "Upsert": if not external_id_field: return "❌ External ID field is required for upsert operation", None try: result = sf_object.bulk.upsert(records, external_id_field) results = result except Exception as e: return f"❌ Bulk upsert failed: {str(e)}", None # Process results success_count = 0 error_count = 0 for result in results: if result.get('success'): success_count += 1 else: error_count += 1 errors.append(result.get('errors', [])) # Create results summary summary = f"✅ Operation completed!\n" summary += f"📊 Total records: {len(records)}\n" summary += f"✅ Successful: {success_count}\n" summary += f"❌ Failed: {error_count}\n" if errors: summary += f"\n**First few errors:**\n" for i, error in enumerate(errors[:5]): summary += f"{i+1}. {error}\n" # Create downloadable results file results_df = pd.DataFrame(results) results_csv = results_df.to_csv(index=False) results_file = f"salesforce_upload_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(results_file, 'w') as f: f.write(results_csv) return summary, results_file except Exception as e: logger.error(f"Upload error: {str(e)}") logger.error(traceback.format_exc()) return f"❌ Error: {str(e)}", None def export_data_from_salesforce(self, object_name, fields, record_limit=1000): """Export data from Salesforce""" if not self.connected: return "❌ Please connect to Salesforce first", None if not object_name: return "❌ Please select an object", None try: # Build SOQL query if not fields: # Get some default fields obj = getattr(self.sf, object_name) metadata = obj.describe() fields = [field['name'] for field in metadata['fields'][:10]] # First 10 fields fields_str = ', '.join(fields) query = f"SELECT {fields_str} FROM {object_name} LIMIT {record_limit}" # Execute query result = self.sf.query_all(query) records = result['records'] if not records: return "❌ No records found", None # Convert to DataFrame df = pd.DataFrame(records) # Remove Salesforce metadata columns if 'attributes' in df.columns: df = df.drop('attributes', axis=1) # Create downloadable file export_file = f"salesforce_export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" df.to_csv(export_file, index=False) summary = f"✅ Export completed!\n" summary += f"📊 Records exported: {len(records)}\n" summary += f"📋 Fields: {', '.join(fields)}\n" summary += f"📁 File: {export_file}" return summary, export_file except Exception as e: logger.error(f"Export error: {str(e)}") return f"❌ Error: {str(e)}", None # Initialize the Salesforce Data Loader sf_loader = SalesforceDataLoader() # Create the Gradio interface def create_interface(): with gr.Blocks(title="Salesforce Data Loader", theme=gr.themes.Soft()) as interface: gr.Markdown(""" # 🚀 Salesforce Data Loader A powerful tool to upload and download data from Salesforce. Connect with your credentials and start managing your data! """) with gr.Tab("🔐 Connection"): gr.Markdown("### Connect to Salesforce") with gr.Row(): with gr.Column(): username = gr.Textbox(label="Username", placeholder="your.email@company.com") password = gr.Textbox(label="Password", type="password") security_token = gr.Textbox(label="Security Token", type="password", info="Get this from Salesforce Setup → Personal Information → Reset Security Token") with gr.Column(): sandbox = gr.Checkbox(label="Sandbox Environment", info="Check if connecting to a sandbox") connect_btn = gr.Button("🔗 Connect to Salesforce", variant="primary") connection_status = gr.Markdown("") with gr.Tab("📤 Upload Data", visible=False) as upload_tab: gr.Markdown("### Upload CSV/Excel data to Salesforce") with gr.Row(): with gr.Column(): file_upload = gr.File(label="Upload CSV or Excel file", file_types=[".csv", ".xlsx", ".xls"]) object_dropdown = gr.Dropdown(label="Select Salesforce Object", choices=[]) operation_dropdown = gr.Dropdown( label="Operation Type", choices=["Insert", "Update", "Upsert"], value="Insert" ) external_id_field = gr.Dropdown(label="External ID Field (for Upsert)", choices=[], visible=False) with gr.Column(): object_fields_info = gr.Markdown("Select an object to see available fields") upload_btn = gr.Button("📤 Upload Data", variant="primary") upload_results = gr.Markdown("") download_results = gr.File(label="Download Results", visible=False) with gr.Tab("📥 Export Data", visible=False) as export_tab: gr.Markdown("### Export data from Salesforce") with gr.Row(): with gr.Column(): export_object = gr.Dropdown(label="Select Object to Export", choices=[]) export_fields = gr.CheckboxGroup(label="Select Fields to Export", choices=[]) record_limit = gr.Slider(minimum=100, maximum=10000, value=1000, step=100, label="Record Limit") with gr.Column(): export_fields_info = gr.Markdown("Select an object to see available fields") export_btn = gr.Button("📥 Export Data", variant="primary") export_results = gr.Markdown("") download_export = gr.File(label="Download Export", visible=False) # Event handlers connect_btn.click( fn=sf_loader.connect_to_salesforce, inputs=[username, password, security_token, sandbox], outputs=[connection_status, upload_tab, object_dropdown] ) # Update export object dropdown when connection is made connect_btn.click( fn=lambda status, visible, choices: gr.update(choices=choices), inputs=[connection_status, upload_tab, object_dropdown], outputs=[export_object] ) object_dropdown.change( fn=sf_loader.get_object_fields, inputs=[object_dropdown], outputs=[external_id_field, object_fields_info] ) export_object.change( fn=sf_loader.get_object_fields, inputs=[export_object], outputs=[export_fields, export_fields_info] ) operation_dropdown.change( fn=lambda op: gr.update(visible=(op == "Upsert")), inputs=[operation_dropdown], outputs=[external_id_field] ) upload_btn.click( fn=sf_loader.upload_data_to_salesforce, inputs=[file_upload, object_dropdown, operation_dropdown, external_id_field], outputs=[upload_results, download_results] ) download_results.change( fn=lambda file: gr.update(visible=file is not None), inputs=[download_results], outputs=[download_results] ) export_btn.click( fn=sf_loader.export_data_from_salesforce, inputs=[export_object, export_fields, record_limit], outputs=[export_results, download_export] ) download_export.change( fn=lambda file: gr.update(visible=file is not None), inputs=[download_export], outputs=[download_export] ) return interface # Launch the interface if __name__ == "__main__": interface = create_interface() interface.launch(server_name="0.0.0.0", server_port=7860)