SFDataLoader / app.py
Vishwas1's picture
πŸš€ ENHANCED VERSION: Live object selection, schema reading, Faker test data generation, field selection
3cc6bdd verified
import gradio as gr
import pandas as pd
from simple_salesforce import Salesforce
from datetime import datetime
import logging
import json
from faker import Faker
import random
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global connection and state
sf_connection = None
available_objects = []
object_schemas = {}
fake = Faker()
def get_salesforce_objects():
"""Get list of available Salesforce objects"""
global sf_connection, available_objects
if not sf_connection:
return []
try:
# Get commonly used objects and test their accessibility
common_objects = [
'Account', 'Contact', 'Lead', 'Opportunity', 'Case',
'Campaign', 'User', 'Product2', 'Task', 'Event'
]
available_objects = []
for obj_name in common_objects:
try:
obj = getattr(sf_connection, obj_name)
obj.describe()
available_objects.append(obj_name)
except:
continue
return available_objects
except Exception as e:
logger.error(f"Error getting objects: {str(e)}")
return ['Account', 'Contact', 'Lead']
def get_object_schema(object_name):
"""Get schema for a specific Salesforce object"""
global sf_connection, object_schemas
if not sf_connection or not object_name:
return {}
try:
if object_name not in object_schemas:
obj = getattr(sf_connection, object_name)
metadata = obj.describe()
schema = {
'name': object_name,
'label': metadata.get('label', object_name),
'fields': []
}
for field in metadata['fields']:
if field['createable'] or field['updateable']:
field_info = {
'name': field['name'],
'label': field['label'],
'type': field['type'],
'required': not field['nillable'] and not field['defaultedOnCreate'],
'length': field.get('length', 0),
'picklistValues': [pv['value'] for pv in field.get('picklistValues', [])]
}
schema['fields'].append(field_info)
object_schemas[object_name] = schema
return object_schemas[object_name]
except Exception as e:
logger.error(f"Error getting schema for {object_name}: {str(e)}")
return {}
def generate_test_data(object_name, fields, num_records=100):
"""Generate test data using Faker for specified object and fields"""
try:
schema = get_object_schema(object_name)
if not schema:
return None, "❌ Could not get object schema"
records = []
for _ in range(num_records):
record = {}
for field_name in fields:
field_info = next((f for f in schema['fields'] if f['name'] == field_name), None)
if not field_info:
continue
field_type = field_info['type']
# Generate data based on field type and name
if field_name.lower() in ['firstname', 'first_name']:
record[field_name] = fake.first_name()
elif field_name.lower() in ['lastname', 'last_name']:
record[field_name] = fake.last_name()
elif field_name.lower() in ['name'] and object_name == 'Account':
record[field_name] = fake.company()
elif field_name.lower() in ['email']:
record[field_name] = fake.email()
elif field_name.lower() in ['phone']:
record[field_name] = fake.phone_number()
elif field_name.lower() in ['website']:
record[field_name] = fake.url()
elif field_name.lower() in ['street', 'mailingstreet', 'billingstreet']:
record[field_name] = fake.street_address()
elif field_name.lower() in ['city', 'mailingcity', 'billingcity']:
record[field_name] = fake.city()
elif field_name.lower() in ['state', 'mailingstate', 'billingstate']:
record[field_name] = fake.state_abbr()
elif field_name.lower() in ['postalcode', 'mailingpostalcode', 'billingpostalcode']:
record[field_name] = fake.zipcode()
elif field_name.lower() in ['country', 'mailingcountry', 'billingcountry']:
record[field_name] = 'US'
elif field_type == 'picklist' and field_info['picklistValues']:
record[field_name] = random.choice(field_info['picklistValues'])
elif field_type == 'boolean':
record[field_name] = random.choice([True, False])
elif field_type in ['int', 'double', 'currency']:
if 'annual' in field_name.lower() or 'revenue' in field_name.lower():
record[field_name] = random.randint(100000, 10000000)
else:
record[field_name] = random.randint(1, 1000)
elif field_type == 'date':
record[field_name] = fake.date_between(start_date='-1y', end_date='today').isoformat()
elif field_type == 'datetime':
record[field_name] = fake.date_time_between(start_date='-1y', end_date='now').isoformat()
elif field_type in ['string', 'textarea']:
if field_info['length'] > 100:
record[field_name] = fake.text(max_nb_chars=min(field_info['length'], 200))
else:
record[field_name] = fake.sentence(nb_words=3)[:field_info['length']]
else:
# Default string value
record[field_name] = f"Test {field_name}"
records.append(record)
# Create DataFrame
df = pd.DataFrame(records)
# Save to CSV
filename = f"test_data_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(filename, index=False)
return filename, f"βœ… Generated {num_records} test records for {object_name}\nFields: {', '.join(fields)}\nFile: {filename}"
except Exception as e:
logger.error(f"Error generating test data: {str(e)}")
return None, f"❌ Error generating test data: {str(e)}"
def enhanced_salesforce_operation(username, password, security_token, sandbox, operation,
object_name, selected_fields, csv_file, num_records):
"""Enhanced Salesforce operations with full functionality"""
global sf_connection
# Step 1: Connect to Salesforce
if not username or not password or not security_token:
return "❌ Please provide username, password, and security token", None, "[]", "[]"
try:
domain = 'test' if sandbox else None
sf_connection = Salesforce(
username=username,
password=password,
security_token=security_token,
domain=domain
)
connection_msg = f"βœ… Connected to Salesforce as {username}\n"
# Get available objects
objects = get_salesforce_objects()
objects_json = json.dumps(objects)
# Step 2: Handle different operations
if operation == "connect_only":
return connection_msg + f"Available objects: {', '.join(objects)}", None, objects_json, "[]"
elif operation == "get_schema":
if not object_name:
return connection_msg + "❌ Please select an object", None, objects_json, "[]"
schema = get_object_schema(object_name)
fields = [f"{f['name']} ({f['type']})" for f in schema.get('fields', [])]
fields_json = json.dumps([f['name'] for f in schema.get('fields', [])])
return (connection_msg + f"πŸ“‹ Schema for {object_name}:\n" +
f"Fields: {len(fields)}\n" + "\n".join(fields[:20]) +
(f"\n... and {len(fields)-20} more fields" if len(fields) > 20 else "")), None, objects_json, fields_json
elif operation == "generate_data":
if not object_name or not selected_fields:
return connection_msg + "❌ Please select object and fields", None, objects_json, "[]"
fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields
filename, result = generate_test_data(object_name, fields_list, num_records)
return connection_msg + result, filename, objects_json, "[]"
elif operation == "import_data":
if not csv_file:
return connection_msg + "❌ Please upload a CSV file", None, objects_json, "[]"
if not object_name:
return connection_msg + "❌ Please select target object", None, objects_json, "[]"
# Read and process file
try:
if csv_file.name.endswith('.csv'):
df = pd.read_csv(csv_file.name)
elif csv_file.name.endswith(('.xlsx', '.xls')):
df = pd.read_excel(csv_file.name)
else:
return connection_msg + "❌ Please upload a CSV or Excel file", None, objects_json, "[]"
if df.empty:
return connection_msg + "❌ The uploaded file is empty", None, objects_json, "[]"
# Clean data
records = df.to_dict('records')
cleaned_records = []
for record in records:
cleaned_record = {k: v for k, v in record.items() if pd.notna(v)}
cleaned_records.append(cleaned_record)
# Import using bulk API
result = sf_connection.bulk.__getattr__(object_name).insert(cleaned_records)
# Process results
success_count = sum(1 for r in result if r.get('success'))
error_count = len(result) - success_count
import_msg = f"\nπŸ“€ Import Results:\n"
import_msg += f"Object: {object_name}\n"
import_msg += f"Total records: {len(records)}\n"
import_msg += f"βœ… Successful: {success_count}\n"
import_msg += f"❌ Failed: {error_count}\n"
if error_count > 0:
errors = [r.get('errors', []) for r in result if not r.get('success')]
import_msg += f"\nFirst few errors: {str(errors[:3])}"
return connection_msg + import_msg, None, objects_json, "[]"
except Exception as e:
return connection_msg + f"❌ Import error: {str(e)}", None, objects_json, "[]"
elif operation == "export_data":
if not object_name:
return connection_msg + "❌ Please select an object", None, objects_json, "[]"
try:
schema = get_object_schema(object_name)
# Use selected fields or default fields
if selected_fields:
fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields
fields_list = [f.strip() for f in fields_list]
else:
# Use first 10 fields as default
fields_list = [f['name'] for f in schema.get('fields', [])[:10]]
fields_str = ', '.join(fields_list)
query = f"SELECT {fields_str} FROM {object_name} LIMIT 100"
result = sf_connection.query_all(query)
records = result['records']
if records:
df = pd.DataFrame(records)
if 'attributes' in df.columns:
df = df.drop('attributes', axis=1)
# Save export file
export_file = f"export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(export_file, index=False)
export_msg = f"\nπŸ“₯ Export Results:\n"
export_msg += f"Object: {object_name}\n"
export_msg += f"Records exported: {len(records)}\n"
export_msg += f"Fields: {', '.join(df.columns)}\n"
export_msg += f"Sample data:\n{df.head(3).to_string()}"
return connection_msg + export_msg, export_file, objects_json, "[]"
else:
return connection_msg + f"\n❌ No {object_name} records found", None, objects_json, "[]"
except Exception as e:
return connection_msg + f"\n❌ Export error: {str(e)}", None, objects_json, "[]"
else:
return connection_msg + "❌ Invalid operation", None, objects_json, "[]"
except Exception as e:
error_msg = str(e)
if "INVALID_LOGIN" in error_msg:
return "❌ Invalid credentials. Please check your username, password, and security token.", None, "[]", "[]"
elif "API_DISABLED_FOR_ORG" in error_msg:
return "❌ API access is disabled. Contact your Salesforce admin.", None, "[]", "[]"
elif "LOGIN_MUST_USE_SECURITY_TOKEN" in error_msg:
return "❌ Security token required. Append it to your password.", None, "[]", "[]"
else:
return f"❌ Connection failed: {error_msg}", None, "[]", "[]"
# Create the enhanced interface
demo = gr.Interface(
fn=enhanced_salesforce_operation,
inputs=[
gr.Textbox(label="Username", placeholder="[email protected]"),
gr.Textbox(label="Password", type="password"),
gr.Textbox(label="Security Token", type="password"),
gr.Checkbox(label="Sandbox Environment"),
gr.Dropdown(
label="Operation",
choices=[
"connect_only",
"get_schema",
"generate_data",
"import_data",
"export_data"
],
value="connect_only"
),
gr.Dropdown(label="Salesforce Object", choices=[], allow_custom_value=True),
gr.Textbox(label="Selected Fields (comma-separated)", placeholder="Name,Email,Phone"),
gr.File(label="CSV/Excel File (for import)", file_types=[".csv", ".xlsx", ".xls"]),
gr.Slider(label="Number of Test Records", minimum=10, maximum=1000, value=100, step=10)
],
outputs=[
gr.Textbox(label="Results", lines=15),
gr.File(label="Download File"),
gr.Textbox(label="Available Objects (JSON)", visible=False),
gr.Textbox(label="Available Fields (JSON)", visible=False)
],
title="πŸš€ Enhanced Salesforce Data Loader",
description="""
**Advanced Salesforce Data Management Tool**
**Workflow:**
1. **Connect**: Enter credentials, select 'connect_only' to see available objects
2. **Get Schema**: Select object, choose 'get_schema' to see fields
3. **Generate Data**: Select fields, choose 'generate_data' to create test data with Faker
4. **Import**: Upload CSV/Excel, select target object, choose 'import_data'
5. **Export**: Select object and fields, choose 'export_data'
**Features:**
- βœ… Live object detection from your Salesforce org
- βœ… Dynamic schema reading with field types
- βœ… Intelligent test data generation using Faker
- βœ… Field mapping and validation
- βœ… Bulk operations for performance
- βœ… Relationship data support (Account + Contact)
""",
examples=[
["[email protected]", "password123", "token123", False, "connect_only", "", "", None, 100],
]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)