Spaces:
Runtime error
Runtime error
import os | |
import json | |
import numpy as np | |
import torch | |
from PIL import Image, ImageDraw | |
import gradio as gr | |
from openai import OpenAI | |
from geopy.geocoders import Nominatim | |
from staticmap import StaticMap, CircleMarker, Polygon | |
from diffusers import ControlNetModel, StableDiffusionControlNetInpaintPipeline | |
from diffusers import StableDiffusionInpaintPipeline | |
import spaces | |
import logging | |
import math | |
from typing import List, Union # Make sure these are actually used or remove them | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s') | |
logger = logging.getLogger(__name__) | |
logger.info("Script starting. Initializing APIs and models.") | |
# Initialize APIs | |
try: | |
openai_client = OpenAI(api_key=os.environ['OPENAI_API_KEY']) | |
logger.info("OpenAI client initialized.") | |
except KeyError: | |
logger.error("OPENAI_API_KEY environment variable not set!") | |
# Handle this critical error, perhaps exit or raise | |
raise | |
except Exception as e: | |
logger.error(f"Error initializing OpenAI client: {e}") | |
raise | |
try: | |
geolocator = Nominatim(user_agent="geoapi_visualizemap") # More specific user agent | |
logger.info("Geolocator initialized.") | |
except Exception as e: | |
logger.error(f"Error initializing Geolocator: {e}") | |
raise | |
# Function to fetch coordinates | |
def get_geo_coordinates(location_name): | |
logger.info(f"Attempting to fetch coordinates for: {location_name}") | |
try: | |
location = geolocator.geocode(location_name, timeout=10) # Added timeout | |
if location: | |
logger.info(f"Coordinates found for {location_name}: {[location.longitude, location.latitude]}") | |
return [location.longitude, location.latitude] | |
logger.warning(f"No location data returned for {location_name}") | |
return None | |
except Exception as e: | |
logger.error(f"Error fetching coordinates for {location_name}: {e}") | |
return None | |
# Function to process OpenAI chat response | |
def process_openai_response(query): | |
logger.info(f"Processing OpenAI query: {query}") | |
try: | |
response = openai_client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{ | |
"role": "system", | |
"content": """ | |
You are an assistant that generates structured JSON output for geographical queries with city names. Your task is to generate a JSON object containing information about geographical features and their representation based on the user's query. Follow these rules: | |
1. The JSON should always have the following structure: | |
{ | |
"input": "<user's query>", | |
"output": { | |
"answer": "<concise text answering the query>", | |
"feature_representation": { | |
"type": "<one of: Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon, GeometryCollection>", | |
"cities": ["<list of city names>"], | |
"properties": { | |
"description": "<a prompt for a diffusion model describing the geographical feature>" | |
} | |
} | |
} | |
} | |
2. For the `type` field in `feature_representation`: | |
- Use "Point" for single city queries. | |
- Use "MultiPoint" for queries involving multiple cities not forming a line or area. | |
- Use "LineString" for queries about paths between two or more cities. | |
- Use "Polygon" for queries about areas formed by three or more cities. | |
3. For the `cities` field: | |
- List the names of cities mentioned in the query in the order they appear. | |
- If no cities are mentioned, try to add them with your knowledge. | |
4. For the `properties.description` field: | |
- Describe the geographical feature in a creative way, suitable for generating an image with a diffusion model. | |
### Example Input: | |
"Mark a triangular area of 3 US cities." | |
### Example Output: | |
{ | |
"input": "Mark a triangular area of 3 US cities.", | |
"output": { | |
"answer": "The cities New York, Boston, and Philadelphia form a triangle.", | |
"feature_representation": { | |
"type": "Polygon", | |
"cities": ["New York", "Boston", "Philadelphia"], | |
"properties": { | |
"description": "A satellite image of a triangular area formed by New York, Boston, and Philadelphia, with green fields and urban regions, 4k resolution, highly detailed." | |
} | |
} | |
} | |
} | |
Generate similar JSON for the following query: | |
""" | |
}, | |
{ | |
"role": "user", | |
"content": query | |
} | |
], | |
temperature=1, | |
max_tokens=2048, | |
top_p=1, | |
frequency_penalty=0, | |
presence_penalty=0, | |
response_format={"type": "json_object"} | |
) | |
content = response.choices[0].message.content | |
logger.info(f"Raw OpenAI response content: {content}") | |
parsed_response = json.loads(content) | |
logger.info(f"Parsed OpenAI response: {json.dumps(parsed_response, indent=2)}") | |
return parsed_response | |
except Exception as e: | |
logger.error(f"Error processing OpenAI response for query '{query}': {e}") | |
# Consider returning a default error structure or re-raising | |
raise | |
# Generate GeoJSON from OpenAI response | |
def generate_geojson(response_data): # Renamed to avoid confusion with http response | |
logger.info(f"Generating GeoJSON from OpenAI response_data: {json.dumps(response_data, indent=2)}") | |
try: | |
feature_type = response_data['output']['feature_representation']['type'] | |
city_names = response_data['output']['feature_representation']['cities'] | |
properties = response_data['output']['feature_representation']['properties'] | |
logger.info(f"Feature type: {feature_type}, Cities: {city_names}") | |
coordinates = [] | |
for city in city_names: | |
coord = get_geo_coordinates(city) | |
if coord: | |
coordinates.append(coord) | |
else: | |
logger.warning(f"Coordinates not found for city: {city}. Skipping.") | |
logger.info(f"Collected coordinates: {coordinates}") | |
# Ensure coordinates has the correct structure for each geometry type | |
if feature_type == "Point": | |
if not coordinates: | |
raise ValueError("Point type requires at least one coordinate.") | |
# GeoJSON Point expects a single coordinate pair, not a list of pairs | |
final_coordinates = coordinates[0] if coordinates else [] | |
elif feature_type == "MultiPoint": | |
final_coordinates = coordinates # List of coordinate pairs | |
elif feature_type == "LineString": | |
if len(coordinates) < 2: | |
raise ValueError("LineString requires at least 2 coordinates.") | |
final_coordinates = coordinates # List of coordinate pairs | |
elif feature_type == "Polygon": | |
if len(coordinates) < 3: | |
raise ValueError("Polygon requires at least 3 coordinates.") | |
# Close the polygon by appending the first point at the end | |
if coordinates[0] != coordinates[-1]: # Check if already closed | |
coordinates.append(coordinates[0]) | |
final_coordinates = [coordinates] # Nest coordinates for Polygon | |
else: # MultiLineString, MultiPolygon, GeometryCollection | |
logger.warning(f"Unsupported or complex feature_type: {feature_type}. Using raw coordinates.") | |
final_coordinates = coordinates # Or handle more specifically | |
geojson_data = { | |
"type": "FeatureCollection", | |
"features": [ | |
{ | |
"type": "Feature", | |
"properties": properties, | |
"geometry": { | |
"type": feature_type, | |
"coordinates": final_coordinates, | |
}, | |
} | |
], | |
} | |
logger.info(f"Generated GeoJSON: {json.dumps(geojson_data, indent=2)}") | |
return geojson_data | |
except KeyError as e: | |
logger.error(f"KeyError while generating GeoJSON: {e}. Response data: {json.dumps(response_data, indent=2)}") | |
raise | |
except ValueError as e: | |
logger.error(f"ValueError while generating GeoJSON: {e}. Coordinates: {coordinates if 'coordinates' in locals() else 'N/A'}") | |
raise | |
except Exception as e: | |
logger.error(f"Unexpected error in generate_geojson: {e}") | |
raise | |
# Sort coordinates for a simple polygon (Reduce intersection points) | |
def sort_coordinates_for_simple_polygon(geojson): | |
logger.info("Attempting to sort polygon coordinates.") | |
try: | |
coordinates = geojson['features'][0]['geometry']['coordinates'][0] | |
logger.info(f"Original polygon coordinates: {coordinates}") | |
if not coordinates or len(coordinates) < 3: | |
logger.warning("Not enough coordinates to sort for a polygon.") | |
return geojson | |
# Remove the last point if it duplicates the first (GeoJSON convention for polygons) | |
if coordinates[0] == coordinates[-1] and len(coordinates) > 1: | |
plot_coordinates = coordinates[:-1] | |
else: | |
plot_coordinates = coordinates | |
if not plot_coordinates or len(plot_coordinates) < 3: # Check again after potentially removing last point | |
logger.warning("Not enough unique coordinates to sort for a polygon after de-duplication.") | |
return geojson | |
# Calculate the centroid of the points | |
centroid_x = sum(point[0] for point in plot_coordinates) / len(plot_coordinates) | |
centroid_y = sum(point[1] for point in plot_coordinates) / len(plot_coordinates) | |
logger.info(f"Calculated centroid: ({centroid_x}, {centroid_y})") | |
def angle_from_centroid(point): | |
dx = point[0] - centroid_x | |
dy = point[1] - centroid_y | |
return math.atan2(dy, dx) | |
sorted_plot_coordinates = sorted(plot_coordinates, key=angle_from_centroid) | |
sorted_plot_coordinates.append(sorted_plot_coordinates[0]) # Close the polygon | |
geojson['features'][0]['geometry']['coordinates'][0] = sorted_plot_coordinates | |
logger.info(f"Sorted polygon coordinates: {sorted_plot_coordinates}") | |
return geojson | |
except Exception as e: | |
logger.error(f"Error sorting polygon coordinates: {e}") | |
return geojson # Return original on error | |
# Generate static map image | |
def generate_static_map(geojson_data, invisible=False): | |
logger.info(f"Generating static map. Invisible: {invisible}. GeoJSON: {json.dumps(geojson_data, indent=2)}") | |
try: | |
m = StaticMap(600, 600) | |
color = '#1C00ff00' if invisible else '#42445A85' # Transparent if invisible, else semi-transparent blue/grey | |
for feature in geojson_data["features"]: | |
geom_type = feature["geometry"]["type"] | |
coords = feature["geometry"]["coordinates"] | |
logger.info(f"Processing feature type: {geom_type} with coords: {coords}") | |
if geom_type == "Point": | |
# Coords for Point is a single [lon, lat] | |
if coords and len(coords) == 2 and isinstance(coords[0], (int, float)): | |
m.add_marker(CircleMarker((coords[0], coords[1]), color, 20 if invisible else 10)) # Adjusted size | |
else: | |
logger.warning(f"Skipping Point due to invalid coordinate structure: {coords}") | |
elif geom_type == "MultiPoint": | |
# Coords for MultiPoint is a list of [lon, lat] | |
for coord_pair in coords: | |
if coord_pair and len(coord_pair) == 2 and isinstance(coord_pair[0], (int, float)): | |
m.add_marker(CircleMarker((coord_pair[0], coord_pair[1]), color, 20 if invisible else 10)) | |
else: | |
logger.warning(f"Skipping point in MultiPoint due to invalid coordinate structure: {coord_pair}") | |
elif geom_type == "LineString": | |
# Coords for LineString is a list of [lon, lat] | |
if len(coords) >=2: | |
m.add_line(Polygon([(c[0], c[1]) for c in coords], "blue", 3)) # For LineString, use add_line or thicker Polygon outline | |
else: | |
logger.warning(f"Skipping LineString, not enough points: {coords}") | |
elif geom_type == "Polygon": | |
# Coords for Polygon is a list containing one list of [lon, lat] (the exterior ring) | |
for polygon_ring in coords: # Should be only one for simple polygon | |
if len(polygon_ring) >= 3: | |
m.add_polygon(Polygon([(c[0], c[1]) for c in polygon_ring], color, '#0000AA' if not invisible else '#1C00ff00', 3 if not invisible else 0)) | |
else: | |
logger.warning(f"Skipping polygon ring, not enough points: {polygon_ring}") | |
# Add handling for MultiLineString, MultiPolygon if your OpenAI might produce them | |
else: | |
logger.warning(f"Unsupported geometry type for static map: {geom_type}") | |
rendered_map = m.render(center=None, zoom=None) # Let it auto-center and zoom | |
logger.info(f"Static map rendered successfully. Invisible: {invisible}") | |
return rendered_map | |
except Exception as e: | |
logger.error(f"Error generating static map (invisible={invisible}): {e}") | |
# Return a placeholder or re-raise | |
return Image.new("RGB", (600, 600), color="grey") # Placeholder | |
# ControlNet pipeline setup | |
logger.info("Initializing Stable Diffusion Inpaint Pipeline.") | |
try: | |
# controlnet = ControlNetModel.from_pretrained("stabilityai/stable-diffusion-2-inpainting", torch_dtype=torch.float16) | |
# pipeline = StableDiffusionControlNetInpaintPipeline.from_pretrained( | |
# "runwayml/stable-diffusion-inpainting", controlnet=controlnet, torch_dtype=torch.float16 # Changed base model | |
# ) | |
pipeline = StableDiffusionInpaintPipeline.from_pretrained( | |
"stabilityai/stable-diffusion-2-inpainting", # This is a full inpainting pipeline, not just a controlnet | |
torch_dtype=torch.float16, | |
) | |
pipeline.to("cuda") | |
logger.info("Stable Diffusion Inpaint Pipeline loaded to CUDA.") | |
except Exception as e: | |
logger.error(f"Error initializing Stable Diffusion pipeline: {e}") | |
raise | |
# This function was for ControlNet, may not be needed as-is for StableDiffusionInpaintPipeline | |
# It expects init_image to be a NumPy array, and mask_image a NumPy array | |
def make_inpaint_condition(init_image_pil, mask_image_pil): | |
logger.info("Preparing inpaint condition (ControlNet specific, may need adjustment).") | |
# Ensure PIL Images are converted to NumPy arrays correctly | |
init_image_np = np.array(init_image_pil.convert("RGB")).astype(np.float32) / 255.0 | |
mask_image_np = np.array(mask_image_pil.convert("L")).astype(np.float32) / 255.0 # Ensure mask is L | |
logger.info(f"Init image shape: {init_image_np.shape}, Mask image shape: {mask_image_np.shape}") | |
if init_image_np.shape[:2] != mask_image_np.shape[:2]: | |
logger.error(f"Image and mask dimensions mismatch: {init_image_np.shape[:2]} vs {mask_image_np.shape[:2]}") | |
# Resize mask to match image if necessary, or raise error | |
# For now, let's assume they should match and this is an error state | |
raise ValueError("Image and mask_image must have the same height and width.") | |
# This operation is specific to how some ControlNet inpainting expects masked areas. | |
# Standard SDInpaintPipeline might not need this. | |
# init_image_np[mask_image_np > 0.5] = -1.0 # set as masked pixel | |
# init_image_np = np.expand_dims(init_image_np, 0).transpose(0, 3, 1, 2) | |
# init_image_tensor = torch.from_numpy(init_image_np) | |
# logger.info(f"Processed init_image tensor shape: {init_image_tensor.shape}") | |
# return init_image_tensor | |
# For StableDiffusionInpaintPipeline, `image` and `mask_image` are passed directly as PIL Images or tensors. | |
# The `make_inpaint_condition` might be redundant if you are not using a ControlNet that specifically requires this format. | |
# If you were using ControlNet, this would be the control_image. | |
# For now, let's assume it's meant to be the 'image' input for SD Inpaint, preprocessed. | |
return init_image_pil # Or init_image_tensor if pipeline expects tensor | |
def generate_satellite_image(base_image_pil, mask_image_pil, prompt): | |
logger.info(f"Generating satellite image with prompt: '{prompt}'") | |
logger.info(f"Base image type: {type(base_image_pil)}, Mask image type: {type(mask_image_pil)}") | |
try: | |
# StableDiffusionInpaintPipeline expects PIL Images or tensors for image and mask_image | |
# The `control_image` argument is not standard for StableDiffusionInpaintPipeline. | |
# It's specific to StableDiffusionControlNetInpaintPipeline. | |
# If you were using the ControlNet variant: | |
# control_image_tensor = make_inpaint_condition(base_image_pil, mask_image_pil) | |
# result = pipeline( | |
# prompt=prompt, | |
# image=base_image_pil, # or tensor version if pipeline prefers | |
# mask_image=mask_image_pil, # or tensor version | |
# control_image=control_image_tensor, # This is for ControlNet | |
# strength=0.47, # strength might be called differently or not used in SD Inpaint | |
# guidance_scale=9.5, # Adjusted scale | |
# num_inference_steps=50 # Adjusted steps | |
# ).images[0] | |
# For StableDiffusionInpaintPipeline: | |
result = pipeline( | |
prompt=prompt, | |
image=base_image_pil, # PIL Image or PyTorch tensor | |
mask_image=mask_image_pil, # PIL Image or PyTorch tensor | |
guidance_scale=9.5, # More reasonable default | |
num_inference_steps=50 # More reasonable default | |
).images[0] | |
logger.info("Satellite image generated successfully.") | |
return result | |
except Exception as e: | |
logger.error(f"Error generating satellite image: {e}") | |
return Image.new("RGB", base_image_pil.size, color="red") # Placeholder | |
# Gradio UI | |
def handle_query(query: str): | |
logger.info(f"--- Handling query: {query} ---") | |
try: | |
openai_response = process_openai_response(query) | |
logger.info(f"handle_query: OpenAI response received: type={type(openai_response)}") | |
geojson_data = generate_geojson(openai_response) | |
logger.info(f"handle_query: GeoJSON data generated: type={type(geojson_data)}") | |
processed_geojson_data = geojson_data | |
if geojson_data["features"][0]["geometry"]["type"] == 'Polygon': | |
logger.info("handle_query: Detected Polygon, attempting to sort coordinates.") | |
processed_geojson_data = sort_coordinates_for_simple_polygon(geojson_data) | |
map_image = generate_static_map(processed_geojson_data, invisible=False) | |
logger.info(f"handle_query: Visible map_image generated: type={type(map_image)}") | |
empty_map_image = generate_static_map(processed_geojson_data, invisible=True) # Use processed_geojson_data here too | |
logger.info(f"handle_query: Invisible empty_map_image generated: type={type(empty_map_image)}") | |
# Ensure images are PIL for diff | |
map_array = np.array(map_image.convert("RGB")) | |
empty_map_array = np.array(empty_map_image.convert("RGB")) | |
difference = np.abs(map_array - empty_map_array) | |
threshold = 10 # May need adjustment | |
mask_array = (np.sum(difference, axis=-1) > threshold).astype(np.uint8) * 255 | |
mask_image = Image.fromarray(mask_array, mode="L") | |
logger.info(f"handle_query: Mask image generated: type={type(mask_image)}") | |
prompt_for_image = openai_response['output']['feature_representation']['properties']['description'] | |
logger.info(f"handle_query: Prompt for satellite image: '{prompt_for_image}', type={type(prompt_for_image)}") | |
# Pass empty_map_image (which is the base map without visible markers) | |
# and the derived mask_image to the inpainting function | |
satellite_image = generate_satellite_image( | |
empty_map_image, mask_image, prompt_for_image | |
) | |
logger.info(f"handle_query: Satellite image generated: type={type(satellite_image)}") | |
# Ensure all returned image types are PIL Images | |
final_map_image = map_image if isinstance(map_image, Image.Image) else Image.new("RGB", (600,600), "grey") | |
final_satellite_image = satellite_image if isinstance(satellite_image, Image.Image) else Image.new("RGB", (600,600), "red") | |
final_empty_map_image = empty_map_image if isinstance(empty_map_image, Image.Image) else Image.new("RGB", (600,600), "grey") | |
final_mask_image = mask_image if isinstance(mask_image, Image.Image) else Image.new("L", (600,600), 0) | |
logger.info(f"handle_query: Returning types: {type(final_map_image)}, {type(final_satellite_image)}, {type(final_empty_map_image)}, {type(final_mask_image)}, {type(prompt_for_image)}") | |
return final_map_image, final_satellite_image, final_empty_map_image, final_mask_image, prompt_for_image | |
except Exception as e: | |
logger.error(f"--- Error in handle_query for query '{query}': {e} ---", exc_info=True) | |
# Return placeholder/error images and message | |
error_img = Image.new("RGB", (600, 600), "black") | |
error_text_img = ImageDraw.Draw(error_img) | |
error_text_img.text((10,10), f"Error: {e}", fill="white") | |
return error_img, error_img, error_img, error_img, f"Error processing query: {e}" | |
def update_query(selected_query_value: str) -> str: # Added type hints | |
logger.info(f"Dropdown changed. Selected query: '{selected_query_value}', type: {type(selected_query_value)}") | |
return selected_query_value | |
logger.info("Defining Gradio UI components.") | |
query_options = [ | |
"Area covering south asian subcontinent", | |
"Mark a triangular area using New York, Boston, and Texas", # Texas is a state, might cause issues with geocoding as a city point | |
"Mark cities in India", | |
"Show me Lotus Tower in a Map", | |
"Mark the area of west germany", | |
"Mark the area of the Amazon rainforest", | |
"Mark the area of the Sahara desert" | |
] | |
logger.info(f"Query options: {query_options}") | |
# It's crucial that the `value` parameters for components are of the type Gradio expects | |
# for their schema generation, even before any function is called. | |
# For gr.Textbox, `value` should be a string. | |
# For gr.Dropdown, `value` should be one of the `choices` or None. | |
try: | |
with gr.Blocks() as demo: | |
logger.info("Inside gr.Blocks() context manager.") | |
with gr.Row(): | |
logger.info("Defining first gr.Row.") | |
selected_query = gr.Dropdown(label="Select Query", choices=query_options, value=query_options[-1], type="value") # Ensure type="value" if not default | |
logger.info(f"selected_query Dropdown defined. Initial value: '{query_options[-1]}', type: {type(query_options[-1])}") | |
query_input = gr.Textbox(label="Enter Query", value=str(query_options[-1])) # Ensure value is string | |
logger.info(f"query_input Textbox defined. Initial value: '{query_options[-1]}', type: {type(query_options[-1])}") | |
# The `change` event should not cause the schema error, but good to log | |
selected_query.change(fn=update_query, inputs=selected_query, outputs=query_input) | |
logger.info("selected_query.change event defined.") | |
submit_btn = gr.Button("Submit") | |
logger.info("submit_btn Button defined.") | |
with gr.Row(): | |
logger.info("Defining second gr.Row for image outputs.") | |
map_output = gr.Image(label="Map Visualization") # No initial value needed here, will be populated by function | |
logger.info("map_output Image defined.") | |
satellite_output = gr.Image(label="Generated Map Image") | |
logger.info("satellite_output Image defined.") | |
with gr.Row(): | |
logger.info("Defining third gr.Row for debug outputs.") | |
empty_map_output = gr.Image(label="Empty Visualization") | |
logger.info("empty_map_output Image defined.") | |
mask_output = gr.Image(label="Mask") | |
logger.info("mask_output Image defined.") | |
# For image_prompt, provide a default string value or None. An empty string is fine. | |
image_prompt_output = gr.Textbox(label="Image Prompt Used", value="") # Changed name to avoid conflict, ensure string value | |
logger.info(f"image_prompt_output Textbox defined. Initial value: '', type: str") | |
# The outputs list must match the number and expected types of what handle_query returns. | |
# handle_query returns: PIL.Image, PIL.Image, PIL.Image, PIL.Image, str | |
# Gradio components: gr.Image, gr.Image, gr.Image, gr.Image, gr.Textbox | |
# This mapping looks correct. | |
submit_btn.click(fn=handle_query, | |
inputs=[query_input], | |
outputs=[map_output, satellite_output, empty_map_output, mask_output, image_prompt_output]) | |
logger.info("submit_btn.click event defined.") | |
logger.info("Gradio Blocks defined successfully.") | |
except Exception as e: | |
logger.error(f"Error during Gradio UI definition: {e}", exc_info=True) | |
raise | |
if __name__ == "__main__": | |
logger.info("Launching Gradio demo.") | |
try: | |
demo.launch() # debug=True can sometimes give more frontend info, but not for this backend error | |
logger.info("Gradio demo launched.") | |
except Exception as e: | |
logger.error(f"Error launching Gradio demo: {e}", exc_info=True) | |
raise |