Spaces:
Runtime error
Runtime error
| """ | |
| ======================================================================== | |
| 第一部分:来自EdgeGPT.py | |
| https://github.com/acheong08/EdgeGPT | |
| ======================================================================== | |
| """ | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| import random | |
| import re | |
| import ssl | |
| import sys | |
| import uuid | |
| from enum import Enum | |
| from typing import Generator | |
| from typing import Literal | |
| from typing import Optional | |
| from typing import Union | |
| import websockets.client as websockets | |
| DELIMITER = "\x1e" | |
| # Generate random IP between range 13.104.0.0/14 | |
| FORWARDED_IP = ( | |
| f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}" | |
| ) | |
| HEADERS = { | |
| "accept": "application/json", | |
| "accept-language": "en-US,en;q=0.9", | |
| "content-type": "application/json", | |
| "sec-ch-ua": '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"', | |
| "sec-ch-ua-arch": '"x86"', | |
| "sec-ch-ua-bitness": '"64"', | |
| "sec-ch-ua-full-version": '"109.0.1518.78"', | |
| "sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', | |
| "sec-ch-ua-mobile": "?0", | |
| "sec-ch-ua-model": "", | |
| "sec-ch-ua-platform": '"Windows"', | |
| "sec-ch-ua-platform-version": '"15.0.0"', | |
| "sec-fetch-dest": "empty", | |
| "sec-fetch-mode": "cors", | |
| "sec-fetch-site": "same-origin", | |
| "x-ms-client-request-id": str(uuid.uuid4()), | |
| "x-ms-useragent": "azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32", | |
| "Referer": "https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx", | |
| "Referrer-Policy": "origin-when-cross-origin", | |
| "x-forwarded-for": FORWARDED_IP, | |
| } | |
| HEADERS_INIT_CONVER = { | |
| "authority": "edgeservices.bing.com", | |
| "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
| "accept-language": "en-US,en;q=0.9", | |
| "cache-control": "max-age=0", | |
| "sec-ch-ua": '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', | |
| "sec-ch-ua-arch": '"x86"', | |
| "sec-ch-ua-bitness": '"64"', | |
| "sec-ch-ua-full-version": '"110.0.1587.69"', | |
| "sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"', | |
| "sec-ch-ua-mobile": "?0", | |
| "sec-ch-ua-model": '""', | |
| "sec-ch-ua-platform": '"Windows"', | |
| "sec-ch-ua-platform-version": '"15.0.0"', | |
| "sec-fetch-dest": "document", | |
| "sec-fetch-mode": "navigate", | |
| "sec-fetch-site": "none", | |
| "sec-fetch-user": "?1", | |
| "upgrade-insecure-requests": "1", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69", | |
| "x-edge-shopping-flag": "1", | |
| "x-forwarded-for": FORWARDED_IP, | |
| } | |
| def get_ssl_context(): | |
| import certifi | |
| ssl_context = ssl.create_default_context() | |
| ssl_context.load_verify_locations(certifi.where()) | |
| return ssl_context | |
| class NotAllowedToAccess(Exception): | |
| pass | |
| class ConversationStyle(Enum): | |
| creative = "h3imaginative,clgalileo,gencontentv3" | |
| balanced = "galileo" | |
| precise = "h3precise,clgalileo" | |
| CONVERSATION_STYLE_TYPE = Optional[ | |
| Union[ConversationStyle, Literal["creative", "balanced", "precise"]] | |
| ] | |
| def _append_identifier(msg: dict) -> str: | |
| """ | |
| Appends special character to end of message to identify end of message | |
| """ | |
| # Convert dict to json string | |
| return json.dumps(msg) + DELIMITER | |
| def _get_ran_hex(length: int = 32) -> str: | |
| """ | |
| Returns random hex string | |
| """ | |
| return "".join(random.choice("0123456789abcdef") for _ in range(length)) | |
| class _ChatHubRequest: | |
| """ | |
| Request object for ChatHub | |
| """ | |
| def __init__( | |
| self, | |
| conversation_signature: str, | |
| client_id: str, | |
| conversation_id: str, | |
| invocation_id: int = 0, | |
| ) -> None: | |
| self.struct: dict = {} | |
| self.client_id: str = client_id | |
| self.conversation_id: str = conversation_id | |
| self.conversation_signature: str = conversation_signature | |
| self.invocation_id: int = invocation_id | |
| def update( | |
| self, | |
| prompt, | |
| conversation_style, | |
| options, | |
| ) -> None: | |
| """ | |
| Updates request object | |
| """ | |
| if options is None: | |
| options = [ | |
| "deepleo", | |
| "enable_debug_commands", | |
| "disable_emoji_spoken_text", | |
| "enablemm", | |
| ] | |
| if conversation_style: | |
| if not isinstance(conversation_style, ConversationStyle): | |
| conversation_style = getattr(ConversationStyle, conversation_style) | |
| options = [ | |
| "nlu_direct_response_filter", | |
| "deepleo", | |
| "disable_emoji_spoken_text", | |
| "responsible_ai_policy_235", | |
| "enablemm", | |
| conversation_style.value, | |
| "dtappid", | |
| "cricinfo", | |
| "cricinfov2", | |
| "dv3sugg", | |
| ] | |
| self.struct = { | |
| "arguments": [ | |
| { | |
| "source": "cib", | |
| "optionsSets": options, | |
| "sliceIds": [ | |
| "222dtappid", | |
| "225cricinfo", | |
| "224locals0", | |
| ], | |
| "traceId": _get_ran_hex(32), | |
| "isStartOfSession": self.invocation_id == 0, | |
| "message": { | |
| "author": "user", | |
| "inputMethod": "Keyboard", | |
| "text": prompt, | |
| "messageType": "Chat", | |
| }, | |
| "conversationSignature": self.conversation_signature, | |
| "participant": { | |
| "id": self.client_id, | |
| }, | |
| "conversationId": self.conversation_id, | |
| }, | |
| ], | |
| "invocationId": str(self.invocation_id), | |
| "target": "chat", | |
| "type": 4, | |
| } | |
| self.invocation_id += 1 | |
| class _Conversation: | |
| """ | |
| Conversation API | |
| """ | |
| def __init__( | |
| self, | |
| cookies, | |
| proxy, | |
| ) -> None: | |
| self.struct: dict = { | |
| "conversationId": None, | |
| "clientId": None, | |
| "conversationSignature": None, | |
| "result": {"value": "Success", "message": None}, | |
| } | |
| import httpx | |
| self.proxy = proxy | |
| proxy = ( | |
| proxy | |
| or os.environ.get("all_proxy") | |
| or os.environ.get("ALL_PROXY") | |
| or os.environ.get("https_proxy") | |
| or os.environ.get("HTTPS_PROXY") | |
| or None | |
| ) | |
| if proxy is not None and proxy.startswith("socks5h://"): | |
| proxy = "socks5://" + proxy[len("socks5h://") :] | |
| self.session = httpx.Client( | |
| proxies=proxy, | |
| timeout=30, | |
| headers=HEADERS_INIT_CONVER, | |
| ) | |
| for cookie in cookies: | |
| self.session.cookies.set(cookie["name"], cookie["value"]) | |
| # Send GET request | |
| response = self.session.get( | |
| url=os.environ.get("BING_PROXY_URL") | |
| or "https://edgeservices.bing.com/edgesvc/turing/conversation/create", | |
| ) | |
| if response.status_code != 200: | |
| response = self.session.get( | |
| "https://edge.churchless.tech/edgesvc/turing/conversation/create", | |
| ) | |
| if response.status_code != 200: | |
| print(f"Status code: {response.status_code}") | |
| print(response.text) | |
| print(response.url) | |
| raise Exception("Authentication failed") | |
| try: | |
| self.struct = response.json() | |
| except (json.decoder.JSONDecodeError, NotAllowedToAccess) as exc: | |
| raise Exception( | |
| "Authentication failed. You have not been accepted into the beta.", | |
| ) from exc | |
| if self.struct["result"]["value"] == "UnauthorizedRequest": | |
| raise NotAllowedToAccess(self.struct["result"]["message"]) | |
| class _ChatHub: | |
| """ | |
| Chat API | |
| """ | |
| def __init__(self, conversation) -> None: | |
| self.wss = None | |
| self.request: _ChatHubRequest | |
| self.loop: bool | |
| self.task: asyncio.Task | |
| print(conversation.struct) | |
| self.request = _ChatHubRequest( | |
| conversation_signature=conversation.struct["conversationSignature"], | |
| client_id=conversation.struct["clientId"], | |
| conversation_id=conversation.struct["conversationId"], | |
| ) | |
| async def ask_stream( | |
| self, | |
| prompt: str, | |
| wss_link: str, | |
| conversation_style: CONVERSATION_STYLE_TYPE = None, | |
| raw: bool = False, | |
| options: dict = None, | |
| ) -> Generator[str, None, None]: | |
| """ | |
| Ask a question to the bot | |
| """ | |
| if self.wss and not self.wss.closed: | |
| await self.wss.close() | |
| # Check if websocket is closed | |
| self.wss = await websockets.connect( | |
| wss_link, | |
| extra_headers=HEADERS, | |
| max_size=None, | |
| ssl=get_ssl_context() | |
| ) | |
| await self._initial_handshake() | |
| # Construct a ChatHub request | |
| self.request.update( | |
| prompt=prompt, | |
| conversation_style=conversation_style, | |
| options=options, | |
| ) | |
| # Send request | |
| await self.wss.send(_append_identifier(self.request.struct)) | |
| final = False | |
| while not final: | |
| objects = str(await self.wss.recv()).split(DELIMITER) | |
| for obj in objects: | |
| if obj is None or not obj: | |
| continue | |
| response = json.loads(obj) | |
| if response.get("type") != 2 and raw: | |
| yield False, response | |
| elif response.get("type") == 1 and response["arguments"][0].get( | |
| "messages", | |
| ): | |
| resp_txt = response["arguments"][0]["messages"][0]["adaptiveCards"][ | |
| 0 | |
| ]["body"][0].get("text") | |
| yield False, resp_txt | |
| elif response.get("type") == 2: | |
| final = True | |
| yield True, response | |
| async def _initial_handshake(self) -> None: | |
| await self.wss.send(_append_identifier({"protocol": "json", "version": 1})) | |
| await self.wss.recv() | |
| async def close(self) -> None: | |
| """ | |
| Close the connection | |
| """ | |
| if self.wss and not self.wss.closed: | |
| await self.wss.close() | |
| class NewbingChatbot: | |
| """ | |
| Combines everything to make it seamless | |
| """ | |
| def __init__( | |
| self, | |
| cookies, | |
| proxy | |
| ) -> None: | |
| if cookies is None: | |
| cookies = {} | |
| self.cookies = cookies | |
| self.proxy = proxy | |
| self.chat_hub: _ChatHub = _ChatHub( | |
| _Conversation(self.cookies, self.proxy), | |
| ) | |
| async def ask( | |
| self, | |
| prompt: str, | |
| wss_link: str, | |
| conversation_style: CONVERSATION_STYLE_TYPE = None, | |
| options: dict = None, | |
| ) -> dict: | |
| """ | |
| Ask a question to the bot | |
| """ | |
| async for final, response in self.chat_hub.ask_stream( | |
| prompt=prompt, | |
| conversation_style=conversation_style, | |
| wss_link=wss_link, | |
| options=options, | |
| ): | |
| if final: | |
| return response | |
| await self.chat_hub.wss.close() | |
| return None | |
| async def ask_stream( | |
| self, | |
| prompt: str, | |
| wss_link: str, | |
| conversation_style: CONVERSATION_STYLE_TYPE = None, | |
| raw: bool = False, | |
| options: dict = None, | |
| ) -> Generator[str, None, None]: | |
| """ | |
| Ask a question to the bot | |
| """ | |
| async for response in self.chat_hub.ask_stream( | |
| prompt=prompt, | |
| conversation_style=conversation_style, | |
| wss_link=wss_link, | |
| raw=raw, | |
| options=options, | |
| ): | |
| yield response | |
| async def close(self) -> None: | |
| """ | |
| Close the connection | |
| """ | |
| await self.chat_hub.close() | |
| async def reset(self) -> None: | |
| """ | |
| Reset the conversation | |
| """ | |
| await self.close() | |
| self.chat_hub = _ChatHub(_Conversation(self.cookies, self.proxy)) | |