Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import re | |
from openai import OpenAI | |
import concurrent.futures | |
import json | |
import os | |
def extract_and_parse_json_from_markdown(markdown_text: str) -> dict: | |
code_block_pattern = r"``````" | |
code_block_match = re.search(code_block_pattern, markdown_text) | |
if code_block_match: | |
json_str = code_block_match.group(1).strip() | |
else: | |
json_str = markdown_text.strip() | |
try: | |
return json.loads(json_str) | |
except json.JSONDecodeError as e: | |
raise ValueError(f"Invalid JSON format: {e}") | |
def process_event(event): | |
openai = OpenAI( | |
api_key=os.environ.get('DEEP_API_KEY'), | |
base_url="https://api.deepinfra.com/v1/openai", | |
) | |
llm_prompt = f""" | |
You are a digital marketing campaign analyst designed to analyze and report digital marketing campaign data for Rod Wave concerts, Your job is to convert the given text into JSON | |
{{ | |
"market": "str", | |
"total_spend": "float", | |
"impressions": "float", | |
"clicks": "float", | |
"metrics_cpc": "float", | |
"metrics_cpm": "float", | |
"metrics_ctr": "float", | |
"metrics_cpa": "float", | |
"platform_spend_meta_total": "float", | |
"platform_spend_meta_instagram": "float", | |
"platform_spend_meta_facebook": "float", | |
"platform_spend_google_total": "float", | |
"platform_spend_google_youtube": "float", | |
"platform_spend_google_search_display": "float", | |
"platform_spend_programmatic": "float", | |
"revenue_average_ticket_price": "float", | |
"revenue_total_revenue": "float", | |
"revenue_roi": "float" | |
}} | |
Here is Text for it: | |
{event} | |
Return in only JSON adhering to Above Schema | |
""" | |
chat_completion = openai.chat.completions.create( | |
model="meta-llama/Llama-3.3-70B-Instruct-Turbo", | |
messages=[{"role": "user", "content": llm_prompt}], | |
) | |
return chat_completion.choices[0].message.content | |
def process_all_events(events): | |
json_all = [] | |
progress_bar = st.progress(0) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | |
futures = [executor.submit(process_event, event) for event in events] | |
for i, future in enumerate(concurrent.futures.as_completed(futures)): | |
progress = (i + 1) / len(events) | |
progress_bar.progress(progress) | |
st.write(f"Processed event {i + 1}/{len(events)}") | |
json_all.append(future.result()) | |
return json_all | |
def main(): | |
st.title("Rod Wave Concert Marketing Data Processor") | |
st.write("Upload a text file containing concert marketing data to convert it to CSV format") | |
uploaded_file = st.file_uploader("Choose a text file", type="txt") | |
if uploaded_file is not None: | |
text = uploaded_file.read().decode("utf-8") | |
events = re.split(r'\n(?=Rod Wave Concert)', text) | |
events = [event for event in events if event.strip()] | |
st.write(f"Found {len(events)} events to process") | |
if st.button("Process Data"): | |
with st.spinner("Processing events..."): | |
json_all = process_all_events(events) | |
json_sanity = [] | |
for ele in json_all: | |
json_sanity.append(extract_and_parse_json_from_markdown(ele)) | |
df = pd.DataFrame(json_sanity) | |
st.success("Processing complete!") | |
st.write("Preview of processed data:") | |
st.dataframe(df.head()) | |
csv = df.to_csv(index=False) | |
st.download_button( | |
label="Download CSV", | |
data=csv, | |
file_name="processed_concert_data.csv", | |
mime="text/csv" | |
) | |
if __name__ == "__main__": | |
main() | |