Spaces:
Sleeping
Sleeping
import os | |
os.environ['KMP_DUPLICATE_LIB_OK']='TRUE' | |
import streamlit as st | |
import pandas as pd | |
import torch | |
import random | |
from transformers import ( | |
T5ForConditionalGeneration, | |
T5Tokenizer, | |
Trainer, | |
TrainingArguments, | |
DataCollatorForSeq2Seq | |
) | |
from torch.utils.data import Dataset | |
from datetime import datetime | |
import numpy as np | |
from random import choice | |
import re | |
class TravelDataset(Dataset): | |
def __init__(self, data, tokenizer, max_length=512): | |
""" | |
data: DataFrame with columns ['destination', 'days', 'budget', 'interests', 'travel_plan'] | |
""" | |
self.tokenizer = tokenizer | |
self.data = data | |
self.max_length = max_length | |
def __len__(self): | |
return len(self.data) | |
def __getitem__(self, idx): | |
row = self.data.iloc[idx] | |
input_text = self.format_input_text(row) | |
target_text = row['travel_plan'] | |
# Tokenize inputs | |
input_encodings = self.tokenizer( | |
input_text, | |
max_length=self.max_length, | |
padding='max_length', | |
truncation=True, | |
return_tensors='pt' | |
) | |
# Tokenize targets | |
target_encodings = self.tokenizer( | |
target_text, | |
max_length=self.max_length, | |
padding='max_length', | |
truncation=True, | |
return_tensors='pt' | |
) | |
return { | |
'input_ids': input_encodings['input_ids'].squeeze(), | |
'attention_mask': input_encodings['attention_mask'].squeeze(), | |
'labels': target_encodings['input_ids'].squeeze() | |
} | |
def format_input_text(row): | |
return f"Plan a trip to {row['destination']} for {row['days']} days with a {row['budget']} budget. Include activities related to: {row['interests']}" | |
def create_sample_data(): | |
"""Create sample training data for travel plans ranging from 1 to 14 days""" | |
destinations = ['Paris', 'Tokyo', 'New York', 'London', 'Rome'] | |
budgets = ['Budget', 'Moderate', 'Luxury'] | |
interests_list = [ | |
'Culture, History', | |
'Food, Shopping', | |
'Art, Museums', | |
'Nature, Adventure', | |
'Relaxation, Food' | |
] | |
# Activity templates for different interests | |
activities = { | |
'Culture': ['Visit historical sites', 'Explore local traditions', 'Attend cultural events', | |
'Visit ancient monuments', 'Experience local festivals'], | |
'History': ['Tour ancient ruins', 'Visit museums', 'Explore historic districts', | |
'Join guided history walks', 'Visit heritage sites'], | |
'Food': ['Try local cuisine', 'Join cooking classes', 'Visit food markets', | |
'Dine at famous restaurants', 'Food tasting tours'], | |
'Shopping': ['Browse local markets', 'Visit shopping districts', 'Shop at boutiques', | |
'Explore artisan shops', 'Visit shopping centers'], | |
'Art': ['Visit art galleries', 'Attend art exhibitions', 'Join art workshops', | |
'Visit artist studios', 'Explore street art'], | |
'Museums': ['Tour famous museums', 'Visit specialty museums', 'Join museum tours', | |
'Explore art collections', 'Visit cultural institutes'], | |
'Nature': ['Visit parks', 'Nature walks', 'Explore gardens', 'Visit natural landmarks', | |
'Outdoor activities'], | |
'Adventure': ['Join adventure tours', 'Try outdoor sports', 'Explore hidden spots', | |
'Take scenic hikes', 'Adventure activities'], | |
'Relaxation': ['Spa treatments', 'Visit peaceful gardens', 'Leisure activities', | |
'Relaxing sightseeing', 'Peaceful excursions'] | |
} | |
def generate_daily_plan(day, total_days, interests, budget_level, destination): | |
"""Generate a single day's plan based on interests and duration""" | |
interest1, interest2 = [i.strip() for i in interests.split(',')] | |
# Select activities based on interests | |
activity1 = choice(activities[interest1]) | |
activity2 = choice(activities[interest2]) | |
if total_days <= 3: | |
# For short trips, pack more activities per day | |
return f"Day {day}: {activity1} in the morning. {activity2} in the afternoon/evening. Experience {destination}'s {budget_level.lower()} offerings." | |
elif total_days <= 7: | |
# Medium trips have a moderate pace | |
return f"Day {day}: Focus on {activity1}. Later, enjoy {activity2}." | |
else: | |
# Longer trips have a more relaxed pace | |
return f"Day {day}: {'Start with' if day == 1 else 'Continue with'} {activity1}. Optional: {activity2}." | |
data = [] | |
for dest in destinations: | |
for days in range(1, 15): # 1 to 14 days | |
for budget in budgets: | |
for interests in interests_list: | |
# Generate multi-day plan | |
daily_plans = [] | |
for day in range(1, days + 1): | |
daily_plan = generate_daily_plan(day, days, interests, budget, dest) | |
daily_plans.append(daily_plan) | |
# Combine all days into one plan | |
full_plan = "\n".join(daily_plans) | |
data.append({ | |
'destination': dest, | |
'days': days, | |
'budget': budget, | |
'interests': interests, | |
'travel_plan': full_plan | |
}) | |
return pd.DataFrame(data) | |
def load_or_train_model(): | |
"""Load trained model or train new one""" | |
model_path = "trained_travel_planner" | |
if os.path.exists(model_path): | |
try: | |
model = T5ForConditionalGeneration.from_pretrained(model_path) | |
tokenizer = T5Tokenizer.from_pretrained(model_path) | |
if torch.cuda.is_available(): | |
model = model.cuda() | |
st.success("β Loaded existing model") | |
return model, tokenizer | |
except Exception as e: | |
st.warning("Could not load existing model, will train new one") | |
st.error(f"Error loading trained model: {str(e)}") | |
# If no trained model exists or loading fails, train new model | |
return train_model() | |
def train_model(): | |
"""Train the T5 model on travel planning data""" | |
try: | |
# Initialize model and tokenizer | |
tokenizer = T5Tokenizer.from_pretrained('t5-base', legacy=False) | |
model = T5ForConditionalGeneration.from_pretrained('t5-base') | |
# Create or load training data | |
if os.path.exists('travel_data.csv'): | |
data = pd.read_csv('travel_data.csv') | |
else: | |
data = create_sample_data() | |
data.to_csv('travel_data.csv', index=False) | |
# Split data into train and validation | |
train_size = int(0.8 * len(data)) | |
train_data = data[:train_size] | |
val_data = data[train_size:] | |
# Create datasets | |
train_dataset = TravelDataset(train_data, tokenizer) | |
val_dataset = TravelDataset(val_data, tokenizer) | |
# Training arguments | |
training_args = TrainingArguments( | |
output_dir=f"./travel_planner_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}", | |
num_train_epochs=3, | |
per_device_train_batch_size=4, | |
per_device_eval_batch_size=4, | |
warmup_steps=500, | |
weight_decay=0.01, | |
logging_dir="./logs", | |
logging_steps=10, | |
evaluation_strategy="steps", | |
eval_steps=50, | |
save_steps=100, | |
load_best_model_at_end=True, | |
) | |
# Data collator | |
data_collator = DataCollatorForSeq2Seq( | |
tokenizer=tokenizer, | |
model=model, | |
padding=True | |
) | |
# Initialize trainer | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=train_dataset, | |
eval_dataset=val_dataset, | |
data_collator=data_collator, | |
) | |
# Train the model | |
trainer.train() | |
# Save the model and tokenizer | |
model_path = "./trained_travel_planner" | |
model.save_pretrained(model_path) | |
tokenizer.save_pretrained(model_path) | |
return model, tokenizer | |
except Exception as e: | |
st.error(f"Error during model training: {str(e)}") | |
return None, None | |
def generate_travel_plan(destination, days, interests, budget, model, tokenizer): | |
"""Generate a travel plan using the trained model with enhanced features""" | |
try: | |
# Format interests into a string, limit to top 3 if more are provided | |
interests = interests[:3] # Limit to top 3 interests for better results | |
interests_str = ', '.join(interests) | |
# Format input prompt to match training data format | |
prompt = f"Plan a trip to {destination} for {days} days with a {budget} budget. Include activities related to: {interests_str}" | |
# Tokenize input with padding | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
max_length=512, | |
padding="max_length", | |
truncation=True | |
) | |
# Move inputs to GPU if available | |
if torch.cuda.is_available(): | |
inputs = {k: v.cuda() for k, v in inputs.items()} | |
model = model.cuda() | |
# Generate output with carefully tuned parameters | |
outputs = model.generate( | |
**inputs, | |
max_length=512, | |
min_length=100, # Ensure reasonable length output | |
num_beams=4, # Beam search for better quality | |
no_repeat_ngram_size=3, # Avoid repetition | |
length_penalty=1.2, # Favor longer sequences | |
early_stopping=True, | |
temperature=0.8, # Slightly random but still focused | |
top_k=50, | |
top_p=0.9, | |
do_sample=True, | |
repetition_penalty=1.2 # Additional repetition avoidance | |
) | |
# Decode output | |
travel_plan = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Handle empty output | |
if not travel_plan.strip(): | |
raise ValueError("Generated plan is empty") | |
# Format the plan using the new formatting function | |
formatted_plan = format_travel_plan(travel_plan, days) | |
return formatted_plan | |
except Exception as e: | |
error_msg = f"Error generating travel plan: {str(e)}" | |
print(error_msg) # Log the error | |
# Generate a basic fallback plan | |
fallback_plan = generate_fallback_plan(destination, days, interests, budget) | |
return fallback_plan | |
def generate_fallback_plan(destination, days, interests, budget): | |
"""Generate a basic fallback plan if the model fails""" | |
# Start with the overview section | |
fallback_plan = f"# Emergency Travel Plan for {destination}\n\n" | |
# Basic activity templates | |
basic_activities = { | |
'Culture': ['Visit museums', 'Explore historical sites', 'Attend local events'], | |
'History': ['Tour historic landmarks', 'Visit ancient sites', 'Join history walks'], | |
'Food': ['Try local cuisine', 'Visit food markets', 'Take cooking classes'], | |
'Nature': ['Visit parks', 'Go hiking', 'Explore gardens'], | |
'Shopping': ['Visit markets', 'Shop at local stores', 'Explore shopping districts'], | |
'Adventure': ['Join tours', 'Try outdoor activities', 'Explore surroundings'], | |
'Relaxation': ['Visit spa', 'Relax in parks', 'Enjoy scenic views'], | |
'Art': ['Visit galleries', 'See street art', 'Attend exhibitions'], | |
'Museums': ['Visit main museums', 'Join guided tours', 'See special exhibits'] | |
} | |
# Generate exactly the requested number of days | |
for day in range(1, days + 1): | |
fallback_plan += f"\nDay {day}:\n" | |
# Select activities based on interests | |
day_activities = [] | |
available_interests = interests[:2] # Use up to 2 interests per day | |
for interest in available_interests: | |
if interest in basic_activities: | |
activity = random.choice(basic_activities[interest]) | |
day_activities.append(activity) | |
# Add budget-appropriate text | |
budget_text = { | |
'Budget': 'Focus on free and affordable activities', | |
'Moderate': 'Mix of affordable and premium experiences', | |
'Luxury': 'Premium experiences and exclusive access' | |
}.get(budget, '') | |
# Format the day's activities | |
fallback_plan += f"Morning: {day_activities[0] if day_activities else 'Explore the area'}\n" | |
if len(day_activities) > 1: | |
fallback_plan += f"Afternoon/Evening: {day_activities[1]}\n" | |
fallback_plan += f"Note: {budget_text}\n" | |
# Format the fallback plan using the same formatter | |
return format_travel_plan(fallback_plan, days) | |
def format_travel_plan(plan, days): | |
"""Format travel plan for 1-14 days with flexible activity distribution""" | |
# Validate days input | |
days = max(1, min(days, 14)) | |
# Initialize day activities dictionary | |
day_activities = {day: [] for day in range(1, days + 1)} | |
# Parse input plan | |
current_day = None | |
for line in plan.split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
# Detect day headers | |
if line.lower().startswith('day'): | |
try: | |
day_num = int(''.join(filter(str.isdigit, line.split()[0]))) | |
if 1 <= day_num <= days: | |
current_day = day_num | |
except ValueError: | |
current_day = None | |
continue | |
# Collect activities | |
elif current_day and current_day <= days: | |
# Split by multiple delimiters, filter meaningful activities | |
activities = [ | |
act.strip() | |
for act in re.split(r'[.;,]', line) | |
if act.strip() and len(act.strip()) > 5 | |
] | |
# Add activities for current day | |
for activity in activities: | |
if len(day_activities[current_day]) < 4: | |
day_activities[current_day].append(activity) | |
# Ensure each day has activities | |
for day in range(1, days + 1): | |
if not day_activities[day]: | |
if day == 1: | |
day_activities[day].append("Explore city highlights") | |
else: | |
day_activities[day].append("Continue exploring local attractions") | |
# Generate formatted plan | |
formatted_plan = [] | |
for day in range(1, days + 1): | |
formatted_plan.append(f"### Day {day}\n") | |
for activity in day_activities[day]: | |
formatted_plan.append(f"- {activity}") | |
formatted_plan.append("\n") | |
return "\n".join(formatted_plan) | |
def main(): | |
st.set_page_config( | |
page_title="AI Travel Planner", | |
page_icon="βοΈ", | |
layout="wide" | |
) | |
st.title("βοΈ AI Travel Planner") | |
st.markdown("### Plan your perfect trip with AI assistance!") | |
# Add training button in sidebar only | |
with st.sidebar: | |
st.header("Model Management") | |
if st.button("Retrain Model"): | |
with st.spinner("Training new model... This will take a while..."): | |
model, tokenizer = train_model() | |
if model is not None: | |
st.session_state['model'] = model | |
st.session_state['tokenizer'] = tokenizer | |
st.success("Model training completed!") | |
# Add model information | |
st.markdown("### Model Information") | |
if 'model' in st.session_state: | |
st.success("β Model loaded") | |
st.info(""" | |
This model was trained on travel plans for: | |
- 5 destinations | |
- 1-14 days duration | |
- 3 budget levels | |
- 5 interest combinations | |
""") | |
# Load or train model | |
if 'model' not in st.session_state: | |
with st.spinner("Loading AI model... Please wait..."): | |
model, tokenizer = load_or_train_model() | |
if model is None or tokenizer is None: | |
st.error("Failed to load/train the AI model. Please try again.") | |
return | |
st.session_state.model = model | |
st.session_state.tokenizer = tokenizer | |
# Create two columns for input form | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
# Input form in a card-like container | |
with st.container(): | |
st.markdown("### π― Plan Your Trip") | |
# Destination and Duration row | |
dest_col, days_col = st.columns(2) | |
with dest_col: | |
destination = st.text_input( | |
"π Destination", | |
placeholder="e.g., Paris, Tokyo, New York...", | |
help="Enter the city you want to visit" | |
) | |
with days_col: | |
days = st.slider( | |
"π Number of days", | |
min_value=1, | |
max_value=14, | |
value=3, | |
help="Select the duration of your trip" | |
) | |
# Budget and Interests row | |
budget_col, interests_col = st.columns(2) | |
with budget_col: | |
budget = st.selectbox( | |
"π° Budget Level", | |
["Budget", "Moderate", "Luxury"], | |
help="Select your preferred budget level" | |
) | |
with interests_col: | |
interests = st.multiselect( | |
"π― Interests", | |
["Culture", "History", "Food", "Nature", "Shopping", | |
"Adventure", "Relaxation", "Art", "Museums"], | |
["Culture", "Food"], | |
help="Select up to three interests to personalize your plan" | |
) | |
with col2: | |
# Tips and information | |
st.markdown("### π‘ Travel Tips") | |
st.info(""" | |
- Choose up to 3 interests for best results | |
- Consider your travel season | |
- Budget levels affect activity suggestions | |
- Plans are customizable after generation | |
""") | |
# Generate button centered | |
col1, col2, col3 = st.columns([1, 2, 1]) | |
with col2: | |
generate_button = st.button( | |
"π¨ Generate Travel Plan", | |
type="primary", | |
use_container_width=True | |
) | |
if generate_button: | |
if not destination: | |
st.error("Please enter a destination!") | |
return | |
if not interests: | |
st.error("Please select at least one interest!") | |
return | |
if len(interests) > 3: | |
st.warning("For best results, please select up to 3 interests.") | |
with st.spinner("π€ Creating your personalized travel plan..."): | |
travel_plan = generate_travel_plan( | |
destination, | |
days, | |
interests, | |
budget, | |
st.session_state.model, | |
st.session_state.tokenizer | |
) | |
# Create an expander for the success message with trip overview | |
with st.expander("β¨ Your travel plan is ready! Click to see trip overview", expanded=True): | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.metric("Destination", destination) | |
with col2: | |
if days == 1: | |
st.metric("Duration", f"{days} day") | |
else: | |
st.metric("Duration", f"{days} days") | |
with col3: | |
st.metric("Budget", budget) | |
st.write("**Selected Interests:**", ", ".join(interests)) | |
# Display the plan in tabs with improved styling | |
plan_tab, summary_tab = st.tabs(["π Detailed Itinerary", "βΉοΈ Trip Summary"]) | |
with plan_tab: | |
# Add a container for better spacing | |
with st.container(): | |
# Add trip title | |
st.markdown(f"## π {days}-Day Trip to {destination}") | |
st.markdown("---") | |
# Display the formatted plan | |
st.markdown(travel_plan) | |
# Add export options in a nice container | |
with st.container(): | |
st.markdown("---") | |
col1, col2 = st.columns([1, 4]) | |
with col1: | |
st.download_button( | |
label="π₯ Download Plan", | |
data=travel_plan, | |
file_name=f"travel_plan_{destination.lower().replace(' ', '_')}.md", | |
mime="text/markdown", | |
use_container_width=True | |
) | |
with summary_tab: | |
# Create three columns for summary information with cards | |
with st.container(): | |
st.markdown("## Trip Overview") | |
sum_col1, sum_col2, sum_col3 = st.columns(3) | |
with sum_col1: | |
with st.container(): | |
st.markdown("### π Destination Details") | |
st.markdown(f"**Location:** {destination}") | |
if days == 1: | |
st.markdown(f"**Duration:** {days} day") | |
else: | |
st.markdown(f"**Duration:** {days} days") | |
st.markdown(f"**Budget Level:** {budget}") | |
with sum_col2: | |
with st.container(): | |
st.markdown("### π― Trip Focus") | |
st.markdown("**Selected Interests:**") | |
for interest in interests: | |
st.markdown(f"- {interest}") | |
with sum_col3: | |
with st.container(): | |
st.markdown("### β οΈ Travel Tips") | |
st.info( | |
"β’ Verify opening hours\n" | |
"β’ Check current prices\n" | |
"β’ Confirm availability\n" | |
"β’ Consider seasonal factors" | |
) | |
if __name__ == "__main__": | |
main() |