Spaces:
Sleeping
Sleeping
| import moviepy.editor as mp | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import requests | |
| from io import BytesIO | |
| import speech_recognition as sr | |
| import io | |
| import fitz # PyMuPDF for working with PDFs | |
| import numpy as np | |
| import cv2 | |
| from flask_caching import Cache | |
| from utils.audioEmbedding.index import extract_audio_embeddings | |
| from utils.videoEmbedding.index import get_video_embedding | |
| from utils.imageToText.index import extract_text | |
| from utils.sentanceEmbedding.index import get_text_vector , get_text_discription_vector | |
| from utils.imageEmbedding.index import get_image_embedding | |
| from utils.similarityScore import get_all_similarities | |
| from utils.objectDetection.index import detect_objects | |
| app = Flask(__name__) | |
| cache = Cache(app, config={'CACHE_TYPE': 'simple'}) # You can choose a caching type based on your requirements | |
| CORS(app) | |
| import moviepy.editor as mp | |
| import tempfile | |
| def get_face_locations(binary_data): | |
| # Convert binary image data to numpy array | |
| print(1) | |
| nparr = np.frombuffer(binary_data, np.uint8) | |
| image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| # Load the pre-trained face detection model | |
| face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| # Convert the image to grayscale | |
| gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| # Detect faces in the image | |
| faces = face_cascade.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |
| # Extract face locations | |
| print(2) | |
| face_locations = [] | |
| for (x, y, w, h) in faces: | |
| face_locations.append({"top": y, "right": x + w, "bottom": y + h, "left": x}) | |
| print(3) | |
| return face_locations | |
| def seperate_image_text_from_pdf(pdf_url): | |
| # List to store page information | |
| try: | |
| pages_info = [] | |
| # Fetch the PDF from the URL | |
| response = requests.get(pdf_url) | |
| if response.status_code == 200: | |
| # Create a temporary file to save the PDF data | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp_file: | |
| tmp_file.write(response.content) | |
| tmp_file_path = tmp_file.name | |
| # Open the PDF | |
| pdf = fitz.open(tmp_file_path) | |
| # Iterate through each page | |
| for page_num in range(len(pdf)): | |
| page = pdf.load_page(page_num) | |
| # Extract text | |
| text = page.get_text() | |
| # Count images | |
| image_list = page.get_images(full=True) | |
| # Convert images to BytesIO and store in a list | |
| images_bytes = [] | |
| for img_index, img_info in enumerate(image_list): | |
| xref = img_info[0] | |
| base_image = pdf.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| images_bytes.append(image_bytes) | |
| # Store page information in a dictionary | |
| page_info = { | |
| "pgno": page_num + 1, | |
| "images": images_bytes, | |
| "text": text | |
| } | |
| # Append page information to the list | |
| pages_info.append(page_info) | |
| # Close the PDF | |
| pdf.close() | |
| # Clean up the temporary file | |
| import os | |
| os.unlink(tmp_file_path) | |
| else: | |
| print("Failed to fetch the PDF from the URL.") | |
| except Exception as e: | |
| print("An error occurred:", e) | |
| return "Error" | |
| return pages_info | |
| def pdf_image_text_embedding_and_text_embedding(pages_info): | |
| try: | |
| # List to store page embeddings | |
| page_embeddings = [] | |
| # Iterate through each page | |
| for page in pages_info: | |
| # Extract text from the page | |
| text = page["text"] | |
| # Extract images from the page | |
| images = page["images"] | |
| # List to store image embeddings | |
| image_embeddings = [] | |
| # Iterate through each image | |
| for image in images: | |
| # Get the image embedding | |
| image_embedding = get_image_embedding(image) | |
| extracted_text = extract_text(image) | |
| # Append the image embedding to the list | |
| image_embeddings.append({"image_embedding": image_embedding.tolist() ,"extracted_text":extracted_text}) | |
| # Get the text embedding | |
| # Store the page embeddings in a dictionary | |
| page_embedding = { | |
| "images": image_embeddings, | |
| "text": text, | |
| } | |
| # Append the page embedding to the list | |
| page_embeddings.append(page_embedding) | |
| return page_embeddings | |
| except Exception as e: | |
| print("An error occurred:", e) | |
| return "Error" | |
| def separate_audio_from_video(video_url): | |
| try: | |
| # Load the video file | |
| video = mp.VideoFileClip(video_url) | |
| # Extract audio | |
| audio = video.audio | |
| # Create a temporary file to write the audio data | |
| try : | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
| temp_audio_filename = temp_audio_file.name | |
| # Write the audio data to the temporary file | |
| audio.write_audiofile(temp_audio_filename) | |
| # Read the audio data from the temporary file as bytes | |
| with open(temp_audio_filename, "rb") as f: | |
| audio_bytes = f.read() | |
| except Exception as e: | |
| return "Error" | |
| return audio_bytes | |
| except Exception as e: | |
| print("An error occurred:", e) | |
| return "Error" | |
| def get_text_embedding_route(): | |
| try: | |
| text = request.json.get("text") | |
| text_embedding = get_text_vector(text) | |
| return jsonify({"text_embedding": text_embedding}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def get_audio_embedding_route(): | |
| audio_url = request.json.get('audio_url') | |
| print(audio_url) | |
| response = requests.get(audio_url) | |
| audio_data = response.content | |
| audio_embedding = extract_audio_embeddings(audio_data) | |
| audio_embedding_list = audio_embedding | |
| audio_file = BytesIO(audio_data) | |
| r = sr.Recognizer() | |
| with sr.AudioFile(audio_file) as source: | |
| audio_data = r.record(source) | |
| extracted_text = "" | |
| try: | |
| text = r.recognize_google(audio_data) | |
| extracted_text = text | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"extracted_text": extracted_text, "audio_embedding": audio_embedding_list}), 200 | |
| # Route to get image embeddings | |
| def get_image_embedding_route(): | |
| try: | |
| image_url = request.json.get("imageUrl") | |
| print(image_url) | |
| response = requests.get(image_url) | |
| if response.status_code != 200: | |
| return jsonify({"error": "Failed to download image"}), 500 | |
| binary_data = response.content | |
| extracted_text = extract_text(binary_data) | |
| image_embedding = get_image_embedding(binary_data) | |
| image_embedding_list = image_embedding.tolist() | |
| return jsonify({"image_embedding": image_embedding_list,"extracted_text":extracted_text}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # Route to get video embeddings | |
| def get_video_embedding_route(): | |
| try: | |
| video_url = request.json.get("videoUrl") | |
| try: | |
| audio_data = separate_audio_from_video(video_url) | |
| except Exception as e: | |
| return jsonify({"error": "Failed to extract audio from video 1"}), 500 | |
| try: | |
| audio_embedding = extract_audio_embeddings(audio_data) | |
| except Exception as e: | |
| return jsonify({"error": "Failed to extract audio embeddings 2 "+e}), 500 | |
| audio_embedding_list = audio_embedding | |
| try : | |
| audio_file = io.BytesIO(audio_data) | |
| except Exception as e: | |
| return jsonify({"error": "Failed to extract audio embeddings 3"}), 500 | |
| try : | |
| r = sr.Recognizer() | |
| with sr.AudioFile(audio_file) as source: | |
| audio_data = r.record(source) | |
| except Exception as e: | |
| return jsonify({"error": "Failed to extract audio embeddings 4"}), 500 | |
| extracted_text = "" | |
| try: | |
| text = r.recognize_google(audio_data) | |
| extracted_text = text | |
| except Exception as e: | |
| print(e) | |
| video_embedding = get_video_embedding(video_url) | |
| return jsonify({"video_embedding": video_embedding,"extracted_audio_text": extracted_text, "audio_embedding": audio_embedding_list}), 200 | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"error": str(e)}), 500 | |
| def extract_pdf_text_and_embedding(): | |
| list = [] | |
| try: | |
| list.append(1) | |
| pdf_url = request.json.get("pdfUrl") | |
| list.append(2) | |
| print(1) | |
| pages_info = "Error" | |
| try : | |
| pages_info = seperate_image_text_from_pdf(pdf_url) | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"error": "Failed to fetch the PDF from the URL"}), 500 | |
| list.append(3) | |
| if(pages_info == "Error"): | |
| return jsonify({"error": "Failed to fetch the PDF from the URL seperate_image_text_from_pdf "}), 500 | |
| list.append(4) | |
| content = pdf_image_text_embedding_and_text_embedding(pages_info) | |
| if content == "Error": | |
| return jsonify({"error": "An error occurred while processing the PDF"}), 500 | |
| list.append(5) | |
| print(content) | |
| return jsonify({"content": content}), 200 | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"error": str(list)}), 500 | |
| finally: | |
| print("kasi",list) | |
| # Route to get text description embeddings | |
| def get_text_description_embedding_route(): | |
| try: | |
| text = request.json.get("text") | |
| text_description_embedding = get_text_discription_vector(text) | |
| return jsonify({"text_description_embedding": text_description_embedding.tolist()}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # Route to get object detection results | |
| def detect_objects_route(): | |
| try: | |
| image_url = request.json.get("imageUrl") | |
| response = requests.get(image_url) | |
| if response.status_code != 200: | |
| return jsonify({"error": "Failed to download image"}), 500 | |
| binary_data = response.content | |
| object_detection_results = detect_objects(binary_data) | |
| return jsonify({"object_detection_results": object_detection_results}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # Route to get face locations | |
| def get_face_locations_route(): | |
| try: | |
| image_url = request.json.get("imageUrl") | |
| response = requests.get(image_url) | |
| print(11) | |
| if response.status_code != 200: | |
| return jsonify({"error": "Failed to download image"}), 500 | |
| print(22) | |
| binary_data = response.content | |
| face_locations = get_face_locations(binary_data) | |
| print(33) | |
| print("ok",face_locations) | |
| return jsonify({"face_locations": str(face_locations)}), 200 | |
| except Exception as e: | |
| print(e) | |
| return jsonify({"error": str(e)}), 500 | |
| # Route to get similarity score | |
| def get_similarity_score_route(): | |
| try: | |
| embedding1 = request.json.get("embedding1") | |
| embedding2 = request.json.get("embedding2") | |
| # Assuming embeddings are provided as lists | |
| similarity_score = get_all_similarities(embedding1, embedding2) | |
| return jsonify({"similarity_score": similarity_score}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 |