|
import os |
|
from fastapi import FastAPI, File, UploadFile |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import uvicorn |
|
import cv2 |
|
import gradio as gr |
|
import mediapipe as mp |
|
import numpy as np |
|
from PIL import Image |
|
from gradio_client import Client, handle_file |
|
import io |
|
import base64 |
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
example_path = os.path.join(os.path.dirname(__file__), 'example') |
|
|
|
garm_list = os.listdir(os.path.join(example_path, "cloth")) |
|
garm_list_path = [os.path.join(example_path, "cloth", garm) for garm in garm_list] |
|
|
|
human_list = os.listdir(os.path.join(example_path, "human")) |
|
human_list_path = [os.path.join(example_path, "human", human) for human in human_list] |
|
|
|
|
|
mp_pose = mp.solutions.pose |
|
pose = mp_pose.Pose(static_image_mode=True) |
|
mp_drawing = mp.solutions.drawing_utils |
|
mp_pose_landmark = mp_pose.PoseLandmark |
|
|
|
|
|
def detect_pose(image): |
|
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
result = pose.process(image_rgb) |
|
|
|
keypoints = {} |
|
|
|
if result.pose_landmarks: |
|
|
|
mp_drawing.draw_landmarks(image, result.pose_landmarks, mp_pose.POSE_CONNECTIONS) |
|
|
|
|
|
height, width, _ = image.shape |
|
|
|
|
|
landmark_indices = { |
|
'left_shoulder': mp_pose_landmark.LEFT_SHOULDER, |
|
'right_shoulder': mp_pose_landmark.RIGHT_SHOULDER, |
|
'left_hip': mp_pose_landmark.LEFT_HIP, |
|
'right_hip': mp_pose_landmark.RIGHT_HIP |
|
} |
|
|
|
for name, index in landmark_indices.items(): |
|
lm = result.pose_landmarks.landmark[index] |
|
x, y = int(lm.x * width), int(lm.y * height) |
|
keypoints[name] = (x, y) |
|
|
|
|
|
cv2.circle(image, (x, y), 5, (0, 255, 0), -1) |
|
cv2.putText(image, name, (x + 5, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) |
|
|
|
return image |
|
|
|
|
|
def align_clothing(body_img, clothing_img): |
|
image_rgb = cv2.cvtColor(body_img, cv2.COLOR_BGR2RGB) |
|
result = pose.process(image_rgb) |
|
output = body_img.copy() |
|
|
|
if result.pose_landmarks: |
|
h, w, _ = output.shape |
|
|
|
|
|
def get_point(landmark_id): |
|
lm = result.pose_landmarks.landmark[landmark_id] |
|
return int(lm.x * w), int(lm.y * h) |
|
|
|
left_shoulder = get_point(mp_pose_landmark.LEFT_SHOULDER) |
|
right_shoulder = get_point(mp_pose_landmark.RIGHT_SHOULDER) |
|
left_hip = get_point(mp_pose_landmark.LEFT_HIP) |
|
right_hip = get_point(mp_pose_landmark.RIGHT_HIP) |
|
|
|
|
|
dst_pts = np.array([ |
|
left_shoulder, |
|
right_shoulder, |
|
right_hip, |
|
left_hip |
|
], dtype=np.float32) |
|
|
|
|
|
src_h, src_w = clothing_img.shape[:2] |
|
src_pts = np.array([ |
|
[0, 0], |
|
[src_w, 0], |
|
[src_w, src_h], |
|
[0, src_h] |
|
], dtype=np.float32) |
|
|
|
|
|
matrix = cv2.getPerspectiveTransform(src_pts, dst_pts) |
|
warped_clothing = cv2.warpPerspective(clothing_img, matrix, (w, h), borderMode=cv2.BORDER_TRANSPARENT) |
|
|
|
|
|
if clothing_img.shape[2] == 4: |
|
alpha = warped_clothing[:, :, 3] / 255.0 |
|
for c in range(3): |
|
output[:, :, c] = (1 - alpha) * output[:, :, c] + alpha * warped_clothing[:, :, c] |
|
else: |
|
output = cv2.addWeighted(output, 0.8, warped_clothing, 0.5, 0) |
|
|
|
return output |
|
|
|
|
|
def process_image(human_img_path, garm_img_path): |
|
client = Client("franciszzj/Leffa") |
|
|
|
result = client.predict( |
|
src_image_path=handle_file(human_img_path), |
|
ref_image_path=handle_file(garm_img_path), |
|
ref_acceleration=False, |
|
step=30, |
|
scale=2.5, |
|
seed=42, |
|
vt_model_type="viton_hd", |
|
vt_garment_type="upper_body", |
|
vt_repaint=False, |
|
api_name="/leffa_predict_vt" |
|
) |
|
|
|
print(result) |
|
generated_image_path = result[0] |
|
print("generated_image_path" + generated_image_path) |
|
generated_image = Image.open(generated_image_path) |
|
|
|
return generated_image |
|
|
|
|
|
@app.post("/") |
|
async def try_on_api(human_image: UploadFile = File(...), garment_image: UploadFile = File(...)): |
|
try: |
|
|
|
human_content = await human_image.read() |
|
garment_content = await garment_image.read() |
|
|
|
|
|
human_img = Image.open(io.BytesIO(human_content)) |
|
garment_img = Image.open(io.BytesIO(garment_content)) |
|
|
|
|
|
human_path = "temp_human.jpg" |
|
garment_path = "temp_garment.jpg" |
|
human_img.save(human_path) |
|
garment_img.save(garment_path) |
|
|
|
|
|
result = process_image(human_path, garment_path) |
|
|
|
|
|
img_byte_arr = io.BytesIO() |
|
result.save(img_byte_arr, format='PNG') |
|
img_byte_arr = img_byte_arr.getvalue() |
|
base64_image = base64.b64encode(img_byte_arr).decode('utf-8') |
|
|
|
|
|
os.remove(human_path) |
|
os.remove(garment_path) |
|
|
|
return { |
|
"status": "success", |
|
"image": base64_image, |
|
"format": "base64" |
|
} |
|
except Exception as e: |
|
return {"status": "error", "message": str(e)} |
|
|
|
|
|
image_blocks = gr.Blocks().queue() |
|
with image_blocks as demo: |
|
gr.HTML("<center><h1>Virtual Try-On</h1></center>") |
|
gr.HTML("<center><p>Upload an image of a person and an image of a garment ✨</p></center>") |
|
with gr.Row(): |
|
with gr.Column(): |
|
human_img = gr.Image(type="filepath", label='Human', interactive=True) |
|
example = gr.Examples( |
|
inputs=human_img, |
|
examples_per_page=10, |
|
examples=human_list_path |
|
) |
|
|
|
with gr.Column(): |
|
garm_img = gr.Image(label="Garment", type="filepath", interactive=True) |
|
example = gr.Examples( |
|
inputs=garm_img, |
|
examples_per_page=8, |
|
examples=garm_list_path) |
|
with gr.Column(): |
|
image_out = gr.Image(label="Processed image", type="pil") |
|
|
|
with gr.Row(): |
|
try_button = gr.Button(value="Try-on", variant='primary') |
|
|
|
|
|
try_button.click(fn=process_image, inputs=[human_img, garm_img], outputs=image_out) |
|
|
|
|
|
app = gr.mount_gradio_app(app, demo, path="/gradio") |
|
|
|
if __name__ == "__main__": |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|