Spaces:
Runtime error
Runtime error
| import io | |
| import re | |
| import subprocess | |
| import sys | |
| import traceback | |
| import bs4 | |
| import requests | |
| from pyrogram import Client | |
| from pyrogram.errors import MessageTooLong | |
| from pyrogram.types import Message | |
| from speedtest import Speedtest | |
| from . import HelpMenu, hellbot, on_message | |
| async def aexec(code, client, message): | |
| exec( | |
| "async def __aexec(client, message): " | |
| + "".join(f"\n {l_}" for l_ in code.split("\n")) | |
| ) | |
| return await locals()["__aexec"](client, message) | |
| async def runeval(client: Client, message: Message): | |
| if len(message.command) < 2: | |
| return await hellbot.delete(message, "No python code provided!") | |
| reply_to = message.reply_to_message or message | |
| code = await hellbot.input(message) | |
| hell = await hellbot.edit(message, "`running...`") | |
| old_stderr = sys.stderr | |
| old_stdout = sys.stdout | |
| redirected_output = sys.stdout = io.StringIO() | |
| redirected_error = sys.stderr = io.StringIO() | |
| stdout, stderr, exc = None, None, None | |
| try: | |
| await aexec(code, client, message) | |
| except Exception: | |
| exc = traceback.format_exc() | |
| evaluation = "" | |
| stdout = redirected_output.getvalue() | |
| stderr = redirected_error.getvalue() | |
| sys.stdout = old_stdout | |
| sys.stderr = old_stderr | |
| if exc: | |
| evaluation = exc | |
| elif stderr: | |
| evaluation = stderr | |
| elif stdout: | |
| evaluation = stdout | |
| else: | |
| evaluation = "Success" | |
| heading = f"**๐ค๐๐บ๐ :**\n```python\n{code}```\n\n" | |
| output = f"**๐ฎ๐๐๐๐๐:**\n`{evaluation.strip()}`" | |
| final_output = heading + output | |
| try: | |
| await reply_to.reply_text(final_output, disable_web_page_preview=True) | |
| except MessageTooLong: | |
| with io.BytesIO(str.encode(output)) as out_file: | |
| out_file.name = "eval.txt" | |
| await reply_to.reply_document(out_file, caption=heading) | |
| await hell.delete() | |
| async def runterm(client: Client, message: Message): | |
| if len(message.command) < 2: | |
| return await hellbot.delete(message, "No shell code provided!") | |
| reply_to = message.reply_to_message or message | |
| cmd = await hellbot.input(message) | |
| hell = await hellbot.edit(message, "`running...`") | |
| if "\n" in cmd: | |
| code = cmd.split("\n") | |
| output = "" | |
| for x in code: | |
| shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x) | |
| try: | |
| process = subprocess.Popen( | |
| shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| except Exception as err: | |
| print(err) | |
| await hell.edit(f"**Error:** \n`{err}`") | |
| output += f"**{code}**\n" | |
| output += process.stdout.read()[:-1].decode("utf-8") | |
| output += "\n" | |
| else: | |
| shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", cmd) | |
| for a in range(len(shell)): | |
| shell[a] = shell[a].replace('"', "") | |
| try: | |
| process = subprocess.Popen( | |
| shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| except Exception as err: | |
| exc_type, exc_obj, exc_tb = sys.exc_info() | |
| errors = traceback.format_exception(exc_type, exc_obj, exc_tb) | |
| await hell.edit("**Error:**\n`{}`".format("".join(errors))) | |
| return | |
| output = process.stdout.read()[:-1].decode("utf-8") | |
| if str(output) == "\n": | |
| return await hell.edit(f"**๐ฎ๐๐๐๐๐:** __๐ญ๐ ๐๐๐๐๐๐!__") | |
| else: | |
| try: | |
| await reply_to.reply_text( | |
| f"**{client.me.id}@hellbot:~$** `{cmd}`\n\n**๐ฎ๐๐๐๐๐:**\n```\n{output}```" | |
| ) | |
| except MessageTooLong: | |
| with io.BytesIO(str.encode(output)) as out_file: | |
| out_file.name = "exec.txt" | |
| await reply_to.reply_document( | |
| out_file, | |
| caption=f"**{client.me.id}@hellbot:~$** `{cmd}`", | |
| ) | |
| await hell.delete() | |
| async def runshell(_, message: Message): | |
| if len(message.command) < 2: | |
| return await hellbot.delete(message, "No shell code provided!") | |
| code = await hellbot.input(message) | |
| hell = await hellbot.edit(message, "`executing...`") | |
| result = subprocess.run(code, shell=True, capture_output=True, text=True) | |
| output = result.stdout + result.stderr | |
| heading = f"**๐ฒ๐๐พ๐ ๐ :**\n```sh\n{code}```\n\n" | |
| output = f"**๐ฎ๐๐๐๐๐:**\n`{output.strip()}`" | |
| final_output = heading + output | |
| try: | |
| await message.reply_text(final_output, disable_web_page_preview=True) | |
| except MessageTooLong: | |
| with io.BytesIO(str.encode(output)) as out_file: | |
| out_file.name = "shell.txt" | |
| await message.reply_document(out_file, caption=heading) | |
| await hell.delete() | |
| async def file_extention(_, message: Message): | |
| BASE_URL = "https://www.fileext.com/file-extension/{0}.html" | |
| if len(message.command) < 2: | |
| return await hellbot.delete(message, "No file extention provided!") | |
| extention = message.command[1] | |
| hell = await hellbot.edit(message, "`getting information...`") | |
| response = requests.get(BASE_URL.format(extention)) | |
| if response.status_code == 200: | |
| soup = bs4.BeautifulSoup(response.content, "html.parser") | |
| details = soup.find_all("td", {"colspan": "3"})[-1].text | |
| await hell.edit(f"**๐ฅ๐๐ ๐พ ๐ค๐๐๐พ๐๐๐๐๐:** `{extention}`\n\n**๐ฃ๐พ๐๐บ๐๐ ๐:**\n`{details}`") | |
| else: | |
| await hell.edit(f"**๐ฅ๐๐ ๐พ ๐ค๐๐๐พ๐๐๐๐๐:** `{extention}`\n\n**๐ฃ๐พ๐๐บ๐๐ ๐:**\n`Not Found`") | |
| async def pypi(_, message: Message): | |
| BASE_URL = "https://pypi.org/pypi/{0}/json" | |
| if len(message.command) < 2: | |
| return await hellbot.delete(message, "No package name provided!") | |
| package = message.command[1] | |
| hell = await hellbot.edit(message, "`getting information...`") | |
| response = requests.get(BASE_URL.format(package)) | |
| if response.status_code == 200: | |
| data = response.json() | |
| info = data["info"] | |
| name = info["name"] | |
| url = info["package_url"] | |
| version = info["version"] | |
| summary = info["summary"] | |
| await hell.edit(f"**๐ฏ๐บ๐ผ๐๐บ๐๐พ:** [{name}]({url}) (`{version}`)\n\n**๐ฃ๐พ๐๐บ๐๐ ๐:** `{summary}`") | |
| else: | |
| await hell.edit(f"**๐ฏ๐บ๐ผ๐๐บ๐๐พ:** `{package}`\n\n**๐ฃ๐พ๐๐บ๐๐ ๐:** `Not Found`") | |
| async def speed_test(_, message: Message): | |
| hell = await hellbot.edit(message, "`testing speed...`") | |
| speed = Speedtest() | |
| speed.get_best_server() | |
| await hell.edit("`calculating download speed...`") | |
| speed.download() | |
| await hell.edit("`calculating upload speed...`") | |
| speed.upload() | |
| await hell.edit("`finising up...`") | |
| speed.results.share() | |
| result = speed.results.dict() | |
| form = """**๐ฒ๐๐พ๐พ๐ฝ๐ณ๐พ๐๐ ๐ฑ๐พ๐๐๐ ๐๐ ๐** | |
| **โง ๐จ๐ฒ๐ฏ:** `{0}, {1}` | |
| **โง ๐ฏ๐๐๐:** `{2}` | |
| **โง ๐ฒ๐พ๐๐๐พ๐:** `{3}, {4}` | |
| **โง ๐ฒ๐๐๐๐๐๐:** `{5}` | |
| """ | |
| await message.reply_photo( | |
| result["share"], | |
| caption=form.format( | |
| result["client"]["isp"], | |
| result["client"]["country"], | |
| result["ping"], | |
| result["server"]["name"], | |
| result["server"]["country"], | |
| result["server"]["sponsor"], | |
| ) | |
| ) | |
| await hell.delete() | |
| HelpMenu("eval").add( | |
| "eval", | |
| "<python code>", | |
| "Execute the python code and get results.", | |
| "eval print('hello world')", | |
| "Use this command with caution! Using this command senselessly and getting yourself in trouble is not Hellbot's responsibility!" | |
| ).add( | |
| "exec", | |
| "<linux command>", | |
| "Execute the linux command and get results.", | |
| "exec ls -a", | |
| "Use this command with caution! Using this command senselessly and getting yourself in trouble is not Hellbot's responsibility!" | |
| ).add( | |
| "shell", | |
| "<shell script>", | |
| "Execute the shell script and get results.", | |
| "shell echo hello world", | |
| "Use this command with caution! Using this command senselessly and getting yourself in trouble is not Hellbot's responsibility!" | |
| ).add( | |
| "fext", | |
| "<file extention>", | |
| "Get the details of the file extention.", | |
| "fext py", | |
| ).add( | |
| "pypi", | |
| "<package name>", | |
| "Get the details of the package from pypi.", | |
| "pypi pyrogram", | |
| ).add( | |
| "speedtest", | |
| None, | |
| "Test the speed of server and client.", | |
| "speedtest", | |
| ).info( | |
| "Execute python, linux and shell scripts." | |
| ).done() | |