import os import pandas as pd import requests from PIL import Image, UnidentifiedImageError from io import BytesIO import matplotlib.pyplot as plt import urllib3 from transformers import pipeline, BitsAndBytesConfig import torch import textwrap from haversine import haversine from geopy.geocoders import Nominatim from huggingface_hub import InferenceClient import os IS_SPACES_ZERO = os.environ.get("SPACES_ZERO_GPU", "0") == "1" IS_SPACE = os.environ.get("SPACE_ID", None) is not None # Constants DEVICE = "cuda" if torch.cuda.is_available() else "cpu" LOW_MEMORY = os.getenv("LOW_MEMORY", "0") == "1" MODEL_ID = "llava-hf/llava-1.5-7b-hf" TEXT_MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" # Print device and memory info print(f"Using device: {DEVICE}") print(f"Low memory: {LOW_MEMORY}") # Quantization configuration for efficient model loading quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) # Load models only once processor = AutoProcessor.from_pretrained(MODEL_ID) model = LlavaForConditionalGeneration.from_pretrained(MODEL_ID, quantization_config=quantization_config, device_map="auto").to(DEVICE) pipe_image_to_text = pipeline("image-to-text", model=model, model_kwargs={"quantization_config": quantization_config}) # Initialize the text generation pipeline pipe_text = pipeline("text-generation", model=TEXT_MODEL_ID, model_kwargs={"quantization_config": quantization_config}) # Ensure data files are available current_directory = os.getcwd() geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') csv_file_path = os.path.join(current_directory, 'hotel_multimodal.csv') # Load geocoded hotels data if not os.path.isfile(geocoded_hotels_path): url = 'https://github.com/ruslanmv/watsonx-with-multimodal-llava/raw/master/geocoded_hotels.csv' response = requests.get(url) if response.status_code == 200: with open(geocoded_hotels_path, 'wb') as f: f.write(response.content) print(f"File {geocoded_hotels_path} downloaded successfully!") else: print(f"Error downloading file. Status code: {response.status_code}") else: print(f"File {geocoded_hotels_path} already exists.") geocoded_hotels = pd.read_csv(geocoded_hotels_path) # Load hotel dataset if not os.path.exists(csv_file_path): dataset = load_dataset("ruslanmv/hotel-multimodal") df_hotels = dataset['train'].to_pandas() df_hotels.to_csv(csv_file_path, index=False) print("Dataset downloaded and saved as CSV.") else: df_hotels = pd.read_csv(csv_file_path) def get_current_location(): try: response = requests.get('https://ipinfo.io/json') data = response.json() location = data.get('loc', '') if location: return map(float, location.split(',')) else: return None, None except Exception as e: print(f"An error occurred: {e}") return None, None def get_coordinates(location_name): geolocator = Nominatim(user_agent="coordinate_finder") location = geolocator.geocode(location_name) if location: return location.latitude, location.longitude else: return None def find_nearby(place=None): if place: coordinates = get_coordinates(place) if coordinates: latitude, longitude = coordinates print(f"The coordinates of {place} are: Latitude: {latitude}, Longitude: {longitude}") else: print(f"Location not found: {place}") return None else: latitude, longitude = get_current_location() if not latitude or not longitude: print("Could not retrieve the current location.") return None geocoded_hotels['distance_km'] = geocoded_hotels.apply( lambda row: haversine((latitude, longitude), (row['latitude'], row['longitude'])), axis=1 ) closest_hotels = geocoded_hotels.sort_values(by='distance_km').head(5) print("The 5 closest locations are:\n") print(closest_hotels) return closest_hotels @spaces.GPU # Define the respond function def search_hotel(place=None): df_found = find_nearby(place) if df_found is None: return pd.DataFrame() hotel_ids = df_found["hotel_id"].values.tolist() filtered_df = df_hotels[df_hotels['hotel_id'].isin(hotel_ids)] filtered_df['hotel_id'] = pd.Categorical(filtered_df['hotel_id'], categories=hotel_ids, ordered=True) filtered_df = filtered_df.sort_values('hotel_id').reset_index(drop=True) grouped_df = filtered_df.groupby('hotel_id', observed=True).head(2) description_data = [] for index, row in grouped_df.iterrows(): hotel_id = row['hotel_id'] hotel_name = row['hotel_name'] image_url = row['image_url'] try: response = requests.get(image_url, verify=False) response.raise_for_status() img = Image.open(BytesIO(response.content)) prompt = "USER: \nAnalyze this image. Give me feedback on whether this hotel is worth visiting based on the picture. Provide a summary review.\nASSISTANT:" outputs = pipe_image_to_text(img, prompt=prompt, generate_kwargs={"max_new_tokens": 200}) description = outputs[0]["generated_text"].split("\nASSISTANT:")[-1].strip() description_data.append({'hotel_name': hotel_name, 'hotel_id': hotel_id, 'image': img, 'description': description}) except (requests.RequestException, UnidentifiedImageError): print(f"Skipping image at URL: {image_url}") return pd.DataFrame(description_data) def show_hotels(place=None): description_df = search_hotel(place) if description_df.empty: print("No hotels found.") return num_images = len(description_df) num_rows = (num_images + 1) // 2 fig, axs = plt.subplots(num_rows * 2, 2, figsize=(20, 10 * num_rows)) current_index = 0 for _, row in description_df.iterrows(): img = row['image'] description = row['description'] if img is None: continue row_idx = (current_index // 2) * 2 col_idx = current_index % 2 axs[row_idx, col_idx].imshow(img) axs[row_idx, col_idx].axis('off') axs[row_idx, col_idx].set_title(f"{row['hotel_name']}\nHotel ID: {row['hotel_id']} Image {current_index + 1}", fontsize=16) wrapped_description = "\n".join(textwrap.wrap(description, width=50)) axs[row_idx + 1, col_idx].text(0.5, 0.5, wrapped_description, ha='center', va='center', wrap=True, fontsize=14) axs[row_idx + 1, col_idx].axis('off') current_index += 1 plt.tight_layout() plt.show() def grouped_description(description_df): grouped_descriptions = description_df.groupby('hotel_id')['description'].apply(lambda x: ' '.join(x.astype(str))).reset_index() result_df = pd.merge(grouped_descriptions, description_df[['hotel_id', 'hotel_name']], on='hotel_id', how='left') result_df = result_df.drop_duplicates(subset='hotel_id', keep='first') result_df = result_df[['hotel_name', 'hotel_id', 'description']] return result_df def create_prompt_result(result_df): prompt = "" for _, row in result_df.iterrows(): hotel_name = row['hotel_name'] hotel_id = row['hotel_id'] description = row['description'] prompt += f"Hotel Name: {hotel_name}\nHotel ID: {hotel_id}\nDescription: {description}\n\n" return prompt def build_prompt(context_result): hotel_recommendation_template = """ [INST] <> You are a helpful and informative chatbot assistant. <> Based on the following hotel descriptions, recommend the best hotel: {context_result} [/INST] """ return hotel_recommendation_template.format(context_result=context_result) @spaces.GPU # Define the respond function def generate_text_response(prompt): outputs = pipe_text(prompt, max_new_tokens=500) response = outputs[0]['generated_text'].split("[/INST]")[-1].strip() return response