import os import json import numpy as np import torch from PIL import Image, ImageDraw import gradio as gr from openai import OpenAI from geopy.geocoders import Nominatim from staticmap import StaticMap, CircleMarker, Polygon from diffusers import ControlNetModel, StableDiffusionControlNetInpaintPipeline from diffusers import StableDiffusionInpaintPipeline import spaces import logging import math from typing import List, Union # Make sure these are actually used or remove them # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s') logger = logging.getLogger(__name__) logger.info("Script starting. Initializing APIs and models.") # Initialize APIs try: openai_client = OpenAI(api_key=os.environ['OPENAI_API_KEY']) logger.info("OpenAI client initialized.") except KeyError: logger.error("OPENAI_API_KEY environment variable not set!") # Handle this critical error, perhaps exit or raise raise except Exception as e: logger.error(f"Error initializing OpenAI client: {e}") raise try: geolocator = Nominatim(user_agent="geoapi_visualizemap") # More specific user agent logger.info("Geolocator initialized.") except Exception as e: logger.error(f"Error initializing Geolocator: {e}") raise # Function to fetch coordinates @spaces.GPU def get_geo_coordinates(location_name): logger.info(f"Attempting to fetch coordinates for: {location_name}") try: location = geolocator.geocode(location_name, timeout=10) # Added timeout if location: logger.info(f"Coordinates found for {location_name}: {[location.longitude, location.latitude]}") return [location.longitude, location.latitude] logger.warning(f"No location data returned for {location_name}") return None except Exception as e: logger.error(f"Error fetching coordinates for {location_name}: {e}") return None # Function to process OpenAI chat response @spaces.GPU def process_openai_response(query): logger.info(f"Processing OpenAI query: {query}") try: response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": """ You are an assistant that generates structured JSON output for geographical queries with city names. Your task is to generate a JSON object containing information about geographical features and their representation based on the user's query. Follow these rules: 1. The JSON should always have the following structure: { "input": "", "output": { "answer": "", "feature_representation": { "type": "", "cities": [""], "properties": { "description": "" } } } } 2. For the `type` field in `feature_representation`: - Use "Point" for single city queries. - Use "MultiPoint" for queries involving multiple cities not forming a line or area. - Use "LineString" for queries about paths between two or more cities. - Use "Polygon" for queries about areas formed by three or more cities. 3. For the `cities` field: - List the names of cities mentioned in the query in the order they appear. - If no cities are mentioned, try to add them with your knowledge. 4. For the `properties.description` field: - Describe the geographical feature in a creative way, suitable for generating an image with a diffusion model. ### Example Input: "Mark a triangular area of 3 US cities." ### Example Output: { "input": "Mark a triangular area of 3 US cities.", "output": { "answer": "The cities New York, Boston, and Philadelphia form a triangle.", "feature_representation": { "type": "Polygon", "cities": ["New York", "Boston", "Philadelphia"], "properties": { "description": "A satellite image of a triangular area formed by New York, Boston, and Philadelphia, with green fields and urban regions, 4k resolution, highly detailed." } } } } Generate similar JSON for the following query: """ }, { "role": "user", "content": query } ], temperature=1, max_tokens=2048, top_p=1, frequency_penalty=0, presence_penalty=0, response_format={"type": "json_object"} ) content = response.choices[0].message.content logger.info(f"Raw OpenAI response content: {content}") parsed_response = json.loads(content) logger.info(f"Parsed OpenAI response: {json.dumps(parsed_response, indent=2)}") return parsed_response except Exception as e: logger.error(f"Error processing OpenAI response for query '{query}': {e}") # Consider returning a default error structure or re-raising raise # Generate GeoJSON from OpenAI response @spaces.GPU def generate_geojson(response_data): # Renamed to avoid confusion with http response logger.info(f"Generating GeoJSON from OpenAI response_data: {json.dumps(response_data, indent=2)}") try: feature_type = response_data['output']['feature_representation']['type'] city_names = response_data['output']['feature_representation']['cities'] properties = response_data['output']['feature_representation']['properties'] logger.info(f"Feature type: {feature_type}, Cities: {city_names}") coordinates = [] for city in city_names: coord = get_geo_coordinates(city) if coord: coordinates.append(coord) else: logger.warning(f"Coordinates not found for city: {city}. Skipping.") logger.info(f"Collected coordinates: {coordinates}") # Ensure coordinates has the correct structure for each geometry type if feature_type == "Point": if not coordinates: raise ValueError("Point type requires at least one coordinate.") # GeoJSON Point expects a single coordinate pair, not a list of pairs final_coordinates = coordinates[0] if coordinates else [] elif feature_type == "MultiPoint": final_coordinates = coordinates # List of coordinate pairs elif feature_type == "LineString": if len(coordinates) < 2: raise ValueError("LineString requires at least 2 coordinates.") final_coordinates = coordinates # List of coordinate pairs elif feature_type == "Polygon": if len(coordinates) < 3: raise ValueError("Polygon requires at least 3 coordinates.") # Close the polygon by appending the first point at the end if coordinates[0] != coordinates[-1]: # Check if already closed coordinates.append(coordinates[0]) final_coordinates = [coordinates] # Nest coordinates for Polygon else: # MultiLineString, MultiPolygon, GeometryCollection logger.warning(f"Unsupported or complex feature_type: {feature_type}. Using raw coordinates.") final_coordinates = coordinates # Or handle more specifically geojson_data = { "type": "FeatureCollection", "features": [ { "type": "Feature", "properties": properties, "geometry": { "type": feature_type, "coordinates": final_coordinates, }, } ], } logger.info(f"Generated GeoJSON: {json.dumps(geojson_data, indent=2)}") return geojson_data except KeyError as e: logger.error(f"KeyError while generating GeoJSON: {e}. Response data: {json.dumps(response_data, indent=2)}") raise except ValueError as e: logger.error(f"ValueError while generating GeoJSON: {e}. Coordinates: {coordinates if 'coordinates' in locals() else 'N/A'}") raise except Exception as e: logger.error(f"Unexpected error in generate_geojson: {e}") raise # Sort coordinates for a simple polygon (Reduce intersection points) def sort_coordinates_for_simple_polygon(geojson): logger.info("Attempting to sort polygon coordinates.") try: coordinates = geojson['features'][0]['geometry']['coordinates'][0] logger.info(f"Original polygon coordinates: {coordinates}") if not coordinates or len(coordinates) < 3: logger.warning("Not enough coordinates to sort for a polygon.") return geojson # Remove the last point if it duplicates the first (GeoJSON convention for polygons) if coordinates[0] == coordinates[-1] and len(coordinates) > 1: plot_coordinates = coordinates[:-1] else: plot_coordinates = coordinates if not plot_coordinates or len(plot_coordinates) < 3: # Check again after potentially removing last point logger.warning("Not enough unique coordinates to sort for a polygon after de-duplication.") return geojson # Calculate the centroid of the points centroid_x = sum(point[0] for point in plot_coordinates) / len(plot_coordinates) centroid_y = sum(point[1] for point in plot_coordinates) / len(plot_coordinates) logger.info(f"Calculated centroid: ({centroid_x}, {centroid_y})") def angle_from_centroid(point): dx = point[0] - centroid_x dy = point[1] - centroid_y return math.atan2(dy, dx) sorted_plot_coordinates = sorted(plot_coordinates, key=angle_from_centroid) sorted_plot_coordinates.append(sorted_plot_coordinates[0]) # Close the polygon geojson['features'][0]['geometry']['coordinates'][0] = sorted_plot_coordinates logger.info(f"Sorted polygon coordinates: {sorted_plot_coordinates}") return geojson except Exception as e: logger.error(f"Error sorting polygon coordinates: {e}") return geojson # Return original on error # Generate static map image @spaces.GPU def generate_static_map(geojson_data, invisible=False): logger.info(f"Generating static map. Invisible: {invisible}. GeoJSON: {json.dumps(geojson_data, indent=2)}") try: m = StaticMap(600, 600) color = '#1C00ff00' if invisible else '#42445A85' # Transparent if invisible, else semi-transparent blue/grey for feature in geojson_data["features"]: geom_type = feature["geometry"]["type"] coords = feature["geometry"]["coordinates"] logger.info(f"Processing feature type: {geom_type} with coords: {coords}") if geom_type == "Point": # Coords for Point is a single [lon, lat] if coords and len(coords) == 2 and isinstance(coords[0], (int, float)): m.add_marker(CircleMarker((coords[0], coords[1]), color, 20 if invisible else 10)) # Adjusted size else: logger.warning(f"Skipping Point due to invalid coordinate structure: {coords}") elif geom_type == "MultiPoint": # Coords for MultiPoint is a list of [lon, lat] for coord_pair in coords: if coord_pair and len(coord_pair) == 2 and isinstance(coord_pair[0], (int, float)): m.add_marker(CircleMarker((coord_pair[0], coord_pair[1]), color, 20 if invisible else 10)) else: logger.warning(f"Skipping point in MultiPoint due to invalid coordinate structure: {coord_pair}") elif geom_type == "LineString": # Coords for LineString is a list of [lon, lat] if len(coords) >=2: m.add_line(Polygon([(c[0], c[1]) for c in coords], "blue", 3)) # For LineString, use add_line or thicker Polygon outline else: logger.warning(f"Skipping LineString, not enough points: {coords}") elif geom_type == "Polygon": # Coords for Polygon is a list containing one list of [lon, lat] (the exterior ring) for polygon_ring in coords: # Should be only one for simple polygon if len(polygon_ring) >= 3: m.add_polygon(Polygon([(c[0], c[1]) for c in polygon_ring], color, '#0000AA' if not invisible else '#1C00ff00', 3 if not invisible else 0)) else: logger.warning(f"Skipping polygon ring, not enough points: {polygon_ring}") # Add handling for MultiLineString, MultiPolygon if your OpenAI might produce them else: logger.warning(f"Unsupported geometry type for static map: {geom_type}") rendered_map = m.render(center=None, zoom=None) # Let it auto-center and zoom logger.info(f"Static map rendered successfully. Invisible: {invisible}") return rendered_map except Exception as e: logger.error(f"Error generating static map (invisible={invisible}): {e}") # Return a placeholder or re-raise return Image.new("RGB", (600, 600), color="grey") # Placeholder # ControlNet pipeline setup logger.info("Initializing Stable Diffusion Inpaint Pipeline.") try: # controlnet = ControlNetModel.from_pretrained("stabilityai/stable-diffusion-2-inpainting", torch_dtype=torch.float16) # pipeline = StableDiffusionControlNetInpaintPipeline.from_pretrained( # "runwayml/stable-diffusion-inpainting", controlnet=controlnet, torch_dtype=torch.float16 # Changed base model # ) pipeline = StableDiffusionInpaintPipeline.from_pretrained( "stabilityai/stable-diffusion-2-inpainting", # This is a full inpainting pipeline, not just a controlnet torch_dtype=torch.float16, ) pipeline.to("cuda") logger.info("Stable Diffusion Inpaint Pipeline loaded to CUDA.") except Exception as e: logger.error(f"Error initializing Stable Diffusion pipeline: {e}") raise # This function was for ControlNet, may not be needed as-is for StableDiffusionInpaintPipeline # It expects init_image to be a NumPy array, and mask_image a NumPy array @spaces.GPU def make_inpaint_condition(init_image_pil, mask_image_pil): logger.info("Preparing inpaint condition (ControlNet specific, may need adjustment).") # Ensure PIL Images are converted to NumPy arrays correctly init_image_np = np.array(init_image_pil.convert("RGB")).astype(np.float32) / 255.0 mask_image_np = np.array(mask_image_pil.convert("L")).astype(np.float32) / 255.0 # Ensure mask is L logger.info(f"Init image shape: {init_image_np.shape}, Mask image shape: {mask_image_np.shape}") if init_image_np.shape[:2] != mask_image_np.shape[:2]: logger.error(f"Image and mask dimensions mismatch: {init_image_np.shape[:2]} vs {mask_image_np.shape[:2]}") # Resize mask to match image if necessary, or raise error # For now, let's assume they should match and this is an error state raise ValueError("Image and mask_image must have the same height and width.") # This operation is specific to how some ControlNet inpainting expects masked areas. # Standard SDInpaintPipeline might not need this. # init_image_np[mask_image_np > 0.5] = -1.0 # set as masked pixel # init_image_np = np.expand_dims(init_image_np, 0).transpose(0, 3, 1, 2) # init_image_tensor = torch.from_numpy(init_image_np) # logger.info(f"Processed init_image tensor shape: {init_image_tensor.shape}") # return init_image_tensor # For StableDiffusionInpaintPipeline, `image` and `mask_image` are passed directly as PIL Images or tensors. # The `make_inpaint_condition` might be redundant if you are not using a ControlNet that specifically requires this format. # If you were using ControlNet, this would be the control_image. # For now, let's assume it's meant to be the 'image' input for SD Inpaint, preprocessed. return init_image_pil # Or init_image_tensor if pipeline expects tensor @spaces.GPU def generate_satellite_image(base_image_pil, mask_image_pil, prompt): logger.info(f"Generating satellite image with prompt: '{prompt}'") logger.info(f"Base image type: {type(base_image_pil)}, Mask image type: {type(mask_image_pil)}") try: # StableDiffusionInpaintPipeline expects PIL Images or tensors for image and mask_image # The `control_image` argument is not standard for StableDiffusionInpaintPipeline. # It's specific to StableDiffusionControlNetInpaintPipeline. # If you were using the ControlNet variant: # control_image_tensor = make_inpaint_condition(base_image_pil, mask_image_pil) # result = pipeline( # prompt=prompt, # image=base_image_pil, # or tensor version if pipeline prefers # mask_image=mask_image_pil, # or tensor version # control_image=control_image_tensor, # This is for ControlNet # strength=0.47, # strength might be called differently or not used in SD Inpaint # guidance_scale=9.5, # Adjusted scale # num_inference_steps=50 # Adjusted steps # ).images[0] # For StableDiffusionInpaintPipeline: result = pipeline( prompt=prompt, image=base_image_pil, # PIL Image or PyTorch tensor mask_image=mask_image_pil, # PIL Image or PyTorch tensor guidance_scale=9.5, # More reasonable default num_inference_steps=50 # More reasonable default ).images[0] logger.info("Satellite image generated successfully.") return result except Exception as e: logger.error(f"Error generating satellite image: {e}") return Image.new("RGB", base_image_pil.size, color="red") # Placeholder # Gradio UI @spaces.GPU def handle_query(query: str): logger.info(f"--- Handling query: {query} ---") try: openai_response = process_openai_response(query) logger.info(f"handle_query: OpenAI response received: type={type(openai_response)}") geojson_data = generate_geojson(openai_response) logger.info(f"handle_query: GeoJSON data generated: type={type(geojson_data)}") processed_geojson_data = geojson_data if geojson_data["features"][0]["geometry"]["type"] == 'Polygon': logger.info("handle_query: Detected Polygon, attempting to sort coordinates.") processed_geojson_data = sort_coordinates_for_simple_polygon(geojson_data) map_image = generate_static_map(processed_geojson_data, invisible=False) logger.info(f"handle_query: Visible map_image generated: type={type(map_image)}") empty_map_image = generate_static_map(processed_geojson_data, invisible=True) # Use processed_geojson_data here too logger.info(f"handle_query: Invisible empty_map_image generated: type={type(empty_map_image)}") # Ensure images are PIL for diff map_array = np.array(map_image.convert("RGB")) empty_map_array = np.array(empty_map_image.convert("RGB")) difference = np.abs(map_array - empty_map_array) threshold = 10 # May need adjustment mask_array = (np.sum(difference, axis=-1) > threshold).astype(np.uint8) * 255 mask_image = Image.fromarray(mask_array, mode="L") logger.info(f"handle_query: Mask image generated: type={type(mask_image)}") prompt_for_image = openai_response['output']['feature_representation']['properties']['description'] logger.info(f"handle_query: Prompt for satellite image: '{prompt_for_image}', type={type(prompt_for_image)}") # Pass empty_map_image (which is the base map without visible markers) # and the derived mask_image to the inpainting function satellite_image = generate_satellite_image( empty_map_image, mask_image, prompt_for_image ) logger.info(f"handle_query: Satellite image generated: type={type(satellite_image)}") # Ensure all returned image types are PIL Images final_map_image = map_image if isinstance(map_image, Image.Image) else Image.new("RGB", (600,600), "grey") final_satellite_image = satellite_image if isinstance(satellite_image, Image.Image) else Image.new("RGB", (600,600), "red") final_empty_map_image = empty_map_image if isinstance(empty_map_image, Image.Image) else Image.new("RGB", (600,600), "grey") final_mask_image = mask_image if isinstance(mask_image, Image.Image) else Image.new("L", (600,600), 0) logger.info(f"handle_query: Returning types: {type(final_map_image)}, {type(final_satellite_image)}, {type(final_empty_map_image)}, {type(final_mask_image)}, {type(prompt_for_image)}") return final_map_image, final_satellite_image, final_empty_map_image, final_mask_image, prompt_for_image except Exception as e: logger.error(f"--- Error in handle_query for query '{query}': {e} ---", exc_info=True) # Return placeholder/error images and message error_img = Image.new("RGB", (600, 600), "black") error_text_img = ImageDraw.Draw(error_img) error_text_img.text((10,10), f"Error: {e}", fill="white") return error_img, error_img, error_img, error_img, f"Error processing query: {e}" def update_query(selected_query_value: str) -> str: # Added type hints logger.info(f"Dropdown changed. Selected query: '{selected_query_value}', type: {type(selected_query_value)}") return selected_query_value logger.info("Defining Gradio UI components.") query_options = [ "Area covering south asian subcontinent", "Mark a triangular area using New York, Boston, and Texas", # Texas is a state, might cause issues with geocoding as a city point "Mark cities in India", "Show me Lotus Tower in a Map", "Mark the area of west germany", "Mark the area of the Amazon rainforest", "Mark the area of the Sahara desert" ] logger.info(f"Query options: {query_options}") # It's crucial that the `value` parameters for components are of the type Gradio expects # for their schema generation, even before any function is called. # For gr.Textbox, `value` should be a string. # For gr.Dropdown, `value` should be one of the `choices` or None. try: with gr.Blocks() as demo: logger.info("Inside gr.Blocks() context manager.") with gr.Row(): logger.info("Defining first gr.Row.") selected_query = gr.Dropdown(label="Select Query", choices=query_options, value=query_options[-1], type="value") # Ensure type="value" if not default logger.info(f"selected_query Dropdown defined. Initial value: '{query_options[-1]}', type: {type(query_options[-1])}") query_input = gr.Textbox(label="Enter Query", value=str(query_options[-1])) # Ensure value is string logger.info(f"query_input Textbox defined. Initial value: '{query_options[-1]}', type: {type(query_options[-1])}") # The `change` event should not cause the schema error, but good to log selected_query.change(fn=update_query, inputs=selected_query, outputs=query_input) logger.info("selected_query.change event defined.") submit_btn = gr.Button("Submit") logger.info("submit_btn Button defined.") with gr.Row(): logger.info("Defining second gr.Row for image outputs.") map_output = gr.Image(label="Map Visualization") # No initial value needed here, will be populated by function logger.info("map_output Image defined.") satellite_output = gr.Image(label="Generated Map Image") logger.info("satellite_output Image defined.") with gr.Row(): logger.info("Defining third gr.Row for debug outputs.") empty_map_output = gr.Image(label="Empty Visualization") logger.info("empty_map_output Image defined.") mask_output = gr.Image(label="Mask") logger.info("mask_output Image defined.") # For image_prompt, provide a default string value or None. An empty string is fine. image_prompt_output = gr.Textbox(label="Image Prompt Used", value="") # Changed name to avoid conflict, ensure string value logger.info(f"image_prompt_output Textbox defined. Initial value: '', type: str") # The outputs list must match the number and expected types of what handle_query returns. # handle_query returns: PIL.Image, PIL.Image, PIL.Image, PIL.Image, str # Gradio components: gr.Image, gr.Image, gr.Image, gr.Image, gr.Textbox # This mapping looks correct. submit_btn.click(fn=handle_query, inputs=[query_input], outputs=[map_output, satellite_output, empty_map_output, mask_output, image_prompt_output]) logger.info("submit_btn.click event defined.") logger.info("Gradio Blocks defined successfully.") except Exception as e: logger.error(f"Error during Gradio UI definition: {e}", exc_info=True) raise if __name__ == "__main__": logger.info("Launching Gradio demo.") try: demo.launch() # debug=True can sometimes give more frontend info, but not for this backend error logger.info("Gradio demo launched.") except Exception as e: logger.error(f"Error launching Gradio demo: {e}", exc_info=True) raise