Spaces:
Runtime error
Runtime error
| # Import Data Science Libraries | |
| import gradio as gr | |
| import os | |
| import gdown | |
| import zipfile | |
| import pandas as pd | |
| from pathlib import Path | |
| from PIL import Image, UnidentifiedImageError | |
| import numpy as np | |
| import tensorflow as tf | |
| from sklearn.model_selection import train_test_split | |
| import itertools | |
| import random | |
| # Import visualization libraries | |
| import matplotlib.pyplot as plt | |
| import matplotlib.cm as cm | |
| import cv2 | |
| import seaborn as sns | |
| # Tensorflow Libraries | |
| from tensorflow import keras | |
| from tensorflow.keras import layers, models | |
| from tensorflow.keras.preprocessing.image import ImageDataGenerator | |
| from tensorflow.keras.layers import Dense, Dropout, Flatten, BatchNormalization | |
| from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint | |
| from tensorflow.keras.optimizers import Adam | |
| from tensorflow.keras.applications import MobileNetV2 | |
| from tensorflow.keras import Model | |
| from tensorflow.keras.layers import Rescaling, RandomFlip, RandomRotation, RandomZoom, RandomContrast, Resizing | |
| # System libraries | |
| from pathlib import Path | |
| import os.path | |
| # Metrics | |
| from sklearn.metrics import classification_report, confusion_matrix | |
| sns.set(style='darkgrid') | |
| # Seed Everything to reproduce results for future use cases | |
| def seed_everything(seed=42): | |
| # Seed value for TensorFlow | |
| tf.random.set_seed(seed) | |
| # Seed value for NumPy | |
| np.random.seed(seed) | |
| # Seed value for Python's random library | |
| random.seed(seed) | |
| # Force TensorFlow to use single thread | |
| # Multiple threads are a potential source of non-reproducible results. | |
| session_conf = tf.compat.v1.ConfigProto( | |
| intra_op_parallelism_threads=1, | |
| inter_op_parallelism_threads=1 | |
| ) | |
| # Make sure that TensorFlow uses a deterministic operation wherever possible | |
| tf.compat.v1.set_random_seed(seed) | |
| sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf) | |
| tf.compat.v1.keras.backend.set_session(sess) | |
| seed_everything() | |
| import requests | |
| # URL of the file | |
| url = "https://raw.githubusercontent.com/mrdbourke/tensorflow-deep-learning/main/extras/helper_functions.py" | |
| # Send a GET request to the URL | |
| response = requests.get(url) | |
| # Check if the request was successful | |
| if response.status_code == 200: | |
| # Save the content to a file | |
| with open("helper_functions.py", "wb") as f: | |
| f.write(response.content) | |
| print("File downloaded successfully.") | |
| else: | |
| print("Failed to download the file.") | |
| # Import series of helper functions for our notebook | |
| from helper_functions import create_tensorboard_callback, plot_loss_curves, unzip_data, compare_historys, walk_through_dir, pred_and_plot | |
| BATCH_SIZE = 32 | |
| TARGET_SIZE = (224, 224) | |
| # Define the Google Drive shareable link | |
| gdrive_url = 'https://drive.google.com/file/d/1HjHYlQyRz5oWt8kehkt1TiOGRRlKFsv8/view?usp=drive_link' | |
| # Extract the file ID from the URL | |
| file_id = gdrive_url.split('/d/')[1].split('/view')[0] | |
| direct_download_url = f'https://drive.google.com/uc?id={file_id}' | |
| # Define the local filename to save the ZIP file | |
| local_zip_file = 'file.zip' | |
| # Download the ZIP file | |
| gdown.download(direct_download_url, local_zip_file, quiet=False) | |
| # Directory to extract files | |
| extracted_path = 'extracted_files' | |
| # Verify if the downloaded file is a ZIP file and extract it | |
| try: | |
| with zipfile.ZipFile(local_zip_file, 'r') as zip_ref: | |
| zip_ref.extractall(extracted_path) | |
| print("Extraction successful!") | |
| except zipfile.BadZipFile: | |
| print("Error: The downloaded file is not a valid ZIP file.") | |
| # Optionally, you can delete the ZIP file after extraction | |
| os.remove(local_zip_file) | |
| # Convert the extracted directory path to a pathlib.Path object | |
| data_dir = Path(extracted_path) | |
| # Print the directory structure to debug | |
| for root, dirs, files in os.walk(extracted_path): | |
| level = root.replace(extracted_path, '').count(os.sep) | |
| indent = ' ' * 4 * (level) | |
| print(f"{indent}{os.path.basename(root)}/") | |
| subindent = ' ' * 4 * (level + 1) | |
| for f in files: | |
| print(f"{subindent}{f}") | |
| # Function to convert the directory path to a DataFrame | |
| def convert_path_to_df(dataset): | |
| image_dir = Path(dataset) | |
| # Get filepaths and labels | |
| filepaths = list(image_dir.glob(r'**/*.JPG')) + list(image_dir.glob(r'**/*.jpg')) + list(image_dir.glob(r'**/*.png')) + list(image_dir.glob(r'**/*.PNG')) | |
| labels = list(map(lambda x: os.path.split(os.path.split(x)[0])[1], filepaths)) | |
| filepaths = pd.Series(filepaths, name='Filepath').astype(str) | |
| labels = pd.Series(labels, name='Label') | |
| # Concatenate filepaths and labels | |
| image_df = pd.concat([filepaths, labels], axis=1) | |
| return image_df | |
| # Path to the dataset directory | |
| data_dir = Path('extracted_files/Pest_Dataset') | |
| image_df = convert_path_to_df(data_dir) | |
| # Check for corrupted images within the dataset | |
| for img_p in data_dir.rglob("*.jpg"): | |
| try: | |
| img = Image.open(img_p) | |
| except UnidentifiedImageError: | |
| print(f"Corrupted image file: {img_p}") | |
| # You can save the DataFrame to a CSV for further use | |
| image_df.to_csv('image_dataset.csv', index=False) | |
| print("DataFrame created and saved successfully!") | |
| label_counts = image_df['Label'].value_counts() | |
| plt.figure(figsize=(10, 6)) | |
| sns.barplot(x=label_counts.index, y=label_counts.values, alpha=0.8, palette='rocket') | |
| plt.title('Distribution of Labels in Image Dataset', fontsize=16) | |
| plt.xlabel('Label', fontsize=14) | |
| plt.ylabel('Count', fontsize=14) | |
| plt.xticks(rotation=45) | |
| plt.show() | |
| # Display 16 picture of the dataset with their labels | |
| random_index = np.random.randint(0, len(image_df), 16) | |
| fig, axes = plt.subplots(nrows=4, ncols=4, figsize=(10, 10), | |
| subplot_kw={'xticks': [], 'yticks': []}) | |
| for i, ax in enumerate(axes.flat): | |
| ax.imshow(plt.imread(image_df.Filepath[random_index[i]])) | |
| ax.set_title(image_df.Label[random_index[i]]) | |
| plt.tight_layout() | |
| plt.show() | |
| # Function to return a random image path from a given directory | |
| def random_sample(directory): | |
| images = [os.path.join(directory, img) for img in os.listdir(directory) if img.endswith(('.jpg', '.jpeg', '.png'))] | |
| return random.choice(images) | |
| # Function to compute the Error Level Analysis (ELA) of an image | |
| def compute_ela_cv(path, quality): | |
| temp_filename = 'temp.jpg' | |
| orig = cv2.imread(path) | |
| cv2.imwrite(temp_filename, orig, [int(cv2.IMWRITE_JPEG_QUALITY), quality]) | |
| compressed = cv2.imread(temp_filename) | |
| ela_image = cv2.absdiff(orig, compressed) | |
| ela_image = np.clip(ela_image * 10, 0, 255).astype(np.uint8) | |
| return ela_image | |
| # View random sample from the dataset | |
| p = random_sample('extracted_files/Pest_Dataset/beetle') | |
| orig = cv2.imread(p) | |
| orig = cv2.cvtColor(orig, cv2.COLOR_BGR2RGB) / 255.0 | |
| init_val = 100 | |
| columns = 3 | |
| rows = 3 | |
| fig=plt.figure(figsize=(15, 10)) | |
| for i in range(1, columns*rows +1): | |
| quality=init_val - (i-1) * 8 | |
| img = compute_ela_cv(path=p, quality=quality) | |
| if i == 1: | |
| img = orig.copy() | |
| ax = fig.add_subplot(rows, columns, i) | |
| ax.title.set_text(f'q: {quality}') | |
| plt.imshow(img) | |
| plt.show() | |
| # Separate in train and test data | |
| train_df, test_df = train_test_split(image_df, test_size=0.2, shuffle=True, random_state=42) | |
| train_generator = ImageDataGenerator( | |
| preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input, | |
| validation_split=0.2 | |
| ) | |
| test_generator = ImageDataGenerator( | |
| preprocessing_function=tf.keras.applications.efficientnet_v2.preprocess_input | |
| ) | |
| # Split the data into three categories. | |
| train_images = train_generator.flow_from_dataframe( | |
| dataframe=train_df, | |
| x_col='Filepath', | |
| y_col='Label', | |
| target_size=(224, 224), | |
| color_mode='rgb', | |
| class_mode='categorical', | |
| batch_size=32, | |
| shuffle=True, | |
| seed=42, | |
| subset='training' | |
| ) | |
| val_images = train_generator.flow_from_dataframe( | |
| dataframe=train_df, | |
| x_col='Filepath', | |
| y_col='Label', | |
| target_size=(224, 224), | |
| color_mode='rgb', | |
| class_mode='categorical', | |
| batch_size=32, | |
| shuffle=True, | |
| seed=42, | |
| subset='validation' | |
| ) | |
| test_images = test_generator.flow_from_dataframe( | |
| dataframe=test_df, | |
| x_col='Filepath', | |
| y_col='Label', | |
| target_size=(224, 224), | |
| color_mode='rgb', | |
| class_mode='categorical', | |
| batch_size=32, | |
| shuffle=False | |
| ) | |
| # Data Augmentation Step | |
| augment = tf.keras.Sequential([ | |
| layers.experimental.preprocessing.Resizing(224,224), | |
| layers.experimental.preprocessing.Rescaling(1./255), | |
| layers.experimental.preprocessing.RandomFlip("horizontal"), | |
| layers.experimental.preprocessing.RandomRotation(0.1), | |
| layers.experimental.preprocessing.RandomZoom(0.1), | |
| layers.experimental.preprocessing.RandomContrast(0.1), | |
| ]) | |
| # Load the pretained model | |
| pretrained_model = tf.keras.applications.efficientnet_v2.EfficientNetV2L( | |
| input_shape=(224, 224, 3), | |
| include_top=False, | |
| weights='imagenet', | |
| pooling='max' | |
| ) | |
| pretrained_model.trainable = False | |
| # Create checkpoint callback | |
| checkpoint_path = "pests_cats_classification_model_checkpoint" | |
| checkpoint_callback = ModelCheckpoint(checkpoint_path, | |
| save_weights_only=True, | |
| monitor="val_accuracy", | |
| save_best_only=True) | |
| # Setup EarlyStopping callback to stop training if model's val_loss doesn't improve for 3 epochs | |
| early_stopping = EarlyStopping(monitor = "val_loss", # watch the val loss metric | |
| patience = 5, | |
| restore_best_weights = True) # if val loss decreases for 3 epochs in a row, stop training | |
| inputs = pretrained_model.input | |
| x = augment(inputs) | |
| # x = Dense(128, activation='relu')(pretrained_model.output) | |
| # x = Dropout(0.45)(x) | |
| # x = Dense(256, activation='relu')(x) | |
| # x = Dropout(0.45)(x) | |
| # Add new classification layers | |
| x = Flatten()(pretrained_model.output) | |
| x = Dense(256, activation='relu')(x) | |
| x = Dropout(0.5)(x) | |
| x = BatchNormalization()(x) | |
| x = Dense(128, activation='relu')(x) | |
| x = Dropout(0.5)(x) | |
| outputs = Dense(12, activation='softmax')(x) | |
| model = Model(inputs=inputs, outputs=outputs) | |
| model.compile( | |
| optimizer=Adam(0.00001), | |
| loss='categorical_crossentropy', | |
| metrics=['accuracy'] | |
| ) | |
| history = model.fit( | |
| train_images, | |
| steps_per_epoch=len(train_images), | |
| validation_data=val_images, | |
| validation_steps=len(val_images), | |
| epochs=50, | |
| callbacks=[ | |
| early_stopping, | |
| create_tensorboard_callback("training_logs", | |
| "pests_cats_classification"), | |
| checkpoint_callback, | |
| ] | |
| ) | |
| results = model.evaluate(test_images, verbose=0) | |
| print(" Test Loss: {:.5f}".format(results[0])) | |
| print("Test Accuracy: {:.2f}%".format(results[1] * 100)) | |
| accuracy = history.history['accuracy'] | |
| val_accuracy = history.history['val_accuracy'] | |
| loss = history.history['loss'] | |
| val_loss = history.history['val_loss'] | |
| epochs = range(len(accuracy)) | |
| plt.plot(epochs, accuracy, 'b', label='Training accuracy') | |
| plt.plot(epochs, val_accuracy, 'r', label='Validation accuracy') | |
| plt.title('Training and validation accuracy') | |
| plt.legend() | |
| plt.figure() | |
| plt.plot(epochs, loss, 'b', label='Training loss') | |
| plt.plot(epochs, val_loss, 'r', label='Validation loss') | |
| plt.title('Training and validation loss') | |
| plt.legend() | |
| plt.show() | |
| # Predict the label of the test_images | |
| pred = model.predict(test_images) | |
| pred = np.argmax(pred,axis=1) | |
| # Map the label | |
| labels = (train_images.class_indices) | |
| labels = dict((v,k) for k,v in labels.items()) | |
| pred = [labels[k] for k in pred] | |
| # Display the result | |
| print(f'The first 5 predictions: {pred[:5]}') | |
| # Display 25 random pictures from the dataset with their labels | |
| random_index = np.random.randint(0, len(test_df) - 1, 15) | |
| fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(25, 15), | |
| subplot_kw={'xticks': [], 'yticks': []}) | |
| for i, ax in enumerate(axes.flat): | |
| ax.imshow(plt.imread(test_df.Filepath.iloc[random_index[i]])) | |
| if test_df.Label.iloc[random_index[i]] == pred[random_index[i]]: | |
| color = "green" | |
| else: | |
| color = "red" | |
| ax.set_title(f"True: {test_df.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}", color=color) | |
| plt.show() | |
| plt.tight_layout() | |
| y_test = list(test_df.Label) | |
| print(classification_report(y_test, pred)) | |
| report = classification_report(y_test, pred, output_dict=True) | |
| df = pd.DataFrame(report).transpose() | |
| df | |
| from sklearn.metrics import confusion_matrix | |
| # Assuming y_test contains the true labels and pred contains the predicted labels | |
| cm = confusion_matrix(y_test, pred) | |
| print(cm) | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from tensorflow.keras.applications.efficientnet_v2 import preprocess_input | |
| from tensorflow.keras.preprocessing import image | |
| import tensorflow as tf | |
| import cv2 | |
| def get_img_array(img_path, size): | |
| # Load image and convert to array | |
| img = image.load_img(img_path, target_size=size) | |
| array = image.img_to_array(img) | |
| array = np.expand_dims(array, axis=0) | |
| return array | |
| def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None): | |
| # Create a model that maps the input image to the activations of the last conv layer | |
| grad_model = tf.keras.models.Model( | |
| [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output] | |
| ) | |
| # Compute the gradient of the top predicted class for the input image | |
| with tf.GradientTape() as tape: | |
| last_conv_layer_output, preds = grad_model(img_array) | |
| if pred_index is None: | |
| pred_index = tf.argmax(preds[0]) | |
| class_channel = preds[:, pred_index] | |
| # Gradient of the predicted class with respect to the output feature map of the last conv layer | |
| grads = tape.gradient(class_channel, last_conv_layer_output) | |
| # Vector where each entry is the mean intensity of the gradient over a specific feature map channel | |
| pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2)) | |
| # Multiply each channel in the feature map array by the "importance" of the channel | |
| last_conv_layer_output = last_conv_layer_output[0] | |
| heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis] | |
| heatmap = tf.squeeze(heatmap) | |
| # For visualization purpose, normalize the heatmap between 0 & 1 | |
| heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap) | |
| return heatmap.numpy() | |
| def save_and_display_gradcam(img_path, heatmap, alpha=0.4): | |
| # Load the original image | |
| img = cv2.imread(img_path) | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # Rescale heatmap to a range 0-255 | |
| heatmap = np.uint8(255 * heatmap) | |
| # Use jet colormap to colorize the heatmap | |
| jet = cm.get_cmap("jet") | |
| # Use RGB values of the colormap | |
| jet_colors = jet(np.arange(256))[:, :3] | |
| jet_heatmap = jet_colors[heatmap] | |
| # Create an image with RGB colorized heatmap | |
| jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap) | |
| jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0])) | |
| jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap) | |
| # Superimpose the heatmap on the original image | |
| superimposed_img = jet_heatmap * alpha + img | |
| superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img) | |
| # Save the superimposed image | |
| cam_path = "cam.jpg" | |
| superimposed_img.save(cam_path) | |
| return cam_path | |
| import matplotlib.cm as cm | |
| import pandas as pd | |
| # Assuming you have test_df, model, and other variables defined | |
| random_index = np.random.randint(0, len(test_df), 15) | |
| img_size = (224, 224) | |
| last_conv_layer_name = 'top_conv' | |
| fig, axes = plt.subplots(nrows=3, ncols=5, figsize=(15, 10), | |
| subplot_kw={'xticks': [], 'yticks': []}) | |
| for i, ax in enumerate(axes.flat): | |
| img_path = test_df.Filepath.iloc[random_index[i]] | |
| img_array = preprocess_input(get_img_array(img_path, size=img_size)) | |
| heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name) | |
| cam_path = save_and_display_gradcam(img_path, heatmap) | |
| ax.imshow(plt.imread(cam_path)) | |
| ax.set_title(f"True: {test_df.Label.iloc[random_index[i]]}\nPredicted: {pred[random_index[i]]}") | |
| plt.tight_layout() | |
| plt.show() | |
| class_names = train_images.class_indices | |
| class_names = {v: k for k, v in class_names.items()} | |
| # Gradio Interface for Prediction | |
| def predict_image(img): | |
| img = np.array(img) | |
| img_resized = tf.image.resize(img, (TARGET_SIZE[0], TARGET_SIZE[1])) | |
| img_4d = tf.expand_dims(img_resized, axis=0) | |
| prediction = model.predict(img_4d)[0] | |
| return {class_names[i]: float(prediction[i]) for i in range(len(class_names))} | |
| # Launch Gradio interface | |
| image = gr.Image() | |
| label = gr.Label(num_top_classes=1) | |
| gr.Interface( | |
| fn=predict_image, | |
| inputs=image, | |
| outputs=label, | |
| title="Welcome to Agricultural Pest Image Classification", | |
| description="The image data set used was obtained from Kaggle and has a collection of 12 different types of agricultural pests: Ants, Bees, Beetles, Caterpillars, Earthworms, Earwigs, Grasshoppers, Moths, Slugs, Snails, Wasps, and Weevils", | |
| ).launch(debug=True) |