Spaces:
Sleeping
Sleeping
File size: 5,781 Bytes
d3242ce bc2ba25 376049c bc2ba25 376049c d3242ce 376049c bc2ba25 376049c bc2ba25 376049c bc2ba25 376049c bc2ba25 376049c bc2ba25 376049c bc2ba25 376049c bc2ba25 376049c bc2ba25 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import streamlit as st
import zipfile
import tempfile
import requests
import pdfplumber
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
import os
import warnings
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
# Suppress warnings
warnings.filterwarnings("ignore")
# Setup models
device = "cuda:0" if torch.cuda.is_available() else "cpu"
whisper_model_id = "openai/whisper-medium"
# Load Whisper model and processor
whisper_model = AutoModelForSpeechSeq2Seq.from_pretrained(whisper_model_id)
whisper_processor = AutoProcessor.from_pretrained(whisper_model_id)
# Create Whisper pipeline
whisper_pipe = pipeline(
"automatic-speech-recognition",
model=whisper_model,
tokenizer=whisper_processor.tokenizer,
feature_extractor=whisper_processor.feature_extractor,
device=device
)
# IBM Granite API URL and Headers
granite_url = "https://us-south.ml.cloud.ibm.com/ml/v1/text/generation?version=2023-05-29"
granite_headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_API_KEY_HERE" # Replace with your actual API key
}
# Function to transcribe audio files
def transcribe_audio(file_path):
result = whisper_pipe(file_path)
return result['text']
# Function to extract text and questions from PDF
def extract_text_from_pdf(pdf_path):
text = ""
questions = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text
questions += [line.strip() for line in page_text.split("\n") if line.strip()]
return text, questions
# Function to generate form data with Granite
def generate_form_data(text, questions):
question_list = "\n".join(f"- {question}" for question in questions)
body = {
"input": f"""The following text is a transcript from an audio recording. Read the text and extract the information needed to fill out the following form.\n\nText: {text}\n\nForm Questions:\n{question_list}\n\nExtracted Form Data:""",
"parameters": {
"decoding_method": "sample",
"max_new_tokens": 900,
"temperature": 0.7,
"top_k": 50,
"top_p": 1,
"repetition_penalty": 1.05
},
"model_id": "ibm/granite-13b-chat-v2",
"project_id": "YOUR_PROJECT_ID", # Replace with your actual project ID
"moderations": {
"hap": {
"input": {
"enabled": True,
"threshold": 0.5,
"mask": {"remove_entity_value": True}
},
"output": {
"enabled": True,
"threshold": 0.5,
"mask": {"remove_entity_value": True}
}
}
}
}
response = requests.post(granite_url, headers=granite_headers, json=body)
if response.status_code != 200:
raise Exception("Non-200 response: " + str(response.text))
data = response.json()
return data['results'][0]['generated_text'].strip()
# Function to save responses to PDF
def save_responses_to_pdf(responses, output_pdf_path):
document = SimpleDocTemplate(output_pdf_path, pagesize=letter)
styles = getSampleStyleSheet()
# Custom style for numbered responses
number_style = ParagraphStyle(
name='NumberedStyle',
parent=styles['BodyText'],
fontSize=10,
spaceAfter=12
)
content = []
for index, response in enumerate(responses, start=1):
# Add the response number and content
heading = Paragraph(f"<b>File {index}:</b>", styles['Heading2'])
response_text = Paragraph(response.replace("\n", "<br/>"), number_style)
content.append(heading)
content.append(Spacer(1, 6)) # Space between heading and response
content.append(response_text)
content.append(Spacer(1, 18)) # Space between responses
document.build(content)
# Streamlit Interface
st.title("Audio to Form Filling")
zip_file = st.file_uploader("Upload ZIP File with Audio Files", type="zip")
pdf_file = st.file_uploader("Upload PDF Form", type="pdf")
if zip_file and pdf_file:
with tempfile.TemporaryDirectory() as tmp_dir:
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(tmp_dir)
responses = []
for filename in os.listdir(tmp_dir):
if filename.endswith((".wav", ".mp3")):
file_path = os.path.join(tmp_dir, filename)
# Transcribe audio
transcribed_text = transcribe_audio(file_path)
# Extract text and form fields from PDF
pdf_text, pdf_questions = extract_text_from_pdf(pdf_file)
# Generate form data
form_data = generate_form_data(transcribed_text, pdf_questions)
responses.append(form_data)
st.write(f"File {len(responses)}:\n{form_data}\n") # Display the extracted form data with numbering
# Save all responses to a PDF
output_pdf_path = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf").name
save_responses_to_pdf(responses, output_pdf_path)
# Provide a download button for the generated PDF
with open(output_pdf_path, "rb") as f:
st.download_button(
label="Download Processed PDF",
data=f,
file_name="processed_output.pdf",
mime="application/pdf"
)
|