Spaces:
Runtime error
Runtime error
# Import Data Science Libraries | |
import gradio as gr | |
import os | |
import requests | |
import gdown | |
import zipfile | |
import pandas as pd | |
from pathlib import Path | |
from PIL import Image, UnidentifiedImageError | |
import numpy as np | |
import tensorflow as tf | |
from sklearn.model_selection import train_test_split | |
import itertools | |
import random | |
# Import visualization libraries | |
import matplotlib.pyplot as plt | |
import matplotlib.cm as cm | |
import cv2 | |
import seaborn as sns | |
# Tensorflow Libraries | |
from tensorflow import keras | |
from tensorflow.keras import layers, models | |
from tensorflow.keras.preprocessing.image import ImageDataGenerator | |
from tensorflow.keras.layers import Dense, Dropout | |
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.applications import MobileNetV2 | |
from tensorflow.keras import Model | |
from keras.layers import Dense, Flatten, Dropout, BatchNormalization | |
# System libraries | |
from pathlib import Path | |
import os.path | |
# Metrics | |
from sklearn.metrics import classification_report, confusion_matrix | |
sns.set(style='darkgrid') | |
# Seed Everything to reproduce results for future use cases | |
def seed_everything(seed=42): | |
# Seed value for TensorFlow | |
tf.random.set_seed(seed) | |
# Seed value for NumPy | |
np.random.seed(seed) | |
# Seed value for Python's random library | |
random.seed(seed) | |
# Force TensorFlow to use single thread | |
# Multiple threads are a potential source of non-reproducible results. | |
session_conf = tf.compat.v1.ConfigProto( | |
intra_op_parallelism_threads=1, | |
inter_op_parallelism_threads=1 | |
) | |
# Make sure that TensorFlow uses a deterministic operation wherever possible | |
tf.compat.v1.set_random_seed(seed) | |
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf) | |
tf.compat.v1.keras.backend.set_session(sess) | |
seed_everything() | |
# URL of the file you want to download | |
url = "https://raw.githubusercontent.com/mrdbourke/tensorflow-deep-learning/main/extras/helper_functions.py" | |
# Send a GET request to the URL | |
response = requests.get(url) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
# Save the content of the response (the file) to a local file | |
with open("helper_functions.py", "wb") as f: | |
f.write(response.content) | |
print("File downloaded successfully!") | |
else: | |
print("Failed to download file") | |
# Import series of helper functions for our notebook | |
from helper_functions import create_tensorboard_callback, plot_loss_curves, unzip_data, compare_historys, walk_through_dir, pred_and_plot | |
BATCH_SIZE = 32 | |
TARGET_SIZE = (224, 224) | |
# Define the Google Drive shareable link | |
gdrive_url = 'https://drive.google.com/file/d/1HjHYlQyRz5oWt8kehkt1TiOGRRlKFsv8/view?usp=drive_link' | |
# Extract the file ID from the URL | |
file_id = gdrive_url.split('/d/')[1].split('/view')[0] | |
direct_download_url = f'https://drive.google.com/uc?id={file_id}' | |
# Define the local filename to save the ZIP file | |
local_zip_file = 'file.zip' | |
# Download the ZIP file | |
gdown.download(direct_download_url, local_zip_file, quiet=False) | |
# Directory to extract files | |
extracted_path = 'extracted_files' | |
# Verify if the downloaded file is a ZIP file and extract it | |
try: | |
with zipfile.ZipFile(local_zip_file, 'r') as zip_ref: | |
zip_ref.extractall(extracted_path) | |
print("Extraction successful!") | |
except zipfile.BadZipFile: | |
print("Error: The downloaded file is not a valid ZIP file.") | |
# Optionally, you can delete the ZIP file after extraction | |
os.remove(local_zip_file) | |
# Convert the extracted directory path to a pathlib.Path object | |
data_dir = Path(extracted_path) | |
# Print the directory structure to debug | |
for root, dirs, files in os.walk(extracted_path): | |
level = root.replace(extracted_path, '').count(os.sep) | |
indent = ' ' * 4 * (level) | |
print(f"{indent}{os.path.basename(root)}/") | |
subindent = ' ' * 4 * (level + 1) | |
for f in files: | |
print(f"{subindent}{f}") | |
# Function to convert the directory path to a DataFrame | |
def convert_path_to_df(dataset): | |
image_dir = Path(dataset) | |
# Get filepaths and labels | |
filepaths = list(image_dir.glob(r'**/*.JPG')) + list(image_dir.glob(r'**/*.jpg')) + list(image_dir.glob(r'**/*.png')) + list(image_dir.glob(r'**/*.PNG')) | |
labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths)) | |
filepaths = pd.Series(filepaths, name='Filepath').astype(str) | |
labels = pd.Series(labels, name='Label') | |
# Concatenate filepaths and labels | |
image_df = pd.concat([filepaths, labels], axis=1) | |
return image_df | |
# Path to the dataset directory | |
data_dir = Path('extracted_files/Pest_Dataset') | |
image_df = convert_path_to_df(data_dir) | |
# Check for corrupted images within the dataset | |
for img_p in data_dir.rglob("*.jpg"): | |
try: | |
img = Image.open(img_p) | |
except UnidentifiedImageError: | |
print(f"Corrupted image file: {img_p}") | |
# You can save the DataFrame to a CSV for further use | |
image_df.to_csv('image_dataset.csv', index=False) | |
print("DataFrame created and saved successfully!") | |
label_counts = image_df['Label'].value_counts() | |
plt.figure(figsize=(10, 6)) | |
sns.barplot(x=label_counts.index, y=label_counts.values, alpha=0.8, palette='rocket') | |
plt.title('Distribution of Labels in Image Dataset', fontsize=16) | |
plt.xlabel('Label', fontsize=14) | |
plt.ylabel('Count', fontsize=14) | |
plt.xticks(rotation=45) | |
plt.show() | |
# Display 16 picture of the dataset with their labels | |
random_index = np.random.randint(0, len(image_df), 16) | |
fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(10, 10), | |
subplot_kw={'xticks': [], 'yticks': []}) | |
for i, ax in enumerate(axes.flat): | |
ax.imshow(plt.imread(image_df.Filepath[random_index[i]])) | |
ax.set_title(image_df.Label[random_index[i]]) | |
plt.tight_layout() | |
plt.show() | |
# Function to return a random image path from a given directory | |
def random_sample(directory): | |
images = [os.path.join(directory, img) for img in os.listdir(directory) if img.endswith(('.jpg', '.jpeg', '.png'))] | |
return random.choice(images) | |
# Function to compute the Error Level Analysis (ELA) of an image | |
def compute_ela_cv(path, quality): | |
temp_filename = 'temp.jpg' | |
orig = cv2.imread(path) | |
cv2.imwrite(temp_filename, orig, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) | |
compressed = cv2.imread(temp_filename) | |
ela_image = cv2.absdiff(orig, compressed) | |
ela_image = np.clip(ela_image * 10, 0, 255).astype(np.uint8) | |
return ela_image | |
# View random sample from the dataset | |
p = random_sample('extracted_files/Pest_Dataset/beetle') | |
orig = cv2.imread(p) | |
orig = cv2.cvtColor(orig, cv2.COLOR_BGR2RGB) / 255.0 | |
init_val = 100 | |
columns = 3 | |
rows = 3 | |
fig=plt.figure(figsize=(15, 10)) | |
for i in range(1, columns*rows +1): | |
quality=init_val - (i-1) * 8 | |
img = compute_ela_cv(path=p, quality=quality) | |
if i == 1: | |
img = orig.copy() | |
ax = fig.add_subplot(rows, columns, i) | |
ax.title.set_text(f'q: {quality}') | |
plt.imshow(img) | |
plt.show() | |
# Separate in train and test data | |
train_df, test_df = train_test_split(image_df, test_size=0.2, shuffle=True, random_state=42) | |
train_generator = ImageDataGenerator( | |
preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input, | |
validation_split=0.2 | |
) | |
test_generator = ImageDataGenerator( | |
preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input | |
) | |
# Split the data into three categories. | |
train_images = train_generator.flow_from_dataframe( | |
dataframe=train_df, | |
x_col='Filepath', | |
y_col='Label', | |
target_size=(224, 224), | |
color_mode='rgb', | |
class_mode='categorical', | |
batch_size=32, | |
shuffle=True, | |
seed=42, | |
subset='training' | |
) | |
val_images = train_generator.flow_from_dataframe( | |
dataframe=train_df, | |
x_col='Filepath', | |
y_col='Label', | |
target_size=(224, 224), | |
color_mode='rgb', | |
class_mode='categorical', | |
batch_size=32, | |
shuffle=True, | |
seed=42, | |
subset='validation' | |
) | |
test_images = test_generator.flow_from_dataframe( | |
dataframe=test_df, | |
x_col='Filepath', | |
y_col='Label', | |
target_size=(224, 224), | |
color_mode='rgb', | |
class_mode='categorical', | |
batch_size=32, | |
shuffle=False | |
) | |
# Data Augmentation Step | |
augment = tf.keras.Sequential([ | |
tf.keras.layers.Resizing(224, 224), | |
tf.keras.layers.Rescaling(1./255), | |
tf.keras.layers.RandomFlip("horizontal"), | |
tf.keras.layers.RandomRotation(0.1), | |
tf.keras.layers.RandomZoom(0.1), | |
tf.keras.layers.RandomContrast(0.1), | |
]) | |
# Load the pretained model | |
pretrained_model = tf.keras.applications.efficientnet_v2.EfficientNetV2L( | |
input_shape=(224, 224, 3), | |
include_top=False, | |
weights='imagenet', | |
pooling='max' | |
) | |
pretrained_model.trainable = False | |
# Create checkpoint callback | |
checkpoint_path = "pests_cats_classification_model_checkpoint" | |
checkpoint_callback = ModelCheckpoint(checkpoint_path, | |
save_weights_only=True, | |
monitor="val_accuracy", | |
save_best_only=True) | |
# Setup EarlyStopping callback to stop training if model's val_loss doesn't improve for 3 epochs | |
early_stopping = EarlyStopping(monitor = "val_loss", # watch the val loss metric | |
patience = 5, | |
restore_best_weights = True) # if val loss decreases for 3 epochs in a row, stop training | |
inputs = pretrained_model.input | |
x = augment(inputs) | |
# Add new classification layers | |
x = Flatten()(pretrained_model.output) | |
x = Dense(256, activation='relu')(x) | |
x = Dropout(0.5)(x) | |
x = BatchNormalization()(x) | |
x = Dense(128, activation='relu')(x) | |
x = Dropout(0.5)(x) | |
outputs = Dense(12, activation='softmax')(x) | |
model = Model(inputs=inputs, outputs=outputs) | |
model.compile( | |
optimizer=Adam(0.00001), | |
loss='categorical_crossentropy', | |
metrics=['accuracy'] | |
) | |
history = model.fit( | |
train_images, | |
steps_per_epoch=len(train_images), | |
validation_data=val_images, | |
validation_steps=len(val_images), | |
epochs=60, # Adjusted to 30 epochs | |
callbacks=[ | |
early_stopping, | |
create_tensorboard_callback("training_logs", | |
"pests_cats_classification"), | |
checkpoint_callback, | |
] | |
) | |
results = model.evaluate(test_images, verbose=0) | |
print(" Test Loss: {:.5f}".format(results[0])) | |
print("Test Accuracy: {:.2f}%".format(results[1] * 100)) | |
class_names = train_images.class_indices | |
class_names = {v: k for k, v in class_names.items()} | |
# Gradio Interface for Prediction | |
def predict_image(img): | |
img = np.array(img) | |
img_resized = tf.image.resize(img, (TARGET_SIZE[0], TARGET_SIZE[1])) | |
img_4d = tf.expand_dims(img_resized, axis=0) | |
prediction = model.predict(img_4d)[0] | |
return {class_names[i]: float(prediction[i]) for i in range(len(class_names))} | |
# Launch Gradio interface | |
image = gr.Image() | |
label = gr.Label(num_top_classes=12) | |
gr.Interface( | |
fn=predict_image, | |
inputs=image, | |
outputs=label, | |
title="Pest Classification", | |
description="Upload an image of a pest to classify it into one of the predefined categories.", | |
).launch(debug=True) |