Spaces:
Runtime error
Runtime error
| import os | |
| from dotenv import load_dotenv | |
| import os.path | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| import base64 | |
| from email.message import EmailMessage | |
| from bs4 import BeautifulSoup | |
| import webbrowser | |
| import datetime | |
| import streamlit as st | |
| SCOPES = ["https://www.googleapis.com/auth/gmail.compose", | |
| "https://www.googleapis.com/auth/gmail.modify", | |
| "https://mail.google.com/", | |
| "https://www.googleapis.com/auth/calendar.readonly", | |
| "https://www.googleapis.com/auth/documents.readonly", | |
| "https://www.googleapis.com/auth/forms.body", | |
| "https://www.googleapis.com/auth/spreadsheets.readonly"] | |
| #---------------------------------------------------------- LETTURA EMAIL --------------------------------------------------------- | |
| def converti_email_txt(body): | |
| try: | |
| soup = BeautifulSoup(body, 'html.parser') | |
| body_content = soup.find('body').get_text() if soup.find('body') else body | |
| body = body_content | |
| except: | |
| body = body | |
| return body | |
| def leggi_gmail(max_results=10): | |
| creds=connetti_google() | |
| links = [] | |
| service = build("gmail", "v1", credentials=creds) | |
| results = service.users().messages().list(userId="me", labelIds=["INBOX"], q="is:unread", maxResults=max_results).execute() | |
| messages = results.get("messages", []) | |
| testo_email = '' | |
| if not messages: | |
| print("You have no New Messages.") | |
| else: | |
| message_count = 0 | |
| for message in messages: | |
| msg = service.users().messages().get(userId="me", id=message["id"]).execute() | |
| message_count = message_count + 1 | |
| email_data = msg["payload"]["headers"] | |
| for values in email_data: | |
| name = values["name"] | |
| if name == "From": | |
| from_name = values["value"] | |
| subject = [j["value"] for j in email_data if j["name"] == "Subject"] | |
| elif name == "Date": | |
| received_date = values["value"] | |
| body = "" | |
| if "parts" in msg["payload"]: | |
| for part in msg["payload"]["parts"]: | |
| if part["mimeType"] == "text/plain": | |
| body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") | |
| body = converti_email_txt(body) | |
| break | |
| elif part["mimeType"] == "multipart/alternative": | |
| for subpart in part["parts"]: | |
| if subpart["mimeType"] == "text/plain": | |
| body = base64.urlsafe_b64decode(subpart["body"]["data"]).decode("utf-8") | |
| body = converti_email_txt(body) | |
| break | |
| else: | |
| body = base64.urlsafe_b64decode(msg["payload"]["body"]["data"]).decode("utf-8") | |
| body = converti_email_txt(body) | |
| testo_email += 'Data Ricezione: ' + received_date[5:] + '\nMittente: ' + from_name + '\nOggetto: ' + ', '.join(subject) + '\nTesto Email: ' + body + '\n---------------------------------------------------\n' | |
| links.append(('Mittente: ' + from_name, 'Data: ' + received_date[5:] + '\n\nOggetto: ' + ', '.join(subject))) | |
| return testo_email, links | |
| #---------------------------------------------------------- SCRITTURA BOZZA EMAIL --------------------------------------------------------- | |
| def scrivi_bozza_gmail(testo): | |
| draft_url = '' | |
| creds=connetti_google() | |
| try: | |
| service = build("gmail", "v1", credentials=creds) | |
| message = EmailMessage() | |
| message.set_content(testo) | |
| message["To"] = "[email protected]" | |
| message["From"] = "[email protected]" | |
| message["Subject"] = "Automated draft" | |
| encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() | |
| create_message = {"message": {"raw": encoded_message}} | |
| draft = (service.users().drafts().create(userId="me", body=create_message).execute()) | |
| print(f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}') | |
| draft_id = draft["id"] | |
| draft_details = service.users().drafts().get(userId="me", id=draft_id).execute() | |
| print(draft_details) | |
| except HttpError as error: | |
| print(f"An error occurred: {error}") | |
| return draft_url | |
| #---------------------------------------------------------- LEGGI GOOGLE CALENDAR --------------------------------------------------------- | |
| def leggi_calendario_google(max_results=10): | |
| creds=connetti_google() | |
| try: | |
| service = build("calendar", "v3", credentials=creds) | |
| calendar_list_result = service.calendarList().list().execute() | |
| calendars = calendar_list_result.get('items', []) | |
| descrizione_eventi = '' | |
| links = [] | |
| for calendar in calendars: | |
| events_result = (service.events().list(calendarId=calendar['id'], timeMin=datetime.datetime.now().isoformat() + 'Z', maxResults=max_results, singleEvents=True, orderBy="startTime", ).execute()) | |
| events = events_result.get("items", []) | |
| for event in events: | |
| start = event["start"].get("dateTime", event["start"].get("date")) | |
| end = event["end"].get("dateTime", event["end"].get("date")) | |
| descrizione = '' | |
| calendario = '' | |
| if 'description' in event: | |
| descrizione = event['description'].replace('\n', '. ') | |
| if 'displayName' in event['organizer']: | |
| calendario = event['organizer']['displayName'] | |
| else: | |
| calendario = 'Principale' | |
| descrizione_eventi += f'Calendario: {calendario} --- ' | |
| descrizione_eventi += f'Data Inizio: {start.replace("T", " ").replace("Z", " ")} - Data Fine: {end.replace("T", " ").replace("Z", " ")}: ' | |
| descrizione_link = f'Data: {start.replace("T", " ").replace("Z", " ")} \n\n Titolo: {event["summary"]}' | |
| if descrizione != '': | |
| descrizione_eventi += f'{event["summary"]} ({descrizione})' | |
| descrizione_link += f'\n\nDescrizione: {event["summary"]} ({descrizione})' | |
| else: | |
| descrizione_eventi += f'{event["summary"]}' | |
| descrizione_eventi += '\n' | |
| links.append((f'Calendario: {calendario}', descrizione_link)) | |
| except HttpError as error: | |
| print(f"An error occurred: {error}") | |
| return descrizione_eventi, links | |
| #---------------------------------------------------------- CONNESSIONE ACCOUNT GOOGLE --------------------------------------------------------- | |
| def connetti_google(): | |
| load_dotenv() | |
| json_variable_str = os.getenv("JSON_GOOGLE") | |
| print(json_variable_str) | |
| with open("./credentials.json", "w") as f: | |
| f.write(json_variable_str) | |
| if os.path.exists("./credentials.json"): | |
| print('ESISTE') | |
| else: | |
| print('NON ESISTE') | |
| creds = None | |
| st.write("Welcome to My App!") | |
| auth_code = st.query_params.get("code") | |
| flow = InstalledAppFlow.from_client_secrets_file("./credentials.json", SCOPES, redirect_uri='https://matteoscript-streamlitchat-new.hf.space') | |
| if auth_code: | |
| flow.fetch_token(code=auth_code) | |
| creds = flow.credentials | |
| st.write("Login Done") | |
| ) | |
| else: | |
| authorization_url, state = flow.authorization_url(include_granted_scopes="true",) | |
| print(authorization_url) | |
| st.markdown(f'<a href="{authorization_url}">Click here to sign in with Google</a>', unsafe_allow_html=True) | |
| return creds | |
| def connetti_google_2(): | |
| load_dotenv() | |
| json_variable_str = os.getenv("JSON_GOOGLE") | |
| print(json_variable_str) | |
| with open("./credentials.json", "w") as f: | |
| f.write(json_variable_str) | |
| if os.path.exists("./credentials.json"): | |
| print('ESISTE') | |
| else: | |
| print('NON ESISTE') | |
| creds = None | |
| if os.path.exists("./token.json"): | |
| creds = Credentials.from_authorized_user_file("token.json", SCOPES) | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| flow = InstalledAppFlow.from_client_secrets_file("./credentials.json", SCOPES, redirect_uri='https://matteoscript-streamlitchat-new.hf.space') | |
| #creds = flow.run_local_server(port=8501, open_browser=False) | |
| creds = flow.run_console() | |
| with open("token.json", "w") as token: | |
| token.write(creds.to_json()) | |
| return creds |