File size: 6,959 Bytes
16fea32 771bde2 394f072 16fea32 581c890 7c25703 581c890 283cfad 0ea97e9 283cfad 0ea97e9 283cfad 581c890 5818248 283cfad 581c890 394f072 16fea32 394f072 0e4da50 394f072 4f2457b e44a34d a8700ab 5fce82e 394f072 283cfad beaa004 771bde2 90f1673 771bde2 5818248 771bde2 4f2457b 7c25703 0ea97e9 283cfad 0ea97e9 283cfad 0ea97e9 283cfad 581c890 0ea97e9 581c890 0ea97e9 581c890 0ea97e9 beaa004 4f2457b 0ea97e9 283cfad beaa004 394f072 beaa004 394f072 beaa004 394f072 beaa004 394f072 581c890 3b60e2a 4f2457b beaa004 771bde2 0ea97e9 beaa004 0ea97e9 4f2457b 394f072 4f2457b beaa004 283cfad beaa004 8d76def 0ea97e9 771bde2 0ea97e9 581c890 0ea97e9 581c890 4f2457b ac35f3e 4f2457b 283cfad 0ea97e9 283cfad 4f2457b 394f072 4f2457b 0ea97e9 4f2457b debd5cf 0ea97e9 283cfad 0ea97e9 283cfad debd5cf 0ea97e9 debd5cf 4f2457b beaa004 394f072 ac35f3e 1fdd050 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
import gradio as gr
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
import re
import random
import logging
import os
import jwt
from typing import Dict, Any
import autopep8
import textwrap
from datasets import load_dataset
import time
from collections import defaultdict
import threading
import hashlib
# Rate limiting data structures
token_usage = defaultdict(int)
last_reset_time = time.time()
rate_limit_lock = threading.Lock()
# Constants
MAX_TOKEN_USAGE = 10
RESET_INTERVAL = 24 * 60 * 60 # 24 hours in seconds
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load the dataset
dataset = load_dataset("sugiv/leetmonkey_python_dataset")
train_dataset = dataset["train"]
# JWT settings
JWT_SECRET = os.environ.get("JWT_SECRET")
if not JWT_SECRET:
raise ValueError("JWT_SECRET environment variable is not set")
JWT_ALGORITHM = "HS256"
# Model settings
#MODEL_NAME = "leetmonkey_peft__q8_0.gguf"
MODEL_NAME= "leetmonkey_peft_super_block_q6.gguf"
REPO_ID = "sugiv/leetmonkey-peft-gguf"
# Load the model
model_path = hf_hub_download(repo_id=REPO_ID, filename=MODEL_NAME, cache_dir="./models")
#llm = Llama(model_path=model_path, n_ctx=2048, n_threads=16, n_gpu_layers=-1, verbose=False, mlock=True) ## TPU
llm = Llama(model_path=model_path, n_ctx=1024, n_threads=2, n_gpu_layers=0, verbose=False, mlock=True) ## CPU only
#llm = Llama(model_path=model_path, n_ctx=1024, n_threads=4, n_gpu_layers=-1, verbose=False, mlock=True) ## Nvidia
logger.info("8-bit model loaded successfully")
# User data storage
token_to_problem_solution = {}
# Generation parameters
generation_kwargs = {
"max_tokens": 512,
"stop": ["```", "### Instruction:", "### Response:"],
"echo": False,
"temperature": 0.05,
"top_k": 10,
"top_p": 0.9,
"repeat_penalty": 1.1
}
def verify_token(token: str) -> bool:
try:
jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return True
except jwt.PyJWTError:
return False
def check_rate_limit(token: str):
global last_reset_time
with rate_limit_lock:
current_time = time.time()
if current_time - last_reset_time >= RESET_INTERVAL:
token_usage.clear()
last_reset_time = current_time
if token_usage[token] >= MAX_TOKEN_USAGE:
return False, "Rate limit exceeded. Please try again later."
token_usage[token] += 1
return True, ""
def extract_and_format_code(text):
code_match = re.search(r'```python\s*(.*?)\s*```', text, re.DOTALL)
if code_match:
code = code_match.group(1)
else:
code = text
code = textwrap.dedent(code)
lines = code.split('\n')
indented_lines = []
for line in lines:
if line.strip().startswith('class') or line.strip().startswith('def'):
indented_lines.append(line)
elif line.strip():
indented_lines.append(' ' + line)
else:
indented_lines.append(line)
formatted_code = '\n'.join(indented_lines)
try:
return autopep8.fix_code(formatted_code)
except:
return formatted_code
def generate_explanation(problem: str, solution: str, token: str) -> Dict[str, Any]:
if not verify_token(token):
return {"error": "Invalid token"}
is_allowed, message = check_rate_limit(token)
if not is_allowed:
return {"error": message}
problem_solution_hash = hashlib.sha256(f"{problem}{solution}".encode()).hexdigest()
if token not in token_to_problem_solution or token_to_problem_solution[token] != problem_solution_hash:
return {"error": "No matching problem-solution pair found for this token"}
system_prompt = "You are a Python coding assistant specialized in explaining LeetCode problem solutions. Provide a clear and concise explanation of the given solution."
full_prompt = f"""### Instruction:
{system_prompt}
Problem:
{problem}
Solution:
{solution}
Explain this solution step by step.
### Response:
Here's the explanation of the solution:
"""
generated_text = ""
for chunk in llm(full_prompt, stream=True, **generation_kwargs):
generated_text += chunk["choices"][0]["text"]
return {"explanation": generated_text}
def generate_solution(instruction: str, token: str) -> Dict[str, Any]:
if not verify_token(token):
return {"error": "Invalid token"}
is_allowed, message = check_rate_limit(token)
if not is_allowed:
return {"error": message}
system_prompt = "You are a Python coding assistant specialized in solving LeetCode problems. Provide only the complete implementation of the given function. Ensure proper indentation and formatting. Do not include any explanations or multiple solutions."
full_prompt = f"""### Instruction:
{system_prompt}
Implement the following function for the LeetCode problem:
{instruction}
### Response:
Here's the complete Python function implementation:
```python
"""
generated_text = ""
for chunk in llm(full_prompt, stream=True, **generation_kwargs):
generated_text += chunk["choices"][0]["text"]
formatted_code = extract_and_format_code(generated_text)
problem_solution_hash = hashlib.sha256(f"{instruction}{formatted_code}".encode()).hexdigest()
token_to_problem_solution[token] = problem_solution_hash
return {"solution": formatted_code}
def random_problem(token: str) -> Dict[str, Any]:
if not verify_token(token):
return {"error": "Invalid token"}
is_allowed, message = check_rate_limit(token)
if not is_allowed:
return {"error": message}
random_item = random.choice(train_dataset)
problem = random_item['instruction']
return {"problem": problem}
# Create Gradio interfaces
generate_interface = gr.Interface(
fn=generate_solution,
inputs=[
gr.Textbox(label="Problem Instruction"),
gr.Textbox(label="JWT Token")
],
outputs=gr.JSON(),
title="Generate Solution API",
description="Provide a LeetCode problem instruction and a valid JWT token to generate a solution."
)
random_problem_interface = gr.Interface(
fn=random_problem,
inputs=[gr.Textbox(label="JWT Token")],
outputs=gr.JSON(),
title="Random Problem API",
description="Provide a valid JWT token to get a random LeetCode problem."
)
explain_interface = gr.Interface(
fn=generate_explanation,
inputs=[
gr.Textbox(label="Problem"),
gr.Textbox(label="Solution"),
gr.Textbox(label="JWT Token")
],
outputs=gr.JSON(),
title="Explain Solution API",
description="Provide a problem, solution, and valid JWT token to get an explanation of the solution."
)
demo = gr.TabbedInterface(
[generate_interface, explain_interface, random_problem_interface],
["Generate Solution", "Explain Solution", "Random Problem"]
)
# Launch the Gradio app
demo.launch() |