from fastapi import FastAPI import os import subprocess import gdown import h5py app = FastAPI() @app.get("/") def greet_json(): return {"Hello": "World!"} os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" os.environ["HF_HOME"] = "/tmp/huggingface_cache" os.makedirs("/tmp/matplotlib", exist_ok=True) os.makedirs("/tmp/fontconfig", exist_ok=True) os.makedirs("/tmp/huggingface_cache", exist_ok=True) from torchaudio.pipelines import WAV2VEC2_BASE bundle = WAV2VEC2_BASE model = bundle.get_model() print("Model downloaded successfully!") def reencode_audio(input_path, output_path): command = [ 'ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path ] subprocess.run(command, check=True) #----------------------------------------------------------------------------------------- # import os # from dotenv import load_dotenv # from googleapiclient.discovery import build # from google.auth.transport.requests import Request # from google.oauth2.credentials import Credentials # from google.oauth2 import service_account # SCOPES = ['https://www.googleapis.com/auth/drive'] # details = { # "refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4", # "token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175", # "token_uri": "https://oauth2.googleapis.com/token", # "client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com", # "client_secret": "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", # "scopes": ["https://www.googleapis.com/auth/drive"], # "universe_domain": "googleapis.com", # "account": "", # } # def authenticate_with_env_vars(details): # creds = Credentials.from_authorized_user_info(details, SCOPES) # if not creds or not creds.valid: # if creds and creds.expired and creds.refresh_token: # creds.refresh(Request()) # else: # raise ValueError("Credentials are invalid and cannot be refreshed.") # return creds #----------------------------------------------------------------------------------------- from fastapi import UploadFile from googleapiclient.http import MediaIoBaseUpload import io import PyPDF2 Folder_Name = "Document_DB" file_metadata = { "name": "Fake", "mimeType": "application/vnd.google-apps.folder", } def check_folder(service): try: resource = service.files() result = resource.list( q=f"mimeType = 'application/vnd.google-apps.folder' and 'root' in parents", fields="nextPageToken, files(id, name)", ).execute() list_folders = result.get("files") folder_id = None for folder in list_folders: if folder["name"] == Folder_Name: folder_id = folder["id"] break if not folder_id: folder = service.files().create(body=file_metadata, fields="id").execute() folder_id = folder["id"] return folder_id, "success" except Exception as e: print(f"Error occurred while pushing file to DB: {e}") return None, str(e) def extract_text_from_pdf(pdf_file_content): extracted_text = "" try: pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file_content)) num_pages = len(pdf_reader.pages) for i in range(num_pages): page = pdf_reader.pages[i] page_text = page.extract_text() if "ABSTRACT" in page_text: extracted_text += page_text + "\n" break return extracted_text except Exception as e: print("An error occurred:", e) return None async def extract_text_url(file:UploadFile): try: file_content = await file.read() extract_text = extract_text_from_pdf(file_content) return extract_text, "success" except Exception as e: print(f"Error occurred while pushing file to DB: {e}") return None, str(e) async def push_file_db(service, file: UploadFile): try: folder_id, status = check_folder(service) if not folder_id: return [None, None, status] file_content = await file.read() file_metadata = {"name": file.filename, "parents": [folder_id]} media = MediaIoBaseUpload(io.BytesIO(file_content), mimetype="application/pdf") print("hh1") new_file = ( service.files() .create(body=file_metadata, media_body=media, fields="id") .execute() ) print("hh2") service.permissions().create( fileId=new_file["id"], body={"role": "reader", "type": "anyone"}, fields="id", ).execute() extracted_text = extract_text_from_pdf(file_content) return new_file.get("id"), extracted_text, "success" except Exception as e: print(f"Error occurred while pushing file to DB: {e}") return None, None, str(e) #----------------------------------------------------------------------------------------- import os import gdown file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa" output_dir = "./downloads" output_file = "file.h5" if not os.path.exists(output_dir): os.makedirs(output_dir) output_path = os.path.join(output_dir, output_file) url = f"https://drive.google.com/uc?id={file_id}" try: gdown.download(url, output_path, quiet=False) print(f"File downloaded successfully to: {output_path}") except Exception as e: print(f"Error downloading file: {e}") output_file = "file.h5" file_path = os.path.join(output_dir, output_file) #----------------------------------------------------------------------------------------- import os import gdown file_id = "1wIaycDFGTF3e0PpAHKk-GLnxk4cMehOU" output_dir = "./downloads" output_file = "file2.h5" if not os.path.exists(output_dir): os.makedirs(output_dir) output_path = os.path.join(output_dir, output_file) url = f"https://drive.google.com/uc?id={file_id}" try: gdown.download(url, output_path, quiet=False) print(f"File downloaded successfully to: {output_path}") except Exception as e: print(f"Error downloading file: {e}") output_file = "file2.h5" file_path = os.path.join(output_dir, output_file) if os.path.exists(file_path): print(f"The file '{output_file}' exists at '{file_path}'.") else: print(f"The file '{output_file}' does not exist at '{file_path}'.") #----------------------------------------------------------------------------------------- import os from dotenv import load_dotenv from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google.oauth2 import service_account SCOPES = ['https://www.googleapis.com/auth/drive'] details = { "refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4", "token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175", "token_uri": "https://oauth2.googleapis.com/token", "client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com", "client_secret": "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", "scopes": ["https://www.googleapis.com/auth/drive"], "universe_domain": "googleapis.com", "account": "", } def main(): try: print(details) creds = None creds = Credentials.from_authorized_user_info(details, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) service = build('drive', 'v3', credentials=creds) return service except Exception as error: print(f'An error occurred: {error}')