Spaces:
Runtime error
Runtime error
File size: 5,774 Bytes
05164e0 003db9a 488ca72 003db9a fb51e42 003db9a fb51e42 05164e0 cecb224 5b5750d 05164e0 5b5750d 05164e0 5b5750d 05164e0 fb51e42 05164e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
import sys
from typing import List
import traceback
import os
# needs to be imported *before* transformers
if os.path.exists('use_normal_tokenizers'):
import tokenizers
BIG_MODEL = False
else:
import tokenizers_patch
BIG_MODEL = True
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
# from flask import Flask, request, render_template
# from flask_cors import CORS
# app = Flask(__name__, static_folder='static')
# app.config['TEMPLATES_AUTO_RELOAD'] = True
# CORS(app, resources= {
# r"/generate": {"origins": origins},
# r"/infill": {"origins": origins},
# })
# origins=[f"http://localhost:{PORT}", "https://huggingface.co", "https://hf.space"]
PORT = 7860
VERBOSE = False
if BIG_MODEL:
CUDA = True
model_name = "facebook/incoder-6B"
else:
CUDA = False
model_name = "facebook/incoder-1B"
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
app = FastAPI(docs_url=None, redoc_url=None)
app.mount("/static", StaticFiles(directory="static"), name="static")
print("loading model")
model = AutoModelForCausalLM.from_pretrained(model_name)
print("loading tokenizer")
tokenizer = AutoTokenizer.from_pretrained(model_name)
print("loading complete")
if CUDA:
model = model.half().cuda()
BOS = "<|endoftext|>"
EOM = "<|endofmask|>"
def make_sentinel(i):
return f"<|mask:{i}|>"
SPECIAL_TOKENS = [make_sentinel(i) for i in range(256)] + [EOM]
def generate(input, length_limit=None, temperature=None):
input_ids = tokenizer(input, return_tensors="pt").input_ids
if CUDA:
input_ids = input_ids.cuda()
max_length = length_limit + input_ids.flatten().size(0)
if max_length > 256:
max_length = 256
output = model.generate(input_ids=input_ids, do_sample=True, top_p=0.95, temperature=temperature, max_length=max_length)
detok_hypo_str = tokenizer.decode(output.flatten())
if detok_hypo_str.startswith(BOS):
detok_hypo_str = detok_hypo_str[len(BOS):]
return detok_hypo_str
def infill(parts: List[str], length_limit=None, temperature=None, extra_sentinel=False, max_retries=1):
assert isinstance(parts, list)
retries_attempted = 0
done = False
while (not done) and (retries_attempted < max_retries):
retries_attempted += 1
if VERBOSE:
print(f"retry {retries_attempted}")
if len(parts) == 1:
prompt = parts[0]
else:
prompt = ""
# encode parts separated by sentinel
for sentinel_ix, part in enumerate(parts):
prompt += part
if extra_sentinel or (sentinel_ix < len(parts) - 1):
prompt += make_sentinel(sentinel_ix)
# prompt += TokenizerWrapper.make_sentinel(0)
infills = []
complete = []
done = True
for sentinel_ix, part in enumerate(parts[:-1]):
complete.append(part)
prompt += make_sentinel(sentinel_ix)
completion = generate(prompt, length_limit, temperature)
completion = completion[len(prompt):]
if EOM not in completion:
if VERBOSE:
print(f"warning: {EOM} not found")
completion += EOM
# TODO: break inner loop here
done = False
completion = completion[:completion.index(EOM) + len(EOM)]
infilled = completion[:-len(EOM)]
infills.append(infilled)
complete.append(infilled)
prompt += completion
complete.append(parts[-1])
text = ''.join(complete)
if VERBOSE:
print("generated text:")
print(prompt)
print()
print("parts:")
print(parts)
print()
print("infills:")
print(infills)
print()
print("restitched text:")
print(text)
print()
return {
'text': text,
'parts': parts,
'infills': infills,
'retries_attempted': retries_attempted,
}
@app.head("/")
@app.get("/")
def index() -> FileResponse:
return FileResponse(path="static/index.html", media_type="text/html")
@app.get('/generate')
async def generate_maybe(info: str):
# form = await info.json()
form = json.loads(info)
prompt = form['prompt']
length_limit = int(form['length'])
temperature = float(form['temperature'])
if VERBOSE:
print(prompt)
try:
generation = generate(prompt, length_limit, temperature)
return {'result': 'success', 'type': 'generate', 'prompt': prompt, 'text': generation}
except Exception as e:
traceback.print_exception(*sys.exc_info())
return {'result': 'error', 'type': 'generate', 'prompt': prompt, 'text': f'There was an error: {e}. Tell Daniel.'}
@app.get('/infill')
async def infill_maybe(info: str):
# form = await info.json()
form = json.loads(info)
length_limit = int(form['length'])
temperature = float(form['temperature'])
max_retries = 1
extra_sentinel = True
try:
generation = infill(form['parts'], length_limit, temperature, extra_sentinel=extra_sentinel, max_retries=max_retries)
generation['result'] = 'success'
generation['type'] = 'infill'
return generation
# return {'result': 'success', 'prefix': prefix, 'suffix': suffix, 'text': generation['text']}
except Exception as e:
traceback.print_exception(*sys.exc_info())
print(e)
return {'result': 'error', 'type': 'infill', 'text': f'There was an error: {e}.'}
if __name__ == "__main__":
app.run(host='0.0.0.0', port=PORT, threaded=False)
|