Ufoptg's picture
Upload 93 files
78b07ad verified
raw
history blame
18.9 kB
import io
import os
import time
import urllib.request
from shutil import rmtree
import emoji
import requests
from bs4 import BeautifulSoup
from edge_tts import Communicate
from geopy.geocoders import Nominatim
from geopy.location import Location
from googlesearch import search
from googletrans import LANGCODES, LANGUAGES, Translator
from imdb import Cinemagoer, Movie
from pyrogram import Client
from pyrogram.types import InputMediaPhoto, Message
from wikipedia import exceptions, summary
from Hellbot.functions.driver import Driver
from Hellbot.functions.images import download_images
from Hellbot.functions.paste import post_to_telegraph
from Hellbot.functions.scraping import is_valid_url
from . import Config, HelpMenu, Symbols, db, handler, hellbot, on_message
imdb = Cinemagoer()
mov_titles = [
"localized title",
"canonical title",
"smart canonical title",
"smart long imdb canonical title",
"long imdb canonical title",
"long imdb title",
]
final_msg = """
<b>โœฆ ๐–ณ๐–ฌ๐–ด{}00๐–ฑ๐–ซ ๐–จ๐—‡๐–ฟ๐—ˆ ๐–ฆ๐–พ๐—‡๐—‹๐–พ๐—Œ ๐–ฑ๐–บ๐—๐—‚๐—‡๐—€ ๐–ฃ๐—‚๏ฟฝ๐–พ๐–ผ๏ฟฝ๐—๏ฟฝ๏ฟฝ:</b> <code></code>
<b>โœฆ ๐–จ๐–ฌ๐–ฃ๐–ป ๐–ด๐–ฑ๐–ซ:</b> <a href='https://www.imdb.com/title/tt{1}'>Click here.</a>
<b>โœฆ ๐– ๐—‚๐—‹๐–ฝ๐–บ๐—๐–พ:</b> <code>{2}</code>
<b>โœฆ ๐–ฆ๐–พ๐—‡๐—‹๐–พ๐—Œ:</b> <code>{3}</code>
<b>โœฆ ๐–ฑ๐–บ๐—๐—‚๐—‡๐—€:</b> <code>{4}</code>
<b>โœฆ ๐–ฑ๐—Ž๐—‡๐—๐—‚๐—†๐–พ:</b> <code>{5}</code>
<b>โœฆ ๐–ฃ๐—‚๐—‹๐–พ๐–ผ๐—๐—ˆ๐—‹:</b> <code>{6}</code>
<b><a href='{7}'>๐Ÿ’ซ ๐–ฌ๐—ˆ๐—‹๐–พ ๐–ฝ๐–พ๐—๐–บ๐—‚๐—…๐—Œ ๐—๐–พ๐—‹๐–พ!</a></b>
"""
telegraph_msg = """
<img src='{0}'/>
<b>โœฆ ๐–ณ๐—‚๐—๐—…๐–พ:</b> <code>{1}</code>
<b>โœฆ ๐–จ๐–ฌ๐–ฃ๐–ป ๐–ด๐–ฑ๐–ซ:</b> <a href='https://www.imdb.com/title/tt{2}'>Click here.</a>
<b>โœฆ ๐– ๐—‚๐—‹๐–ฝ๐–บ๐—๐–พ:</b> <code>{3}</code>
<b>โœฆ ๐–ฆ๐–พ๐—‡๐—‹๐–พ๐—Œ:</b> <code>{4}</code>
<b>โœฆ ๐–ฑ๐–บ๐—๐—‚๐—‡๐—€:</b> <code>{5}</code>
<b>โœฆ ๐–ฑ๐—Ž๐—‡๐—๐—‚๐—†๐–พ:</b> <code>{6}</code>
<b>โœฆ ๐–ฃ๐—‚๐—‹๐–พ๐–ผ๐—๐—ˆ๐—‹:</b> <code>{7}</code>
<b>โœฆ ๐–ถ๐—‹๐—‚๐—๐–พ๐—‹:</b> <code>{8}</code>
<b>โœฆ ๐–ข๐—ˆ๐—†๐—‰๐—ˆ๐—Œ๐–พ๐—‹๐—Œ:</b> <code>{9}</code>
<b>โœฆ ๐–ข๐–บ๐—Œ๐—:</b> <code>{10}</code>
<b>โœฆ ๐–ข๐—ˆ๐—Ž๐—‡๐—๐—‹๐—’:</b> <code>{11}</code>
<b>โœฆ ๐–ซ๐–บ๐—‡๐—€๐—Ž๐–บ๐—€๐–พ:</b> <code>{12}</code>
<b>โœฆ ๐–ก๐—ˆ๐—‘ ๐–ฎ๐–ฟ๐–ฟ๐—‚๐–ผ๐–พ:</b> <code>{13}</code>
<b>โœฆ ๐–ฏ๐—…๐—ˆ๐—๐—‹๐—Œ14๐–ก๐—ˆ๐—‘ ๐–ฎ๐–ฟ๐–ฟ๐—‚๐–ผ๐–พ:</b> <
<b>๐Ÿ€ @HellBot_Networks</b>
"""
@on_message("wikipedia", allow_stan=True)
async def google_search(client: Client, message: Message):
if len(message.command) < 2:
return await hellbot.edit(message, "Give some text to search on wikipedia.")
search_query = await hellbot.input(message)
hell = await hellbot.edit(
message, f"Searching for `{search_query}` on wikipedia..."
)
try:
data = summary(search_query, auto_suggest=False)
except exceptions.DisambiguationError as error:
error = str(error).split("\n")
result = "".join(
f"`{i}`\n" if lineno > 1 else f"**{i}**\n"
for lineno, i in enumerate(error, start=1)
)
return await hell.edit(f"**DisambiguationError:**\n\n{result}")
except exceptions.PageError:
return await hellbot.delete(hell, "**Page not found.**")
await hell.edit(
f"**๐–ฒ๐–พ๐–บ๐—‹๐–ผ๐—:** `{search_query}`\n**๐–ฑ๐–พ๐—Œ๐—Ž๐—…๐—:**\n{data}",
disable_web_page_preview=True,
)
@on_message("google", allow_stan=True)
async def googleSearch(_, message: Message):
if len(message.command) < 2:
return await hellbot.edit(message, "Give some text to search on google.")
search_query = await hellbot.input(message)
hell = await hellbot.edit(message, f"Searching for `{search_query}` on google...")
try:
results = search(search_query, 7, advanced=True)
except Exception as error:
return await hellbot.error(hell, f"`{str(error)}`")
outStr = f"**๐Ÿ” ๐–ฒ๐–พ๐–บ๐—‹๐–ผ๐—:** `{search_query}`\n\n"
for result in results:
outStr += f"**๐ŸŒ ๐–ฑ๐–พ๐—Œ๐—Ž๐—…๐—:** [{result.title}]({result.url})\n"
outStr += f"**๐Ÿ“– ๐–ฃ๐–พ๐—Œ๐–ผ:** {str(result.description)[:50]}...\n\n"
await hell.edit(outStr, disable_web_page_preview=True)
@on_message("reverse", allow_stan=True)
async def reverseSearch(_, message: Message):
if not message.reply_to_message:
return await hellbot.edit(
message, "Reply to an image/sticker to reverse search it."
)
hell = await hellbot.edit(message, "Processing...")
if message.reply_to_message.sticker or message.reply_to_message.photo:
dl_path = await message.reply_to_message.download(
Config.DWL_DIR + "reverse.jpg"
)
file = {"encoded_image": (dl_path, open(dl_path, "rb"))}
else:
return await hellbot.error(
hell, "Reply to an image/sticker to reverse search it."
)
await hell.edit("Searching on google...")
resp = requests.post(
"https://www.google.com/searchbyimage/upload", files=file, allow_redirects=False
)
webresp = requests.get(
resp.headers.get("Location"),
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:58.0) Gecko/20100101 Firefox/58.0",
},
)
os.remove(dl_path)
soup = BeautifulSoup(webresp.text, "html.parser")
div = soup.find("div", {"class": "r5a77d"})
if div:
alls = div.find("a")
link = alls["href"]
text = alls.text
await hell.edit(
f"**๐–ฏ๐—ˆ๐—Œ๐—Œ๐—‚๐–ป๐—…๐–พ ๐–ฑ๐–พ๐—Œ๐—Ž๐—…๐—:** [{text}]({link})", disable_web_page_preview=True
)
else:
return await hell.edit("No results found.")
try:
to_send = []
images = await download_images(text, 3)
for image in images:
to_send.append(InputMediaPhoto(image))
if to_send:
await hell.reply_media_group(to_send)
try:
rmtree("./images")
except:
pass
except:
pass
@on_message("gps", allow_stan=True)
async def gpsLocation(_, message: Message):
if len(message.command) < 2:
return await hellbot.edit(message, "Give some place name to search.")
search_query = await hellbot.input(message)
hell = await hellbot.edit(
message, f"Searching for `{search_query}` on google maps..."
)
geolocator = Nominatim(user_agent="Hellbot")
location: Location = geolocator.geocode(search_query)
if not location:
return await hellbot.delete(hell, "Location not found.")
latitiude = location.latitude
longitude = location.longitude
address = location.address
await hell.reply_location(latitiude, longitude)
await hellbot.delete(hell, f"**๐ŸŒ ๐–ฏ๐—…๐–บ๐–ผ๐–พ:** {address}")
@on_message("webshot", allow_stan=True)
async def webScreenshot(_, message: Message):
if len(message.command) < 2:
return await hellbot.edit(message, "Give some url to take screenshot.")
search_query = await hellbot.input(message)
if not is_valid_url(search_query):
return await hellbot.edit(message, "Invalid url.")
hell = await hellbot.edit(message, f"Taking screenshot of `{search_query}`...")
driver, resp = Driver.get()
if not driver:
return await hellbot.error(hell, resp)
driver.get(search_query)
height = driver.execute_script(
"return Math.max(document.body.scrollHeight, document.body.offsetHeight, document.documentElement.clientHeight, document.documentElement.scrollHeight, document.documentElement.offsetHeight);"
)
width = driver.execute_script(
"return Math.max(document.body.scrollWidth, document.body.offsetWidth, document.documentElement.clientWidth, document.documentElement.scrollWidth, document.documentElement.offsetWidth);"
)
driver.set_window_size(width + 100, height + 100)
image = driver.get_screenshot_as_png()
Driver.close(driver)
with io.BytesIO(image) as result:
result.name = "Hellbot_Webshot.png"
await hell.reply_document(result)
await hell.delete()
try:
os.remove("Hellbot_Webshot.png")
except:
pass
@on_message("cricket", allow_stan=True)
async def cricketScore(_, message: Message):
BASE = "http://static.cricinfo.com/rss/livescores.xml"
page = urllib.request.urlopen(BASE)
soup = BeautifulSoup(page, "html.parser")
result = soup.find_all("description")
final = "**Cricket Live Score:\n\n**"
for match in result:
final += f"{Symbols.bullet} `{match.text}`\n\n"
await hellbot.edit(message, final)
@on_message(["dictionary", "meaning"], allow_stan=True)
async def wordMeaning(_, message: Message):
BASE = "https://api.dictionaryapi.dev/api/v2/entries/en/{0}"
if len(message.command) < 2:
return await hellbot.edit(message, "Give some word to search.")
search_query = await hellbot.input(message)
hell = await hellbot.edit(message, f"Searching for `{search_query}`...")
response = requests.get(BASE.format(search_query))
if response.status_code == 404:
return await hellbot.delete(hell, "Word not found.")
data: dict = response.json()[0]
outStr = ""
outStr += f"**๐Ÿ“–W:ord** `{data.get('word', search_query)}`\n"
outStr += f"**๐Ÿ”Š Phonetic:** `{data.get('phonetic', 'Not Found')}`\n"
for meaning in data.get("meanings", []):
outStr += f"\n**{Symbols.bullet} Part of Speech:** `{meaning.get('partOfSpeech', 'Not Found').title()}`\n"
for definition in meaning.get("definitions", []):
outStr += f" {Symbols.check_mark} `{definition.get('definition', 'Not Found')}`\n"
synonyms = meaning.get("synonyms", [])
outStr += "**๐Ÿ‘ Synonyms:** " + ", ".join(synonyms) if synonyms else "Not Found"
outStr += "\n"
antonyms = meaning.get("antonyms", [])
outStr += "**๐Ÿ‘Ž Antonyms:** " + ", ".join(antonyms) if antonyms else "Not Found"
outStr += "\n"
audio = data.get("phonetics", [])[0].get("audio", "")
if audio:
await hell.reply_audio(audio, caption=outStr)
await hell.delete()
else:
await hell.edit(outStr)
@on_message(["translate", "tr"], allow_stan=True)
async def translateHandler(_, message: Message):
if message.reply_to_message:
if len(message.command) < 2:
return await hellbot.edit(message, "Give some language code to translate.")
toLang = message.command[1]
text = message.reply_to_message.text or message.reply_to_message.caption
elif len(message.command) > 2:
msg = await hellbot.input(message)
toLang = message.command[1]
text = msg.split(" ", 1)[1]
else:
return await hellbot.delete(
message,
f"Either reply to a message with a language code or give input text and language code.\n\n**Example:** `{handler}tr en ใ“ใ‚“ใซใกใฏไธ–็•Œ`",
15,
)
hell = await hellbot.edit(message, f"Translating to `{toLang}`...")
text = emoji.demojize(text.strip())
translator = Translator(http2=False)
try:
translated = translator.translate(text, toLang)
outStr = f"**๐ŸŒ ๐–ณ๐—‹๐–บ๐—‡๐—Œ๐—…๐–บ๐—๐–พ๐–ฝ ๐–ฟ๐—‹๐—ˆ๐—†** __{translated.src}__ **๐—๐—ˆ** __{translated.dest}__**:**"
outStr += f"\n\n`{translated.text}`"
await hell.edit(outStr)
except Exception as e:
return await hellbot.error(hell, f"`{str(e)}`")
@on_message("trcode", allow_stan=True)
async def translateCodes(_, message: Message):
outStr = None
if len(message.command) > 1:
language = message.command[1]
fromCodeToLang = LANGUAGES.get(language.lower(), None)
fromLangToCode = LANGCODES.get(language.lower(), None)
if fromCodeToLang:
outStr = f"**{Symbols.bullet} Language Code:** `{language.lower()}`\n**{Symbols.bullet} Language:** `{fromCodeToLang}`"
elif fromLangToCode:
outStr = f"**{Symbols.bullet} Language:** `{language.lower()}`\n**{Symbols.bullet} Language Code:** `{fromLangToCode}`"
else:
outStr = None
if not outStr:
outStr = "**Language Codes:**\n\n"
for code in LANGUAGES:
outStr += f"**{code}**: {LANGUAGES[code]}\n"
await hellbot.edit(message, outStr)
@on_message(["voice", "tts"], allow_stan=True)
async def textToSpeech(_, message: Message):
if message.reply_to_message:
text = message.reply_to_message.text or message.reply_to_message.caption
elif len(message.command) > 2:
text = await hellbot.input(message)
else:
return await hellbot.delete(
message,
f"Either reply to a message with a language code or give input text and language code.\n\n**Example:** `{handler}tr en ใ“ใ‚“ใซใกใฏไธ–็•Œ`",
15,
)
hell = await hellbot.edit(message, "Processing...")
text = emoji.demojize(text.strip())
try:
comm = Communicate(
text,
"en-IN-NeerjaExpressiveNeural",
rate="+10%",
volume="+50%",
pitch="+5Hz",
)
path = f"{Config.DWL_DIR}tts{int(time.time())}.mp3"
await comm.save(path)
await message.reply_audio(
path,
caption=f"**๐Ÿ”Š ๐–ต๐—ˆ๐—‚๐–ผ๐–พ:** `{text[:100]}...`",
performer="HellyAI",
title="Hellbot TTS",
thumb="./Hellbot/resources/images/hellbot_logo.png",
)
await hell.delete()
except Exception as e:
return await hellbot.error(hell, f"`{str(e)}`")
@on_message("movie", allow_stan=True)
async def movieSearch(_, message: Message):
if len(message.command) < 2:
return await hellbot.edit(message, "Give a movie name to search on IMDb.")
query = await hellbot.input(message)
hell = await hellbot.edit(message, "Searching...")
try:
movieObj: Movie.Movie = imdb.search_movie(query)[0]
movieId = movieObj.movieID
movieObj = imdb.get_movie(movieId)
movieKeys = list(movieObj.keys())
title = "No title found."
for i in mov_titles:
if i in movieKeys:
title = movieObj[i]
break
airdate = "N/A"
if "original air date" in movieKeys:
airdate = movieObj["original air date"]
elif "year" in movieKeys:
airdate = movieObj["year"]
runtime = movieObj.get("runtimes", ["N/A"])[0] + " min"
rating = str(movieObj.get("rating", "N/A"))
rating += " (by " + str(movieObj.get("votes", "N/A")) + " votes)"
genres = ", ".join(movieObj.get("genres", ["N/A"]))
countries = ", ".join(movieObj.get("countries", ["N/A"]))
languages = ", ".join(movieObj.get("languages", ["N/A"]))
plot = movieObj.get("plot outline", "N/A")
cast = ", ".join([str(actor) for actor in movieObj.get("cast", ["N/A"])])
directors = ", ".join(
[str(actor) for actor in movieObj.get("director", ["N/A"])]
)
writers = ", ".join([str(actor) for actor in movieObj.get("writer", ["N/A"])])
composers = ", ".join(
[str(actor) for actor in movieObj.get("composer", ["N/A"])]
)
box_office_info = movieObj.get("box office", {})
box_office = (
"\n".join([f"{key} --> {value}" for key, value in box_office_info.items()])
if box_office_info
else "N/A"
)
image = movieObj.get("full-size cover url", None)
link = post_to_telegraph(
f"IMDb Search: {title}",
telegraph_msg.format(
image,
title,
movieId,
airdate,
genres,
rating,
runtime,
directors,
writers,
composers,
cast,
countries,
languages,
box_office,
plot,
),
)
await message.reply_photo(
image,
caption=final_msg.format(
title, movieId, airdate, genres, rating, runtime, directors, link
),
parse_mode="html",
)
await hell.delete()
except IndexError:
await hellbot.delete(hell, "No results found.")
except Exception as e:
await hellbot.error(hell, str(e))
HelpMenu("google").add(
"wikipedia",
"<query>",
"Searches for the given query on wikipedia.",
"wikipedia keanu reeves",
).add(
"google", "<query>", "Searches for the given query on google.", "google the hellbot"
).add(
"reverse",
"<reply to image/sticker>",
"Reverse searches the given image/sticker.",
"reverse",
).add(
"gps", "<place name>", "Sends the pin location of the given place.", "gps new york"
).add(
"webshot",
"<url>",
"Takes a screenshot of the given url.",
"webshot https://google.com",
).add(
"cricket", None, "Sends the live cricket scores.", "cricket"
).add(
"dictionary",
"<word>",
"Sends the meaning of the given word.",
"dictionary loyalty",
"An alias of 'meaning' can also be used.",
).add(
"translate",
"<lannguage code> <text/reply to message>",
"Translates the given text to the given language code.",
"translate en ใ“ใ‚“ใซใกใฏไธ–็•Œ",
"An alias of 'tr' can also be used.",
).add(
"trcode",
"<language / code>",
"Sends the language code and language name if found.",
"trcode en",
"If no language code is given, sends all the language codes.",
).add(
"voice",
"<text/reply to message>",
"Sends the text as a voice message.",
"voice I'm Helly and this is an Text to Speech Example.",
"An alias of 'tts' can also be used.",
).add(
"movie", # Bugged: to-be-fixed
"<movie name>",
"Sends the details of the given movie.",
"movie the shawshak redemption",
).info(
"Every Google command you need."
).done()