VisualizeGeoMap / app.py
Suchinthana
Logging, prompt, pipe update
ecb6154
raw
history blame
26.1 kB
import os
import json
import numpy as np
import torch
from PIL import Image, ImageDraw
import gradio as gr
from openai import OpenAI
from geopy.geocoders import Nominatim
from staticmap import StaticMap, CircleMarker, Polygon
from diffusers import ControlNetModel, StableDiffusionControlNetInpaintPipeline
from diffusers import StableDiffusionInpaintPipeline
import spaces
import logging
import math
from typing import List, Union # Make sure these are actually used or remove them
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s')
logger = logging.getLogger(__name__)
logger.info("Script starting. Initializing APIs and models.")
# Initialize APIs
try:
openai_client = OpenAI(api_key=os.environ['OPENAI_API_KEY'])
logger.info("OpenAI client initialized.")
except KeyError:
logger.error("OPENAI_API_KEY environment variable not set!")
# Handle this critical error, perhaps exit or raise
raise
except Exception as e:
logger.error(f"Error initializing OpenAI client: {e}")
raise
try:
geolocator = Nominatim(user_agent="geoapi_visualizemap") # More specific user agent
logger.info("Geolocator initialized.")
except Exception as e:
logger.error(f"Error initializing Geolocator: {e}")
raise
# Function to fetch coordinates
@spaces.GPU
def get_geo_coordinates(location_name):
logger.info(f"Attempting to fetch coordinates for: {location_name}")
try:
location = geolocator.geocode(location_name, timeout=10) # Added timeout
if location:
logger.info(f"Coordinates found for {location_name}: {[location.longitude, location.latitude]}")
return [location.longitude, location.latitude]
logger.warning(f"No location data returned for {location_name}")
return None
except Exception as e:
logger.error(f"Error fetching coordinates for {location_name}: {e}")
return None
# Function to process OpenAI chat response
@spaces.GPU
def process_openai_response(query):
logger.info(f"Processing OpenAI query: {query}")
try:
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """
You are an assistant that generates structured JSON output for geographical queries with city names. Your task is to generate a JSON object containing information about geographical features and their representation based on the user's query. Follow these rules:
1. The JSON should always have the following structure:
{
"input": "<user's query>",
"output": {
"answer": "<concise text answering the query>",
"feature_representation": {
"type": "<one of: Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon, GeometryCollection>",
"cities": ["<list of city names>"],
"properties": {
"description": "<a prompt for a diffusion model describing the geographical feature>"
}
}
}
}
2. For the `type` field in `feature_representation`:
- Use "Point" for single city queries.
- Use "MultiPoint" for queries involving multiple cities not forming a line or area.
- Use "LineString" for queries about paths between two or more cities.
- Use "Polygon" for queries about areas formed by three or more cities.
3. For the `cities` field:
- List the names of cities mentioned in the query in the order they appear.
- If no cities are mentioned, try to add them with your knowledge.
4. For the `properties.description` field:
- Describe the geographical feature in a creative way, suitable for generating an image with a diffusion model.
### Example Input:
"Mark a triangular area of 3 US cities."
### Example Output:
{
"input": "Mark a triangular area of 3 US cities.",
"output": {
"answer": "The cities New York, Boston, and Philadelphia form a triangle.",
"feature_representation": {
"type": "Polygon",
"cities": ["New York", "Boston", "Philadelphia"],
"properties": {
"description": "A satellite image of a triangular area formed by New York, Boston, and Philadelphia, with green fields and urban regions, 4k resolution, highly detailed."
}
}
}
}
Generate similar JSON for the following query:
"""
},
{
"role": "user",
"content": query
}
],
temperature=1,
max_tokens=2048,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
response_format={"type": "json_object"}
)
content = response.choices[0].message.content
logger.info(f"Raw OpenAI response content: {content}")
parsed_response = json.loads(content)
logger.info(f"Parsed OpenAI response: {json.dumps(parsed_response, indent=2)}")
return parsed_response
except Exception as e:
logger.error(f"Error processing OpenAI response for query '{query}': {e}")
# Consider returning a default error structure or re-raising
raise
# Generate GeoJSON from OpenAI response
@spaces.GPU
def generate_geojson(response_data): # Renamed to avoid confusion with http response
logger.info(f"Generating GeoJSON from OpenAI response_data: {json.dumps(response_data, indent=2)}")
try:
feature_type = response_data['output']['feature_representation']['type']
city_names = response_data['output']['feature_representation']['cities']
properties = response_data['output']['feature_representation']['properties']
logger.info(f"Feature type: {feature_type}, Cities: {city_names}")
coordinates = []
for city in city_names:
coord = get_geo_coordinates(city)
if coord:
coordinates.append(coord)
else:
logger.warning(f"Coordinates not found for city: {city}. Skipping.")
logger.info(f"Collected coordinates: {coordinates}")
# Ensure coordinates has the correct structure for each geometry type
if feature_type == "Point":
if not coordinates:
raise ValueError("Point type requires at least one coordinate.")
# GeoJSON Point expects a single coordinate pair, not a list of pairs
final_coordinates = coordinates[0] if coordinates else []
elif feature_type == "MultiPoint":
final_coordinates = coordinates # List of coordinate pairs
elif feature_type == "LineString":
if len(coordinates) < 2:
raise ValueError("LineString requires at least 2 coordinates.")
final_coordinates = coordinates # List of coordinate pairs
elif feature_type == "Polygon":
if len(coordinates) < 3:
raise ValueError("Polygon requires at least 3 coordinates.")
# Close the polygon by appending the first point at the end
if coordinates[0] != coordinates[-1]: # Check if already closed
coordinates.append(coordinates[0])
final_coordinates = [coordinates] # Nest coordinates for Polygon
else: # MultiLineString, MultiPolygon, GeometryCollection
logger.warning(f"Unsupported or complex feature_type: {feature_type}. Using raw coordinates.")
final_coordinates = coordinates # Or handle more specifically
geojson_data = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": properties,
"geometry": {
"type": feature_type,
"coordinates": final_coordinates,
},
}
],
}
logger.info(f"Generated GeoJSON: {json.dumps(geojson_data, indent=2)}")
return geojson_data
except KeyError as e:
logger.error(f"KeyError while generating GeoJSON: {e}. Response data: {json.dumps(response_data, indent=2)}")
raise
except ValueError as e:
logger.error(f"ValueError while generating GeoJSON: {e}. Coordinates: {coordinates if 'coordinates' in locals() else 'N/A'}")
raise
except Exception as e:
logger.error(f"Unexpected error in generate_geojson: {e}")
raise
# Sort coordinates for a simple polygon (Reduce intersection points)
def sort_coordinates_for_simple_polygon(geojson):
logger.info("Attempting to sort polygon coordinates.")
try:
coordinates = geojson['features'][0]['geometry']['coordinates'][0]
logger.info(f"Original polygon coordinates: {coordinates}")
if not coordinates or len(coordinates) < 3:
logger.warning("Not enough coordinates to sort for a polygon.")
return geojson
# Remove the last point if it duplicates the first (GeoJSON convention for polygons)
if coordinates[0] == coordinates[-1] and len(coordinates) > 1:
plot_coordinates = coordinates[:-1]
else:
plot_coordinates = coordinates
if not plot_coordinates or len(plot_coordinates) < 3: # Check again after potentially removing last point
logger.warning("Not enough unique coordinates to sort for a polygon after de-duplication.")
return geojson
# Calculate the centroid of the points
centroid_x = sum(point[0] for point in plot_coordinates) / len(plot_coordinates)
centroid_y = sum(point[1] for point in plot_coordinates) / len(plot_coordinates)
logger.info(f"Calculated centroid: ({centroid_x}, {centroid_y})")
def angle_from_centroid(point):
dx = point[0] - centroid_x
dy = point[1] - centroid_y
return math.atan2(dy, dx)
sorted_plot_coordinates = sorted(plot_coordinates, key=angle_from_centroid)
sorted_plot_coordinates.append(sorted_plot_coordinates[0]) # Close the polygon
geojson['features'][0]['geometry']['coordinates'][0] = sorted_plot_coordinates
logger.info(f"Sorted polygon coordinates: {sorted_plot_coordinates}")
return geojson
except Exception as e:
logger.error(f"Error sorting polygon coordinates: {e}")
return geojson # Return original on error
# Generate static map image
@spaces.GPU
def generate_static_map(geojson_data, invisible=False):
logger.info(f"Generating static map. Invisible: {invisible}. GeoJSON: {json.dumps(geojson_data, indent=2)}")
try:
m = StaticMap(600, 600)
color = '#1C00ff00' if invisible else '#42445A85' # Transparent if invisible, else semi-transparent blue/grey
for feature in geojson_data["features"]:
geom_type = feature["geometry"]["type"]
coords = feature["geometry"]["coordinates"]
logger.info(f"Processing feature type: {geom_type} with coords: {coords}")
if geom_type == "Point":
# Coords for Point is a single [lon, lat]
if coords and len(coords) == 2 and isinstance(coords[0], (int, float)):
m.add_marker(CircleMarker((coords[0], coords[1]), color, 20 if invisible else 10)) # Adjusted size
else:
logger.warning(f"Skipping Point due to invalid coordinate structure: {coords}")
elif geom_type == "MultiPoint":
# Coords for MultiPoint is a list of [lon, lat]
for coord_pair in coords:
if coord_pair and len(coord_pair) == 2 and isinstance(coord_pair[0], (int, float)):
m.add_marker(CircleMarker((coord_pair[0], coord_pair[1]), color, 20 if invisible else 10))
else:
logger.warning(f"Skipping point in MultiPoint due to invalid coordinate structure: {coord_pair}")
elif geom_type == "LineString":
# Coords for LineString is a list of [lon, lat]
if len(coords) >=2:
m.add_line(Polygon([(c[0], c[1]) for c in coords], "blue", 3)) # For LineString, use add_line or thicker Polygon outline
else:
logger.warning(f"Skipping LineString, not enough points: {coords}")
elif geom_type == "Polygon":
# Coords for Polygon is a list containing one list of [lon, lat] (the exterior ring)
for polygon_ring in coords: # Should be only one for simple polygon
if len(polygon_ring) >= 3:
m.add_polygon(Polygon([(c[0], c[1]) for c in polygon_ring], color, '#0000AA' if not invisible else '#1C00ff00', 3 if not invisible else 0))
else:
logger.warning(f"Skipping polygon ring, not enough points: {polygon_ring}")
# Add handling for MultiLineString, MultiPolygon if your OpenAI might produce them
else:
logger.warning(f"Unsupported geometry type for static map: {geom_type}")
rendered_map = m.render(center=None, zoom=None) # Let it auto-center and zoom
logger.info(f"Static map rendered successfully. Invisible: {invisible}")
return rendered_map
except Exception as e:
logger.error(f"Error generating static map (invisible={invisible}): {e}")
# Return a placeholder or re-raise
return Image.new("RGB", (600, 600), color="grey") # Placeholder
# ControlNet pipeline setup
logger.info("Initializing Stable Diffusion Inpaint Pipeline.")
try:
# controlnet = ControlNetModel.from_pretrained("stabilityai/stable-diffusion-2-inpainting", torch_dtype=torch.float16)
# pipeline = StableDiffusionControlNetInpaintPipeline.from_pretrained(
# "runwayml/stable-diffusion-inpainting", controlnet=controlnet, torch_dtype=torch.float16 # Changed base model
# )
pipeline = StableDiffusionInpaintPipeline.from_pretrained(
"stabilityai/stable-diffusion-2-inpainting", # This is a full inpainting pipeline, not just a controlnet
torch_dtype=torch.float16,
)
pipeline.to("cuda")
logger.info("Stable Diffusion Inpaint Pipeline loaded to CUDA.")
except Exception as e:
logger.error(f"Error initializing Stable Diffusion pipeline: {e}")
raise
# This function was for ControlNet, may not be needed as-is for StableDiffusionInpaintPipeline
# It expects init_image to be a NumPy array, and mask_image a NumPy array
@spaces.GPU
def make_inpaint_condition(init_image_pil, mask_image_pil):
logger.info("Preparing inpaint condition (ControlNet specific, may need adjustment).")
# Ensure PIL Images are converted to NumPy arrays correctly
init_image_np = np.array(init_image_pil.convert("RGB")).astype(np.float32) / 255.0
mask_image_np = np.array(mask_image_pil.convert("L")).astype(np.float32) / 255.0 # Ensure mask is L
logger.info(f"Init image shape: {init_image_np.shape}, Mask image shape: {mask_image_np.shape}")
if init_image_np.shape[:2] != mask_image_np.shape[:2]:
logger.error(f"Image and mask dimensions mismatch: {init_image_np.shape[:2]} vs {mask_image_np.shape[:2]}")
# Resize mask to match image if necessary, or raise error
# For now, let's assume they should match and this is an error state
raise ValueError("Image and mask_image must have the same height and width.")
# This operation is specific to how some ControlNet inpainting expects masked areas.
# Standard SDInpaintPipeline might not need this.
# init_image_np[mask_image_np > 0.5] = -1.0 # set as masked pixel
# init_image_np = np.expand_dims(init_image_np, 0).transpose(0, 3, 1, 2)
# init_image_tensor = torch.from_numpy(init_image_np)
# logger.info(f"Processed init_image tensor shape: {init_image_tensor.shape}")
# return init_image_tensor
# For StableDiffusionInpaintPipeline, `image` and `mask_image` are passed directly as PIL Images or tensors.
# The `make_inpaint_condition` might be redundant if you are not using a ControlNet that specifically requires this format.
# If you were using ControlNet, this would be the control_image.
# For now, let's assume it's meant to be the 'image' input for SD Inpaint, preprocessed.
return init_image_pil # Or init_image_tensor if pipeline expects tensor
@spaces.GPU
def generate_satellite_image(base_image_pil, mask_image_pil, prompt):
logger.info(f"Generating satellite image with prompt: '{prompt}'")
logger.info(f"Base image type: {type(base_image_pil)}, Mask image type: {type(mask_image_pil)}")
try:
# StableDiffusionInpaintPipeline expects PIL Images or tensors for image and mask_image
# The `control_image` argument is not standard for StableDiffusionInpaintPipeline.
# It's specific to StableDiffusionControlNetInpaintPipeline.
# If you were using the ControlNet variant:
# control_image_tensor = make_inpaint_condition(base_image_pil, mask_image_pil)
# result = pipeline(
# prompt=prompt,
# image=base_image_pil, # or tensor version if pipeline prefers
# mask_image=mask_image_pil, # or tensor version
# control_image=control_image_tensor, # This is for ControlNet
# strength=0.47, # strength might be called differently or not used in SD Inpaint
# guidance_scale=9.5, # Adjusted scale
# num_inference_steps=50 # Adjusted steps
# ).images[0]
# For StableDiffusionInpaintPipeline:
result = pipeline(
prompt=prompt,
image=base_image_pil, # PIL Image or PyTorch tensor
mask_image=mask_image_pil, # PIL Image or PyTorch tensor
guidance_scale=9.5, # More reasonable default
num_inference_steps=50 # More reasonable default
).images[0]
logger.info("Satellite image generated successfully.")
return result
except Exception as e:
logger.error(f"Error generating satellite image: {e}")
return Image.new("RGB", base_image_pil.size, color="red") # Placeholder
# Gradio UI
@spaces.GPU
def handle_query(query: str):
logger.info(f"--- Handling query: {query} ---")
try:
openai_response = process_openai_response(query)
logger.info(f"handle_query: OpenAI response received: type={type(openai_response)}")
geojson_data = generate_geojson(openai_response)
logger.info(f"handle_query: GeoJSON data generated: type={type(geojson_data)}")
processed_geojson_data = geojson_data
if geojson_data["features"][0]["geometry"]["type"] == 'Polygon':
logger.info("handle_query: Detected Polygon, attempting to sort coordinates.")
processed_geojson_data = sort_coordinates_for_simple_polygon(geojson_data)
map_image = generate_static_map(processed_geojson_data, invisible=False)
logger.info(f"handle_query: Visible map_image generated: type={type(map_image)}")
empty_map_image = generate_static_map(processed_geojson_data, invisible=True) # Use processed_geojson_data here too
logger.info(f"handle_query: Invisible empty_map_image generated: type={type(empty_map_image)}")
# Ensure images are PIL for diff
map_array = np.array(map_image.convert("RGB"))
empty_map_array = np.array(empty_map_image.convert("RGB"))
difference = np.abs(map_array - empty_map_array)
threshold = 10 # May need adjustment
mask_array = (np.sum(difference, axis=-1) > threshold).astype(np.uint8) * 255
mask_image = Image.fromarray(mask_array, mode="L")
logger.info(f"handle_query: Mask image generated: type={type(mask_image)}")
prompt_for_image = openai_response['output']['feature_representation']['properties']['description']
logger.info(f"handle_query: Prompt for satellite image: '{prompt_for_image}', type={type(prompt_for_image)}")
# Pass empty_map_image (which is the base map without visible markers)
# and the derived mask_image to the inpainting function
satellite_image = generate_satellite_image(
empty_map_image, mask_image, prompt_for_image
)
logger.info(f"handle_query: Satellite image generated: type={type(satellite_image)}")
# Ensure all returned image types are PIL Images
final_map_image = map_image if isinstance(map_image, Image.Image) else Image.new("RGB", (600,600), "grey")
final_satellite_image = satellite_image if isinstance(satellite_image, Image.Image) else Image.new("RGB", (600,600), "red")
final_empty_map_image = empty_map_image if isinstance(empty_map_image, Image.Image) else Image.new("RGB", (600,600), "grey")
final_mask_image = mask_image if isinstance(mask_image, Image.Image) else Image.new("L", (600,600), 0)
logger.info(f"handle_query: Returning types: {type(final_map_image)}, {type(final_satellite_image)}, {type(final_empty_map_image)}, {type(final_mask_image)}, {type(prompt_for_image)}")
return final_map_image, final_satellite_image, final_empty_map_image, final_mask_image, prompt_for_image
except Exception as e:
logger.error(f"--- Error in handle_query for query '{query}': {e} ---", exc_info=True)
# Return placeholder/error images and message
error_img = Image.new("RGB", (600, 600), "black")
error_text_img = ImageDraw.Draw(error_img)
error_text_img.text((10,10), f"Error: {e}", fill="white")
return error_img, error_img, error_img, error_img, f"Error processing query: {e}"
def update_query(selected_query_value: str) -> str: # Added type hints
logger.info(f"Dropdown changed. Selected query: '{selected_query_value}', type: {type(selected_query_value)}")
return selected_query_value
logger.info("Defining Gradio UI components.")
query_options = [
"Area covering south asian subcontinent",
"Mark a triangular area using New York, Boston, and Texas", # Texas is a state, might cause issues with geocoding as a city point
"Mark cities in India",
"Show me Lotus Tower in a Map",
"Mark the area of west germany",
"Mark the area of the Amazon rainforest",
"Mark the area of the Sahara desert"
]
logger.info(f"Query options: {query_options}")
# It's crucial that the `value` parameters for components are of the type Gradio expects
# for their schema generation, even before any function is called.
# For gr.Textbox, `value` should be a string.
# For gr.Dropdown, `value` should be one of the `choices` or None.
try:
with gr.Blocks() as demo:
logger.info("Inside gr.Blocks() context manager.")
with gr.Row():
logger.info("Defining first gr.Row.")
selected_query = gr.Dropdown(label="Select Query", choices=query_options, value=query_options[-1], type="value") # Ensure type="value" if not default
logger.info(f"selected_query Dropdown defined. Initial value: '{query_options[-1]}', type: {type(query_options[-1])}")
query_input = gr.Textbox(label="Enter Query", value=str(query_options[-1])) # Ensure value is string
logger.info(f"query_input Textbox defined. Initial value: '{query_options[-1]}', type: {type(query_options[-1])}")
# The `change` event should not cause the schema error, but good to log
selected_query.change(fn=update_query, inputs=selected_query, outputs=query_input)
logger.info("selected_query.change event defined.")
submit_btn = gr.Button("Submit")
logger.info("submit_btn Button defined.")
with gr.Row():
logger.info("Defining second gr.Row for image outputs.")
map_output = gr.Image(label="Map Visualization") # No initial value needed here, will be populated by function
logger.info("map_output Image defined.")
satellite_output = gr.Image(label="Generated Map Image")
logger.info("satellite_output Image defined.")
with gr.Row():
logger.info("Defining third gr.Row for debug outputs.")
empty_map_output = gr.Image(label="Empty Visualization")
logger.info("empty_map_output Image defined.")
mask_output = gr.Image(label="Mask")
logger.info("mask_output Image defined.")
# For image_prompt, provide a default string value or None. An empty string is fine.
image_prompt_output = gr.Textbox(label="Image Prompt Used", value="") # Changed name to avoid conflict, ensure string value
logger.info(f"image_prompt_output Textbox defined. Initial value: '', type: str")
# The outputs list must match the number and expected types of what handle_query returns.
# handle_query returns: PIL.Image, PIL.Image, PIL.Image, PIL.Image, str
# Gradio components: gr.Image, gr.Image, gr.Image, gr.Image, gr.Textbox
# This mapping looks correct.
submit_btn.click(fn=handle_query,
inputs=[query_input],
outputs=[map_output, satellite_output, empty_map_output, mask_output, image_prompt_output])
logger.info("submit_btn.click event defined.")
logger.info("Gradio Blocks defined successfully.")
except Exception as e:
logger.error(f"Error during Gradio UI definition: {e}", exc_info=True)
raise
if __name__ == "__main__":
logger.info("Launching Gradio demo.")
try:
demo.launch() # debug=True can sometimes give more frontend info, but not for this backend error
logger.info("Gradio demo launched.")
except Exception as e:
logger.error(f"Error launching Gradio demo: {e}", exc_info=True)
raise