Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import base64 | |
from io import BytesIO | |
from PIL import Image | |
import hashlib | |
import concurrent.futures | |
# Constants | |
FREE_URL = os.environ.get("SERVER_URL_FREE") | |
PREMIUM_URL = os.environ.get("SERVER_URL_PREMIUM") | |
PREMIUM_URL2 = os.environ.get("SERVER_URL_PREMIUM2") | |
TOKEN_SERVER_URL = os.environ.get("TOKEN_URL") | |
API_KEY = os.environ.get("API_KEY") | |
STATUS_MESSAGES = { | |
301: "Too many faces in the photo.", | |
302: "Face is not clear enough.", | |
303: "No matches found.", | |
304: "No face in the photo.", | |
305: "Search blocked for privacy issue.", | |
401: "Invalid image format.", | |
402: "Wrong request.", | |
403: "Please try again later.", | |
404: "Timeout, try again." | |
} | |
FREE_SUFFIX = "*********" | |
def image_to_base64(image): | |
buffered = BytesIO() | |
image.save(buffered, format="JPEG", quality=90) | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
def base64_to_image(base64_str): | |
return base64.b64decode(base64_str + '=' * (-len(base64_str) % 4)) | |
def check_db(img_array, suffix): | |
hashes = [] | |
out_array = [] | |
for item in img_array: | |
try: | |
image_data = base64_to_image(item["image"]) | |
image_hash = hashlib.sha256(image_data).hexdigest() | |
hashes.append(image_hash) | |
out_array.append((Image.open(BytesIO(image_data)), item["url"] + suffix)) | |
except base64.binascii.Error as e: | |
raise ValueError(f"Invalid base64 string: {str(e)}") | |
except Exception as e: | |
raise ValueError(f"Error processing image: {str(e)}") | |
try: | |
response = requests.post(url=TOKEN_SERVER_URL + '/lookup-hashes', json={"hashes": hashes}) | |
out_array = [value for i, value in enumerate(out_array) if i not in response.json().get('res')] | |
except: | |
raise gr.Error("Token Server Error!") | |
return out_array | |
def verify_token(token): | |
try: | |
response = requests.post(url=TOKEN_SERVER_URL + '/verify-token', json={"token": token}) | |
if response.json().get('status') == 'success': | |
return PREMIUM_URL | |
else: | |
raise gr.Error("Invalid token! For free search, use empty string for token") | |
except: | |
raise gr.Error("Invalid token!") | |
def activate_token(token): | |
try: | |
requests.post(url=TOKEN_SERVER_URL + '/activate-token', json={"token": token}) | |
except: | |
raise gr.Error("Invalid token!") | |
def get_image_base64(file): | |
try: | |
image = Image.open(file) | |
return image_to_base64(image) | |
except Exception as e: | |
raise gr.Error("Please select image file!") | |
def send_requests(url, file): | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future1 = executor.submit(requests.post, url, headers={"X-RapidAPI-Key": API_KEY}, json={"image": get_image_base64(file)}) | |
if url == PREMIUM_URL: | |
future2 = executor.submit(requests.post, PREMIUM_URL2, files={"image": open(file, 'rb')}) | |
response1 = future1.result() | |
response2 = future2.result() | |
return response1, response2 | |
else: | |
response1 = future1.result() | |
return response1, None | |
def process_response(response, soc_res, url, token): | |
status_code = response.status_code | |
if not soc_res: | |
if status_code in STATUS_MESSAGES: | |
gr.Info(STATUS_MESSAGES[status_code]) | |
if status_code > 300: | |
return [] | |
try: | |
res = response.json().get('img_array') | |
if soc_res: | |
res = soc_res[0] + res + soc_res[1] | |
suffix = "" if url == PREMIUM_URL else FREE_SUFFIX | |
out_array = check_db(res, suffix) | |
if url == PREMIUM_URL: | |
activate_token(token) | |
return out_array | |
except: | |
raise gr.Error("Try again.") | |
def process_response2(response): | |
if response.status_code == 200: | |
part1 = response.json().get('part1') | |
part2 = response.json().get('part2') | |
if not part1 and not part2: | |
return None | |
return (part1, part2) | |
return None | |
def search_face(file, token=None): | |
url = FREE_URL | |
if token: | |
url = verify_token(token) | |
response1, response2 = send_requests(url, file) | |
soc_res = process_response2(response2) if response2 else None | |
return process_response(response1, soc_res, url, token) | |
custom_css = """ | |
caption.caption { | |
user-select: text; | |
cursor: text; | |
} | |
""" | |
js = """ | |
function aff() { | |
const links = document.querySelectorAll('a'); | |
const currentUrl = new URL(window.location.href); | |
const currentParams = currentUrl.searchParams.toString(); | |
links.forEach(link => { | |
const href = link.getAttribute('href'); | |
if (href && (href.startsWith('https://faceonlive.pocketsflow.com') || href.startsWith('https://faceonlive.com'))) { | |
const targetUrl = new URL(href); | |
// Append current page parameters to the link | |
currentParams.split('&').forEach(param => { | |
if (param) { | |
const [key, value] = param.split('='); | |
targetUrl.searchParams.set(key, value); | |
} | |
}); | |
link.setAttribute('href', targetUrl.toString()); | |
console.log(`Updated Link: ${targetUrl.toString()}`); | |
} | |
}); | |
return '' | |
} | |
""" | |
head = """ | |
<!-- Google tag (gtag.js) --> | |
<script async src="https://www.googletagmanager.com/gtag/js?id=G-8YPXF4536P"></script> | |
<script> | |
window.dataLayer = window.dataLayer || []; | |
function gtag(){dataLayer.push(arguments);} | |
gtag('js', new Date()); | |
gtag('config', 'G-8YPXF4536P'); | |
</script> | |
""" | |
with gr.Blocks(css=custom_css, head=head) as demo: | |
gr.Markdown( | |
""" | |
# Free Face Search Online | |
#### [Discover more about our Face Search on our website.](https://faceonlive.com/face-search-online) | |
<br> | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
image = gr.Image(type='filepath', height=480) | |
token = gr.Textbox(placeholder="(Optional) Get Premium Token Below.", label="Premium Token") | |
gr.HTML("<a href='https://faceonlive.pocketsflow.com/checkout?productId=676c15b1971244a587ca07cb' target='_blank'>Get Premium Token: Deep Search Including Social Media & Full URL Reveal</a>") | |
gr.HTML("<a href='https://faceonlive.pocketsflow.com/checkout?productId=676d7e7597f8b3b699f84820' target='_blank'>Protect Your Privacy β Start Your Takedown Now</a>") | |
search_face_button = gr.Button("Search Face") | |
with gr.Column(scale=2): | |
output = gr.Gallery(label="Found Images", columns=[4], object_fit="contain", height="auto") | |
gr.Examples(['examples/1.jpg', 'examples/2.jpg'], inputs=image, cache_examples=True, cache_mode='lazy', fn=search_face, outputs=[output]) | |
search_face_button.click(search_face, inputs=[image, token], outputs=[output], api_name=False) | |
gr.HTML('<a href="https://visitorbadge.io/status?path=https%3A%2F%2Fhuggingface.co%2Fspaces%2FFaceOnLive%2FFace-Search-Online"><img src="https://api.visitorbadge.io/api/visitors?path=https%3A%2F%2Fhuggingface.co%2Fspaces%2FFaceOnLive%2FFace-Search-Online&labelColor=%23ff8a65&countColor=%2337d67a&style=flat&labelStyle=upper" /></a>') | |
html = gr.HTML() | |
demo.load(None, inputs=None, outputs=html, js=js) | |
demo.queue(api_open=False, default_concurrency_limit=4).launch(server_name="0.0.0.0", server_port=7860, show_api=False) |