Spaces:
Runtime error
Runtime error
from typing import Iterable, Dict | |
import streamlit as st | |
from os import listdir | |
from uuid import uuid4 | |
from time import sleep, time | |
from threading import Thread | |
from json import loads, dumps | |
from random import getrandbits | |
from websocket import WebSocketApp | |
from requests import Session, get, post | |
class Perplexity: | |
def __init__(self, email: str = None) -> None: | |
self.session: Session = Session() | |
self.user_agent: dict = { "User-Agent": "Ask/2.4.1/224 (iOS; iPhone; Version 17.1) isiOSOnMac/false", "X-Client-Name": "Perplexity-iOS" } | |
self.session.headers.update(self.user_agent) | |
if email and ".perplexity_session" in listdir(): | |
self._recover_session(email) | |
else: | |
self._init_session_without_login() | |
if email: | |
self._login(email) | |
self.email: str = email | |
self.t: str = self._get_t() | |
st.write(self.t) | |
self.sid: str = self._get_sid() | |
st.write(self.sid) | |
self.n: int = 1 | |
self.base: int = 420 | |
self.queue: list = [] | |
self.finished: bool = True | |
self.last_uuid: str = None | |
self.backend_uuid: str = None # unused because we can't yet follow-up questions | |
self.frontend_session_id: str = str(uuid4()) | |
assert self._ask_anonymous_user(), "failed to ask anonymous user" | |
self.ws: WebSocketApp = self._init_websocket() | |
self.ws_thread: Thread = Thread(target=self.ws.run_forever).start() | |
self._auth_session() | |
while not (self.ws.sock and self.ws.sock.connected): | |
sleep(0.01) | |
def _recover_session(self, email: str) -> None: | |
with open(".perplexity_session", "r") as f: | |
perplexity_session: dict = loads(f.read()) | |
if email in perplexity_session: | |
self.session.cookies.update(perplexity_session[email]) | |
else: | |
self._login(email, perplexity_session) | |
def _login(self, email: str, ps: dict = None) -> None: | |
self.session.post(url="https://www.perplexity.ai/api/auth/signin-email", data={"email": email}) | |
email_link: str = str(input("paste the link you received by email: ")) | |
self.session.get(email_link) | |
if ps: | |
ps[email] = self.session.cookies.get_dict() | |
else: | |
ps = {email: self.session.cookies.get_dict()} | |
with open(".perplexity_session", "w") as f: | |
f.write(dumps(ps)) | |
def _init_session_without_login(self) -> None: | |
self.session.get(url=f"https://www.perplexity.ai/search/{str(uuid4())}") | |
self.session.headers.update(self.user_agent) | |
def _auth_session(self) -> None: | |
self.session.get(url="https://www.perplexity.ai/api/auth/session") | |
def _get_t(self) -> str: | |
return format(getrandbits(32), "08x") | |
def _get_sid(self) -> str: | |
st.write(self.session.get(url=f"https://www.perplexity.ai/socket.io/?EIO=4&transport=polling&t={self.t}")) | |
st.write(self.session.get(url=f"https://www.perplexity.ai/socket.io/?EIO=4&transport=polling&t={self.t}").text) | |
return loads(self.session.get( | |
url=f"https://www.perplexity.ai/socket.io/?EIO=4&transport=polling&t={self.t}" | |
).text[1:])["sid"] | |
def _ask_anonymous_user(self) -> bool: | |
response = self.session.post( | |
url=f"https://www.perplexity.ai/socket.io/?EIO=4&transport=polling&t={self.t}&sid={self.sid}", | |
data="40{\"jwt\":\"anonymous-ask-user\"}" | |
).text | |
return response == "OK" | |
def _start_interaction(self) -> None: | |
self.finished = False | |
if self.n == 9: | |
self.n = 0 | |
self.base *= 10 | |
else: | |
self.n += 1 | |
self.queue = [] | |
def _get_cookies_str(self) -> str: | |
cookies = "" | |
for key, value in self.session.cookies.get_dict().items(): | |
cookies += f"{key}={value}; " | |
return cookies[:-2] | |
def _write_file_url(self, filename: str, file_url: str) -> None: | |
if ".perplexity_files_url" in listdir(): | |
with open(".perplexity_files_url", "r") as f: | |
perplexity_files_url: dict = loads(f.read()) | |
else: | |
perplexity_files_url: dict = {} | |
perplexity_files_url[filename] = file_url | |
with open(".perplexity_files_url", "w") as f: | |
f.write(dumps(perplexity_files_url)) | |
def _init_websocket(self) -> WebSocketApp: | |
def on_open(ws: WebSocketApp) -> None: | |
ws.send("2probe") | |
ws.send("5") | |
def on_message(ws: WebSocketApp, message: str) -> None: | |
if message == "2": | |
ws.send("3") | |
elif not self.finished: | |
if message.startswith("42"): | |
message : list = loads(message[2:]) | |
content: dict = message[1] | |
if "mode" in content and content["mode"] == "copilot": | |
content["copilot_answer"] = loads(content["text"]) | |
elif "mode" in content: | |
content.update(loads(content["text"])) | |
content.pop("text") | |
if (not ("final" in content and content["final"])) or ("status" in content and content["status"] == "completed"): | |
self.queue.append(content) | |
if message[0] == "query_answered": | |
self.last_uuid = content["uuid"] | |
self.finished = True | |
elif message.startswith("43"): | |
message: dict = loads(message[3:])[0] | |
if ("uuid" in message and message["uuid"] != self.last_uuid) or "uuid" not in message: | |
self.queue.append(message) | |
self.finished = True | |
return WebSocketApp( | |
url=f"wss://www.perplexity.ai/socket.io/?EIO=4&transport=websocket&sid={self.sid}", | |
header=self.user_agent, | |
cookie=self._get_cookies_str(), | |
on_open=on_open, | |
on_message=on_message, | |
on_error=lambda ws, err: print(f"websocket error: {err}") | |
) | |
def _s(self, query: str, mode: str = "concise", search_focus: str = "internet", attachments: list[str] = [], language: str = "en-GB", in_page: str = None, in_domain: str = None) -> None: | |
assert self.finished, "already searching" | |
assert mode in ["concise", "copilot"], "invalid mode" | |
assert len(attachments) <= 4, "too many attachments: max 4" | |
assert search_focus in ["internet", "scholar", "writing", "wolfram", "youtube", "reddit"], "invalid search focus" | |
if in_page: | |
search_focus = "in_page" | |
if in_domain: | |
search_focus = "in_domain" | |
self._start_interaction() | |
ws_message: str = f"{self.base + self.n}" + dumps([ | |
"perplexity_ask", | |
query, | |
{ | |
"version": "2.1", | |
"source": "default", # "ios" | |
"frontend_session_id": self.frontend_session_id, | |
"language": language, | |
"timezone": "CET", | |
"attachments": attachments, | |
"search_focus": search_focus, | |
"frontend_uuid": str(uuid4()), | |
"mode": mode, | |
# "use_inhouse_model": True | |
"in_page": in_page, | |
"in_domain": in_domain | |
} | |
]) | |
self.ws.send(ws_message) | |
def search(self, query: str, mode: str = "concise", search_focus: str = "internet", attachments: list[str] = [], language: str = "en-GB", timeout: float = 30) -> Iterable[Dict]: | |
self._s(query, mode, search_focus, attachments, language) | |
start_time: float = time() | |
while (not self.finished) or len(self.queue) != 0: | |
if timeout and time() - start_time > timeout: | |
self.finished = True | |
return {"error": "timeout"} | |
if len(self.queue) != 0: | |
yield self.queue.pop(0) | |
def search_sync(self, query: str, mode: str = "concise", search_focus: str = "internet", attachments: list[str] = [], language: str = "en-GB", timeout: float = 30) -> dict: | |
self._s(query, mode, search_focus, attachments, language) | |
start_time: float = time() | |
while not self.finished: | |
if timeout and time() - start_time > timeout: | |
self.finished = True | |
return {"error": "timeout"} | |
return self.queue.pop(-1) | |
def upload(self, filename: str) -> str: | |
assert self.finished, "already searching" | |
assert filename.split(".")[-1] in ["txt", "pdf"], "invalid file format" | |
if filename.startswith("http"): | |
file = get(filename).content | |
else: | |
with open(filename, "rb") as f: | |
file = f.read() | |
self._start_interaction() | |
ws_message: str = f"{self.base + self.n}" + dumps([ | |
"get_upload_url", | |
{ | |
"version": "2.1", | |
"source": "default", | |
"content_type": "text/plain" if filename.split(".")[-1] == "txt" else "application/pdf", | |
} | |
]) | |
self.ws.send(ws_message) | |
while not self.finished or len(self.queue) != 0: | |
if len(self.queue) != 0: | |
upload_data = self.queue.pop(0) | |
assert not upload_data["rate_limited"], "rate limited" | |
post( | |
url=upload_data["url"], | |
files={ | |
"acl": (None, upload_data["fields"]["acl"]), | |
"Content-Type": (None, upload_data["fields"]["Content-Type"]), | |
"key": (None, upload_data["fields"]["key"]), | |
"AWSAccessKeyId": (None, upload_data["fields"]["AWSAccessKeyId"]), | |
"x-amz-security-token": (None, upload_data["fields"]["x-amz-security-token"]), | |
"policy": (None, upload_data["fields"]["policy"]), | |
"signature": (None, upload_data["fields"]["signature"]), | |
"file": (filename, file) | |
} | |
) | |
file_url: str = upload_data["url"] + upload_data["fields"]["key"].split("$")[0] + filename | |
self._write_file_url(filename, file_url) | |
return file_url | |
def threads(self, query: str = None, limit: int = None) -> list[dict]: | |
assert self.email, "not logged in" | |
assert self.finished, "already searching" | |
if not limit: limit = 20 | |
data: dict = {"version": "2.1", "source": "default", "limit": limit, "offset": 0} | |
if query: data["search_term"] = query | |
self._start_interaction() | |
ws_message: str = f"{self.base + self.n}" + dumps([ | |
"list_ask_threads", | |
data | |
]) | |
self.ws.send(ws_message) | |
while not self.finished or len(self.queue) != 0: | |
if len(self.queue) != 0: | |
return self.queue.pop(0) | |
def list_autosuggest(self, query: str = "", search_focus: str = "internet") -> list[dict]: | |
assert self.finished, "already searching" | |
self._start_interaction() | |
ws_message: str = f"{self.base + self.n}" + dumps([ | |
"list_autosuggest", | |
query, | |
{ | |
"has_attachment": False, | |
"search_focus": search_focus, | |
"source": "default", | |
"version": "2.1" | |
} | |
]) | |
self.ws.send(ws_message) | |
while not self.finished or len(self.queue) != 0: | |
if len(self.queue) != 0: | |
return self.queue.pop(0) | |
def close(self) -> None: | |
self.ws.close() | |
if self.email: | |
with open(".perplexity_session", "r") as f: | |
perplexity_session: dict = loads(f.read()) | |
perplexity_session[self.email] = self.session.cookies.get_dict() | |
with open(".perplexity_session", "w") as f: | |
f.write(dumps(perplexity_session)) | |