import gradio as gr import pandas as pd from simple_salesforce import Salesforce from datetime import datetime import logging import json from faker import Faker import random # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global connection and state sf_connection = None available_objects = [] object_schemas = {} fake = Faker() def get_salesforce_objects(): """Get list of available Salesforce objects""" global sf_connection, available_objects if not sf_connection: return [] try: # Get commonly used objects and test their accessibility common_objects = [ 'Account', 'Contact', 'Lead', 'Opportunity', 'Case', 'Campaign', 'User', 'Product2', 'Task', 'Event' ] available_objects = [] for obj_name in common_objects: try: obj = getattr(sf_connection, obj_name) obj.describe() available_objects.append(obj_name) except: continue return available_objects except Exception as e: logger.error(f"Error getting objects: {str(e)}") return ['Account', 'Contact', 'Lead'] def get_object_schema(object_name): """Get schema for a specific Salesforce object""" global sf_connection, object_schemas if not sf_connection or not object_name: return {} try: if object_name not in object_schemas: obj = getattr(sf_connection, object_name) metadata = obj.describe() schema = { 'name': object_name, 'label': metadata.get('label', object_name), 'fields': [] } for field in metadata['fields']: if field['createable'] or field['updateable']: field_info = { 'name': field['name'], 'label': field['label'], 'type': field['type'], 'required': not field['nillable'] and not field['defaultedOnCreate'], 'length': field.get('length', 0), 'picklistValues': [pv['value'] for pv in field.get('picklistValues', [])] } schema['fields'].append(field_info) object_schemas[object_name] = schema return object_schemas[object_name] except Exception as e: logger.error(f"Error getting schema for {object_name}: {str(e)}") return {} def generate_test_data(object_name, fields, num_records=100): """Generate test data using Faker for specified object and fields""" try: schema = get_object_schema(object_name) if not schema: return None, "❌ Could not get object schema" records = [] for _ in range(num_records): record = {} for field_name in fields: field_info = next((f for f in schema['fields'] if f['name'] == field_name), None) if not field_info: continue field_type = field_info['type'] # Generate data based on field type and name if field_name.lower() in ['firstname', 'first_name']: record[field_name] = fake.first_name() elif field_name.lower() in ['lastname', 'last_name']: record[field_name] = fake.last_name() elif field_name.lower() in ['name'] and object_name == 'Account': record[field_name] = fake.company() elif field_name.lower() in ['email']: record[field_name] = fake.email() elif field_name.lower() in ['phone']: record[field_name] = fake.phone_number() elif field_name.lower() in ['website']: record[field_name] = fake.url() elif field_name.lower() in ['street', 'mailingstreet', 'billingstreet']: record[field_name] = fake.street_address() elif field_name.lower() in ['city', 'mailingcity', 'billingcity']: record[field_name] = fake.city() elif field_name.lower() in ['state', 'mailingstate', 'billingstate']: record[field_name] = fake.state_abbr() elif field_name.lower() in ['postalcode', 'mailingpostalcode', 'billingpostalcode']: record[field_name] = fake.zipcode() elif field_name.lower() in ['country', 'mailingcountry', 'billingcountry']: record[field_name] = 'US' elif field_type == 'picklist' and field_info['picklistValues']: record[field_name] = random.choice(field_info['picklistValues']) elif field_type == 'boolean': record[field_name] = random.choice([True, False]) elif field_type in ['int', 'double', 'currency']: if 'annual' in field_name.lower() or 'revenue' in field_name.lower(): record[field_name] = random.randint(100000, 10000000) else: record[field_name] = random.randint(1, 1000) elif field_type == 'date': record[field_name] = fake.date_between(start_date='-1y', end_date='today').isoformat() elif field_type == 'datetime': record[field_name] = fake.date_time_between(start_date='-1y', end_date='now').isoformat() elif field_type in ['string', 'textarea']: if field_info['length'] > 100: record[field_name] = fake.text(max_nb_chars=min(field_info['length'], 200)) else: record[field_name] = fake.sentence(nb_words=3)[:field_info['length']] else: # Default string value record[field_name] = f"Test {field_name}" records.append(record) # Create DataFrame df = pd.DataFrame(records) # Save to CSV filename = f"test_data_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" df.to_csv(filename, index=False) return filename, f"✅ Generated {num_records} test records for {object_name}\nFields: {', '.join(fields)}\nFile: {filename}" except Exception as e: logger.error(f"Error generating test data: {str(e)}") return None, f"❌ Error generating test data: {str(e)}" def enhanced_salesforce_operation(username, password, security_token, sandbox, operation, object_name, selected_fields, csv_file, num_records): """Enhanced Salesforce operations with full functionality""" global sf_connection # Step 1: Connect to Salesforce if not username or not password or not security_token: return "❌ Please provide username, password, and security token", None, "[]", "[]" try: domain = 'test' if sandbox else None sf_connection = Salesforce( username=username, password=password, security_token=security_token, domain=domain ) connection_msg = f"✅ Connected to Salesforce as {username}\n" # Get available objects objects = get_salesforce_objects() objects_json = json.dumps(objects) # Step 2: Handle different operations if operation == "connect_only": return connection_msg + f"Available objects: {', '.join(objects)}", None, objects_json, "[]" elif operation == "get_schema": if not object_name: return connection_msg + "❌ Please select an object", None, objects_json, "[]" schema = get_object_schema(object_name) fields = [f"{f['name']} ({f['type']})" for f in schema.get('fields', [])] fields_json = json.dumps([f['name'] for f in schema.get('fields', [])]) return (connection_msg + f"📋 Schema for {object_name}:\n" + f"Fields: {len(fields)}\n" + "\n".join(fields[:20]) + (f"\n... and {len(fields)-20} more fields" if len(fields) > 20 else "")), None, objects_json, fields_json elif operation == "generate_data": if not object_name or not selected_fields: return connection_msg + "❌ Please select object and fields", None, objects_json, "[]" fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields filename, result = generate_test_data(object_name, fields_list, num_records) return connection_msg + result, filename, objects_json, "[]" elif operation == "import_data": if not csv_file: return connection_msg + "❌ Please upload a CSV file", None, objects_json, "[]" if not object_name: return connection_msg + "❌ Please select target object", None, objects_json, "[]" # Read and process file try: if csv_file.name.endswith('.csv'): df = pd.read_csv(csv_file.name) elif csv_file.name.endswith(('.xlsx', '.xls')): df = pd.read_excel(csv_file.name) else: return connection_msg + "❌ Please upload a CSV or Excel file", None, objects_json, "[]" if df.empty: return connection_msg + "❌ The uploaded file is empty", None, objects_json, "[]" # Clean data records = df.to_dict('records') cleaned_records = [] for record in records: cleaned_record = {k: v for k, v in record.items() if pd.notna(v)} cleaned_records.append(cleaned_record) # Import using bulk API result = sf_connection.bulk.__getattr__(object_name).insert(cleaned_records) # Process results success_count = sum(1 for r in result if r.get('success')) error_count = len(result) - success_count import_msg = f"\n📤 Import Results:\n" import_msg += f"Object: {object_name}\n" import_msg += f"Total records: {len(records)}\n" import_msg += f"✅ Successful: {success_count}\n" import_msg += f"❌ Failed: {error_count}\n" if error_count > 0: errors = [r.get('errors', []) for r in result if not r.get('success')] import_msg += f"\nFirst few errors: {str(errors[:3])}" return connection_msg + import_msg, None, objects_json, "[]" except Exception as e: return connection_msg + f"❌ Import error: {str(e)}", None, objects_json, "[]" elif operation == "export_data": if not object_name: return connection_msg + "❌ Please select an object", None, objects_json, "[]" try: schema = get_object_schema(object_name) # Use selected fields or default fields if selected_fields: fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields fields_list = [f.strip() for f in fields_list] else: # Use first 10 fields as default fields_list = [f['name'] for f in schema.get('fields', [])[:10]] fields_str = ', '.join(fields_list) query = f"SELECT {fields_str} FROM {object_name} LIMIT 100" result = sf_connection.query_all(query) records = result['records'] if records: df = pd.DataFrame(records) if 'attributes' in df.columns: df = df.drop('attributes', axis=1) # Save export file export_file = f"export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" df.to_csv(export_file, index=False) export_msg = f"\n📥 Export Results:\n" export_msg += f"Object: {object_name}\n" export_msg += f"Records exported: {len(records)}\n" export_msg += f"Fields: {', '.join(df.columns)}\n" export_msg += f"Sample data:\n{df.head(3).to_string()}" return connection_msg + export_msg, export_file, objects_json, "[]" else: return connection_msg + f"\n❌ No {object_name} records found", None, objects_json, "[]" except Exception as e: return connection_msg + f"\n❌ Export error: {str(e)}", None, objects_json, "[]" else: return connection_msg + "❌ Invalid operation", None, objects_json, "[]" except Exception as e: error_msg = str(e) if "INVALID_LOGIN" in error_msg: return "❌ Invalid credentials. Please check your username, password, and security token.", None, "[]", "[]" elif "API_DISABLED_FOR_ORG" in error_msg: return "❌ API access is disabled. Contact your Salesforce admin.", None, "[]", "[]" elif "LOGIN_MUST_USE_SECURITY_TOKEN" in error_msg: return "❌ Security token required. Append it to your password.", None, "[]", "[]" else: return f"❌ Connection failed: {error_msg}", None, "[]", "[]" # Create the enhanced interface demo = gr.Interface( fn=enhanced_salesforce_operation, inputs=[ gr.Textbox(label="Username", placeholder="your.email@company.com"), gr.Textbox(label="Password", type="password"), gr.Textbox(label="Security Token", type="password"), gr.Checkbox(label="Sandbox Environment"), gr.Dropdown( label="Operation", choices=[ "connect_only", "get_schema", "generate_data", "import_data", "export_data" ], value="connect_only" ), gr.Dropdown(label="Salesforce Object", choices=[], allow_custom_value=True), gr.Textbox(label="Selected Fields (comma-separated)", placeholder="Name,Email,Phone"), gr.File(label="CSV/Excel File (for import)", file_types=[".csv", ".xlsx", ".xls"]), gr.Slider(label="Number of Test Records", minimum=10, maximum=1000, value=100, step=10) ], outputs=[ gr.Textbox(label="Results", lines=15), gr.File(label="Download File"), gr.Textbox(label="Available Objects (JSON)", visible=False), gr.Textbox(label="Available Fields (JSON)", visible=False) ], title="🚀 Enhanced Salesforce Data Loader", description=""" **Advanced Salesforce Data Management Tool** **Workflow:** 1. **Connect**: Enter credentials, select 'connect_only' to see available objects 2. **Get Schema**: Select object, choose 'get_schema' to see fields 3. **Generate Data**: Select fields, choose 'generate_data' to create test data with Faker 4. **Import**: Upload CSV/Excel, select target object, choose 'import_data' 5. **Export**: Select object and fields, choose 'export_data' **Features:** - ✅ Live object detection from your Salesforce org - ✅ Dynamic schema reading with field types - ✅ Intelligent test data generation using Faker - ✅ Field mapping and validation - ✅ Bulk operations for performance - ✅ Relationship data support (Account + Contact) """, examples=[ ["user@company.com", "password123", "token123", False, "connect_only", "", "", None, 100], ] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)