Spaces:
Runtime error
Runtime error
# Import Data Science Libraries | |
import gradio as gr | |
import os | |
import gdown | |
import zipfile | |
import pandas as pd | |
from pathlib import Path | |
from PIL import Image, UnidentifiedImageError | |
import numpy as np | |
import tensorflow as tf | |
from sklearn.model_selection import train_test_split | |
import itertools | |
import random | |
# Import visualization libraries | |
import matplotlib.pyplot as plt | |
import matplotlib.cm as cm | |
import cv2 | |
import seaborn as sns | |
# Tensorflow Libraries | |
from tensorflow import keras | |
from tensorflow.keras import layers, models | |
from tensorflow.keras.preprocessing.image import ImageDataGenerator | |
from tensorflow.keras.layers import Dense, Dropout, Flatten, BatchNormalization | |
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint | |
from tensorflow.keras.optimizers import Adam | |
from tensorflow.keras.applications import MobileNetV2 | |
from tensorflow.keras import Model | |
from tensorflow.keras.layers import Rescaling, RandomFlip, RandomRotation, RandomZoom, RandomContrast, Resizing | |
# System libraries | |
from pathlib import Path | |
import os.path | |
# Metrics | |
from sklearn.metrics import classification_report, confusion_matrix | |
sns.set(style='darkgrid') | |
# Seed Everything to reproduce results for future use cases | |
def seed_everything(seed=42): | |
# Seed value for TensorFlow | |
tf.random.set_seed(seed) | |
# Seed value for NumPy | |
np.random.seed(seed) | |
# Seed value for Python's random library | |
random.seed(seed) | |
# Force TensorFlow to use single thread | |
# Multiple threads are a potential source of non-reproducible results. | |
session_conf = tf.compat.v1.ConfigProto( | |
intra_op_parallelism_threads=1, | |
inter_op_parallelism_threads=1 | |
) | |
# Make sure that TensorFlow uses a deterministic operation wherever possible | |
tf.compat.v1.set_random_seed(seed) | |
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf) | |
tf.compat.v1.keras.backend.set_session(sess) | |
seed_everything() | |
import requests | |
# URL of the file | |
url = "https://raw.githubusercontent.com/mrdbourke/tensorflow-deep-learning/main/extras/helper_functions.py" | |
# Send a GET request to the URL | |
response = requests.get(url) | |
# Check if the request was successful | |
if response.status_code == 200: | |
# Save the content to a file | |
with open("helper_functions.py", "wb") as f: | |
f.write(response.content) | |
print("File downloaded successfully.") | |
else: | |
print("Failed to download the file.") | |
# Import series of helper functions for our notebook | |
from helper_functions import create_tensorboard_callback, plot_loss_curves, unzip_data, compare_historys, walk_through_dir, pred_and_plot | |
BATCH_SIZE = 32 | |
TARGET_SIZE = (224, 224) | |
# Define the Google Drive shareable link | |
gdrive_url = 'https://drive.google.com/file/d/1HjHYlQyRz5oWt8kehkt1TiOGRRlKFsv8/view?usp=drive_link' | |
# Extract the file ID from the URL | |
file_id = gdrive_url.split('/d/')[1].split('/view')[0] | |
direct_download_url = f'https://drive.google.com/uc?id={file_id}' | |
# Define the local filename to save the ZIP file | |
local_zip_file = 'file.zip' | |
# Download the ZIP file | |
gdown.download(direct_download_url, local_zip_file, quiet=False) | |
# Directory to extract files | |
extracted_path = 'extracted_files' | |
# Verify if the downloaded file is a ZIP file and extract it | |
try: | |
with zipfile.ZipFile(local_zip_file, 'r') as zip_ref: | |
zip_ref.extractall(extracted_path) | |
print("Extraction successful!") | |
except zipfile.BadZipFile: | |
print("Error: The downloaded file is not a valid ZIP file.") | |
# Optionally, you can delete the ZIP file after extraction | |
os.remove(local_zip_file) | |
# Convert the extracted directory path to a pathlib.Path object | |
data_dir = Path(extracted_path) | |
# Print the directory structure to debug | |
for root, dirs, files in os.walk(extracted_path): | |
level = root.replace(extracted_path, '').count(os.sep) | |
indent = ' ' * 4 * (level) | |
print(f"{indent}{os.path.basename(root)}/") | |
subindent = ' ' * 4 * (level + 1) | |
for f in files: | |
print(f"{subindent}{f}") | |
# Function to convert the directory path to a DataFrame | |
def convert_path_to_df(dataset): | |
image_dir = Path(dataset) | |
# Get filepaths and labels | |
filepaths = list(image_dir.glob(r'**/*.JPG')) + list(image_dir.glob(r'**/*.jpg')) + list(image_dir.glob(r'**/*.png')) + list(image_dir.glob(r'**/*.PNG')) | |
labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths)) | |
filepaths = pd.Series(filepaths, name='Filepath').astype(str) | |
labels = pd.Series(labels, name='Label') | |
# Concatenate filepaths and labels | |
image_df = pd.concat([filepaths, labels], axis=1) | |
return image_df | |
# Path to the dataset directory | |
data_dir = Path('extracted_files/Pest_Dataset') | |
image_df = convert_path_to_df(data_dir) | |
# Check for corrupted images within the dataset | |
for img_p in data_dir.rglob("*.jpg"): | |
try: | |
img = Image.open(img_p) | |
except UnidentifiedImageError: | |
print(f"Corrupted image file: {img_p}") | |
# You can save the DataFrame to a CSV for further use | |
image_df.to_csv('image_dataset.csv', index=False) | |
print("DataFrame created and saved successfully!") | |
label_counts = image_df['Label'].value_counts() | |
plt.figure(figsize=(10, 6)) | |
sns.barplot(x=label_counts.index, y=label_counts.values, alpha=0.8, palette='rocket') | |
plt.title('Distribution of Labels in Image Dataset', fontsize=16) | |
plt.xlabel('Label', fontsize=14) | |
plt.ylabel('Count', fontsize=14) | |
plt.xticks(rotation=45) | |
plt.show() | |
# Display 16 picture of the dataset with their labels | |
random_index = np.random.randint(0, len(image_df), 16) | |
fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(10, 10), | |
subplot_kw={'xticks': [], 'yticks': []}) | |
for i, ax in enumerate(axes.flat): | |
ax.imshow(plt.imread(image_df.Filepath[random_index[i]])) | |
ax.set_title(image_df.Label[random_index[i]]) | |
plt.tight_layout() | |
plt.show() | |
# Function to return a random image path from a given directory | |
def random_sample(directory): | |
images = [os.path.join(directory, img) for img in os.listdir(directory) if img.endswith(('.jpg', '.jpeg', '.png'))] | |
return random.choice(images) | |
# Function to compute the Error Level Analysis (ELA) of an image | |
def compute_ela_cv(path, quality): | |
temp_filename = 'temp.jpg' | |
orig = cv2.imread(path) | |
cv2.imwrite(temp_filename, orig, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) | |
compressed = cv2.imread(temp_filename) | |
ela_image = cv2.absdiff(orig, compressed) | |
ela_image = np.clip(ela_image * 10, 0, 255).astype(np.uint8) | |
return ela_image | |
# View random sample from the dataset | |
p = random_sample('extracted_files/Pest_Dataset/beetle') | |
orig = cv2.imread(p) | |
orig = cv2.cvtColor(orig, cv2.COLOR_BGR2RGB) / 255.0 | |
init_val = 100 | |
columns = 3 | |
rows = 3 | |
fig=plt.figure(figsize=(15, 10)) | |
for i in range(1, columns*rows +1): | |
quality=init_val - (i-1) * 8 | |
img = compute_ela_cv(path=p, quality=quality) | |
if i == 1: | |
img = orig.copy() | |
ax = fig.add_subplot(rows, columns, i) | |
ax.title.set_text(f'q: {quality}') | |
plt.imshow(img) | |
plt.show() | |
# Separate in train and test data | |
train_df, test_df = train_test_split(image_df, test_size=0.2, shuffle=True, random_state=42) | |
train_generator = ImageDataGenerator( | |
preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input, | |
validation_split=0.2 | |
) | |
test_generator = ImageDataGenerator( | |
preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input | |
) | |
# Split the data into three categories. | |
train_images = train_generator.flow_from_dataframe( | |
dataframe=train_df, | |
x_col='Filepath', | |
y_col='Label', | |
target_size=(224, 224), | |
color_mode='rgb', | |
class_mode='categorical', | |
batch_size=32, | |
shuffle=True, | |
seed=42, | |
subset='training' | |
) | |
val_images = train_generator.flow_from_dataframe( | |
dataframe=train_df, | |
x_col='Filepath', | |
y_col='Label', | |
target_size=(224, 224), | |
color_mode='rgb', | |
class_mode='categorical', | |
batch_size=32, | |
shuffle=True, | |
seed=42, | |
subset='validation' | |
) | |
test_images = test_generator.flow_from_dataframe( | |
dataframe=test_df, | |
x_col='Filepath', | |
y_col='Label', | |
target_size=(224, 224), | |
color_mode='rgb', | |
class_mode='categorical', | |
batch_size=32, | |
shuffle=False | |
) | |
# Data Augmentation Step | |
augment = tf.keras.Sequential([ | |
layers.experimental.preprocessing.Resizing(224,224), | |
layers.experimental.preprocessing.Rescaling(1./255), | |
layers.experimental.preprocessing.RandomFlip("horizontal"), | |
layers.experimental.preprocessing.RandomRotation(0.1), | |
layers.experimental.preprocessing.RandomZoom(0.1), | |
layers.experimental.preprocessing.RandomContrast(0.1), | |
]) | |
# Load the pretained model | |
pretrained_model = tf.keras.applications.efficientnet_v2.EfficientNetV2L( | |
input_shape=(224, 224, 3), | |
include_top=False, | |
weights='imagenet', | |
pooling='max' | |
) | |
pretrained_model.trainable = False | |
# Create checkpoint callback | |
checkpoint_path = "pests_cats_classification_model_checkpoint" | |
checkpoint_callback = ModelCheckpoint(checkpoint_path, | |
save_weights_only=True, | |
monitor="val_accuracy", | |
save_best_only=True) | |
# Setup EarlyStopping callback to stop training if model's val_loss doesn't improve for 3 epochs | |
early_stopping = EarlyStopping(monitor = "val_loss", # watch the val loss metric | |
patience = 5, | |
restore_best_weights = True) # if val loss decreases for 3 epochs in a row, stop training | |
inputs = pretrained_model.input | |
x = augment(inputs) | |
# x = Dense(128, activation='relu')(pretrained_model.output) | |
# x = Dropout(0.45)(x) | |
# x = Dense(256, activation='relu')(x) | |
# x = Dropout(0.45)(x) | |
# Add new classification layers | |
x = Flatten()(pretrained_model.output) | |
x = Dense(256, activation='relu')(x) | |
x = Dropout(0.5)(x) | |
x = BatchNormalization()(x) | |
x = Dense(128, activation='relu')(x) | |
x = Dropout(0.5)(x) | |
outputs = Dense(12, activation='softmax')(x) | |
model = Model(inputs=inputs, outputs=outputs) | |
model.compile( | |
optimizer=Adam(0.00001), | |
loss='categorical_crossentropy', | |
metrics=['accuracy'] | |
) | |
history = model.fit( | |
train_images, | |
steps_per_epoch=len(train_images), | |
validation_data=val_images, | |
validation_steps=len(val_images), | |
epochs=50, | |
callbacks=[ | |
early_stopping, | |
create_tensorboard_callback("training_logs", | |
"pests_cats_classification"), | |
checkpoint_callback, | |
] | |
) | |
results = model.evaluate(test_images, verbose=0) | |
print(" Test Loss: {:.5f}".format(results[0])) | |
print("Test Accuracy: {:.2f}%".format(results[1] * 100)) | |
accuracy = history.history['accuracy'] | |
val_accuracy = history.history['val_accuracy'] | |
loss = history.history['loss'] | |
val_loss = history.history['val_loss'] | |
epochs = range(len(accuracy)) | |
plt.plot(epochs, accuracy, 'b', label='Training accuracy') | |
plt.plot(epochs, val_accuracy, 'r', label='Validation accuracy') | |
plt.title('Training and validation accuracy') | |
plt.legend() | |
plt.figure() | |
plt.plot(epochs, loss, 'b', label='Training loss') | |
plt.plot(epochs, val_loss, 'r', label='Validation loss') | |
plt.title('Training and validation loss') | |
plt.legend() | |
plt.show() | |
# Predict the label of the test_images | |
pred = model.predict(test_images) | |
pred = np.argmax(pred,axis=1) | |
# Map the label | |
labels = (train_images.class_indices) | |
labels = dict((v,k) for k,v in labels.items()) | |
pred = [labels[k] for k in pred] | |
# Display the result | |
print(f'The first 5 predictions: {pred[:5]}') | |
# Display 25 random pictures from the dataset with their labels | |
random_index = np.random.randint(0, len(test_df) - 1, 15) | |
fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(25, 15), | |
subplot_kw={'xticks': [], 'yticks': []}) | |
for i, ax in enumerate(axes.flat): | |
ax.imshow(plt.imread(test_df.Filepath.iloc[random_index[i]])) | |
if test_df.Label.iloc[random_index[i]] == pred[random_index[i]]: | |
color = "green" | |
else: | |
color = "red" | |
ax.set_title(f"True: {test_df.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}", color=color) | |
plt.show() | |
plt.tight_layout() | |
y_test = list(test_df.Label) | |
print(classification_report(y_test, pred)) | |
report = classification_report(y_test, pred, output_dict=True) | |
df = pd.DataFrame(report).transpose() | |
df | |
from sklearn.metrics import confusion_matrix | |
# Assuming y_test contains the true labels and pred contains the predicted labels | |
cm = confusion_matrix(y_test, pred) | |
print(cm) | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from tensorflow.keras.applications.efficientnet_v2 import preprocess_input | |
from tensorflow.keras.preprocessing import image | |
import tensorflow as tf | |
import cv2 | |
def get_img_array(img_path, size): | |
# Load image and convert to array | |
img = image.load_img(img_path, target_size=size) | |
array = image.img_to_array(img) | |
array = np.expand_dims(array, axis=0) | |
return array | |
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None): | |
# Create a model that maps the input image to the activations of the last conv layer | |
grad_model = tf.keras.models.Model( | |
[model.inputs], [model.get_layer(last_conv_layer_name).output, model.output] | |
) | |
# Compute the gradient of the top predicted class for the input image | |
with tf.GradientTape() as tape: | |
last_conv_layer_output, preds = grad_model(img_array) | |
if pred_index is None: | |
pred_index = tf.argmax(preds[0]) | |
class_channel = preds[:, pred_index] | |
# Gradient of the predicted class with respect to the output feature map of the last conv layer | |
grads = tape.gradient(class_channel, last_conv_layer_output) | |
# Vector where each entry is the mean intensity of the gradient over a specific feature map channel | |
pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2)) | |
# Multiply each channel in the feature map array by the "importance" of the channel | |
last_conv_layer_output = last_conv_layer_output[0] | |
heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis] | |
heatmap = tf.squeeze(heatmap) | |
# For visualization purpose, normalize the heatmap between 0 & 1 | |
heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap) | |
return heatmap.numpy() | |
def save_and_display_gradcam(img_path, heatmap, alpha=0.4): | |
# Load the original image | |
img = cv2.imread(img_path) | |
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
# Rescale heatmap to a range 0-255 | |
heatmap = np.uint8(255 * heatmap) | |
# Use jet colormap to colorize the heatmap | |
jet = cm.get_cmap("jet") | |
# Use RGB values of the colormap | |
jet_colors = jet(np.arange(256))[:, :3] | |
jet_heatmap = jet_colors[heatmap] | |
# Create an image with RGB colorized heatmap | |
jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap) | |
jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0])) | |
jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap) | |
# Superimpose the heatmap on the original image | |
superimposed_img = jet_heatmap * alpha + img | |
superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img) | |
# Save the superimposed image | |
cam_path = "cam.jpg" | |
superimposed_img.save(cam_path) | |
return cam_path | |
import matplotlib.cm as cm | |
import pandas as pd | |
# Assuming you have test_df, model, and other variables defined | |
random_index = np.random.randint(0, len(test_df), 15) | |
img_size = (224, 224) | |
last_conv_layer_name = 'top_conv' | |
fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(15, 10), | |
subplot_kw={'xticks': [], 'yticks': []}) | |
for i, ax in enumerate(axes.flat): | |
img_path = test_df.Filepath.iloc[random_index[i]] | |
img_array = preprocess_input(get_img_array(img_path, size=img_size)) | |
heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name) | |
cam_path = save_and_display_gradcam(img_path, heatmap) | |
ax.imshow(plt.imread(cam_path)) | |
ax.set_title(f"True: {test_df.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}") | |
plt.tight_layout() | |
plt.show() | |
class_names = train_images.class_indices | |
class_names = {v: k for k, v in class_names.items()} | |
# Gradio Interface for Prediction | |
def predict_image(img): | |
img = np.array(img) | |
img_resized = tf.image.resize(img, (TARGET_SIZE[0], TARGET_SIZE[1])) | |
img_4d = tf.expand_dims(img_resized, axis=0) | |
prediction = model.predict(img_4d)[0] | |
return {class_names[i]: float(prediction[i]) for i in range(len(class_names))} | |
# Launch Gradio interface | |
image = gr.Image() | |
label = gr.Label(num_top_classes=1) | |
gr.Interface( | |
fn=predict_image, | |
inputs=image, | |
outputs=label, | |
title="Welcome to Agricultural Pest Image Classification", | |
description="The image data set used was obtained from Kaggle and has a collection of 12 different types of agricultural pests: Ants, Bees, Beetles, Caterpillars, Earthworms, Earwigs, Grasshoppers, Moths, Slugs, Snails, Wasps, and Weevils", | |
).launch(debug=True) |