from io import BytesIO from bs4 import BeautifulSoup from collections import namedtuple import requests import re import pandas as pd import numpy as np import time import streamlit as st prezzo_al_mq = 0 class Immobiliare: def __init__(self, url, *, verbose=True, min_house_cost=10_000, browse_all_pages=True, area_not_found=0, price_not_found=np.nan, floor_not_found=0, car_not_found=0, energy_not_found="n/a", invalid_price_per_area=0, wait=60): self.url = url self.verbose = verbose self.min_house_cost = min_house_cost self.browse_all_pages = browse_all_pages self.wait = wait / 1000 self.area_not_found = area_not_found self.price_not_found = price_not_found self.floor_not_found = floor_not_found self.car_not_found = car_not_found self.energy_not_found = energy_not_found self.invalid_price_per_area = invalid_price_per_area def _say(self, *args, **kwargs): if self.verbose: print(*args, **kwargs) def get_all_urls(self): pattern = re.compile(r"\d+\/$") urls_ = [] # first page self._say("Processing page 1") page = self._get_page(self.url) page.seek(0) soup = BeautifulSoup(page, "html.parser") for link in soup.find_all("a"): time.sleep(self.wait) l = link.get("href") if l is None: continue if "https" in l and "annunci" in l: if pattern.search(l): urls_.append(l) if self.browse_all_pages: for i in range(2, 10_000): self._say(f"Processing page {i}") curr_url = self.url + f"&pag={i}" t = self._get_text(curr_url).lower() if "404 not found" in t or "non è presente" in t: self.urls_ = urls_ break else: page = self._get_page(curr_url) page.seek(0) soup = BeautifulSoup(page, "html.parser") for link in soup.find_all("a"): l = link.get("href") if l is None: continue if "https" in l and "annunci" in l: if pattern.search(l): urls_.append(l) self.urls_ = urls_ self._say("All retrieved urls in attribute 'urls_'") self._say(f"Found {len(urls_)} houses matching criteria.") @staticmethod def _get_page(url): req = requests.get(url, allow_redirects=False) page = BytesIO() page.write(req.content) return page @staticmethod def _get_text(sub_url): req = requests.get(sub_url, allow_redirects=False) page = BytesIO() page.write(req.content) page.seek(0) soup = BeautifulSoup(page, "html.parser") text = soup.get_text() t = text.replace("\n", "") for _ in range(50): t = t.replace(" ", " ") return t def _get_data(self, sub_url): t = self._get_text(sub_url).lower() # costo appartamento cost_patterns = ( r"€ (\d+\.\d+\.\d+)", #if that's more than 1M € r"€ (\d+\.\d+)", ) cost = None locali = None for pattern in cost_patterns: cost_pattern = re.compile(pattern) try: cost = cost_pattern.search(t) locali = str(cost.group(1).replace(".", ""))[-1] cost = str(cost.group(1).replace(".", ""))[:-1] #cost = cost.group(1).replace(".", "") break except AttributeError: continue if cost is None: if "prezzo su richiesta" in t: self._say(f"Price available upon request for {sub_url}") cost = self.price_not_found else: self._say(f"Can't get price for {sub_url}") cost = self.price_not_found if cost is not None and cost is not self.price_not_found: if int(cost) < self.min_house_cost: if "prezzo su richiesta" in t: self._say(f"Price available upon request for {sub_url}") cost = self.price_not_found else: self._say(f"Too low house price: {int(cost)}? for {sub_url}") cost = self.price_not_found # piano floor_patterns = ( r"piano (\d{1,2})", r"(\d{1,2}) piano", r"(\d{1,2}) piani", ) floor = None for pattern in floor_patterns: floor_pattern = re.compile(pattern) floor = floor_pattern.search(t) if floor is not None: floor = floor.group(1) break if "piano terra" in t: floor = 1 ultimo = "ultimo" in t # metri quadri area_pattern = re.compile(r"(\d+) m²") try: area = area_pattern.search(t) area = area.group(1) except AttributeError: area = self.area_not_found if "asta" in t: self._say(f"Auction house: no area info {sub_url}") else: self._say(f"Can't get area info from url {sub_url}") # classe energetica energy_patterns = ( r"energetica (\D{1,2}) ", r"energetica(\S{1,2})", ) def energy_acceptable(stringlike): if not stringlike.startswith(("A", "B", "C", "D", "E", "F", "G")): return False else: if len(stringlike) == 1: return True else: if not stringlike.endswith( ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "+") ): return False else: return True energy = None for i, pattern in enumerate(energy_patterns): energy_pattern = re.compile(pattern) energy = energy_pattern.search(t) if energy is not None: energy = energy.group(1).upper() if energy_acceptable(energy): break if energy is None or not energy_acceptable(energy): if "in attesa di certificazione" in t: self._say(f"Energy efficiency still pending for {sub_url} ") energy = self.energy_not_found else: self._say(f"Can't get energy efficiency from {sub_url}") energy = self.energy_not_found # posto auto car_patterns = ( r"post\S auto (\d{1,2})", ) car = None for pattern in car_patterns: car_pattern = re.compile(pattern) car = car_pattern.search(t) if car is not None: car = car.group(1) break if car is None: available_upon_request = re.compile(r"possibilit\S.{0,10}auto") if available_upon_request.search(t) is not None: self._say(f"Car spot/box available upon request for {sub_url}") car = 0 else: car = self.car_not_found # €/m² try: price_per_area = round(int(cost) / int(area), 1) differenza = prezzo_al_mq - price_per_area vantaggio = (differenza / prezzo_al_mq) * 120 vantaggio = max(0, vantaggio) vantaggio = int(vantaggio) except: price_per_area = self.energy_not_found vantaggio = 0 # packing the results House = namedtuple( "House", [ "Vantaggio", "Prezzo_Mq", "Prezzo", "Superficie", "Locali", "Piano", #"ultimo", "Url" #"energy", #"posto_auto" ] ) res = House( vantaggio, price_per_area, cost, area, #ultimo, locali, floor, sub_url #energy, #car ) return res def find_all_houses(self): if not hasattr(self, "urls_"): self.get_all_urls() all_results = [] for url in self.urls_: try: all_results.append(self._get_data(url)) except: print(f"offending_url='{url}'") raise self.df_ = pd.DataFrame(all_results) self._say("Results stored in attribute 'df_'") # Funzione di styling per evidenziare in rosso i valori inferiori alla variabile def evidenzia_in_rosso(valore, soglia): if valore < soglia: return 'background-color: red; color: white' return '' st.set_page_config(layout="wide") # Streamlit interface st.title('🏠 Immobiliare A.I. ') st.write("##### Il tuo assistente di intelligenza artificiale per la ricerca di occasioni immobiliari") with st.expander("Informazioni"): st.write("Immobiliare A.I. è la webapp che semplifica la ricerca di immobili, grazie a algoritmi avanzati che calcolano il vantaggio di ogni offerta. Trova le migliori occasioni sul mercato con analisi precise e personalizzate. Scopri l’immobile giusto per te con facilità e sicurezza!") cerca_premuto = False # Input field for 'comune' with st.sidebar: st.title("Filtri") comune_input = st.text_input("Comune", 'lonato del garda') prezzo_al_mq = st.number_input("Prezzo Medio al Mq", 2500) prezzo_minimo = st.sidebar.slider("Prezzo Minimo", min_value=0, max_value=1000, value=200) prezzo_massimo = st.sidebar.slider("Prezzo Massimo", min_value=0, max_value=1000, value=230) locali = list(range(1, 21)) # Intervallo da 1 a 10 # Select slider unico per selezionare l'intervallo del numero di locali locali_range = st.sidebar.select_slider( "Locali", options=locali, value=(locali[2], locali[4]) # Valore iniziale, da 1 a 5 locali ) # Dividi il range in minimo e massimo numero di locali locali_minimo, locali_massimo = locali_range prezzo_minimo = prezzo_minimo*1000 prezzo_massimo = prezzo_massimo*1000 cerca_premuto = st.button("Cerca", use_container_width=True, type='primary') if cerca_premuto: if comune_input: comune = comune_input.replace(" ", "-") url = f"https://www.immobiliare.it/vendita-case/{comune}/?prezzoMinimo={prezzo_minimo}&prezzoMassimo={prezzo_massimo}&localiMinimo={locali_minimo}&localiMassimo={locali_massimo}&random=123456" #st.write(f"Seraching: {url}") with st.spinner("Ricerca immobiliare in corso..."): case = Immobiliare(url) case.find_all_houses() df = case.df_ df = df.sort_values(by="Prezzo_Mq", ascending=True) st.dataframe(df, hide_index=True, use_container_width=True, column_config ={ "Vantaggio": st.column_config.ProgressColumn( "Vantaggio", help="Vantaggio in %", format='%f', min_value=0, max_value=100, ), "Prezzo_Mq": " €/Mq", "Prezzo": "Prezzo Totale", "Superficie": "Superficie", "Locali": "Locali", "Piano": "Piano", "Url": st.column_config.LinkColumn("App URL") }) st.success("Elaborazione Completata") else: st.error("Per favore, inserisci il nome di un comune.")