|
import os |
|
import pandas as pd |
|
import requests |
|
from PIL import Image, UnidentifiedImageError |
|
from io import BytesIO |
|
import matplotlib.pyplot as plt |
|
import urllib3 |
|
from transformers import pipeline |
|
from transformers import BitsAndBytesConfig |
|
import torch |
|
import textwrap |
|
import pandas as pd |
|
import numpy as np |
|
from haversine import haversine |
|
from transformers import AutoProcessor, LlavaForConditionalGeneration |
|
from transformers import BitsAndBytesConfig |
|
import torch |
|
|
|
from huggingface_hub import InferenceClient |
|
IS_SPACES_ZERO = os.environ.get("SPACES_ZERO_GPU", "0") == "1" |
|
IS_SPACE = os.environ.get("SPACE_ID", None) is not None |
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
LOW_MEMORY = os.getenv("LOW_MEMORY", "0") == "1" |
|
print(f"Using device: {device}") |
|
print(f"low memory: {LOW_MEMORY}") |
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
quantization_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_compute_dtype=torch.float16 |
|
) |
|
|
|
|
|
model_id = "llava-hf/llava-1.5-7b-hf" |
|
|
|
processor = AutoProcessor.from_pretrained(model_id) |
|
|
|
|
|
model = LlavaForConditionalGeneration.from_pretrained(model_id, quantization_config=quantization_config, device_map="auto") |
|
model.to(device) |
|
|
|
|
|
import os |
|
import requests |
|
|
|
url = 'https://github.com/ruslanmv/watsonx-with-multimodal-llava/raw/master/geocoded_hotels.csv' |
|
filename = 'geocoded_hotels.csv' |
|
|
|
|
|
if not os.path.isfile(filename): |
|
response = requests.get(url) |
|
|
|
if response.status_code == 200: |
|
with open(filename, 'wb') as f: |
|
f.write(response.content) |
|
print(f"File {filename} downloaded successfully!") |
|
else: |
|
print(f"Error downloading file. Status code: {response.status_code}") |
|
else: |
|
print(f"File {filename} already exists.") |
|
|
|
import os |
|
import pandas as pd |
|
from datasets import load_dataset |
|
import pyarrow |
|
|
|
|
|
current_directory = os.getcwd() |
|
|
|
|
|
csv_file_path = os.path.join(current_directory, 'hotel_multimodal.csv') |
|
|
|
|
|
if not os.path.exists(csv_file_path): |
|
|
|
print("File not found, downloading from Hugging Face...") |
|
|
|
dataset = load_dataset("ruslanmv/hotel-multimodal") |
|
|
|
|
|
df_hotels = dataset['train'].to_pandas() |
|
|
|
|
|
df_hotels.to_csv(csv_file_path, index=False) |
|
print("Dataset downloaded and saved as CSV.") |
|
|
|
|
|
|
|
df_hotels = pd.read_csv(csv_file_path) |
|
|
|
print("DataFrame loaded:") |
|
geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') |
|
|
|
geocoded_hotels = pd.read_csv(geocoded_hotels_path) |
|
|
|
import requests |
|
|
|
def get_current_location(): |
|
try: |
|
response = requests.get('https://ipinfo.io/json') |
|
data = response.json() |
|
|
|
location = data.get('loc', '') |
|
if location: |
|
latitude, longitude = map(float, location.split(',')) |
|
return latitude, longitude |
|
else: |
|
return None, None |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
return None, None |
|
|
|
latitude, longitude = get_current_location() |
|
if latitude and longitude: |
|
print(f"Current location: Latitude = {latitude}, Longitude = {longitude}") |
|
else: |
|
print("Could not retrieve the current location.") |
|
|
|
|
|
from geopy.geocoders import Nominatim |
|
|
|
def get_coordinates(location_name): |
|
"""Fetches latitude and longitude coordinates for a given location name. |
|
|
|
Args: |
|
location_name (str): The name of the location (e.g., "Rome, Italy"). |
|
|
|
Returns: |
|
tuple: A tuple containing the latitude and longitude (float values), |
|
or None if the location is not found. |
|
""" |
|
|
|
geolocator = Nominatim(user_agent="coordinate_finder") |
|
location = geolocator.geocode(location_name) |
|
|
|
if location: |
|
return location.latitude, location.longitude |
|
else: |
|
return None |
|
|
|
|
|
|
|
def find_nearby(place=None): |
|
if place!=None: |
|
coordinates = get_coordinates(place) |
|
if coordinates: |
|
latitude, longitude = coordinates |
|
print(f"The coordinates of {place} are: Latitude: {latitude}, Longitude: {longitude}") |
|
else: |
|
print(f"Location not found: {place}") |
|
else: |
|
latitude, longitude = get_current_location() |
|
if latitude and longitude: |
|
print(f"Current location: Latitude = {latitude}, Longitude = {longitude}") |
|
|
|
current_directory = os.getcwd() |
|
geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') |
|
geocoded_hotels = pd.read_csv(geocoded_hotels_path) |
|
|
|
|
|
reference_latitude = latitude |
|
reference_longitude = longitude |
|
|
|
|
|
def calculate_haversine_distance(lat1, lon1, lat2, lon2): |
|
"""Calculates the Haversine distance between two points on the Earth's surface.""" |
|
return haversine((lat1, lon1), (lat2, lon2)) |
|
|
|
|
|
geocoded_hotels['distance_km'] = geocoded_hotels.apply( |
|
lambda row: calculate_haversine_distance( |
|
reference_latitude, reference_longitude, row['latitude'], row['longitude'] |
|
), |
|
axis=1 |
|
) |
|
|
|
|
|
closest_hotels = geocoded_hotels.sort_values(by='distance_km').head(5) |
|
|
|
|
|
print("The 5 closest locations are:\n") |
|
print(closest_hotels) |
|
return closest_hotels |
|
|
|
@spaces.GPU |
|
|
|
def search_hotel(place=None): |
|
import os |
|
import pandas as pd |
|
import requests |
|
from PIL import Image, UnidentifiedImageError |
|
from io import BytesIO |
|
import urllib3 |
|
from transformers import pipeline |
|
from transformers import BitsAndBytesConfig |
|
import torch |
|
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
|
|
current_directory = os.getcwd() |
|
|
|
csv_file_path = os.path.join(current_directory, 'hotel_multimodal.csv') |
|
|
|
df_hotels = pd.read_csv(csv_file_path) |
|
geocoded_hotels_path = os.path.join(current_directory, 'geocoded_hotels.csv') |
|
|
|
geocoded_hotels = pd.read_csv(geocoded_hotels_path) |
|
|
|
|
|
df_found = find_nearby(place) |
|
|
|
|
|
hotel_ids = df_found["hotel_id"].values.tolist() |
|
|
|
|
|
filtered_df = df_hotels[df_hotels['hotel_id'].isin(hotel_ids)] |
|
|
|
|
|
filtered_df['hotel_id'] = pd.Categorical(filtered_df['hotel_id'], categories=hotel_ids, ordered=True) |
|
filtered_df = filtered_df.sort_values('hotel_id').reset_index(drop=True) |
|
|
|
|
|
quantization_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_compute_dtype=torch.float16 |
|
) |
|
|
|
model_id = "llava-hf/llava-1.5-7b-hf" |
|
|
|
|
|
pipe = pipeline("image-to-text", model=model_id, model_kwargs={"quantization_config": quantization_config}) |
|
|
|
|
|
grouped_df = filtered_df.groupby('hotel_id', observed=True).head(2) |
|
|
|
|
|
description_data = [] |
|
|
|
|
|
for index, row in grouped_df.iterrows(): |
|
hotel_id = row['hotel_id'] |
|
hotel_name = row['hotel_name'] |
|
image_url = row['image_url'] |
|
|
|
try: |
|
response = requests.get(image_url, verify=False) |
|
response.raise_for_status() |
|
img = Image.open(BytesIO(response.content)) |
|
|
|
|
|
prompt = "USER: <image>\nAnalyze this image. Give me feedback on whether this hotel is worth visiting based on the picture. Provide a summary review.\nASSISTANT:" |
|
outputs = pipe(img, prompt=prompt, generate_kwargs={"max_new_tokens": 200}) |
|
description = outputs[0]["generated_text"].split("\nASSISTANT:")[-1].strip() |
|
|
|
|
|
description_data.append({ |
|
'hotel_name': hotel_name, |
|
'hotel_id': hotel_id, |
|
'image': img, |
|
'description': description |
|
}) |
|
except (requests.RequestException, UnidentifiedImageError): |
|
print(f"Skipping image at URL: {image_url}") |
|
|
|
|
|
description_df = pd.DataFrame(description_data) |
|
return description_df |
|
|
|
|
|
def show_hotels(place=None): |
|
description_df = search_hotel(place) |
|
|
|
|
|
num_images = len(description_df) |
|
num_rows = (num_images + 1) // 2 |
|
|
|
fig, axs = plt.subplots(num_rows * 2, 2, figsize=(20, 10 * num_rows)) |
|
|
|
current_index = 0 |
|
|
|
for _, row in description_df.iterrows(): |
|
img = row['image'] |
|
description = row['description'] |
|
|
|
if img is None: |
|
continue |
|
|
|
row_idx = (current_index // 2) * 2 |
|
col_idx = current_index % 2 |
|
|
|
|
|
axs[row_idx, col_idx].imshow(img) |
|
axs[row_idx, col_idx].axis('off') |
|
axs[row_idx, col_idx].set_title(f"{row['hotel_name']}\nHotel ID: {row['hotel_id']} Image {current_index + 1}", fontsize=16) |
|
|
|
|
|
wrapped_description = "\n".join(textwrap.wrap(description, width=50)) |
|
|
|
|
|
axs[row_idx + 1, col_idx].text(0.5, 0.5, wrapped_description, ha='center', va='center', wrap=True, fontsize=14) |
|
axs[row_idx + 1, col_idx].axis('off') |
|
|
|
current_index += 1 |
|
|
|
|
|
total_plots = (current_index + 1) // 2 * 2 |
|
for j in range(current_index, total_plots * 2): |
|
row_idx = (j // 2) * 2 |
|
col_idx = j % 2 |
|
if row_idx < num_rows * 2: |
|
axs[row_idx, col_idx].axis('off') |
|
if row_idx + 1 < num_rows * 2: |
|
axs[row_idx + 1, col_idx].axis('off') |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
def grouped_description(description_df): |
|
|
|
|
|
grouped_descriptions = description_df.groupby('hotel_id')['description'].apply(lambda x: ' '.join(x.astype(str))).reset_index() |
|
|
|
|
|
result_df = pd.merge(grouped_descriptions, description_df[['hotel_id', 'hotel_name']], on='hotel_id', how='left') |
|
|
|
|
|
result_df = result_df.drop_duplicates(subset='hotel_id', keep='first') |
|
|
|
|
|
result_df = result_df[['hotel_name', 'hotel_id', 'description']] |
|
return result_df |
|
|
|
|
|
|
|
def create_prompt_result(result_df): |
|
prompt = "" |
|
for _, row in result_df.iterrows(): |
|
hotel_name = row['hotel_name'] |
|
hotel_id = row['hotel_id'] |
|
description = row['description'] |
|
prompt += f"Hotel Name: {hotel_name}\nHotel ID: {hotel_id}\nDescription: {description}\n\n" |
|
return prompt |
|
from transformers import pipeline, BitsAndBytesConfig |
|
import torch |
|
from langchain import PromptTemplate |
|
|
|
|
|
hotel_recommendation_template = """ |
|
<s>[INST] <<SYS>> |
|
You are a helpful and informative chatbot assistant. |
|
<</SYS>> |
|
Based on the following hotel descriptions, recommend the best hotel: |
|
{context_result} |
|
[/INST] |
|
""" |
|
@spaces.GPU |
|
|
|
|
|
def build_prompt(context_result): |
|
prompt_template = PromptTemplate(template=hotel_recommendation_template) |
|
return prompt_template.format(context_result=context_result) |
|
|
|
|
|
quantization_config = BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_compute_dtype=torch.float16 |
|
) |
|
|
|
|
|
pipe_text = pipeline("text-generation", model="mistralai/Mistral-7B-Instruct-v0.2", |
|
model_kwargs={"quantization_config": quantization_config}) |
|
|
|
def generate_text_response(prompt): |
|
outputs = pipe_text(prompt, max_new_tokens=500) |
|
|
|
response = outputs[0]['generated_text'].split("[/INST]")[-1].strip() |
|
return response |
|
|
|
|