SFDataLoader / app.py
Vishwas1's picture
Upload app.py with huggingface_hub
8e839af verified
raw
history blame
14.6 kB
import gradio as gr
import pandas as pd
import numpy as np
from simple_salesforce import Salesforce
import io
import traceback
from datetime import datetime
import os
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SalesforceDataLoader:
def __init__(self):
self.sf = None
self.connected = False
self.available_objects = []
def connect_to_salesforce(self, username, password, security_token, sandbox=False):
"""Connect to Salesforce using credentials"""
try:
domain = 'test' if sandbox else None
self.sf = Salesforce(
username=username,
password=password,
security_token=security_token,
domain=domain
)
self.connected = True
# Get available objects
self._get_available_objects()
return f"βœ… Successfully connected to Salesforce as {username}", gr.update(visible=True), gr.update(choices=self.available_objects, value=None)
except Exception as e:
logger.error(f"Connection failed: {str(e)}")
self.connected = False
return f"❌ Connection failed: {str(e)}", gr.update(visible=False), gr.update(choices=[], value=None)
def _get_available_objects(self):
"""Get list of available Salesforce objects"""
try:
# Get commonly used objects
common_objects = ['Account', 'Contact', 'Lead', 'Opportunity', 'Case', 'Campaign', 'User', 'Product2']
self.available_objects = []
for obj_name in common_objects:
try:
# Test if object exists and is accessible
getattr(self.sf, obj_name).describe()
self.available_objects.append(obj_name)
except:
continue
except Exception as e:
logger.error(f"Error getting objects: {str(e)}")
self.available_objects = ['Account', 'Contact', 'Lead'] # Fallback
def get_object_fields(self, object_name):
"""Get fields for selected Salesforce object"""
if not self.connected or not object_name:
return gr.update(choices=[], value=None), ""
try:
obj = getattr(self.sf, object_name)
metadata = obj.describe()
fields = []
field_info = []
for field in metadata['fields']:
if field['createable'] or field['updateable']:
field_name = field['name']
field_type = field['type']
required = "Required" if not field['nillable'] and not field['defaultedOnCreate'] else "Optional"
fields.append(field_name)
field_info.append(f"**{field_name}** ({field_type}) - {required}")
field_info_text = "\n".join(field_info[:20]) # Show first 20 fields
if len(field_info) > 20:
field_info_text += f"\n... and {len(field_info) - 20} more fields"
return gr.update(choices=fields, value=None), field_info_text
except Exception as e:
logger.error(f"Error getting fields: {str(e)}")
return gr.update(choices=[], value=None), f"Error: {str(e)}"
def upload_data_to_salesforce(self, file, object_name, operation, external_id_field=None):
"""Upload data to Salesforce"""
if not self.connected:
return "❌ Please connect to Salesforce first", None
if not file or not object_name:
return "❌ Please select a file and object", None
try:
# Read the uploaded file
if file.name.endswith('.csv'):
df = pd.read_csv(file.name)
elif file.name.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file.name)
else:
return "❌ Please upload a CSV or Excel file", None
if df.empty:
return "❌ The uploaded file is empty", None
# Get Salesforce object
sf_object = getattr(self.sf, object_name)
# Prepare data for upload
records = df.to_dict('records')
# Clean data - remove NaN values
for record in records:
record = {k: v for k, v in record.items() if pd.notna(v)}
results = []
errors = []
# Perform bulk operation
if operation == "Insert":
try:
result = sf_object.bulk.insert(records)
results = result
except Exception as e:
return f"❌ Bulk insert failed: {str(e)}", None
elif operation == "Update":
try:
result = sf_object.bulk.update(records)
results = result
except Exception as e:
return f"❌ Bulk update failed: {str(e)}", None
elif operation == "Upsert":
if not external_id_field:
return "❌ External ID field is required for upsert operation", None
try:
result = sf_object.bulk.upsert(records, external_id_field)
results = result
except Exception as e:
return f"❌ Bulk upsert failed: {str(e)}", None
# Process results
success_count = 0
error_count = 0
for result in results:
if result.get('success'):
success_count += 1
else:
error_count += 1
errors.append(result.get('errors', []))
# Create results summary
summary = f"βœ… Operation completed!\n"
summary += f"πŸ“Š Total records: {len(records)}\n"
summary += f"βœ… Successful: {success_count}\n"
summary += f"❌ Failed: {error_count}\n"
if errors:
summary += f"\n**First few errors:**\n"
for i, error in enumerate(errors[:5]):
summary += f"{i+1}. {error}\n"
# Create downloadable results file
results_df = pd.DataFrame(results)
results_csv = results_df.to_csv(index=False)
results_file = f"salesforce_upload_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
with open(results_file, 'w') as f:
f.write(results_csv)
return summary, results_file
except Exception as e:
logger.error(f"Upload error: {str(e)}")
logger.error(traceback.format_exc())
return f"❌ Error: {str(e)}", None
def export_data_from_salesforce(self, object_name, fields, record_limit=1000):
"""Export data from Salesforce"""
if not self.connected:
return "❌ Please connect to Salesforce first", None
if not object_name:
return "❌ Please select an object", None
try:
# Build SOQL query
if not fields:
# Get some default fields
obj = getattr(self.sf, object_name)
metadata = obj.describe()
fields = [field['name'] for field in metadata['fields'][:10]] # First 10 fields
fields_str = ', '.join(fields)
query = f"SELECT {fields_str} FROM {object_name} LIMIT {record_limit}"
# Execute query
result = self.sf.query_all(query)
records = result['records']
if not records:
return "❌ No records found", None
# Convert to DataFrame
df = pd.DataFrame(records)
# Remove Salesforce metadata columns
if 'attributes' in df.columns:
df = df.drop('attributes', axis=1)
# Create downloadable file
export_file = f"salesforce_export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(export_file, index=False)
summary = f"βœ… Export completed!\n"
summary += f"πŸ“Š Records exported: {len(records)}\n"
summary += f"πŸ“‹ Fields: {', '.join(fields)}\n"
summary += f"πŸ“ File: {export_file}"
return summary, export_file
except Exception as e:
logger.error(f"Export error: {str(e)}")
return f"❌ Error: {str(e)}", None
# Initialize the Salesforce Data Loader
sf_loader = SalesforceDataLoader()
# Create the Gradio interface
def create_interface():
with gr.Blocks(title="Salesforce Data Loader", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# πŸš€ Salesforce Data Loader
A powerful tool to upload and download data from Salesforce. Connect with your credentials and start managing your data!
""")
with gr.Tab("πŸ” Connection"):
gr.Markdown("### Connect to Salesforce")
with gr.Row():
with gr.Column():
username = gr.Textbox(label="Username", placeholder="[email protected]")
password = gr.Textbox(label="Password", type="password")
security_token = gr.Textbox(label="Security Token", type="password",
info="Get this from Salesforce Setup β†’ Personal Information β†’ Reset Security Token")
with gr.Column():
sandbox = gr.Checkbox(label="Sandbox Environment", info="Check if connecting to a sandbox")
connect_btn = gr.Button("πŸ”— Connect to Salesforce", variant="primary")
connection_status = gr.Markdown("")
with gr.Tab("πŸ“€ Upload Data", visible=False) as upload_tab:
gr.Markdown("### Upload CSV/Excel data to Salesforce")
with gr.Row():
with gr.Column():
file_upload = gr.File(label="Upload CSV or Excel file", file_types=[".csv", ".xlsx", ".xls"])
object_dropdown = gr.Dropdown(label="Select Salesforce Object", choices=[])
operation_dropdown = gr.Dropdown(
label="Operation Type",
choices=["Insert", "Update", "Upsert"],
value="Insert"
)
external_id_field = gr.Dropdown(label="External ID Field (for Upsert)", choices=[], visible=False)
with gr.Column():
object_fields_info = gr.Markdown("Select an object to see available fields")
upload_btn = gr.Button("πŸ“€ Upload Data", variant="primary")
upload_results = gr.Markdown("")
download_results = gr.File(label="Download Results", visible=False)
with gr.Tab("πŸ“₯ Export Data", visible=False) as export_tab:
gr.Markdown("### Export data from Salesforce")
with gr.Row():
with gr.Column():
export_object = gr.Dropdown(label="Select Object to Export", choices=[])
export_fields = gr.CheckboxGroup(label="Select Fields to Export", choices=[])
record_limit = gr.Slider(minimum=100, maximum=10000, value=1000, step=100,
label="Record Limit")
with gr.Column():
export_fields_info = gr.Markdown("Select an object to see available fields")
export_btn = gr.Button("πŸ“₯ Export Data", variant="primary")
export_results = gr.Markdown("")
download_export = gr.File(label="Download Export", visible=False)
# Event handlers
connect_btn.click(
fn=sf_loader.connect_to_salesforce,
inputs=[username, password, security_token, sandbox],
outputs=[connection_status, upload_tab, object_dropdown]
)
# Update export object dropdown when connection is made
connect_btn.click(
fn=lambda status, visible, choices: gr.update(choices=choices),
inputs=[connection_status, upload_tab, object_dropdown],
outputs=[export_object]
)
object_dropdown.change(
fn=sf_loader.get_object_fields,
inputs=[object_dropdown],
outputs=[external_id_field, object_fields_info]
)
export_object.change(
fn=sf_loader.get_object_fields,
inputs=[export_object],
outputs=[export_fields, export_fields_info]
)
operation_dropdown.change(
fn=lambda op: gr.update(visible=(op == "Upsert")),
inputs=[operation_dropdown],
outputs=[external_id_field]
)
upload_btn.click(
fn=sf_loader.upload_data_to_salesforce,
inputs=[file_upload, object_dropdown, operation_dropdown, external_id_field],
outputs=[upload_results, download_results]
)
download_results.change(
fn=lambda file: gr.update(visible=file is not None),
inputs=[download_results],
outputs=[download_results]
)
export_btn.click(
fn=sf_loader.export_data_from_salesforce,
inputs=[export_object, export_fields, record_limit],
outputs=[export_results, download_export]
)
download_export.change(
fn=lambda file: gr.update(visible=file is not None),
inputs=[download_export],
outputs=[download_export]
)
return interface
# Launch the interface
if __name__ == "__main__":
interface = create_interface()
interface.launch(server_name="0.0.0.0", server_port=7860)