Spaces:
Runtime error
Runtime error
| import requests | |
| from flask import Flask, request, Response, redirect, jsonify, stream_with_context | |
| import random, subprocess, string, json | |
| import os | |
| import logging | |
| from logging import NullHandler | |
| import subprocess | |
| from functools import wrap | |
| # Copied pepsi (thanks <3) made some modification so it works for me :v hue hue, nah doesn't use that shitty api thingy :v world sim whatever >.> | |
| def validate_token(token): | |
| if token == os.environ.get('PASSWORD'): | |
| return True | |
| else: | |
| return False | |
| def requires_auth_bearer(f): | |
| def decorated(*args, **kwargs): | |
| if 'X-Api-Key' not in request.headers: | |
| return jsonify({'error': 'You need special Saya password :3c'}), 401 | |
| token = request.headers['X-Api-Key'] | |
| if not validate_token(token): | |
| return jsonify({'error': 'hue hue hue hue hue hue '}), 401 | |
| return f(*args, **kwargs) | |
| return decorated | |
| app = Flask(__name__) | |
| app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True | |
| state = { | |
| "current_requests": 0, | |
| "total_prompts_sent": 0, | |
| "total_tokens": 0, | |
| "logging_enabled": os.environ.get('LOGGING_ENABLED', 'False').lower() in ['true', '1', 't'] | |
| } | |
| ## ill ne ver turn on logging i just wanted to show its off on page (same pepsi same :V) | |
| def configure_logging(): | |
| app.logger.addHandler(NullHandler()) | |
| app.logger.propagate = False | |
| bloody_log = logging.getLogger('bloody') | |
| bloody_log.setLevel(logging.ERROR) | |
| bloody_log.addHandler(NullHandler()) | |
| configure_logging() | |
| def get_token_count(text): | |
| result = subprocess.run(['node', 'tokenizer.js', text], capture_output=True, text=True) | |
| return int(result.stdout.strip()) | |
| def generate_message_id(): | |
| chars = string.ascii_uppercase + string.digits + string.ascii_lowercase | |
| return 'msg_' + ''.join(random.choices(chars, k=16)) | |
| def page_not_found(e): | |
| return redirect(random.choice([ | |
| 'https://youtu.be/0o0y-MNqdQU', | |
| 'https://youtu.be/oAJC1Pn78ZA', | |
| 'https://youtu.be/p_5tTM9D7l0', | |
| 'https://youtu.be/H0gEjUEneBI', | |
| 'https://youtu.be/Hj8icpQldzc', | |
| 'https://youtu.be/-9_sTTYXcwc', | |
| 'https://youtu.be/LmsuxO5rfEU', | |
| 'https://youtu.be/VJkzfV7kNYQ', | |
| 'https://youtu.be/oCikD1xcv0o', | |
| 'https://youtu.be/k9TSVx9gAW0', | |
| 'https://youtu.be/Xiiy8vEWj-g', | |
| 'https://youtu.be/FKLd1YdmIwA', | |
| 'https://youtu.be/RJ4iaQAF6SI', | |
| 'https://youtu.be/KPad2ftEwqc' | |
| ])), 302 | |
| def proxy_message(): | |
| state["total_prompts_sent"] += 1 | |
| state["current_requests"] += 1 | |
| data = request.get_json() | |
| if not data or 'messages' not in data: | |
| abort(400, 'Bad Request: No messages found in the request.') | |
| full_text = ' '.join(msg.get('content', '') for msg in data['messages']) | |
| token_count = get_token_count(full_text) | |
| data["model"] = os.environ.get('MODEL') | |
| headers = { | |
| "Authorization": f"Bearer {os.environ.get('KEY')}", | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) FxiOS/13.2b11866 Mobile/16A366 Safari/605.1.15', | |
| 'Origin': os.environ.get('ORIGIN'), | |
| 'Referer': os.environ.get('REFERER'), | |
| "Accept": "*/*", | |
| "Accept-Language": "en-US,en;q=0.5", | |
| "Sec-Fetch-Dest": "empty", | |
| "Sec-Fetch-Mode": "cors", | |
| "Sec-Fetch-Site": "same-origin", | |
| "Sec-GPC": "1" | |
| } | |
| proxied_url = os.environ.get('PROXIED_URL') | |
| response = requests.post(proxied_url, headers=headers, data=json.dumps(data), stream=True) | |
| if not data.get('stream', False): | |
| output = ''.join(chunk.decode('utf-8') for chunk in response.iter_content(chunk_size=1024) if chunk) | |
| response_json = { | |
| "content": [{"text": output, "type": "text"}], | |
| "id": generate_message_id(), | |
| "model": ":^)", | |
| "role": "assistant", | |
| "stop_reason": "end_turn", | |
| "stop_sequence": None, | |
| "type": "message", | |
| "usage": { | |
| "input_tokens": token_count, | |
| "output_tokens": 25, | |
| } | |
| } | |
| state["current_requests"] -= 1 | |
| return Response(json.dumps(response_json), headers={'Content-Type': 'application/json'}, status=response.status_code) | |
| def generate_stream(): | |
| tokens_sent = 0 | |
| text_buffer = '' | |
| try: | |
| yield 'event: message_start\n' | |
| yield f'data: {{"type": "message_start", "message": {{"id": "{generate_message_id()}", "type": "message", "role": "assistant", "content": [], "model": "claude-3-opus-20240229", "stop_reason": null, "stop_sequence": null, "usage": {{"input_tokens": 25, "output_tokens": 1}}}}}}\n\n' | |
| yield 'event: content_block_start\n' | |
| yield 'data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}\n\n' | |
| incomplete_chunk="" | |
| for chunk in response.iter_content(chunk_size=256): | |
| if chunk: | |
| decoded_chunk = incomplete_chunk+chunk.decode('utf-8', 'xmlcharrefreplace') | |
| if "data:" in decoded_chunk: | |
| if 'null}]}' not in decoded_chunk: | |
| incomplete_chunk+= decoded_chunk | |
| else: | |
| for chunkparts in decoded_chunk.split("\n"): | |
| if "data:" in chunkparts and "null}]}" in chunkparts: | |
| data = json.loads(chunkparts.replace("data: ","")) | |
| try: | |
| text_buffer += data["choices"][0]["delta"]["content"] | |
| data_dump = json.dumps({"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": data["choices"][0]["delta"]["content"]}}) | |
| yield f'event: content_block_delta\n' | |
| yield f'data: {data_dump}\n\n' | |
| if 'content_block_stop' in chunkparts: | |
| tokens_sent += get_token_count(text_buffer) | |
| text_buffer = '' | |
| incomplete_chunk="" | |
| except: | |
| pass | |
| else: | |
| incomplete_chunk=chunkparts | |
| else: | |
| if decoded_chunk[0] != ":": | |
| incomplete_chunk+=decoded_chunk | |
| yield 'event: content_block_stop\n' | |
| yield 'data: {"type": "content_block_stop", "index": 0}\n\n' | |
| yield 'event: message_delta\n' | |
| yield 'data: {"type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence":null, "usage":{"output_tokens": 15}}}\n\n' | |
| yield 'event: message_stop\n' | |
| yield 'data: {"type": "message_stop"}\n\n' | |
| except GeneratorExit: | |
| pass | |
| finally: | |
| if text_buffer: | |
| tokens_sent += get_token_count(text_buffer) | |
| state["current_requests"] -= 1 | |
| state["total_tokens"] += tokens_sent | |
| return Response(generate_stream(), headers={'Content-Type': 'text/event-stream'}) | |
| def index(): | |
| space_host = os.environ.get('SPACE_HOST', 'default-space-host') | |
| endpoint = f"https://{space_host}/saya" | |
| payload = '''<body style="background-image: url('https://images.dragonetwork.pl/wp12317828.jpg'); background-size: cover; background-repeat: no-repeat;"> | |
| <div style="background-color: rgba(0, 0, 0, 0.5); padding: 20px;"> | |
| <p style="color: white; font-weight: bold; font-size: 200%;"> | |
| <span>Endpoint:</span> <span>''' + endpoint + '''</span><br> | |
| <span>Prompt Logging:</span> <span>''' + str(state["logging_enabled"]) + '''</span><br> | |
| <span>Prompters Now:</span> <span>''' + str(state["current_requests"]) + '''</span><br> | |
| <span>Total Prompts:</span> <span>''' + str(state["total_prompts_sent"]) + '''</span><br> | |
| <span>Total Tokens:</span> <span>''' + str(state["total_tokens"]) + '''</span><br> | |
| <span>Password Protected?:</span> <span>yes :3c</span><br> | |
| </p> | |
| </div> | |
| </body>''' | |
| return payload, 200 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=False) |