Spaces:
Build error
Build error
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from simple_salesforce import Salesforce | |
import io | |
import traceback | |
from datetime import datetime | |
import os | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class SalesforceDataLoader: | |
def __init__(self): | |
self.sf = None | |
self.connected = False | |
self.available_objects = [] | |
def connect_to_salesforce(self, username, password, security_token, sandbox=False): | |
"""Connect to Salesforce using credentials""" | |
try: | |
domain = 'test' if sandbox else None | |
self.sf = Salesforce( | |
username=username, | |
password=password, | |
security_token=security_token, | |
domain=domain | |
) | |
self.connected = True | |
# Get available objects | |
self._get_available_objects() | |
return f"β Successfully connected to Salesforce as {username}", gr.update(visible=True), gr.update(choices=self.available_objects, value=None) | |
except Exception as e: | |
logger.error(f"Connection failed: {str(e)}") | |
self.connected = False | |
return f"β Connection failed: {str(e)}", gr.update(visible=False), gr.update(choices=[], value=None) | |
def _get_available_objects(self): | |
"""Get list of available Salesforce objects""" | |
try: | |
# Get commonly used objects | |
common_objects = ['Account', 'Contact', 'Lead', 'Opportunity', 'Case', 'Campaign', 'User', 'Product2'] | |
self.available_objects = [] | |
for obj_name in common_objects: | |
try: | |
# Test if object exists and is accessible | |
getattr(self.sf, obj_name).describe() | |
self.available_objects.append(obj_name) | |
except: | |
continue | |
except Exception as e: | |
logger.error(f"Error getting objects: {str(e)}") | |
self.available_objects = ['Account', 'Contact', 'Lead'] # Fallback | |
def get_object_fields(self, object_name): | |
"""Get fields for selected Salesforce object""" | |
if not self.connected or not object_name: | |
return gr.update(choices=[], value=None), "" | |
try: | |
obj = getattr(self.sf, object_name) | |
metadata = obj.describe() | |
fields = [] | |
field_info = [] | |
for field in metadata['fields']: | |
if field['createable'] or field['updateable']: | |
field_name = field['name'] | |
field_type = field['type'] | |
required = "Required" if not field['nillable'] and not field['defaultedOnCreate'] else "Optional" | |
fields.append(field_name) | |
field_info.append(f"**{field_name}** ({field_type}) - {required}") | |
field_info_text = "\n".join(field_info[:20]) # Show first 20 fields | |
if len(field_info) > 20: | |
field_info_text += f"\n... and {len(field_info) - 20} more fields" | |
return gr.update(choices=fields, value=None), field_info_text | |
except Exception as e: | |
logger.error(f"Error getting fields: {str(e)}") | |
return gr.update(choices=[], value=None), f"Error: {str(e)}" | |
def upload_data_to_salesforce(self, file, object_name, operation, external_id_field=None): | |
"""Upload data to Salesforce""" | |
if not self.connected: | |
return "β Please connect to Salesforce first", None | |
if not file or not object_name: | |
return "β Please select a file and object", None | |
try: | |
# Read the uploaded file | |
if file.name.endswith('.csv'): | |
df = pd.read_csv(file.name) | |
elif file.name.endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file.name) | |
else: | |
return "β Please upload a CSV or Excel file", None | |
if df.empty: | |
return "β The uploaded file is empty", None | |
# Get Salesforce object | |
sf_object = getattr(self.sf, object_name) | |
# Prepare data for upload | |
records = df.to_dict('records') | |
# Clean data - remove NaN values | |
for record in records: | |
record = {k: v for k, v in record.items() if pd.notna(v)} | |
results = [] | |
errors = [] | |
# Perform bulk operation | |
if operation == "Insert": | |
try: | |
result = sf_object.bulk.insert(records) | |
results = result | |
except Exception as e: | |
return f"β Bulk insert failed: {str(e)}", None | |
elif operation == "Update": | |
try: | |
result = sf_object.bulk.update(records) | |
results = result | |
except Exception as e: | |
return f"β Bulk update failed: {str(e)}", None | |
elif operation == "Upsert": | |
if not external_id_field: | |
return "β External ID field is required for upsert operation", None | |
try: | |
result = sf_object.bulk.upsert(records, external_id_field) | |
results = result | |
except Exception as e: | |
return f"β Bulk upsert failed: {str(e)}", None | |
# Process results | |
success_count = 0 | |
error_count = 0 | |
for result in results: | |
if result.get('success'): | |
success_count += 1 | |
else: | |
error_count += 1 | |
errors.append(result.get('errors', [])) | |
# Create results summary | |
summary = f"β Operation completed!\n" | |
summary += f"π Total records: {len(records)}\n" | |
summary += f"β Successful: {success_count}\n" | |
summary += f"β Failed: {error_count}\n" | |
if errors: | |
summary += f"\n**First few errors:**\n" | |
for i, error in enumerate(errors[:5]): | |
summary += f"{i+1}. {error}\n" | |
# Create downloadable results file | |
results_df = pd.DataFrame(results) | |
results_csv = results_df.to_csv(index=False) | |
results_file = f"salesforce_upload_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
with open(results_file, 'w') as f: | |
f.write(results_csv) | |
return summary, results_file | |
except Exception as e: | |
logger.error(f"Upload error: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return f"β Error: {str(e)}", None | |
def export_data_from_salesforce(self, object_name, fields, record_limit=1000): | |
"""Export data from Salesforce""" | |
if not self.connected: | |
return "β Please connect to Salesforce first", None | |
if not object_name: | |
return "β Please select an object", None | |
try: | |
# Build SOQL query | |
if not fields: | |
# Get some default fields | |
obj = getattr(self.sf, object_name) | |
metadata = obj.describe() | |
fields = [field['name'] for field in metadata['fields'][:10]] # First 10 fields | |
fields_str = ', '.join(fields) | |
query = f"SELECT {fields_str} FROM {object_name} LIMIT {record_limit}" | |
# Execute query | |
result = self.sf.query_all(query) | |
records = result['records'] | |
if not records: | |
return "β No records found", None | |
# Convert to DataFrame | |
df = pd.DataFrame(records) | |
# Remove Salesforce metadata columns | |
if 'attributes' in df.columns: | |
df = df.drop('attributes', axis=1) | |
# Create downloadable file | |
export_file = f"salesforce_export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
df.to_csv(export_file, index=False) | |
summary = f"β Export completed!\n" | |
summary += f"π Records exported: {len(records)}\n" | |
summary += f"π Fields: {', '.join(fields)}\n" | |
summary += f"π File: {export_file}" | |
return summary, export_file | |
except Exception as e: | |
logger.error(f"Export error: {str(e)}") | |
return f"β Error: {str(e)}", None | |
# Initialize the Salesforce Data Loader | |
sf_loader = SalesforceDataLoader() | |
# Create the Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="Salesforce Data Loader", theme=gr.themes.Soft()) as interface: | |
gr.Markdown(""" | |
# π Salesforce Data Loader | |
A powerful tool to upload and download data from Salesforce. Connect with your credentials and start managing your data! | |
""") | |
with gr.Tab("π Connection"): | |
gr.Markdown("### Connect to Salesforce") | |
with gr.Row(): | |
with gr.Column(): | |
username = gr.Textbox(label="Username", placeholder="[email protected]") | |
password = gr.Textbox(label="Password", type="password") | |
security_token = gr.Textbox(label="Security Token", type="password", | |
info="Get this from Salesforce Setup β Personal Information β Reset Security Token") | |
with gr.Column(): | |
sandbox = gr.Checkbox(label="Sandbox Environment", info="Check if connecting to a sandbox") | |
connect_btn = gr.Button("π Connect to Salesforce", variant="primary") | |
connection_status = gr.Markdown("") | |
with gr.Tab("π€ Upload Data", visible=False) as upload_tab: | |
gr.Markdown("### Upload CSV/Excel data to Salesforce") | |
with gr.Row(): | |
with gr.Column(): | |
file_upload = gr.File(label="Upload CSV or Excel file", file_types=[".csv", ".xlsx", ".xls"]) | |
object_dropdown = gr.Dropdown(label="Select Salesforce Object", choices=[]) | |
operation_dropdown = gr.Dropdown( | |
label="Operation Type", | |
choices=["Insert", "Update", "Upsert"], | |
value="Insert" | |
) | |
external_id_field = gr.Dropdown(label="External ID Field (for Upsert)", choices=[], visible=False) | |
with gr.Column(): | |
object_fields_info = gr.Markdown("Select an object to see available fields") | |
upload_btn = gr.Button("π€ Upload Data", variant="primary") | |
upload_results = gr.Markdown("") | |
download_results = gr.File(label="Download Results", visible=False) | |
with gr.Tab("π₯ Export Data", visible=False) as export_tab: | |
gr.Markdown("### Export data from Salesforce") | |
with gr.Row(): | |
with gr.Column(): | |
export_object = gr.Dropdown(label="Select Object to Export", choices=[]) | |
export_fields = gr.CheckboxGroup(label="Select Fields to Export", choices=[]) | |
record_limit = gr.Slider(minimum=100, maximum=10000, value=1000, step=100, | |
label="Record Limit") | |
with gr.Column(): | |
export_fields_info = gr.Markdown("Select an object to see available fields") | |
export_btn = gr.Button("π₯ Export Data", variant="primary") | |
export_results = gr.Markdown("") | |
download_export = gr.File(label="Download Export", visible=False) | |
# Event handlers | |
connect_btn.click( | |
fn=sf_loader.connect_to_salesforce, | |
inputs=[username, password, security_token, sandbox], | |
outputs=[connection_status, upload_tab, object_dropdown] | |
) | |
# Update export object dropdown when connection is made | |
connect_btn.click( | |
fn=lambda status, visible, choices: gr.update(choices=choices), | |
inputs=[connection_status, upload_tab, object_dropdown], | |
outputs=[export_object] | |
) | |
object_dropdown.change( | |
fn=sf_loader.get_object_fields, | |
inputs=[object_dropdown], | |
outputs=[external_id_field, object_fields_info] | |
) | |
export_object.change( | |
fn=sf_loader.get_object_fields, | |
inputs=[export_object], | |
outputs=[export_fields, export_fields_info] | |
) | |
operation_dropdown.change( | |
fn=lambda op: gr.update(visible=(op == "Upsert")), | |
inputs=[operation_dropdown], | |
outputs=[external_id_field] | |
) | |
upload_btn.click( | |
fn=sf_loader.upload_data_to_salesforce, | |
inputs=[file_upload, object_dropdown, operation_dropdown, external_id_field], | |
outputs=[upload_results, download_results] | |
) | |
download_results.change( | |
fn=lambda file: gr.update(visible=file is not None), | |
inputs=[download_results], | |
outputs=[download_results] | |
) | |
export_btn.click( | |
fn=sf_loader.export_data_from_salesforce, | |
inputs=[export_object, export_fields, record_limit], | |
outputs=[export_results, download_export] | |
) | |
download_export.change( | |
fn=lambda file: gr.update(visible=file is not None), | |
inputs=[download_export], | |
outputs=[download_export] | |
) | |
return interface | |
# Launch the interface | |
if __name__ == "__main__": | |
interface = create_interface() | |
interface.launch(server_name="0.0.0.0", server_port=7860) |