Spaces:
Running
on
Zero
Running
on
Zero
File size: 10,936 Bytes
45f7be1 42897ae a52b051 a1a380b a52b051 879a241 24a1808 8152b02 879a241 6efeffc 45f7be1 6efeffc 45f7be1 a52b051 45f7be1 879a241 45f7be1 a52b051 45f7be1 8152b02 c6fc1b3 2ef199a c6fc1b3 2ef199a c6fc1b3 2ef199a c6fc1b3 e74df71 c6fc1b3 8152b02 36e6906 45f7be1 a52b051 45f7be1 fd3b5e9 45f7be1 a52b051 45f7be1 a52b051 0376c05 fe42fe7 0376c05 45f7be1 0261894 8152b02 45f7be1 8152b02 0376c05 45f7be1 0376c05 45f7be1 fe42fe7 8152b02 24a1808 42897ae a52b051 d6f2bf8 a1a380b 2cf2e45 8152b02 42897ae 8319e98 42897ae 8319e98 7a1d390 8319e98 42897ae 7a1d390 42897ae 8319e98 7a1d390 45f7be1 8152b02 92e64c7 45f7be1 1b5be08 45f7be1 3e8f3e6 a52b051 8319e98 a52b051 36e6906 45f7be1 a5e00fd 9690adb 8152b02 45f7be1 a52b051 45f7be1 a52b051 85bb77d 8152b02 a52b051 9b980f8 8152b02 9b980f8 36e6906 9b980f8 aa6df08 9b980f8 a52b051 2eda22a 45f7be1 f219d44 8f22ee9 2eda22a 7af0491 2eda22a 7af0491 9b980f8 45f7be1 f219d44 45f7be1 2eda22a 681ada7 2eda22a 3647dae 0b075c8 2eda22a 45f7be1 36e6906 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
import os
import json
import numpy as np
import torch
from PIL import Image, ImageDraw
import gradio as gr
from openai import OpenAI
from geopy.geocoders import Nominatim
from staticmap import StaticMap, CircleMarker, Polygon
from diffusers import ControlNetModel, StableDiffusionControlNetInpaintPipeline
import spaces
import logging
import math
from typing import List, Union
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize APIs
openai_client = OpenAI(api_key=os.environ['OPENAI_API_KEY'])
geolocator = Nominatim(user_agent="geoapi")
# Function to fetch coordinates
@spaces.GPU
def get_geo_coordinates(location_name):
try:
location = geolocator.geocode(location_name)
if location:
return [location.longitude, location.latitude]
return None
except Exception as e:
logger.error(f"Error fetching coordinates for {location_name}: {e}")
return None
# Function to process OpenAI chat response
@spaces.GPU
def process_openai_response(query):
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """
You are an assistant that generates structured JSON output for geographical queries with city names. Your task is to generate a JSON object containing information about geographical features and their representation based on the user's query. Follow these rules:
1. The JSON should always have the following structure:
{
"input": "<user's query>",
"output": {
"answer": "<concise text answering the query>",
"feature_representation": {
"type": "<one of: Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon, GeometryCollection>",
"cities": ["<list of city names>"],
"properties": {
"description": "<a prompt for a diffusion model describing the geographical feature>"
}
}
}
}
2. For the `type` field in `feature_representation`:
- Use "Point" for single city queries.
- Use "MultiPoint" for queries involving multiple cities not forming a line or area.
- Use "LineString" for queries about paths between two or more cities.
- Use "Polygon" for queries about areas formed by three or more cities.
3. For the `cities` field:
- List the names of cities mentioned in the query in the order they appear.
- If no cities are mentioned, try to add them with your knowledge.
4. For the `properties.description` field:
- Describe the geographical feature in a creative way, suitable for generating an image with a diffusion model.
### Example Input:
"Mark a triangular area of 3 US cities."
### Example Output:
{
"input": "Mark a triangular area of 3 US cities.",
"output": {
"answer": "The cities New York, Boston, and Philadelphia form a triangle.",
"feature_representation": {
"type": "Polygon",
"cities": ["New York", "Boston", "Philadelphia"],
"properties": {
"description": "A satellite image of a triangular area formed by New York, Boston, and Philadelphia, with green fields and urban regions, 4k resolution, highly detailed."
}
}
}
}
Generate similar JSON for the following query:
"""
},
{
"role": "user",
"content": query
}
],
temperature=1,
max_tokens=2048,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
# Generate GeoJSON from OpenAI response
@spaces.GPU
def generate_geojson(response):
logger.info(f"OpenAI response: {response}")
feature_type = response['output']['feature_representation']['type']
city_names = response['output']['feature_representation']['cities']
properties = response['output']['feature_representation']['properties']
coordinates = []
# Fetch coordinates for cities
for city in city_names:
try:
coord = get_geo_coordinates(city)
if coord:
coordinates.append(coord)
else:
logger.warning(f"Coordinates not found for city: {city}")
except Exception as e:
logger.error(f"Error fetching coordinates for {city}: {e}")
if feature_type == "Polygon":
if len(coordinates) < 3:
raise ValueError("Polygon requires at least 3 coordinates.")
# Close the polygon by appending the first point at the end
coordinates.append(coordinates[0])
coordinates = [coordinates] # Nest coordinates for Polygon
# Create the GeoJSON object
geojson_data = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": properties,
"geometry": {
"type": feature_type,
"coordinates": coordinates,
},
}
],
}
return geojson_data
# Sort coordinates for a simple polygon (Reduce intersection points)
def sort_coordinates_for_simple_polygon(geojson):
# Extract coordinates from the GeoJSON
coordinates = geojson['features'][0]['geometry']['coordinates'][0]
# Remove the last point if it duplicates the first (GeoJSON convention for polygons)
if coordinates[0] == coordinates[-1]:
coordinates = coordinates[:-1]
# Calculate the centroid of the points
centroid_x = sum(point[0] for point in coordinates) / len(coordinates)
centroid_y = sum(point[1] for point in coordinates) / len(coordinates)
# Define a function to calculate the angle relative to the centroid
def angle_from_centroid(point):
dx = point[0] - centroid_x
dy = point[1] - centroid_y
return math.atan2(dy, dx)
# Sort points by their angle from the centroid
sorted_coordinates = sorted(coordinates, key=angle_from_centroid)
# Close the polygon by appending the first point to the end
sorted_coordinates.append(sorted_coordinates[0])
# Update the GeoJSON with sorted coordinates
geojson['features'][0]['geometry']['coordinates'][0] = sorted_coordinates
return geojson
# Generate static map image
@spaces.GPU
def generate_static_map(geojson_data, invisible=False):
m = StaticMap(600, 600)
logger.info(f"GeoJSON data: {geojson_data}")
for feature in geojson_data["features"]:
geom_type = feature["geometry"]["type"]
coords = feature["geometry"]["coordinates"]
if geom_type == "Point":
m.add_marker(CircleMarker((coords[0][0], coords[0][1]), '#1C00ff00' if invisible else '#42445A85', 100))
elif geom_type in ["MultiPoint", "LineString"]:
for coord in coords:
m.add_marker(CircleMarker((coord[0], coord[1]), '#1C00ff00' if invisible else '#42445A85', 100))
elif geom_type in ["Polygon", "MultiPolygon"]:
for polygon in coords:
m.add_polygon(Polygon([(c[0], c[1]) for c in polygon], '#1C00ff00' if invisible else '#42445A85', 3))
return m.render()
# ControlNet pipeline setup
controlnet = ControlNetModel.from_pretrained("lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16)
pipeline = StableDiffusionControlNetInpaintPipeline.from_pretrained(
"stable-diffusion-v1-5/stable-diffusion-inpainting", controlnet=controlnet, torch_dtype=torch.float16
)
pipeline.to('cuda')
@spaces.GPU
def make_inpaint_condition(init_image, mask_image):
init_image = np.array(init_image.convert("RGB")).astype(np.float32) / 255.0
mask_image = np.array(mask_image.convert("L")).astype(np.float32) / 255.0
assert init_image.shape[0:1] == mask_image.shape[0:1], "image and image_mask must have the same image size"
init_image[mask_image > 0.5] = -1.0 # set as masked pixel
init_image = np.expand_dims(init_image, 0).transpose(0, 3, 1, 2)
init_image = torch.from_numpy(init_image)
return init_image
@spaces.GPU
def generate_satellite_image(init_image, mask_image, prompt):
control_image = make_inpaint_condition(init_image, mask_image)
result = pipeline(
prompt=prompt,
image=init_image,
mask_image=mask_image,
control_image=control_image,
strength=0.47,
guidance_scale=95,
num_inference_steps=250
)
return result.images[0]
# Gradio UI
@spaces.GPU
def handle_query(query):
response = process_openai_response(query)
geojson_data = generate_geojson(response)
if geojson_data["features"][0]["geometry"]["type"] == 'Polygon':
geojson_data_coords = sort_coordinates_for_simple_polygon(geojson_data)
map_image = generate_static_map(geojson_data_coords)
else:
map_image = generate_static_map(geojson_data)
empty_map_image = generate_static_map(geojson_data, invisible=True)
difference = np.abs(np.array(map_image.convert("RGB")) - np.array(empty_map_image.convert("RGB")))
threshold = 10
mask = (np.sum(difference, axis=-1) > threshold).astype(np.uint8) * 255
mask_image = Image.fromarray(mask, mode="L")
satellite_image = generate_satellite_image(
empty_map_image, mask_image, response['output']['feature_representation']['properties']['description']
)
return map_image, satellite_image, empty_map_image, mask_image, response
def update_query(selected_query):
return selected_query
query_options = [
"Area covering south asian subcontinent",
"Mark a triangular area using New York, Boston, and Texas",
"Mark cities in India",
"Show me Lotus Tower in a Map",
"Mark the area of west germany",
"Mark the area of the Amazon rainforest",
"Mark the area of the Sahara desert"
]
with gr.Blocks() as demo:
with gr.Row():
selected_query = gr.Dropdown(label="Select Query", choices=query_options, value=query_options[-1])
query_input = gr.Textbox(label="Enter Query", value=query_options[-1])
selected_query.change(update_query, inputs=selected_query, outputs=query_input)
submit_btn = gr.Button("Submit")
with gr.Row():
map_output = gr.Image(label="Map Visualization")
satellite_output = gr.Image(label="Generated Map Image")
with gr.Row():
empty_map_output = gr.Image(label="Empty Visualization")
mask_output = gr.Image(label="Mask")
image_prompt = gr.Textbox(label="Image Prompt Used")
submit_btn.click(handle_query, inputs=[query_input], outputs=[map_output, satellite_output, empty_map_output, mask_output, image_prompt])
if __name__ == "__main__":
demo.launch()
|