Spaces:
Build error
Build error
import gradio as gr | |
import pandas as pd | |
from simple_salesforce import Salesforce | |
import io | |
from datetime import datetime | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global variables to store connection | |
sf_connection = None | |
available_objects = [] | |
def connect_to_salesforce(username, password, security_token, sandbox): | |
"""Connect to Salesforce""" | |
global sf_connection, available_objects | |
try: | |
domain = 'test' if sandbox else None | |
sf_connection = Salesforce( | |
username=username, | |
password=password, | |
security_token=security_token, | |
domain=domain | |
) | |
# Test connection and get objects | |
common_objects = ['Account', 'Contact', 'Lead', 'Opportunity', 'Case'] | |
available_objects = [] | |
for obj_name in common_objects: | |
try: | |
obj = getattr(sf_connection, obj_name) | |
obj.describe() | |
available_objects.append(obj_name) | |
except: | |
continue | |
if not available_objects: | |
available_objects = ['Account', 'Contact', 'Lead'] | |
return f"β Successfully connected to Salesforce as {username}\nAvailable objects: {', '.join(available_objects)}" | |
except Exception as e: | |
sf_connection = None | |
available_objects = [] | |
error_msg = str(e) | |
if "INVALID_LOGIN" in error_msg: | |
return "β Invalid credentials. Please check your username, password, and security token." | |
elif "API_DISABLED_FOR_ORG" in error_msg: | |
return "β API access is disabled for your organization. Please contact your Salesforce admin." | |
elif "LOGIN_MUST_USE_SECURITY_TOKEN" in error_msg: | |
return "β Security token required. Please append your security token to your password." | |
else: | |
return f"β Connection failed: {error_msg}" | |
def upload_data_to_salesforce(file, object_name, operation): | |
"""Upload data to Salesforce""" | |
global sf_connection | |
if not sf_connection: | |
return "β Please connect to Salesforce first", None | |
if not file or not object_name: | |
return "β Please select a file and object", None | |
try: | |
# Read the uploaded file | |
if file.name.endswith('.csv'): | |
df = pd.read_csv(file.name) | |
elif file.name.endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file.name) | |
else: | |
return "β Please upload a CSV or Excel file", None | |
if df.empty: | |
return "β The uploaded file is empty", None | |
# Get Salesforce object | |
sf_object = getattr(sf_connection, object_name) | |
# Prepare data for upload | |
records = df.to_dict('records') | |
# Clean data - remove NaN values | |
cleaned_records = [] | |
for record in records: | |
cleaned_record = {k: v for k, v in record.items() if pd.notna(v)} | |
cleaned_records.append(cleaned_record) | |
# Perform operation | |
if operation == "Insert": | |
result = sf_object.bulk.insert(cleaned_records) | |
elif operation == "Update": | |
result = sf_object.bulk.update(cleaned_records) | |
else: # Upsert | |
return "β Upsert operation requires additional configuration", None | |
# Process results | |
success_count = sum(1 for r in result if r.get('success')) | |
error_count = len(result) - success_count | |
summary = f"β Operation completed!\n" | |
summary += f"π Total records: {len(records)}\n" | |
summary += f"β Successful: {success_count}\n" | |
summary += f"β Failed: {error_count}\n" | |
# Create results file | |
results_df = pd.DataFrame(result) | |
results_file = f"salesforce_upload_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
results_df.to_csv(results_file, index=False) | |
return summary, results_file | |
except Exception as e: | |
logger.error(f"Upload error: {str(e)}") | |
return f"β Error: {str(e)}", None | |
def export_data_from_salesforce(object_name, record_limit): | |
"""Export data from Salesforce""" | |
global sf_connection | |
if not sf_connection: | |
return "β Please connect to Salesforce first", None | |
if not object_name: | |
return "β Please select an object", None | |
try: | |
# Get object metadata to find some fields | |
obj = getattr(sf_connection, object_name) | |
metadata = obj.describe() | |
# Get first 10 fields | |
fields = [field['name'] for field in metadata['fields'][:10]] | |
fields_str = ', '.join(fields) | |
# Build and execute query | |
query = f"SELECT {fields_str} FROM {object_name} LIMIT {record_limit}" | |
result = sf_connection.query_all(query) | |
records = result['records'] | |
if not records: | |
return "β No records found", None | |
# Convert to DataFrame and clean | |
df = pd.DataFrame(records) | |
if 'attributes' in df.columns: | |
df = df.drop('attributes', axis=1) | |
# Create export file | |
export_file = f"salesforce_export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
df.to_csv(export_file, index=False) | |
summary = f"β Export completed!\n" | |
summary += f"π Records exported: {len(records)}\n" | |
summary += f"π Fields: {', '.join(fields)}\n" | |
return summary, export_file | |
except Exception as e: | |
logger.error(f"Export error: {str(e)}") | |
return f"β Error: {str(e)}", None | |
# Create the Gradio interface | |
with gr.Blocks(title="Salesforce Data Loader", theme=gr.themes.Default()) as demo: | |
gr.Markdown(""" | |
# π Salesforce Data Loader | |
A simple tool to upload and download data from Salesforce. | |
""") | |
with gr.Tab("π Connect"): | |
gr.Markdown("### Connect to Salesforce") | |
username = gr.Textbox(label="Username", placeholder="[email protected]") | |
password = gr.Textbox(label="Password", type="password") | |
security_token = gr.Textbox(label="Security Token", type="password") | |
sandbox = gr.Checkbox(label="Sandbox Environment") | |
connect_btn = gr.Button("π Connect", variant="primary") | |
connection_status = gr.Textbox(label="Connection Status", interactive=False) | |
connect_btn.click( | |
fn=connect_to_salesforce, | |
inputs=[username, password, security_token, sandbox], | |
outputs=[connection_status] | |
) | |
with gr.Tab("π€ Upload"): | |
gr.Markdown("### Upload CSV/Excel data to Salesforce") | |
file_upload = gr.File(label="Upload CSV or Excel file") | |
upload_object = gr.Dropdown( | |
label="Salesforce Object", | |
choices=["Account", "Contact", "Lead", "Opportunity", "Case"], | |
value="Contact" | |
) | |
upload_operation = gr.Dropdown( | |
label="Operation", | |
choices=["Insert", "Update"], | |
value="Insert" | |
) | |
upload_btn = gr.Button("π€ Upload Data", variant="primary") | |
upload_results = gr.Textbox(label="Upload Results", interactive=False) | |
download_results = gr.File(label="Download Results") | |
upload_btn.click( | |
fn=upload_data_to_salesforce, | |
inputs=[file_upload, upload_object, upload_operation], | |
outputs=[upload_results, download_results] | |
) | |
with gr.Tab("π₯ Export"): | |
gr.Markdown("### Export data from Salesforce") | |
export_object = gr.Dropdown( | |
label="Salesforce Object", | |
choices=["Account", "Contact", "Lead", "Opportunity", "Case"], | |
value="Account" | |
) | |
export_limit = gr.Slider( | |
label="Record Limit", | |
minimum=100, | |
maximum=10000, | |
value=1000, | |
step=100 | |
) | |
export_btn = gr.Button("π₯ Export Data", variant="primary") | |
export_results = gr.Textbox(label="Export Results", interactive=False) | |
download_export = gr.File(label="Download Export") | |
export_btn.click( | |
fn=export_data_from_salesforce, | |
inputs=[export_object, export_limit], | |
outputs=[export_results, download_export] | |
) | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860) |