|
import os |
|
|
|
|
|
|
|
class spaces: |
|
@staticmethod |
|
def GPU(func): |
|
def wrapper(*args, **kwargs): |
|
return func(*args, **kwargs) |
|
return wrapper |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient, HfApi |
|
from torch import nn |
|
from transformers import AutoModel, AutoProcessor, AutoTokenizer, PreTrainedTokenizer, PreTrainedTokenizerFast, AutoModelForCausalLM, LlavaForConditionalGeneration |
|
from pathlib import Path |
|
import torch |
|
import torch.amp.autocast_mode |
|
from PIL import Image |
|
import torchvision.transforms.functional as TVF |
|
import gc |
|
from peft import PeftModel |
|
from typing import Union |
|
|
|
LOAD_IN_NF4 = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
HF_TOKEN = os.environ.get("HF_TOKEN", None) |
|
use_inference_client = False |
|
PIXTRAL_PATHS = ["SeanScripts/pixtral-12b-nf4", "mistral-community/pixtral-12b"] |
|
|
|
llm_models = { |
|
"Orenguteng/Llama-3.1-8B-Lexi-Uncensored-V2": None, |
|
"mlabonne/NeuralDaredevil-8B-abliterated": None, |
|
|
|
"bunnycore/LLama-3.1-8B-Matrix": None, |
|
"Sao10K/Llama-3.1-8B-Stheno-v3.4": None, |
|
"unsloth/Meta-Llama-3.1-8B-bnb-4bit": None, |
|
"DevQuasar/HermesNova-Llama-3.1-8B": None, |
|
"mergekit-community/L3.1-Boshima-b-FIX": None, |
|
|
|
"unsloth/Meta-Llama-3.1-8B-Instruct": None, |
|
} |
|
|
|
CLIP_PATH = "google/siglip-so400m-patch14-384" |
|
MODEL_PATH = list(llm_models.keys())[0] |
|
CHECKPOINT_PATH = BASE_DIR / Path("cgrkzexw-599808") |
|
LORA_PATH = CHECKPOINT_PATH / "text_model" |
|
TITLE = "<h1><center>JoyCaption Alpha Two (2024-09-26a)</center></h1>" |
|
CAPTION_TYPE_MAP = { |
|
"Descriptive": [ |
|
"Write a descriptive caption for this image in a formal tone.", |
|
"Write a descriptive caption for this image in a formal tone within {word_count} words.", |
|
"Write a {length} descriptive caption for this image in a formal tone.", |
|
], |
|
"Descriptive (Informal)": [ |
|
"Write a descriptive caption for this image in a casual tone.", |
|
"Write a descriptive caption for this image in a casual tone within {word_count} words.", |
|
"Write a {length} descriptive caption for this image in a casual tone.", |
|
], |
|
"Training Prompt": [ |
|
"Write a Flux prompt for this image in a formal tone.", |
|
"Write a Flux prompt for this image in a formal tone within {word_count} words.", |
|
"Write a {length} Flux prompt for this image in a formal tone.", |
|
], |
|
"Training Prompt (Informal)": [ |
|
"Write a Flux prompt for this image in a casual tone..", |
|
"Write a Flux prompt for this image in a casual tone within {word_count} words.", |
|
"Write a {length} Flux prompt for this image in a casual tone.", |
|
], |
|
"MidJourney": [ |
|
"Write a MidJourney prompt for this image.", |
|
"Write a MidJourney prompt for this image within {word_count} words.", |
|
"Write a {length} MidJourney prompt for this image.", |
|
], |
|
"Booru tag list": [ |
|
"Write a list of Booru tags for this image.", |
|
"Write a list of Booru tags for this image within {word_count} words.", |
|
"Write a {length} list of Booru tags for this image.", |
|
], |
|
"Booru-like tag list": [ |
|
"Write a list of Booru-like tags for this image.", |
|
"Write a list of Booru-like tags for this image within {word_count} words.", |
|
"Write a {length} list of Booru-like tags for this image.", |
|
], |
|
"Art Critic": [ |
|
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc.", |
|
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it within {word_count} words.", |
|
"Analyze this image like an art critic would with information about its composition, style, symbolism, the use of color, light, any artistic movement it might belong to, etc. Keep it {length}.", |
|
], |
|
"Product Listing": [ |
|
"Write a caption for this image as though it were a product listing.", |
|
"Write a caption for this image as though it were a product listing. Keep it under {word_count} words.", |
|
"Write a {length} caption for this image as though it were a product listing.", |
|
], |
|
"Social Media Post": [ |
|
"Write a caption for this image as if it were being used for a social media post.", |
|
"Write a caption for this image as if it were being used for a social media post. Limit the caption to {word_count} words.", |
|
"Write a {length} caption for this image as if it were being used for a social media post.", |
|
], |
|
} |
|
|
|
class ImageAdapter(nn.Module): |
|
def __init__(self, input_features: int, output_features: int, ln1: bool, pos_emb: bool, num_image_tokens: int, deep_extract: bool): |
|
super().__init__() |
|
self.deep_extract = deep_extract |
|
|
|
if self.deep_extract: |
|
input_features = input_features * 5 |
|
|
|
self.linear1 = nn.Linear(input_features, output_features) |
|
self.activation = nn.GELU() |
|
self.linear2 = nn.Linear(output_features, output_features) |
|
self.ln1 = nn.Identity() if not ln1 else nn.LayerNorm(input_features) |
|
self.pos_emb = None if not pos_emb else nn.Parameter(torch.zeros(num_image_tokens, input_features)) |
|
|
|
|
|
self.other_tokens = nn.Embedding(3, output_features) |
|
self.other_tokens.weight.data.normal_(mean=0.0, std=0.02) |
|
|
|
def forward(self, vision_outputs: torch.Tensor): |
|
if self.deep_extract: |
|
x = torch.concat(( |
|
vision_outputs[-2], |
|
vision_outputs[3], |
|
vision_outputs[7], |
|
vision_outputs[13], |
|
vision_outputs[20], |
|
), dim=-1) |
|
assert len(x.shape) == 3, f"Expected 3, got {len(x.shape)}" |
|
assert x.shape[-1] == vision_outputs[-2].shape[-1] * 5, f"Expected {vision_outputs[-2].shape[-1] * 5}, got {x.shape[-1]}" |
|
else: |
|
x = vision_outputs[-2] |
|
|
|
x = self.ln1(x) |
|
|
|
if self.pos_emb is not None: |
|
assert x.shape[-2:] == self.pos_emb.shape, f"Expected {self.pos_emb.shape}, got {x.shape[-2:]}" |
|
x = x + self.pos_emb |
|
|
|
x = self.linear1(x) |
|
x = self.activation(x) |
|
x = self.linear2(x) |
|
|
|
|
|
other_tokens = self.other_tokens(torch.tensor([0, 1], device=self.other_tokens.weight.device).expand(x.shape[0], -1)) |
|
assert other_tokens.shape == (x.shape[0], 2, x.shape[2]), f"Expected {(x.shape[0], 2, x.shape[2])}, got {other_tokens.shape}" |
|
x = torch.cat((other_tokens[:, 0:1], x, other_tokens[:, 1:2]), dim=1) |
|
|
|
return x |
|
|
|
def get_eot_embedding(self): |
|
return self.other_tokens(torch.tensor([2], device=self.other_tokens.weight.device)).squeeze(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = None |
|
text_model_client = None |
|
text_model = None |
|
image_adapter = None |
|
pixtral_model = None |
|
pixtral_processor = None |
|
def load_text_model(model_name: str=MODEL_PATH, gguf_file: Union[str, None]=None, is_nf4: bool=True, is_lora: bool=True): |
|
global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client |
|
try: |
|
tokenizer = None |
|
text_model_client = None |
|
text_model = None |
|
image_adapter = None |
|
pixtral_model = None |
|
pixtral_processor = None |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
lora_device = "auto" |
|
|
|
from transformers import BitsAndBytesConfig |
|
nf4_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", |
|
bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=torch.bfloat16) |
|
|
|
if model_name in PIXTRAL_PATHS: |
|
print(f"Loading LLM: {model_name}") |
|
if is_nf4: |
|
pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() |
|
else: |
|
pixtral_model = LlavaForConditionalGeneration.from_pretrained(model_name, device_map=device, torch_dtype=torch.bfloat16).eval() |
|
pixtral_processor = AutoProcessor.from_pretrained(model_name) |
|
print(f"pixtral_model: {type(pixtral_model)}") |
|
print(f"pixtral_processor: {type(pixtral_processor)}") |
|
return |
|
|
|
print("Loading tokenizer") |
|
tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT_PATH / "text_model", use_fast=True) |
|
assert isinstance(tokenizer, PreTrainedTokenizer) or isinstance(tokenizer, PreTrainedTokenizerFast), f"Tokenizer is of type {type(tokenizer)}" |
|
|
|
print(f"Loading LLM: {model_name}") |
|
if gguf_file: |
|
if device == "cpu": |
|
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval() |
|
elif is_nf4: |
|
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() |
|
else: |
|
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=lora_device, torch_dtype=torch.bfloat16).eval() |
|
else: |
|
if device == "cpu": |
|
text_model = AutoModelForCausalLM.from_pretrained(model_name, gguf_file=gguf_file, device_map=device, torch_dtype=torch.bfloat16).eval() |
|
elif is_nf4: |
|
text_model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=nf4_config, device_map=device, torch_dtype=torch.bfloat16).eval() |
|
else: |
|
text_model = AutoModelForCausalLM.from_pretrained(model_name, device_map=lora_device, torch_dtype=torch.bfloat16).eval() |
|
|
|
if is_lora and LORA_PATH.exists() and not is_nf4: |
|
print("Loading VLM's custom text model") |
|
if is_nf4: |
|
text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device, quantization_config=nf4_config) |
|
else: |
|
text_model = PeftModel.from_pretrained(model=text_model, model_id=LORA_PATH, device_map=device) |
|
text_model = text_model.merge_and_unload(safe_merge=True) |
|
else: print("VLM's custom text model is not loaded") |
|
|
|
print("Loading image adapter") |
|
image_adapter = ImageAdapter(clip_model.config.hidden_size, text_model.config.hidden_size, False, False, 38, False).eval().to("cpu") |
|
image_adapter.load_state_dict(torch.load(CHECKPOINT_PATH / "image_adapter.pt", map_location="cpu", weights_only=False)) |
|
image_adapter.eval().to(device) |
|
except Exception as e: |
|
print(f"LLM load error: {e}") |
|
raise Exception(f"LLM load error: {e}") from e |
|
finally: |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
load_text_model.zerogpu = True |
|
|
|
|
|
print("Loading CLIP") |
|
clip_processor = AutoProcessor.from_pretrained(CLIP_PATH) |
|
clip_model = AutoModel.from_pretrained(CLIP_PATH).vision_model |
|
assert (CHECKPOINT_PATH / "clip_model.pt").exists() |
|
if (CHECKPOINT_PATH / "clip_model.pt").exists(): |
|
print("Loading VLM's custom vision model") |
|
checkpoint = torch.load(CHECKPOINT_PATH / "clip_model.pt", map_location='cpu', weights_only=False) |
|
checkpoint = {k.replace("_orig_mod.module.", ""): v for k, v in checkpoint.items()} |
|
clip_model.load_state_dict(checkpoint) |
|
del checkpoint |
|
clip_model.eval().requires_grad_(False).to(device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_text_model(MODEL_PATH, None, LOAD_IN_NF4, True) |
|
|
|
|
|
|
|
|
|
|
|
@torch.inference_mode() |
|
def stream_chat_mod(input_image: Image.Image, caption_type: str, caption_length: Union[str, int], extra_options: list[str], name_input: str, custom_prompt: str, |
|
max_new_tokens: int=300, top_p: float=0.9, temperature: float=0.6, model_name: str=MODEL_PATH, progress=gr.Progress(track_tqdm=True)) -> tuple[str, str]: |
|
|
|
try: |
|
global tokenizer, text_model, image_adapter, pixtral_model, pixtral_processor, text_model_client, use_inference_client |
|
torch.cuda.empty_cache() |
|
gc.collect() |
|
|
|
|
|
length = None if caption_length == "any" else caption_length |
|
|
|
if isinstance(length, str): |
|
try: |
|
length = int(length) |
|
except ValueError: |
|
pass |
|
|
|
|
|
if length is None: |
|
map_idx = 0 |
|
elif isinstance(length, int): |
|
map_idx = 1 |
|
elif isinstance(length, str): |
|
map_idx = 2 |
|
else: |
|
raise ValueError(f"Invalid caption length: {length}") |
|
|
|
prompt_str = CAPTION_TYPE_MAP[caption_type][map_idx] |
|
|
|
|
|
if len(extra_options) > 0: |
|
prompt_str += " " + " ".join(extra_options) |
|
|
|
|
|
prompt_str = prompt_str.format(name=name_input, length=caption_length, word_count=caption_length) |
|
|
|
if custom_prompt.strip() != "": |
|
prompt_str = custom_prompt.strip() |
|
|
|
|
|
print(f"Prompt: {prompt_str}") |
|
|
|
|
|
if model_name in PIXTRAL_PATHS: |
|
print(f"pixtral_model: {type(pixtral_model)}") |
|
print(f"pixtral_processor: {type(pixtral_processor)}") |
|
input_images = [input_image.convert("RGB")] |
|
input_prompt = "[INST]Caption this image:\n[IMG][/INST]" |
|
inputs = pixtral_processor(images=input_images, text=input_prompt, return_tensors="pt").to(device) |
|
generate_ids = pixtral_model.generate(**inputs, max_new_tokens=max_new_tokens) |
|
output = pixtral_processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] |
|
return input_prompt, output.strip() |
|
|
|
|
|
|
|
|
|
image = input_image.resize((384, 384), Image.LANCZOS) |
|
pixel_values = TVF.pil_to_tensor(image).unsqueeze(0) / 255.0 |
|
pixel_values = TVF.normalize(pixel_values, [0.5], [0.5]) |
|
pixel_values = pixel_values.to(device) |
|
|
|
|
|
|
|
with torch.amp.autocast_mode.autocast(device, enabled=True): |
|
vision_outputs = clip_model(pixel_values=pixel_values, output_hidden_states=True) |
|
image_features = vision_outputs.hidden_states |
|
embedded_images = image_adapter(image_features) |
|
embedded_images = embedded_images.to(device) |
|
|
|
|
|
convo = [ |
|
{ |
|
"role": "system", |
|
"content": "You are a helpful image captioner.", |
|
}, |
|
{ |
|
"role": "user", |
|
"content": prompt_str, |
|
}, |
|
] |
|
|
|
|
|
convo_string = tokenizer.apply_chat_template(convo, tokenize = False, add_generation_prompt = True) |
|
assert isinstance(convo_string, str) |
|
|
|
|
|
|
|
convo_tokens = tokenizer.encode(convo_string, return_tensors="pt", add_special_tokens=False, truncation=False) |
|
prompt_tokens = tokenizer.encode(prompt_str, return_tensors="pt", add_special_tokens=False, truncation=False) |
|
assert isinstance(convo_tokens, torch.Tensor) and isinstance(prompt_tokens, torch.Tensor) |
|
convo_tokens = convo_tokens.squeeze(0) |
|
prompt_tokens = prompt_tokens.squeeze(0) |
|
|
|
|
|
eot_id_indices = (convo_tokens == tokenizer.convert_tokens_to_ids("<|eot_id|>")).nonzero(as_tuple=True)[0].tolist() |
|
assert len(eot_id_indices) == 2, f"Expected 2 <|eot_id|> tokens, got {len(eot_id_indices)}" |
|
|
|
preamble_len = eot_id_indices[1] - prompt_tokens.shape[0] |
|
|
|
|
|
convo_embeds = text_model.model.embed_tokens(convo_tokens.unsqueeze(0).to(device)) |
|
|
|
|
|
input_embeds = torch.cat([ |
|
convo_embeds[:, :preamble_len], |
|
embedded_images.to(dtype=convo_embeds.dtype), |
|
convo_embeds[:, preamble_len:], |
|
], dim=1).to(device) |
|
|
|
input_ids = torch.cat([ |
|
convo_tokens[:preamble_len].unsqueeze(0), |
|
torch.zeros((1, embedded_images.shape[1]), dtype=torch.long), |
|
convo_tokens[preamble_len:].unsqueeze(0), |
|
], dim=1).to(device) |
|
attention_mask = torch.ones_like(input_ids) |
|
|
|
|
|
|
|
|
|
text_model.to(device) |
|
generate_ids = text_model.generate(input_ids, inputs_embeds=input_embeds, attention_mask=attention_mask, max_new_tokens=max_new_tokens, |
|
do_sample=True, suppress_tokens=None, top_p=top_p, temperature=temperature) |
|
|
|
|
|
generate_ids = generate_ids[:, input_ids.shape[1]:] |
|
if generate_ids[0][-1] == tokenizer.eos_token_id or generate_ids[0][-1] == tokenizer.convert_tokens_to_ids("<|eot_id|>"): |
|
generate_ids = generate_ids[:, :-1] |
|
|
|
caption = tokenizer.batch_decode(generate_ids, skip_special_tokens=False, clean_up_tokenization_spaces=False)[0] |
|
|
|
return prompt_str, caption.strip() |
|
|
|
except Exception as e: |
|
print(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_repo_name(s): |
|
import re |
|
return re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', s) |
|
|
|
|
|
def is_repo_exists(repo_id): |
|
try: |
|
api = HfApi(token=HF_TOKEN) |
|
if api.repo_exists(repo_id=repo_id): return True |
|
else: return False |
|
except Exception as e: |
|
print(f"Error: Failed to connect {repo_id}. {e}") |
|
return True |
|
|
|
|
|
def is_valid_repo(repo_id): |
|
import re |
|
try: |
|
if not re.fullmatch(r'^[^/,\s\"\']+/[^/,\s\"\']+$', repo_id): return False |
|
api = HfApi() |
|
if api.repo_exists(repo_id=repo_id): return True |
|
else: return False |
|
except Exception as e: |
|
print(f"Failed to connect {repo_id}. {e}") |
|
return False |
|
|
|
|
|
def get_text_model(): |
|
return list(llm_models.keys()) |
|
|
|
|
|
def is_gguf_repo(repo_id: str): |
|
try: |
|
api = HfApi(token=HF_TOKEN) |
|
if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return False |
|
files = api.list_repo_files(repo_id=repo_id) |
|
except Exception as e: |
|
print(f"Error: Failed to get {repo_id}'s info. {e}") |
|
gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}") |
|
return False |
|
files = [f for f in files if f.endswith(".gguf")] |
|
if len(files) == 0: return False |
|
else: return True |
|
|
|
|
|
def get_repo_gguf(repo_id: str): |
|
try: |
|
api = HfApi(token=HF_TOKEN) |
|
if not is_repo_name(repo_id) or not is_repo_exists(repo_id): return gr.update(value="", choices=[]) |
|
files = api.list_repo_files(repo_id=repo_id) |
|
except Exception as e: |
|
print(f"Error: Failed to get {repo_id}'s info. {e}") |
|
gr.Warning(f"Error: Failed to get {repo_id}'s info. {e}") |
|
return gr.update(value="", choices=[]) |
|
files = [f for f in files if f.endswith(".gguf")] |
|
if len(files) == 0: return gr.update(value="", choices=[]) |
|
else: return gr.update(value=files[0], choices=files) |
|
|
|
|
|
|
|
def change_text_model(model_name: str=MODEL_PATH, use_client: bool=False, gguf_file: Union[str, None]=None, |
|
is_nf4: bool=True, is_lora: bool=True, progress=gr.Progress(track_tqdm=True)): |
|
global use_inference_client, llm_models |
|
use_inference_client = use_client |
|
try: |
|
if not is_repo_name(model_name) or not is_repo_exists(model_name): |
|
raise gr.Error(f"Repo doesn't exist: {model_name}") |
|
if not gguf_file and is_gguf_repo(model_name): |
|
gr.Info(f"Please select a gguf file.") |
|
return gr.update(visible=True) |
|
if use_inference_client: |
|
pass |
|
else: |
|
load_text_model(model_name, gguf_file, is_nf4, is_lora) |
|
if model_name not in llm_models: llm_models[model_name] = gguf_file if gguf_file else None |
|
return gr.update(choices=get_text_model()) |
|
except Exception as e: |
|
raise gr.Error(f"Model load error: {model_name}, {e}") |
|
|