Spaces:
Sleeping
Sleeping
from fastapi import FastAPI | |
import os | |
import subprocess | |
import gdown | |
import h5py | |
app = FastAPI() | |
def greet_json(): | |
return {"Hello": "World!"} | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" | |
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" | |
os.environ["HF_HOME"] = "/tmp/huggingface_cache" | |
os.makedirs("/tmp/matplotlib", exist_ok=True) | |
os.makedirs("/tmp/fontconfig", exist_ok=True) | |
os.makedirs("/tmp/huggingface_cache", exist_ok=True) | |
from torchaudio.pipelines import WAV2VEC2_BASE | |
bundle = WAV2VEC2_BASE | |
model = bundle.get_model() | |
print("Model downloaded successfully!") | |
def reencode_audio(input_path, output_path): | |
command = [ | |
'ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path | |
] | |
subprocess.run(command, check=True) | |
#----------------------------------------------------------------------------------------- | |
# import os | |
# from dotenv import load_dotenv | |
# from googleapiclient.discovery import build | |
# from google.auth.transport.requests import Request | |
# from google.oauth2.credentials import Credentials | |
# from google.oauth2 import service_account | |
# SCOPES = ['https://www.googleapis.com/auth/drive'] | |
# details = { | |
# "refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4", | |
# "token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175", | |
# "token_uri": "https://oauth2.googleapis.com/token", | |
# "client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com", | |
# "client_secret": "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", | |
# "scopes": ["https://www.googleapis.com/auth/drive"], | |
# "universe_domain": "googleapis.com", | |
# "account": "", | |
# } | |
# def authenticate_with_env_vars(details): | |
# creds = Credentials.from_authorized_user_info(details, SCOPES) | |
# if not creds or not creds.valid: | |
# if creds and creds.expired and creds.refresh_token: | |
# creds.refresh(Request()) | |
# else: | |
# raise ValueError("Credentials are invalid and cannot be refreshed.") | |
# return creds | |
#----------------------------------------------------------------------------------------- | |
from fastapi import UploadFile | |
from googleapiclient.http import MediaIoBaseUpload | |
import io | |
import PyPDF2 | |
Folder_Name = "Document_DB" | |
file_metadata = { | |
"name": "Fake", | |
"mimeType": "application/vnd.google-apps.folder", | |
} | |
def check_folder(service): | |
try: | |
resource = service.files() | |
result = resource.list( | |
q=f"mimeType = 'application/vnd.google-apps.folder' and 'root' in parents", | |
fields="nextPageToken, files(id, name)", | |
).execute() | |
list_folders = result.get("files") | |
folder_id = None | |
for folder in list_folders: | |
if folder["name"] == Folder_Name: | |
folder_id = folder["id"] | |
break | |
if not folder_id: | |
folder = service.files().create(body=file_metadata, fields="id").execute() | |
folder_id = folder["id"] | |
return folder_id, "success" | |
except Exception as e: | |
print(f"Error occurred while pushing file to DB: {e}") | |
return None, str(e) | |
def extract_text_from_pdf(pdf_file_content): | |
extracted_text = "" | |
try: | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file_content)) | |
num_pages = len(pdf_reader.pages) | |
for i in range(num_pages): | |
page = pdf_reader.pages[i] | |
page_text = page.extract_text() | |
if "ABSTRACT" in page_text: | |
extracted_text += page_text + "\n" | |
break | |
return extracted_text | |
except Exception as e: | |
print("An error occurred:", e) | |
return None | |
async def extract_text_url(file:UploadFile): | |
try: | |
file_content = await file.read() | |
extract_text = extract_text_from_pdf(file_content) | |
return extract_text, "success" | |
except Exception as e: | |
print(f"Error occurred while pushing file to DB: {e}") | |
return None, str(e) | |
async def push_file_db(service, file: UploadFile): | |
try: | |
folder_id, status = check_folder(service) | |
if not folder_id: | |
return [None, None, status] | |
file_content = await file.read() | |
file_metadata = {"name": file.filename, "parents": [folder_id]} | |
media = MediaIoBaseUpload(io.BytesIO(file_content), mimetype="application/pdf") | |
print("hh1") | |
new_file = ( | |
service.files() | |
.create(body=file_metadata, media_body=media, fields="id") | |
.execute() | |
) | |
print("hh2") | |
service.permissions().create( | |
fileId=new_file["id"], | |
body={"role": "reader", "type": "anyone"}, | |
fields="id", | |
).execute() | |
extracted_text = extract_text_from_pdf(file_content) | |
return new_file.get("id"), extracted_text, "success" | |
except Exception as e: | |
print(f"Error occurred while pushing file to DB: {e}") | |
return None, None, str(e) | |
#----------------------------------------------------------------------------------------- | |
import os | |
import gdown | |
file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa" | |
output_dir = "./downloads" | |
output_file = "file.h5" | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
output_path = os.path.join(output_dir, output_file) | |
url = f"https://drive.google.com/uc?id={file_id}" | |
try: | |
gdown.download(url, output_path, quiet=False) | |
print(f"File downloaded successfully to: {output_path}") | |
except Exception as e: | |
print(f"Error downloading file: {e}") | |