File size: 5,713 Bytes
2b799f2
 
 
 
 
 
 
 
a2cc10f
fd70c6f
74c4534
2b799f2
74c4534
2b799f2
fd70c6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74c4534
fd70c6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf83e6e
 
fd70c6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf83e6e
fd70c6f
 
 
 
 
 
 
 
74c4534
2b799f2
 
 
 
 
 
 
 
 
 
 
bf83e6e
 
2b799f2
bf83e6e
2b799f2
bf83e6e
 
 
 
 
 
 
 
2b799f2
74c4534
2b799f2
fd70c6f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import base64
import time
import gradio as gr
from PIL import Image
from io import BytesIO
import os

host = "http://18.119.36.46:8888"

def image_prompt(prompt, image1, image2, image3, image4):
    try:
        with open(image1, "rb") as img1, open(image2, "rb") as img2, open(image3, "rb") as img3, open(image4, "rb") as img4:
            source1 = img1.read()
            source2 = img2.read()
            source3 = img3.read()
            source4 = img4.read()
        
        params = {
            "prompt": prompt,
            "image_prompts": [
                {
                    "cn_img": base64.b64encode(source1).decode('utf-8'),
                    "cn_stop": 1,
                    "cn_weight": 1,
                    "cn_type": "ImagePrompt"
                },{
                    "cn_img": base64.b64encode(source2).decode('utf-8'),
                    "cn_stop": 1,
                    "cn_weight": 1,
                    "cn_type": "ImagePrompt"
                },{
                    "cn_img": base64.b64encode(source3).decode('utf-8'),
                    "cn_stop": 1,
                    "cn_weight": 1,
                    "cn_type": "ImagePrompt"
                },{
                    "cn_img": base64.b64encode(source4).decode('utf-8'),
                    "cn_stop": 1,
                    "cn_weight": 1,
                    "cn_type": "ImagePrompt"
                }
            ],
            "async_process": True
        }
        
        session = requests.Session()
        retries = Retry(total=5, backoff_factor=2, status_forcelist=[502, 503, 504])
        session.mount('http://', HTTPAdapter(max_retries=retries))

        response = session.post(
            url=f"{host}/v2/generation/text-to-image-with-ip",
            data=json.dumps(params),
            headers={"Content-Type": "application/json"},
            timeout=15  # Slightly increased timeout
        )
        response.raise_for_status()  # Raise exception for HTTP errors
        result = response.json()
        
        job_id = result.get('job_id')
        if not job_id:
            raise ValueError("Job ID not found in response")
        
        return job_id

    except requests.exceptions.RequestException as e:
        return None, f"API request failed: {str(e)}"
    except json.JSONDecodeError:
        return None, "Failed to decode JSON response from the server."
    except Exception as e:
        return None, f"An unexpected error occurred: {str(e)}"

def query_status(job_id):
    try:
        session = requests.Session()
        retries = Retry(total=5, backoff_factor=2, status_forcelist=[502, 503, 504])
        session.mount('http://', HTTPAdapter(max_retries=retries))
        
        query_url = f"http://18.119.36.46:8888/v1/generation/query-job?job_id={job_id}&require_step_preview=true"
        response = session.get(query_url, timeout=15)
        response.raise_for_status()  # Raise exception for HTTP errors
        job_data = response.json()
        
        job_stage = job_data.get("job_stage")
        
        if job_stage == "SUCCESS":
            final_image_url = job_data.get("job_result")[0].get("url")
            if final_image_url:
                final_image_url = final_image_url.replace("127.0.0.1", "18.119.36.46")
                image_response = session.get(final_image_url, timeout=15)
                image_response.raise_for_status()  # Raise exception for HTTP errors
                image = Image.open(BytesIO(image_response.content))
                return image, "Job completed successfully."
            else:
                return None, "Final image URL not found in the job data."
        elif job_stage == "RUNNING":
            step_preview_base64 = job_data.get("job_step_preview")
            if step_preview_base64:
                image = Image.open(BytesIO(base64.b64decode(step_preview_base64)))
                return image, "Job is running, step preview available."
            return None, "Job is running, no step preview available."
        elif job_stage == "FAILED":
            return None, "Job failed."
        else:
            return None, "Unknown job stage."

    except requests.exceptions.RequestException as e:
        return None, f"API request failed: {str(e)}"
    except json.JSONDecodeError:
        return None, "Failed to decode JSON response from the server."
    except Exception as e:
        return None, f"An unexpected error occurred: {str(e)}"

def gradio_app():
    with gr.Blocks() as demo:
        prompt = gr.Textbox(label="Prompt", placeholder="Enter your text prompt here")
        with gr.Row():
            image1 = gr.Image(label="Image Prompt 1", type="filepath")
            image2 = gr.Image(label="Image Prompt 2", type="filepath")
            image3 = gr.Image(label="Image Prompt 3", type="filepath")
            image4 = gr.Image(label="Image Prompt 4", type="filepath")
        output_image = gr.Image(label="Generated Image")
        status = gr.Textbox(label="Status")
        
        job_id = gr.State()
        
        generate_button = gr.Button("Generate Image")
        generate_button.click(image_prompt, inputs=[prompt, image1, image2, image3, image4], outputs=job_id)
        
        def update_image(job_id):
            if job_id:
                image, status_text = query_status(job_id)
                return image, status_text
            return None, "No job ID found."
        
        demo.load(update_image, inputs=job_id, outputs=[output_image, status], every=2)
    
    demo.launch()

if __name__ == "__main__":
    gradio_app()