{ "cells": [ { "cell_type": "markdown", "id": "53e9feaa-53de-4377-8d45-aa1f7264ae3a", "metadata": {}, "source": [ "### First Neccesary libararies needs to be loaded." ] }, { "cell_type": "code", "execution_count": 32, "id": "8864f53f-15d2-403c-a905-3da509cfb050", "metadata": {}, "outputs": [], "source": [ "import gradio as gr\n", "import transformers\n", "from transformers import pipeline\n", "import tf_keras as keras\n", "import pandas as pd\n", "import os" ] }, { "cell_type": "markdown", "id": "5bb130f0-d8c8-459d-918f-84025c93bc05", "metadata": {}, "source": [ "### Now we import our already pre-trained model from " ] }, { "cell_type": "code", "execution_count": 13, "id": "1c4e29a8-9b24-47d6-b14b-0e2a7e4d66a1", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertForSequenceClassification: ['bert.embeddings.position_ids']\n", "- This IS expected if you are initializing TFBertForSequenceClassification from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).\n", "- This IS NOT expected if you are initializing TFBertForSequenceClassification from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).\n", "All the weights of TFBertForSequenceClassification were initialized from the PyTorch model.\n", "If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertForSequenceClassification for predictions without further training.\n", "Device set to use 0\n" ] } ], "source": [ "# Load pre-trained spam classifier\n", "spam_classifier = pipeline(\n", " \"text-classification\",\n", " model=\"mrm8488/bert-tiny-finetuned-sms-spam-detection\"\n", ")" ] }, { "cell_type": "code", "execution_count": 4, "id": "8b27be80-1a6c-4c6c-a3ac-fe3b6f1378ae", "metadata": {}, "outputs": [], "source": [ "import tempfile" ] }, { "cell_type": "markdown", "id": "9cb4ab66-2833-40bd-87cb-4d712398e431", "metadata": {}, "source": [ "### Since single email check is hassle we will make a function for batch classication\n", "### we should assume certain file template or format so our program knows what to expect" ] }, { "cell_type": "code", "execution_count": 32, "id": "215fd411-623f-43ce-8775-1bbd2c130b56", "metadata": { "jupyter": { "source_hidden": true } }, "outputs": [], "source": [ "def classify_batch(file):\n", " \"\"\"Process uploaded CSV/TXT file with multiple emails\"\"\"\n", " results = []\n", " if file.name.endswith('.csv'): # Handling the emails in CSV files format\n", " df = pd.read_csv(file)\n", " emails = df['email'].tolist() # we assume there's a column named 'email'\n", " for idx, email in enumerate(emails):\n", " prediction = spam_classifier(email)[0]\n", " results.append({\n", " \"email\": email[:50] + \"...\", # Truncate for display\n", " \"label\": \"SPAM\" if prediction[\"label\"] == \"LABEL_1\" else \"HAM\",\n", " \"confidence\": prediction[\"score\"]\n", " })\n", "\n", " ### Now we almost do the same thing but for text files (one email per line)\n", " elif file.name.endswith('.txt'):\n", " with open(file.name, 'r') as f:\n", " emails = f.readlines()\n", " for email in emails:\n", " prediction = spam_classifier(email.strip())[0]\n", " results.append({\n", " \"email\": email.strip()[:50] + \"...\",\n", " \"label\": \"SPAM\" if prediction[\"label\"] == \"LABEL_1\" else \"HAM\",\n", " \"confidence\": f\"{prediction['score']:.4f}\"\n", " })\n", " ### --------------- Here we implemnt some condition for our uploaded files __________\n", " try:\n", " results = []\n", " if not file.name:\n", " raise gr.Error(\"No file uploaded\")\n", " # Handle CSV files\n", " if file.name.endswith('.csv'):\n", " df = pd.read_csv(file)\n", " if 'email' not in df.columns:\n", " raise gr.Error(\"CSV file must contain 'email' column\")\n", " emails = df['email'].tolist()\n", " \n", " # Handle text files\n", " elif file.name.endswith('.txt'):\n", " with open(file.name, 'r') as f:\n", " emails = f.readlines()\n", " else:\n", " raise gr.Error(\"Unsupported file format. Only CSV/TXT accepted\")\n", " \n", " # Limit to 100 emails max for demo\n", " emails = emails[:100]\n", "\n", "\n", " except gr.Error as e:\n", " raise e # Re-raise Gradio errors to show pop-up\n", " except Exception as e:\n", " raise gr.Error(f\"An unexpected error occurred: {str(e)}\")\n", "\n", "\n", " return pd.DataFrame(results)" ] }, { "cell_type": "code", "execution_count": 27, "id": "b8ae7b3b-5273-4242-85db-b5cb622a4046", "metadata": {}, "outputs": [], "source": [ "def classify_batch(file):\n", " \"\"\"Process uploaded CSV/TXT file with multiple emails\"\"\"\n", " try:\n", " results = []\n", " \n", " # Check if file exists\n", " if not file.name:\n", " raise gr.Error(\"No file uploaded\")\n", "\n", " # --- CSV File Handling ---\n", " if file.name.endswith('.csv'):\n", " df = pd.read_csv(file)\n", " \n", " # Check for required email column\n", " if 'email' not in df.columns:\n", " raise gr.Error(\"CSV file must contain a column named 'email'\")\n", " \n", " emails = df['email'].tolist()\n", "\n", " # --- Text File Handling ---\n", " elif file.name.endswith('.txt'):\n", " with open(file.name, 'r') as f:\n", " emails = f.readlines()\n", " \n", " # --- Unsupported Format ---\n", " else:\n", " raise gr.Error(\"Unsupported file format. Only CSV/TXT accepted\")\n", "\n", " # Process emails (common for both formats)\n", " emails = emails[:100] # Limit to 100 emails\n", " for email in emails:\n", " # Handle empty lines in text files\n", " if not email.strip():\n", " continue\n", " \n", " prediction = spam_classifier(email.strip())[0]\n", " results.append({\n", " \"email\": email.strip()[:50] + \"...\",\n", " \"label\": \"SPAM\" if prediction[\"label\"] == \"LABEL_1\" else \"HAM\",\n", " \"confidence\": f\"{prediction['score']:.4f}\"\n", " })\n", "\n", " return pd.DataFrame(results)\n", "\n", " except gr.Error as e:\n", " raise e # Show pop-up for expected errors\n", " except Exception as e:\n", " raise gr.Error(f\"Processing error: {str(e)}\")" ] }, { "cell_type": "markdown", "id": "6ccb5108-a5d4-4f61-b363-dc4c9d25b4fb", "metadata": {}, "source": [ "### We define simple function for classification" ] }, { "cell_type": "code", "execution_count": 28, "id": "1336344b-54c3-431d-8d89-c351b0c24f80", "metadata": {}, "outputs": [], "source": [ "def classify_text(text):\n", " result = spam_classifier(text)[0]\n", " return {\n", " \"Spam\": result[\"score\"] if result[\"label\"] == \"LABEL_1\" else 1 - result[\"score\"],\n", " \"Ham\": result[\"score\"] if result[\"label\"] == \"LABEL_0\" else 1 - result[\"score\"]\n", " }" ] }, { "cell_type": "code", "execution_count": 33, "id": "c428e83a-dbe6-4c91-8a05-5b550652c181", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...\n", "To disable this warning, you can either:\n", "\t- Avoid using `tokenizers` before the fork if possible\n", "\t- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "* Running on local URL: http://127.0.0.1:7867\n", "Caching examples at: '/Users/techgarage/Projects/spamedar/.gradio/cached_examples/318'\n", "\n", "To create a public link, set `share=True` in `launch()`.\n" ] }, { "data": { "text/html": [ "
" ], "text/plain": [ "