|
import gradio as gr |
|
from huggingface_hub import hf_hub_download |
|
from llama_cpp import Llama |
|
import re |
|
import random |
|
import logging |
|
import os |
|
import jwt |
|
from typing import Dict, Any |
|
import autopep8 |
|
import textwrap |
|
from datasets import load_dataset |
|
import time |
|
from collections import defaultdict |
|
import threading |
|
import hashlib |
|
|
|
|
|
token_usage = defaultdict(int) |
|
last_reset_time = time.time() |
|
rate_limit_lock = threading.Lock() |
|
|
|
|
|
MAX_TOKEN_USAGE = 10 |
|
RESET_INTERVAL = 24 * 60 * 60 |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
dataset = load_dataset("sugiv/leetmonkey_python_dataset") |
|
train_dataset = dataset["train"] |
|
|
|
|
|
JWT_SECRET = os.environ.get("JWT_SECRET") |
|
if not JWT_SECRET: |
|
raise ValueError("JWT_SECRET environment variable is not set") |
|
JWT_ALGORITHM = "HS256" |
|
|
|
|
|
|
|
MODEL_NAME= "leetmonkey_peft_super_block_q6.gguf" |
|
REPO_ID = "sugiv/leetmonkey-peft-gguf" |
|
|
|
|
|
model_path = hf_hub_download(repo_id=REPO_ID, filename=MODEL_NAME, cache_dir="./models") |
|
|
|
llm = Llama(model_path=model_path, n_ctx=1024, n_threads=2, n_gpu_layers=0, verbose=False, mlock=True) |
|
|
|
logger.info("8-bit model loaded successfully") |
|
|
|
|
|
token_to_problem_solution = {} |
|
|
|
|
|
generation_kwargs = { |
|
"max_tokens": 512, |
|
"stop": ["```", "### Instruction:", "### Response:"], |
|
"echo": False, |
|
"temperature": 0.05, |
|
"top_k": 10, |
|
"top_p": 0.9, |
|
"repeat_penalty": 1.1 |
|
} |
|
|
|
def verify_token(token: str) -> bool: |
|
try: |
|
jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) |
|
return True |
|
except jwt.PyJWTError: |
|
return False |
|
|
|
def check_rate_limit(token: str): |
|
global last_reset_time |
|
with rate_limit_lock: |
|
current_time = time.time() |
|
if current_time - last_reset_time >= RESET_INTERVAL: |
|
token_usage.clear() |
|
last_reset_time = current_time |
|
if token_usage[token] >= MAX_TOKEN_USAGE: |
|
return False, "Rate limit exceeded. Please try again later." |
|
token_usage[token] += 1 |
|
return True, "" |
|
|
|
def extract_and_format_code(text): |
|
code_match = re.search(r'```python\s*(.*?)\s*```', text, re.DOTALL) |
|
if code_match: |
|
code = code_match.group(1) |
|
else: |
|
code = text |
|
code = textwrap.dedent(code) |
|
lines = code.split('\n') |
|
indented_lines = [] |
|
for line in lines: |
|
if line.strip().startswith('class') or line.strip().startswith('def'): |
|
indented_lines.append(line) |
|
elif line.strip(): |
|
indented_lines.append(' ' + line) |
|
else: |
|
indented_lines.append(line) |
|
formatted_code = '\n'.join(indented_lines) |
|
try: |
|
return autopep8.fix_code(formatted_code) |
|
except: |
|
return formatted_code |
|
|
|
def generate_explanation(problem: str, solution: str, token: str) -> Dict[str, Any]: |
|
if not verify_token(token): |
|
return {"error": "Invalid token"} |
|
|
|
is_allowed, message = check_rate_limit(token) |
|
if not is_allowed: |
|
return {"error": message} |
|
|
|
problem_solution_hash = hashlib.sha256(f"{problem}{solution}".encode()).hexdigest() |
|
if token not in token_to_problem_solution or token_to_problem_solution[token] != problem_solution_hash: |
|
return {"error": "No matching problem-solution pair found for this token"} |
|
|
|
system_prompt = "You are a Python coding assistant specialized in explaining LeetCode problem solutions. Provide a clear and concise explanation of the given solution." |
|
full_prompt = f"""### Instruction: |
|
{system_prompt} |
|
|
|
Problem: |
|
{problem} |
|
|
|
Solution: |
|
{solution} |
|
|
|
Explain this solution step by step. |
|
|
|
### Response: |
|
Here's the explanation of the solution: |
|
|
|
""" |
|
generated_text = "" |
|
for chunk in llm(full_prompt, stream=True, **generation_kwargs): |
|
generated_text += chunk["choices"][0]["text"] |
|
|
|
return {"explanation": generated_text} |
|
|
|
def generate_solution(instruction: str, token: str) -> Dict[str, Any]: |
|
if not verify_token(token): |
|
return {"error": "Invalid token"} |
|
|
|
is_allowed, message = check_rate_limit(token) |
|
if not is_allowed: |
|
return {"error": message} |
|
|
|
system_prompt = "You are a Python coding assistant specialized in solving LeetCode problems. Provide only the complete implementation of the given function. Ensure proper indentation and formatting. Do not include any explanations or multiple solutions." |
|
full_prompt = f"""### Instruction: |
|
{system_prompt} |
|
|
|
Implement the following function for the LeetCode problem: |
|
|
|
{instruction} |
|
|
|
### Response: |
|
Here's the complete Python function implementation: |
|
|
|
```python |
|
""" |
|
generated_text = "" |
|
for chunk in llm(full_prompt, stream=True, **generation_kwargs): |
|
generated_text += chunk["choices"][0]["text"] |
|
|
|
formatted_code = extract_and_format_code(generated_text) |
|
problem_solution_hash = hashlib.sha256(f"{instruction}{formatted_code}".encode()).hexdigest() |
|
token_to_problem_solution[token] = problem_solution_hash |
|
return {"solution": formatted_code} |
|
|
|
def random_problem(token: str) -> Dict[str, Any]: |
|
if not verify_token(token): |
|
return {"error": "Invalid token"} |
|
|
|
is_allowed, message = check_rate_limit(token) |
|
if not is_allowed: |
|
return {"error": message} |
|
|
|
random_item = random.choice(train_dataset) |
|
problem = random_item['instruction'] |
|
return {"problem": problem} |
|
|
|
|
|
generate_interface = gr.Interface( |
|
fn=generate_solution, |
|
inputs=[ |
|
gr.Textbox(label="Problem Instruction"), |
|
gr.Textbox(label="JWT Token") |
|
], |
|
outputs=gr.JSON(), |
|
title="Generate Solution API", |
|
description="Provide a LeetCode problem instruction and a valid JWT token to generate a solution." |
|
) |
|
|
|
random_problem_interface = gr.Interface( |
|
fn=random_problem, |
|
inputs=[gr.Textbox(label="JWT Token")], |
|
outputs=gr.JSON(), |
|
title="Random Problem API", |
|
description="Provide a valid JWT token to get a random LeetCode problem." |
|
) |
|
|
|
explain_interface = gr.Interface( |
|
fn=generate_explanation, |
|
inputs=[ |
|
gr.Textbox(label="Problem"), |
|
gr.Textbox(label="Solution"), |
|
gr.Textbox(label="JWT Token") |
|
], |
|
outputs=gr.JSON(), |
|
title="Explain Solution API", |
|
description="Provide a problem, solution, and valid JWT token to get an explanation of the solution." |
|
) |
|
|
|
demo = gr.TabbedInterface( |
|
[generate_interface, explain_interface, random_problem_interface], |
|
["Generate Solution", "Explain Solution", "Random Problem"] |
|
) |
|
|
|
|
|
demo.launch() |