Spaces:
Build error
Build error
import gradio as gr | |
import pandas as pd | |
from simple_salesforce import Salesforce | |
from datetime import datetime | |
import logging | |
import json | |
from faker import Faker | |
import random | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global connection and state | |
sf_connection = None | |
available_objects = [] | |
object_schemas = {} | |
fake = Faker() | |
def get_salesforce_objects(): | |
"""Get list of available Salesforce objects""" | |
global sf_connection, available_objects | |
if not sf_connection: | |
return [] | |
try: | |
# Get commonly used objects and test their accessibility | |
common_objects = [ | |
'Account', 'Contact', 'Lead', 'Opportunity', 'Case', | |
'Campaign', 'User', 'Product2', 'Task', 'Event' | |
] | |
available_objects = [] | |
for obj_name in common_objects: | |
try: | |
obj = getattr(sf_connection, obj_name) | |
obj.describe() | |
available_objects.append(obj_name) | |
except: | |
continue | |
return available_objects | |
except Exception as e: | |
logger.error(f"Error getting objects: {str(e)}") | |
return ['Account', 'Contact', 'Lead'] | |
def get_object_schema(object_name): | |
"""Get schema for a specific Salesforce object""" | |
global sf_connection, object_schemas | |
if not sf_connection or not object_name: | |
return {} | |
try: | |
if object_name not in object_schemas: | |
obj = getattr(sf_connection, object_name) | |
metadata = obj.describe() | |
schema = { | |
'name': object_name, | |
'label': metadata.get('label', object_name), | |
'fields': [] | |
} | |
for field in metadata['fields']: | |
if field['createable'] or field['updateable']: | |
field_info = { | |
'name': field['name'], | |
'label': field['label'], | |
'type': field['type'], | |
'required': not field['nillable'] and not field['defaultedOnCreate'], | |
'length': field.get('length', 0), | |
'picklistValues': [pv['value'] for pv in field.get('picklistValues', [])] | |
} | |
schema['fields'].append(field_info) | |
object_schemas[object_name] = schema | |
return object_schemas[object_name] | |
except Exception as e: | |
logger.error(f"Error getting schema for {object_name}: {str(e)}") | |
return {} | |
def generate_test_data(object_name, fields, num_records=100): | |
"""Generate test data using Faker for specified object and fields""" | |
try: | |
schema = get_object_schema(object_name) | |
if not schema: | |
return None, "β Could not get object schema" | |
records = [] | |
for _ in range(num_records): | |
record = {} | |
for field_name in fields: | |
field_info = next((f for f in schema['fields'] if f['name'] == field_name), None) | |
if not field_info: | |
continue | |
field_type = field_info['type'] | |
# Generate data based on field type and name | |
if field_name.lower() in ['firstname', 'first_name']: | |
record[field_name] = fake.first_name() | |
elif field_name.lower() in ['lastname', 'last_name']: | |
record[field_name] = fake.last_name() | |
elif field_name.lower() in ['name'] and object_name == 'Account': | |
record[field_name] = fake.company() | |
elif field_name.lower() in ['email']: | |
record[field_name] = fake.email() | |
elif field_name.lower() in ['phone']: | |
record[field_name] = fake.phone_number() | |
elif field_name.lower() in ['website']: | |
record[field_name] = fake.url() | |
elif field_name.lower() in ['street', 'mailingstreet', 'billingstreet']: | |
record[field_name] = fake.street_address() | |
elif field_name.lower() in ['city', 'mailingcity', 'billingcity']: | |
record[field_name] = fake.city() | |
elif field_name.lower() in ['state', 'mailingstate', 'billingstate']: | |
record[field_name] = fake.state_abbr() | |
elif field_name.lower() in ['postalcode', 'mailingpostalcode', 'billingpostalcode']: | |
record[field_name] = fake.zipcode() | |
elif field_name.lower() in ['country', 'mailingcountry', 'billingcountry']: | |
record[field_name] = 'US' | |
elif field_type == 'picklist' and field_info['picklistValues']: | |
record[field_name] = random.choice(field_info['picklistValues']) | |
elif field_type == 'boolean': | |
record[field_name] = random.choice([True, False]) | |
elif field_type in ['int', 'double', 'currency']: | |
if 'annual' in field_name.lower() or 'revenue' in field_name.lower(): | |
record[field_name] = random.randint(100000, 10000000) | |
else: | |
record[field_name] = random.randint(1, 1000) | |
elif field_type == 'date': | |
record[field_name] = fake.date_between(start_date='-1y', end_date='today').isoformat() | |
elif field_type == 'datetime': | |
record[field_name] = fake.date_time_between(start_date='-1y', end_date='now').isoformat() | |
elif field_type in ['string', 'textarea']: | |
if field_info['length'] > 100: | |
record[field_name] = fake.text(max_nb_chars=min(field_info['length'], 200)) | |
else: | |
record[field_name] = fake.sentence(nb_words=3)[:field_info['length']] | |
else: | |
# Default string value | |
record[field_name] = f"Test {field_name}" | |
records.append(record) | |
# Create DataFrame | |
df = pd.DataFrame(records) | |
# Save to CSV | |
filename = f"test_data_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
df.to_csv(filename, index=False) | |
return filename, f"β Generated {num_records} test records for {object_name}\nFields: {', '.join(fields)}\nFile: {filename}" | |
except Exception as e: | |
logger.error(f"Error generating test data: {str(e)}") | |
return None, f"β Error generating test data: {str(e)}" | |
def enhanced_salesforce_operation(username, password, security_token, sandbox, operation, | |
object_name, selected_fields, csv_file, num_records): | |
"""Enhanced Salesforce operations with full functionality""" | |
global sf_connection | |
# Step 1: Connect to Salesforce | |
if not username or not password or not security_token: | |
return "β Please provide username, password, and security token", None, "[]", "[]" | |
try: | |
domain = 'test' if sandbox else None | |
sf_connection = Salesforce( | |
username=username, | |
password=password, | |
security_token=security_token, | |
domain=domain | |
) | |
connection_msg = f"β Connected to Salesforce as {username}\n" | |
# Get available objects | |
objects = get_salesforce_objects() | |
objects_json = json.dumps(objects) | |
# Step 2: Handle different operations | |
if operation == "connect_only": | |
return connection_msg + f"Available objects: {', '.join(objects)}", None, objects_json, "[]" | |
elif operation == "get_schema": | |
if not object_name: | |
return connection_msg + "β Please select an object", None, objects_json, "[]" | |
schema = get_object_schema(object_name) | |
fields = [f"{f['name']} ({f['type']})" for f in schema.get('fields', [])] | |
fields_json = json.dumps([f['name'] for f in schema.get('fields', [])]) | |
return (connection_msg + f"π Schema for {object_name}:\n" + | |
f"Fields: {len(fields)}\n" + "\n".join(fields[:20]) + | |
(f"\n... and {len(fields)-20} more fields" if len(fields) > 20 else "")), None, objects_json, fields_json | |
elif operation == "generate_data": | |
if not object_name or not selected_fields: | |
return connection_msg + "β Please select object and fields", None, objects_json, "[]" | |
fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields | |
filename, result = generate_test_data(object_name, fields_list, num_records) | |
return connection_msg + result, filename, objects_json, "[]" | |
elif operation == "import_data": | |
if not csv_file: | |
return connection_msg + "β Please upload a CSV file", None, objects_json, "[]" | |
if not object_name: | |
return connection_msg + "β Please select target object", None, objects_json, "[]" | |
# Read and process file | |
try: | |
if csv_file.name.endswith('.csv'): | |
df = pd.read_csv(csv_file.name) | |
elif csv_file.name.endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(csv_file.name) | |
else: | |
return connection_msg + "β Please upload a CSV or Excel file", None, objects_json, "[]" | |
if df.empty: | |
return connection_msg + "β The uploaded file is empty", None, objects_json, "[]" | |
# Clean data | |
records = df.to_dict('records') | |
cleaned_records = [] | |
for record in records: | |
cleaned_record = {k: v for k, v in record.items() if pd.notna(v)} | |
cleaned_records.append(cleaned_record) | |
# Import using bulk API | |
result = sf_connection.bulk.__getattr__(object_name).insert(cleaned_records) | |
# Process results | |
success_count = sum(1 for r in result if r.get('success')) | |
error_count = len(result) - success_count | |
import_msg = f"\nπ€ Import Results:\n" | |
import_msg += f"Object: {object_name}\n" | |
import_msg += f"Total records: {len(records)}\n" | |
import_msg += f"β Successful: {success_count}\n" | |
import_msg += f"β Failed: {error_count}\n" | |
if error_count > 0: | |
errors = [r.get('errors', []) for r in result if not r.get('success')] | |
import_msg += f"\nFirst few errors: {str(errors[:3])}" | |
return connection_msg + import_msg, None, objects_json, "[]" | |
except Exception as e: | |
return connection_msg + f"β Import error: {str(e)}", None, objects_json, "[]" | |
elif operation == "export_data": | |
if not object_name: | |
return connection_msg + "β Please select an object", None, objects_json, "[]" | |
try: | |
schema = get_object_schema(object_name) | |
# Use selected fields or default fields | |
if selected_fields: | |
fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields | |
fields_list = [f.strip() for f in fields_list] | |
else: | |
# Use first 10 fields as default | |
fields_list = [f['name'] for f in schema.get('fields', [])[:10]] | |
fields_str = ', '.join(fields_list) | |
query = f"SELECT {fields_str} FROM {object_name} LIMIT 100" | |
result = sf_connection.query_all(query) | |
records = result['records'] | |
if records: | |
df = pd.DataFrame(records) | |
if 'attributes' in df.columns: | |
df = df.drop('attributes', axis=1) | |
# Save export file | |
export_file = f"export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
df.to_csv(export_file, index=False) | |
export_msg = f"\nπ₯ Export Results:\n" | |
export_msg += f"Object: {object_name}\n" | |
export_msg += f"Records exported: {len(records)}\n" | |
export_msg += f"Fields: {', '.join(df.columns)}\n" | |
export_msg += f"Sample data:\n{df.head(3).to_string()}" | |
return connection_msg + export_msg, export_file, objects_json, "[]" | |
else: | |
return connection_msg + f"\nβ No {object_name} records found", None, objects_json, "[]" | |
except Exception as e: | |
return connection_msg + f"\nβ Export error: {str(e)}", None, objects_json, "[]" | |
else: | |
return connection_msg + "β Invalid operation", None, objects_json, "[]" | |
except Exception as e: | |
error_msg = str(e) | |
if "INVALID_LOGIN" in error_msg: | |
return "β Invalid credentials. Please check your username, password, and security token.", None, "[]", "[]" | |
elif "API_DISABLED_FOR_ORG" in error_msg: | |
return "β API access is disabled. Contact your Salesforce admin.", None, "[]", "[]" | |
elif "LOGIN_MUST_USE_SECURITY_TOKEN" in error_msg: | |
return "β Security token required. Append it to your password.", None, "[]", "[]" | |
else: | |
return f"β Connection failed: {error_msg}", None, "[]", "[]" | |
# Create the enhanced interface | |
demo = gr.Interface( | |
fn=enhanced_salesforce_operation, | |
inputs=[ | |
gr.Textbox(label="Username", placeholder="[email protected]"), | |
gr.Textbox(label="Password", type="password"), | |
gr.Textbox(label="Security Token", type="password"), | |
gr.Checkbox(label="Sandbox Environment"), | |
gr.Dropdown( | |
label="Operation", | |
choices=[ | |
"connect_only", | |
"get_schema", | |
"generate_data", | |
"import_data", | |
"export_data" | |
], | |
value="connect_only" | |
), | |
gr.Dropdown(label="Salesforce Object", choices=[], allow_custom_value=True), | |
gr.Textbox(label="Selected Fields (comma-separated)", placeholder="Name,Email,Phone"), | |
gr.File(label="CSV/Excel File (for import)", file_types=[".csv", ".xlsx", ".xls"]), | |
gr.Slider(label="Number of Test Records", minimum=10, maximum=1000, value=100, step=10) | |
], | |
outputs=[ | |
gr.Textbox(label="Results", lines=15), | |
gr.File(label="Download File"), | |
gr.Textbox(label="Available Objects (JSON)", visible=False), | |
gr.Textbox(label="Available Fields (JSON)", visible=False) | |
], | |
title="π Enhanced Salesforce Data Loader", | |
description=""" | |
**Advanced Salesforce Data Management Tool** | |
**Workflow:** | |
1. **Connect**: Enter credentials, select 'connect_only' to see available objects | |
2. **Get Schema**: Select object, choose 'get_schema' to see fields | |
3. **Generate Data**: Select fields, choose 'generate_data' to create test data with Faker | |
4. **Import**: Upload CSV/Excel, select target object, choose 'import_data' | |
5. **Export**: Select object and fields, choose 'export_data' | |
**Features:** | |
- β Live object detection from your Salesforce org | |
- β Dynamic schema reading with field types | |
- β Intelligent test data generation using Faker | |
- β Field mapping and validation | |
- β Bulk operations for performance | |
- β Relationship data support (Account + Contact) | |
""", | |
examples=[ | |
["[email protected]", "password123", "token123", False, "connect_only", "", "", None, 100], | |
] | |
) | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=7860) |