File size: 4,595 Bytes
2b799f2
 
 
 
 
 
 
 
a2cc10f
74c4534
2b799f2
74c4534
bf83e6e
 
 
2b799f2
 
 
 
 
74c4534
2b799f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74c4534
2b799f2
 
 
74c4534
2b799f2
 
 
 
 
 
 
74c4534
2b799f2
bf83e6e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b799f2
bf83e6e
74c4534
2b799f2
 
 
 
 
 
 
 
 
 
 
bf83e6e
 
2b799f2
bf83e6e
2b799f2
bf83e6e
 
 
 
 
 
 
 
 
2b799f2
74c4534
2b799f2
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import base64
import time
import gradio as gr
from PIL import Image
from io import BytesIO

host = "http://18.119.36.46:8888"

# 📂 Get the directory where the script is located
script_dir = os.path.dirname(os.path.abspath(__file__))

def image_prompt(prompt, image1, image2, image3, image4):
    source1 = open(image1, "rb").read()
    source2 = open(image2, "rb").read()
    source3 = open(image3, "rb").read()
    source4 = open(image4, "rb").read()
    
    params = {
        "prompt": prompt,
        "image_prompts": [
            {
                "cn_img": base64.b64encode(source1).decode('utf-8'),
                "cn_stop": 1,
                "cn_weight": 1,
                "cn_type": "ImagePrompt"
            },{
                "cn_img": base64.b64encode(source2).decode('utf-8'),
                "cn_stop": 1,
                "cn_weight": 1,
                "cn_type": "ImagePrompt"
            },{
                "cn_img": base64.b64encode(source3).decode('utf-8'),
                "cn_stop": 1,
                "cn_weight": 1,
                "cn_type": "ImagePrompt"
            },{
                "cn_img": base64.b64encode(source4).decode('utf-8'),
                "cn_stop": 1,
                "cn_weight": 1,
                "cn_type": "ImagePrompt"
            }
        ],
        "async_process": True
    }
    
    session = requests.Session()
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
    session.mount('http://', HTTPAdapter(max_retries=retries))

    response = session.post(
        url=f"{host}/v2/generation/text-to-image-with-ip",
        data=json.dumps(params),
        headers={"Content-Type": "application/json"},
        timeout=10  # Increase timeout as needed
    )
    result = response.json()
    
    job_id = result.get('job_id')
    return job_id

def query_status(job_id):
    session = requests.Session()
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
    session.mount('http://', HTTPAdapter(max_retries=retries))
    
    query_url = f"http://18.119.36.46:8888/v1/generation/query-job?job_id={job_id}&require_step_preview=true"
    response = session.get(query_url, timeout=10)  # Increase timeout as needed
    job_data = response.json()
    
    job_stage = job_data.get("job_stage")
    
    if job_stage == "SUCCESS":
        final_image_url = job_data.get("job_result")[0].get("url")
        if final_image_url:
            final_image_url = final_image_url.replace("127.0.0.1", "18.119.36.46")
            image_response = session.get(final_image_url, timeout=10)  # Increase timeout as needed
            image = Image.open(BytesIO(image_response.content))
            return image, "Job completed successfully."
        else:
            return None, "Final image URL not found in the job data."
    elif job_stage == "RUNNING":
        step_preview_base64 = job_data.get("job_step_preview")
        if step_preview_base64:
            image = Image.open(BytesIO(base64.b64decode(step_preview_base64)))
            return image, "Job is running, step preview available."
        return None, "Job is running, no step preview available."
    elif job_stage == "FAILED":
        return None, "Job failed."
    else:
        return None, "Unknown job stage."

def gradio_app():
    with gr.Blocks() as demo:
        prompt = gr.Textbox(label="Prompt", placeholder="Enter your text prompt here")
        with gr.Row():
            image1 = gr.Image(label="Image Prompt 1", type="filepath")
            image2 = gr.Image(label="Image Prompt 2", type="filepath")
            image3 = gr.Image(label="Image Prompt 3", type="filepath")
            image4 = gr.Image(label="Image Prompt 4", type="filepath")
        output_image = gr.Image(label="Generated Image")
        status = gr.Textbox(label="Status")
        
        job_id = gr.State()
        
        generate_button = gr.Button("Generate Image")
        generate_button.click(image_prompt, inputs=[prompt, image1, image2, image3, image4], outputs=job_id)
        
        def update_image(job_id):
            if job_id:
                image, status_text = query_status(job_id)
                return image, status_text
            return None, "No job ID found."
        
        # ⏲️ Update the image every 2 seconds
        demo.load(update_image, inputs=job_id, outputs=[output_image, status], every=2)
    
    demo.launch()

if __name__ == "__main__":
    gradio_app()