Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import numpy as np | |
from typing import Dict, List, Tuple, Any, Optional | |
from scene_type import SCENE_TYPES | |
from enhance_scene_descriptor import EnhancedSceneDescriber | |
class SpatialAnalyzer: | |
""" | |
Analyzes spatial relationships between objects in an image. | |
Handles region assignment, object positioning, and functional zone identification. | |
""" | |
def __init__(self, class_names: Dict[int, str] = None, object_categories=None): | |
"""Initialize the spatial analyzer with image regions""" | |
# Define regions of the image (3x3 grid) | |
self.regions = { | |
"top_left": (0, 0, 1/3, 1/3), | |
"top_center": (1/3, 0, 2/3, 1/3), | |
"top_right": (2/3, 0, 1, 1/3), | |
"middle_left": (0, 1/3, 1/3, 2/3), | |
"middle_center": (1/3, 1/3, 2/3, 2/3), | |
"middle_right": (2/3, 1/3, 1, 2/3), | |
"bottom_left": (0, 2/3, 1/3, 1), | |
"bottom_center": (1/3, 2/3, 2/3, 1), | |
"bottom_right": (2/3, 2/3, 1, 1) | |
} | |
self.class_names = class_names | |
self.OBJECT_CATEGORIES = object_categories or {} | |
self.enhance_descriptor = EnhancedSceneDescriber(scene_types=SCENE_TYPES) | |
# Distances thresholds for proximity analysis (normalized) | |
self.proximity_threshold = 0.2 | |
def _determine_region(self, x: float, y: float) -> str: | |
""" | |
Determine which region a point falls into. | |
Args: | |
x: Normalized x-coordinate (0-1) | |
y: Normalized y-coordinate (0-1) | |
Returns: | |
Region name | |
""" | |
for region_name, (x1, y1, x2, y2) in self.regions.items(): | |
if x1 <= x < x2 and y1 <= y < y2: | |
return region_name | |
return "unknown" | |
def _analyze_regions(self, detected_objects: List[Dict]) -> Dict: | |
""" | |
Analyze object distribution across image regions. | |
Args: | |
detected_objects: List of detected objects with position information | |
Returns: | |
Dictionary with region analysis | |
""" | |
# Count objects in each region | |
region_counts = {region: 0 for region in self.regions.keys()} | |
region_objects = {region: [] for region in self.regions.keys()} | |
for obj in detected_objects: | |
region = obj["region"] | |
if region in region_counts: | |
region_counts[region] += 1 | |
region_objects[region].append({ | |
"class_id": obj["class_id"], | |
"class_name": obj["class_name"] | |
}) | |
# Determine main focus regions (top 1-2 regions by object count) | |
sorted_regions = sorted(region_counts.items(), key=lambda x: x[1], reverse=True) | |
main_regions = [region for region, count in sorted_regions if count > 0][:2] | |
return { | |
"counts": region_counts, | |
"main_focus": main_regions, | |
"objects_by_region": region_objects | |
} | |
def _extract_detected_objects(self, detection_result: Any, confidence_threshold: float = 0.25) -> List[Dict]: | |
""" | |
Extract detected objects from detection result with position information. | |
Args: | |
detection_result: Detection result from YOLOv8 | |
confidence_threshold: Minimum confidence threshold | |
Returns: | |
List of dictionaries with detected object information | |
""" | |
boxes = detection_result.boxes.xyxy.cpu().numpy() | |
classes = detection_result.boxes.cls.cpu().numpy().astype(int) | |
confidences = detection_result.boxes.conf.cpu().numpy() | |
# Image dimensions | |
img_height, img_width = detection_result.orig_shape[:2] | |
detected_objects = [] | |
for box, class_id, confidence in zip(boxes, classes, confidences): | |
# Skip objects with confidence below threshold | |
if confidence < confidence_threshold: | |
continue | |
x1, y1, x2, y2 = box | |
width = x2 - x1 | |
height = y2 - y1 | |
# Center point | |
center_x = (x1 + x2) / 2 | |
center_y = (y1 + y2) / 2 | |
# Normalized positions (0-1) | |
norm_x = center_x / img_width | |
norm_y = center_y / img_height | |
norm_width = width / img_width | |
norm_height = height / img_height | |
# Area calculation | |
area = width * height | |
norm_area = area / (img_width * img_height) | |
# Region determination | |
object_region = self._determine_region(norm_x, norm_y) | |
detected_objects.append({ | |
"class_id": int(class_id), | |
"class_name": self.class_names[int(class_id)], | |
"confidence": float(confidence), | |
"box": [float(x1), float(y1), float(x2), float(y2)], | |
"center": [float(center_x), float(center_y)], | |
"normalized_center": [float(norm_x), float(norm_y)], | |
"size": [float(width), float(height)], | |
"normalized_size": [float(norm_width), float(norm_height)], | |
"area": float(area), | |
"normalized_area": float(norm_area), | |
"region": object_region | |
}) | |
return detected_objects | |
def _detect_scene_viewpoint(self, detected_objects: List[Dict]) -> Dict: | |
""" | |
檢測場景視角並識別特殊場景模式。 | |
Args: | |
detected_objects: 檢測到的物體列表 | |
Returns: | |
Dict: 包含視角和場景模式信息的字典 | |
""" | |
if not detected_objects: | |
return {"viewpoint": "eye_level", "patterns": []} | |
# 從物體位置中提取信息 | |
patterns = [] | |
# 檢測行人位置模式 | |
pedestrian_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
# 檢查是否有足夠的行人來識別模式 | |
if len(pedestrian_objs) >= 4: | |
pedestrian_positions = [obj["normalized_center"] for obj in pedestrian_objs] | |
# 檢測十字交叉模式 | |
if self._detect_cross_pattern(pedestrian_positions): | |
patterns.append("crosswalk_intersection") | |
# 檢測多方向行人流 | |
directions = self._analyze_movement_directions(pedestrian_positions) | |
if len(directions) >= 2: | |
patterns.append("multi_directional_movement") | |
# 檢查物體的大小一致性 - 在空中俯視圖中,物體大小通常更一致 | |
if len(detected_objects) >= 5: | |
sizes = [obj.get("normalized_area", 0) for obj in detected_objects] | |
size_variance = np.var(sizes) / (np.mean(sizes) ** 2) # 標準化變異數,不會受到平均值影響 | |
if size_variance < 0.3: # 低變異表示大小一致 | |
patterns.append("consistent_object_size") | |
# 基本視角檢測 | |
viewpoint = self.enhance_descriptor._detect_viewpoint(detected_objects) | |
# 根據檢測到的模式增強視角判斷 | |
if "crosswalk_intersection" in patterns and viewpoint != "aerial": | |
# 如果檢測到斑馬線交叉但視角判斷不是空中視角,優先採用模式判斷 | |
viewpoint = "aerial" | |
return { | |
"viewpoint": viewpoint, | |
"patterns": patterns | |
} | |
def _detect_cross_pattern(self, positions): | |
""" | |
檢測位置中的十字交叉模式 | |
Args: | |
positions: 位置列表 [[x1, y1], [x2, y2], ...] | |
Returns: | |
bool: 是否檢測到十字交叉模式 | |
""" | |
if len(positions) < 8: # 需要足夠多的點 | |
return False | |
# 提取 x 和 y 坐標 | |
x_coords = [pos[0] for pos in positions] | |
y_coords = [pos[1] for pos in positions] | |
# 檢測 x 和 y 方向的聚類 | |
x_clusters = [] | |
y_clusters = [] | |
# 簡化的聚類分析 | |
x_mean = np.mean(x_coords) | |
y_mean = np.mean(y_coords) | |
# 計算在中心線附近的點 | |
near_x_center = sum(1 for x in x_coords if abs(x - x_mean) < 0.1) | |
near_y_center = sum(1 for y in y_coords if abs(y - y_mean) < 0.1) | |
# 如果有足夠的點在中心線附近,可能是十字交叉 | |
return near_x_center >= 3 and near_y_center >= 3 | |
def _analyze_movement_directions(self, positions): | |
""" | |
分析位置中的移動方向 | |
Args: | |
positions: 位置列表 [[x1, y1], [x2, y2], ...] | |
Returns: | |
list: 檢測到的主要方向 | |
""" | |
if len(positions) < 6: | |
return [] | |
# extract x 和 y 坐標 | |
x_coords = [pos[0] for pos in positions] | |
y_coords = [pos[1] for pos in positions] | |
directions = [] | |
# horizontal move (left --> right) | |
x_std = np.std(x_coords) | |
x_range = max(x_coords) - min(x_coords) | |
# vertical move(up --> down) | |
y_std = np.std(y_coords) | |
y_range = max(y_coords) - min(y_coords) | |
# 足夠大的範圍表示該方向有運動 | |
if x_range > 0.4: | |
directions.append("horizontal") | |
if y_range > 0.4: | |
directions.append("vertical") | |
return directions | |
def _identify_functional_zones(self, detected_objects: List[Dict], scene_type: str) -> Dict: | |
""" | |
Identify functional zones within the scene with improved detection for different viewpoints | |
and cultural contexts. | |
Args: | |
detected_objects: List of detected objects | |
scene_type: Identified scene type | |
Returns: | |
Dictionary of functional zones with their descriptions | |
""" | |
# Group objects by category and region | |
category_regions = {} | |
for obj in detected_objects: | |
# Find object category | |
category = "other" | |
for cat_name, cat_ids in self.OBJECT_CATEGORIES.items(): | |
if obj["class_id"] in cat_ids: | |
category = cat_name | |
break | |
# Add to category-region mapping | |
if category not in category_regions: | |
category_regions[category] = {} | |
region = obj["region"] | |
if region not in category_regions[category]: | |
category_regions[category][region] = [] | |
category_regions[category][region].append(obj) | |
# Identify zones based on object groupings | |
zones = {} | |
# Detect viewpoint to adjust zone identification strategy | |
viewpoint = self._detect_scene_viewpoint(detected_objects) | |
# Choose appropriate zone identification strategy based on scene type and viewpoint | |
if scene_type in ["living_room", "bedroom", "dining_area", "kitchen", "office_workspace", "meeting_room"]: | |
# Indoor scenes | |
zones.update(self._identify_indoor_zones(category_regions, detected_objects, scene_type)) | |
elif scene_type in ["city_street", "parking_lot", "park_area"]: | |
# Outdoor general scenes | |
zones.update(self._identify_outdoor_general_zones(category_regions, detected_objects, scene_type)) | |
elif "aerial" in scene_type or viewpoint == "aerial": | |
# Aerial viewpoint scenes | |
zones.update(self._identify_aerial_view_zones(category_regions, detected_objects, scene_type)) | |
elif "asian" in scene_type: | |
# Asian cultural context scenes | |
zones.update(self._identify_asian_cultural_zones(category_regions, detected_objects, scene_type)) | |
elif scene_type == "urban_intersection": | |
# Specific urban intersection logic | |
zones.update(self._identify_intersection_zones(category_regions, detected_objects, viewpoint)) | |
elif scene_type == "financial_district": | |
# Financial district specific logic | |
zones.update(self._identify_financial_district_zones(category_regions, detected_objects)) | |
elif scene_type == "upscale_dining": | |
# Upscale dining specific logic | |
zones.update(self._identify_upscale_dining_zones(category_regions, detected_objects)) | |
else: | |
# Default zone identification for other scene types | |
zones.update(self._identify_default_zones(category_regions, detected_objects)) | |
# If no zones were identified, try the default approach | |
if not zones: | |
zones.update(self._identify_default_zones(category_regions, detected_objects)) | |
return zones | |
def _identify_indoor_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
""" | |
Identify functional zones for indoor scenes. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
scene_type: Specific indoor scene type | |
Returns: | |
Dict: Indoor functional zones | |
""" | |
zones = {} | |
# Seating/social zone | |
if "furniture" in category_regions: | |
furniture_regions = category_regions["furniture"] | |
main_furniture_region = max(furniture_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_furniture_region[0] is not None and len(main_furniture_region[1]) >= 2: | |
zone_objects = [obj["class_name"] for obj in main_furniture_region[1]] | |
zones["social_zone"] = { | |
"region": main_furniture_region[0], | |
"objects": zone_objects, | |
"description": f"Social or seating area with {', '.join(zone_objects)}" | |
} | |
# Entertainment zone | |
if "electronics" in category_regions: | |
electronics_items = [] | |
for region_objects in category_regions["electronics"].values(): | |
electronics_items.extend([obj["class_name"] for obj in region_objects]) | |
if electronics_items: | |
zones["entertainment_zone"] = { | |
"region": self._find_main_region(category_regions.get("electronics", {})), | |
"objects": electronics_items, | |
"description": f"Entertainment or media area with {', '.join(electronics_items)}" | |
} | |
# Dining/food zone | |
food_zone_categories = ["kitchen_items", "food"] | |
food_items = [] | |
food_regions = {} | |
for category in food_zone_categories: | |
if category in category_regions: | |
for region, objects in category_regions[category].items(): | |
if region not in food_regions: | |
food_regions[region] = [] | |
food_regions[region].extend(objects) | |
food_items.extend([obj["class_name"] for obj in objects]) | |
if food_items: | |
main_food_region = max(food_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_food_region[0] is not None: | |
zones["dining_zone"] = { | |
"region": main_food_region[0], | |
"objects": list(set(food_items)), | |
"description": f"Dining or food preparation area with {', '.join(list(set(food_items))[:3])}" | |
} | |
# Work/study zone - enhanced to detect even when scene_type is not explicitly office | |
work_items = [] | |
work_regions = {} | |
for obj in detected_objects: | |
if obj["class_id"] in [56, 60, 63, 64, 66, 73]: # chair, table, laptop, mouse, keyboard, book | |
region = obj["region"] | |
if region not in work_regions: | |
work_regions[region] = [] | |
work_regions[region].append(obj) | |
work_items.append(obj["class_name"]) | |
# Check for laptop and table/chair combinations that suggest a workspace | |
has_laptop = any(obj["class_id"] == 63 for obj in detected_objects) | |
has_keyboard = any(obj["class_id"] == 66 for obj in detected_objects) | |
has_table = any(obj["class_id"] == 60 for obj in detected_objects) | |
has_chair = any(obj["class_id"] == 56 for obj in detected_objects) | |
# If we have electronics with furniture in the same region, likely a workspace | |
workspace_detected = (has_laptop or has_keyboard) and (has_table or has_chair) | |
if (workspace_detected or scene_type in ["office_workspace", "meeting_room"]) and work_items: | |
main_work_region = max(work_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_work_region[0] is not None: | |
zones["workspace_zone"] = { | |
"region": main_work_region[0], | |
"objects": list(set(work_items)), | |
"description": f"Work or study area with {', '.join(list(set(work_items))[:3])}" | |
} | |
# Bedroom-specific zones | |
if scene_type == "bedroom": | |
bed_objects = [obj for obj in detected_objects if obj["class_id"] == 59] # Bed | |
if bed_objects: | |
bed_region = bed_objects[0]["region"] | |
zones["sleeping_zone"] = { | |
"region": bed_region, | |
"objects": ["bed"], | |
"description": "Sleeping area with bed" | |
} | |
# Kitchen-specific zones | |
if scene_type == "kitchen": | |
# Look for appliances (refrigerator, oven, microwave, sink) | |
appliance_ids = [68, 69, 71, 72] # microwave, oven, sink, refrigerator | |
appliance_objects = [obj for obj in detected_objects if obj["class_id"] in appliance_ids] | |
if appliance_objects: | |
appliance_regions = {} | |
for obj in appliance_objects: | |
region = obj["region"] | |
if region not in appliance_regions: | |
appliance_regions[region] = [] | |
appliance_regions[region].append(obj) | |
if appliance_regions: | |
main_appliance_region = max(appliance_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_appliance_region[0] is not None: | |
appliance_names = [obj["class_name"] for obj in main_appliance_region[1]] | |
zones["kitchen_appliance_zone"] = { | |
"region": main_appliance_region[0], | |
"objects": appliance_names, | |
"description": f"Kitchen appliance area with {', '.join(appliance_names)}" | |
} | |
return zones | |
def _identify_intersection_zones(self, category_regions: Dict, detected_objects: List[Dict], viewpoint: str) -> Dict: | |
""" | |
Identify functional zones for urban intersections with enhanced spatial awareness. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
viewpoint: Detected viewpoint | |
Returns: | |
Dict: Refined intersection functional zones | |
""" | |
zones = {} | |
# Get pedestrians, vehicles and traffic signals | |
pedestrian_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
vehicle_objs = [obj for obj in detected_objects if obj["class_id"] in [1, 2, 3, 5, 7]] # bicycle, car, motorcycle, bus, truck | |
traffic_light_objs = [obj for obj in detected_objects if obj["class_id"] == 9] | |
# Create distribution maps for better spatial understanding | |
regions_distribution = self._create_distribution_map(detected_objects) | |
# Analyze pedestrian crossing patterns | |
crossing_zones = self._analyze_crossing_patterns(pedestrian_objs, traffic_light_objs, regions_distribution) | |
zones.update(crossing_zones) | |
# Analyze vehicle traffic zones with directional awareness | |
traffic_zones = self._analyze_traffic_zones(vehicle_objs, regions_distribution) | |
zones.update(traffic_zones) | |
# Identify traffic control zones based on signal placement | |
if traffic_light_objs: | |
# Group traffic lights by region for better organization | |
signal_regions = {} | |
for obj in traffic_light_objs: | |
region = obj["region"] | |
if region not in signal_regions: | |
signal_regions[region] = [] | |
signal_regions[region].append(obj) | |
# Create traffic control zones for each region with signals | |
for idx, (region, signals) in enumerate(signal_regions.items()): | |
# Check if this region has a directional name | |
direction = self._get_directional_description(region) | |
zones[f"traffic_control_zone_{idx+1}"] = { | |
"region": region, | |
"objects": ["traffic light"] * len(signals), | |
"description": f"Traffic control area with {len(signals)} traffic signals" + | |
(f" in {direction} area" if direction else "") | |
} | |
return zones | |
def _analyze_crossing_patterns(self, pedestrians: List[Dict], traffic_lights: List[Dict], | |
region_distribution: Dict) -> Dict: | |
""" | |
Analyze pedestrian crossing patterns to identify crosswalk zones. | |
Args: | |
pedestrians: List of pedestrian objects | |
traffic_lights: List of traffic light objects | |
region_distribution: Distribution of objects by region | |
Returns: | |
Dict: Identified crossing zones | |
""" | |
crossing_zones = {} | |
if not pedestrians: | |
return crossing_zones | |
# Group pedestrians by region | |
pedestrian_regions = {} | |
for p in pedestrians: | |
region = p["region"] | |
if region not in pedestrian_regions: | |
pedestrian_regions[region] = [] | |
pedestrian_regions[region].append(p) | |
# Sort regions by pedestrian count to find main crossing areas | |
sorted_regions = sorted(pedestrian_regions.items(), key=lambda x: len(x[1]), reverse=True) | |
# Create crossing zones for regions with pedestrians | |
for idx, (region, peds) in enumerate(sorted_regions[:2]): # Focus on top 2 regions | |
# Check if there are traffic lights nearby to indicate a crosswalk | |
has_nearby_signals = any(t["region"] == region for t in traffic_lights) | |
# Create crossing zone with descriptive naming | |
zone_name = f"crossing_zone_{idx+1}" | |
direction = self._get_directional_description(region) | |
description = f"Pedestrian crossing area with {len(peds)} " | |
description += "person" if len(peds) == 1 else "people" | |
if direction: | |
description += f" in {direction} direction" | |
if has_nearby_signals: | |
description += " near traffic signals" | |
crossing_zones[zone_name] = { | |
"region": region, | |
"objects": ["pedestrian"] * len(peds), | |
"description": description | |
} | |
return crossing_zones | |
def _analyze_traffic_zones(self, vehicles: List[Dict], region_distribution: Dict) -> Dict: | |
""" | |
Analyze vehicle distribution to identify traffic zones with directional awareness. | |
Args: | |
vehicles: List of vehicle objects | |
region_distribution: Distribution of objects by region | |
Returns: | |
Dict: Identified traffic zones | |
""" | |
traffic_zones = {} | |
if not vehicles: | |
return traffic_zones | |
# Group vehicles by region | |
vehicle_regions = {} | |
for v in vehicles: | |
region = v["region"] | |
if region not in vehicle_regions: | |
vehicle_regions[region] = [] | |
vehicle_regions[region].append(v) | |
# Create traffic zones for regions with vehicles | |
main_traffic_region = max(vehicle_regions.items(), key=lambda x: len(x[1]), default=(None, [])) | |
if main_traffic_region[0] is not None: | |
region = main_traffic_region[0] | |
vehicles_in_region = main_traffic_region[1] | |
# Get a list of vehicle types for description | |
vehicle_types = [v["class_name"] for v in vehicles_in_region] | |
unique_types = list(set(vehicle_types)) | |
# Get directional description | |
direction = self._get_directional_description(region) | |
# Create descriptive zone | |
traffic_zones["vehicle_zone"] = { | |
"region": region, | |
"objects": vehicle_types, | |
"description": f"Vehicle traffic area with {', '.join(unique_types[:3])}" + | |
(f" in {direction} area" if direction else "") | |
} | |
# If vehicles are distributed across multiple regions, create secondary zones | |
if len(vehicle_regions) > 1: | |
# Get second most populated region | |
sorted_regions = sorted(vehicle_regions.items(), key=lambda x: len(x[1]), reverse=True) | |
if len(sorted_regions) > 1: | |
second_region, second_vehicles = sorted_regions[1] | |
direction = self._get_directional_description(second_region) | |
vehicle_types = [v["class_name"] for v in second_vehicles] | |
unique_types = list(set(vehicle_types)) | |
traffic_zones["secondary_vehicle_zone"] = { | |
"region": second_region, | |
"objects": vehicle_types, | |
"description": f"Secondary traffic area with {', '.join(unique_types[:2])}" + | |
(f" in {direction} direction" if direction else "") | |
} | |
return traffic_zones | |
def _get_directional_description(self, region: str) -> str: | |
""" | |
Convert region name to a directional description. | |
Args: | |
region: Region name from the grid | |
Returns: | |
str: Directional description | |
""" | |
if "top" in region and "left" in region: | |
return "northwest" | |
elif "top" in region and "right" in region: | |
return "northeast" | |
elif "bottom" in region and "left" in region: | |
return "southwest" | |
elif "bottom" in region and "right" in region: | |
return "southeast" | |
elif "top" in region: | |
return "north" | |
elif "bottom" in region: | |
return "south" | |
elif "left" in region: | |
return "west" | |
elif "right" in region: | |
return "east" | |
else: | |
return "central" | |
def _create_distribution_map(self, detected_objects: List[Dict]) -> Dict: | |
""" | |
Create a distribution map of objects across regions for spatial analysis. | |
Args: | |
detected_objects: List of detected objects | |
Returns: | |
Dict: Distribution map of objects by region and class | |
""" | |
distribution = {} | |
# Initialize all regions | |
for region in self.regions.keys(): | |
distribution[region] = { | |
"total": 0, | |
"objects": {}, | |
"density": 0 | |
} | |
# Populate the distribution | |
for obj in detected_objects: | |
region = obj["region"] | |
class_id = obj["class_id"] | |
class_name = obj["class_name"] | |
distribution[region]["total"] += 1 | |
if class_id not in distribution[region]["objects"]: | |
distribution[region]["objects"][class_id] = { | |
"name": class_name, | |
"count": 0, | |
"positions": [] | |
} | |
distribution[region]["objects"][class_id]["count"] += 1 | |
# Store position for spatial relationship analysis | |
if "normalized_center" in obj: | |
distribution[region]["objects"][class_id]["positions"].append(obj["normalized_center"]) | |
# Calculate object density for each region | |
for region, data in distribution.items(): | |
# Assuming all regions are equal size in the grid | |
data["density"] = data["total"] / 1 | |
return distribution | |
def _identify_asian_cultural_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
""" | |
Identify functional zones for scenes with Asian cultural context. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
scene_type: Specific scene type | |
Returns: | |
Dict: Asian cultural functional zones | |
""" | |
zones = {} | |
# Identify storefront zone | |
storefront_items = [] | |
storefront_regions = {} | |
# Since storefronts aren't directly detectable, infer from context | |
# For example, look for regions with signs, people, and smaller objects | |
sign_regions = set() | |
for obj in detected_objects: | |
if obj["class_id"] == 0: # Person | |
region = obj["region"] | |
if region not in storefront_regions: | |
storefront_regions[region] = [] | |
storefront_regions[region].append(obj) | |
# Add regions with people as potential storefront areas | |
sign_regions.add(region) | |
# Use the areas with most people as storefront zones | |
if storefront_regions: | |
main_storefront_regions = sorted(storefront_regions.items(), | |
key=lambda x: len(x[1]), | |
reverse=True)[:2] # Top 2 regions | |
for idx, (region, objs) in enumerate(main_storefront_regions): | |
zones[f"commercial_zone_{idx+1}"] = { | |
"region": region, | |
"objects": [obj["class_name"] for obj in objs], | |
"description": f"Asian commercial storefront with pedestrian activity" | |
} | |
# Identify pedestrian pathway - enhanced to better detect linear pathways | |
pathway_items = [] | |
pathway_regions = {} | |
# Extract people for pathway analysis | |
people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
# Analyze if people form a line (typical of shopping streets) | |
people_positions = [obj["normalized_center"] for obj in people_objs] | |
structured_path = False | |
if len(people_positions) >= 3: | |
# Check if people are arranged along a similar y-coordinate (horizontal path) | |
y_coords = [pos[1] for pos in people_positions] | |
y_mean = sum(y_coords) / len(y_coords) | |
y_variance = sum((y - y_mean)**2 for y in y_coords) / len(y_coords) | |
horizontal_path = y_variance < 0.05 # Low variance indicates horizontal alignment | |
# Check if people are arranged along a similar x-coordinate (vertical path) | |
x_coords = [pos[0] for pos in people_positions] | |
x_mean = sum(x_coords) / len(x_coords) | |
x_variance = sum((x - x_mean)**2 for x in x_coords) / len(x_coords) | |
vertical_path = x_variance < 0.05 # Low variance indicates vertical alignment | |
structured_path = horizontal_path or vertical_path | |
path_direction = "horizontal" if horizontal_path else "vertical" if vertical_path else "meandering" | |
# Collect pathway objects (people, bicycles, motorcycles in middle area) | |
for obj in detected_objects: | |
if obj["class_id"] in [0, 1, 3]: # Person, bicycle, motorcycle | |
y_pos = obj["normalized_center"][1] | |
# Group by vertical position (middle of image likely pathway) | |
if 0.25 <= y_pos <= 0.75: | |
region = obj["region"] | |
if region not in pathway_regions: | |
pathway_regions[region] = [] | |
pathway_regions[region].append(obj) | |
pathway_items.append(obj["class_name"]) | |
if pathway_items: | |
path_desc = "Pedestrian walkway with people moving through the commercial area" | |
if structured_path: | |
path_desc = f"{path_direction.capitalize()} pedestrian walkway with organized foot traffic" | |
zones["pedestrian_pathway"] = { | |
"region": "middle_center", # Assumption: pathway often in middle | |
"objects": list(set(pathway_items)), | |
"description": path_desc | |
} | |
# Identify vendor zone (small stalls/shops - inferred from context) | |
has_small_objects = any(obj["class_id"] in [24, 26, 39, 41] for obj in detected_objects) # bags, bottles, cups | |
has_people = any(obj["class_id"] == 0 for obj in detected_objects) | |
if has_small_objects and has_people: | |
# Likely vendor areas are where people and small objects cluster | |
small_obj_regions = {} | |
for obj in detected_objects: | |
if obj["class_id"] in [24, 26, 39, 41, 67]: # bags, bottles, cups, phones | |
region = obj["region"] | |
if region not in small_obj_regions: | |
small_obj_regions[region] = [] | |
small_obj_regions[region].append(obj) | |
if small_obj_regions: | |
main_vendor_region = max(small_obj_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_vendor_region[0] is not None: | |
vendor_items = [obj["class_name"] for obj in main_vendor_region[1]] | |
zones["vendor_zone"] = { | |
"region": main_vendor_region[0], | |
"objects": list(set(vendor_items)), | |
"description": "Vendor or market stall area with small merchandise" | |
} | |
# For night markets, identify illuminated zones | |
if scene_type == "asian_night_market": | |
# Night markets typically have bright spots for food stalls | |
# This would be enhanced with lighting analysis integration | |
zones["food_stall_zone"] = { | |
"region": "middle_center", | |
"objects": ["inferred food stalls"], | |
"description": "Food stall area typical of Asian night markets" | |
} | |
return zones | |
def _identify_upscale_dining_zones(self, category_regions: Dict, detected_objects: List[Dict]) -> Dict: | |
""" | |
Identify functional zones for upscale dining settings. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
Returns: | |
Dict: Upscale dining functional zones | |
""" | |
zones = {} | |
# Identify dining table zone | |
dining_items = [] | |
dining_regions = {} | |
for obj in detected_objects: | |
if obj["class_id"] in [40, 41, 42, 43, 44, 45, 60]: # Wine glass, cup, fork, knife, spoon, bowl, table | |
region = obj["region"] | |
if region not in dining_regions: | |
dining_regions[region] = [] | |
dining_regions[region].append(obj) | |
dining_items.append(obj["class_name"]) | |
if dining_items: | |
main_dining_region = max(dining_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_dining_region[0] is not None: | |
zones["formal_dining_zone"] = { | |
"region": main_dining_region[0], | |
"objects": list(set(dining_items)), | |
"description": f"Formal dining area with {', '.join(list(set(dining_items))[:3])}" | |
} | |
# Identify decorative zone with enhanced detection | |
decor_items = [] | |
decor_regions = {} | |
# Look for decorative elements (vases, wine glasses, unused dishes) | |
for obj in detected_objects: | |
if obj["class_id"] in [75, 40]: # Vase, wine glass | |
region = obj["region"] | |
if region not in decor_regions: | |
decor_regions[region] = [] | |
decor_regions[region].append(obj) | |
decor_items.append(obj["class_name"]) | |
if decor_items: | |
main_decor_region = max(decor_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_decor_region[0] is not None: | |
zones["decorative_zone"] = { | |
"region": main_decor_region[0], | |
"objects": list(set(decor_items)), | |
"description": f"Decorative area with {', '.join(list(set(decor_items)))}" | |
} | |
# Identify seating arrangement zone | |
chairs = [obj for obj in detected_objects if obj["class_id"] == 56] # chairs | |
if len(chairs) >= 2: | |
chair_regions = {} | |
for obj in chairs: | |
region = obj["region"] | |
if region not in chair_regions: | |
chair_regions[region] = [] | |
chair_regions[region].append(obj) | |
if chair_regions: | |
main_seating_region = max(chair_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_seating_region[0] is not None: | |
zones["dining_seating_zone"] = { | |
"region": main_seating_region[0], | |
"objects": ["chair"] * len(main_seating_region[1]), | |
"description": f"Formal dining seating arrangement with {len(main_seating_region[1])} chairs" | |
} | |
# Identify serving area (if different from dining area) | |
serving_items = [] | |
serving_regions = {} | |
# Serving areas might have bottles, bowls, containers | |
for obj in detected_objects: | |
if obj["class_id"] in [39, 45]: # Bottle, bowl | |
# Check if it's in a different region from the main dining table | |
if "formal_dining_zone" in zones and obj["region"] != zones["formal_dining_zone"]["region"]: | |
region = obj["region"] | |
if region not in serving_regions: | |
serving_regions[region] = [] | |
serving_regions[region].append(obj) | |
serving_items.append(obj["class_name"]) | |
if serving_items: | |
main_serving_region = max(serving_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_serving_region[0] is not None: | |
zones["serving_zone"] = { | |
"region": main_serving_region[0], | |
"objects": list(set(serving_items)), | |
"description": f"Serving or sideboard area with {', '.join(list(set(serving_items)))}" | |
} | |
return zones | |
def _identify_financial_district_zones(self, category_regions: Dict, detected_objects: List[Dict]) -> Dict: | |
""" | |
Identify functional zones for financial district scenes. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
Returns: | |
Dict: Financial district functional zones | |
""" | |
zones = {} | |
# Identify traffic zone | |
traffic_items = [] | |
traffic_regions = {} | |
for obj in detected_objects: | |
if obj["class_id"] in [1, 2, 3, 5, 6, 7, 9]: # Various vehicles and traffic lights | |
region = obj["region"] | |
if region not in traffic_regions: | |
traffic_regions[region] = [] | |
traffic_regions[region].append(obj) | |
traffic_items.append(obj["class_name"]) | |
if traffic_items: | |
main_traffic_region = max(traffic_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_traffic_region[0] is not None: | |
zones["traffic_zone"] = { | |
"region": main_traffic_region[0], | |
"objects": list(set(traffic_items)), | |
"description": f"Urban traffic area with {', '.join(list(set(traffic_items))[:3])}" | |
} | |
# Building zones on the sides (inferred from scene context) | |
# Enhanced to check if there are actual regions that might contain buildings | |
# Check for regions without vehicles or pedestrians - likely building areas | |
left_side_regions = ["top_left", "middle_left", "bottom_left"] | |
right_side_regions = ["top_right", "middle_right", "bottom_right"] | |
# Check left side | |
left_building_evidence = True | |
for region in left_side_regions: | |
# If many vehicles or people in this region, less likely to be buildings | |
vehicle_in_region = any(obj["region"] == region and obj["class_id"] in [1, 2, 3, 5, 7] | |
for obj in detected_objects) | |
people_in_region = any(obj["region"] == region and obj["class_id"] == 0 | |
for obj in detected_objects) | |
if vehicle_in_region or people_in_region: | |
left_building_evidence = False | |
break | |
# Check right side | |
right_building_evidence = True | |
for region in right_side_regions: | |
# If many vehicles or people in this region, less likely to be buildings | |
vehicle_in_region = any(obj["region"] == region and obj["class_id"] in [1, 2, 3, 5, 7] | |
for obj in detected_objects) | |
people_in_region = any(obj["region"] == region and obj["class_id"] == 0 | |
for obj in detected_objects) | |
if vehicle_in_region or people_in_region: | |
right_building_evidence = False | |
break | |
# Add building zones if evidence supports them | |
if left_building_evidence: | |
zones["building_zone_left"] = { | |
"region": "middle_left", | |
"objects": ["building"], # Inferred | |
"description": "Tall buildings line the left side of the street" | |
} | |
if right_building_evidence: | |
zones["building_zone_right"] = { | |
"region": "middle_right", | |
"objects": ["building"], # Inferred | |
"description": "Tall buildings line the right side of the street" | |
} | |
# Identify pedestrian zone if people are present | |
people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
if people_objs: | |
people_regions = {} | |
for obj in people_objs: | |
region = obj["region"] | |
if region not in people_regions: | |
people_regions[region] = [] | |
people_regions[region].append(obj) | |
if people_regions: | |
main_pedestrian_region = max(people_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_pedestrian_region[0] is not None: | |
zones["pedestrian_zone"] = { | |
"region": main_pedestrian_region[0], | |
"objects": ["person"] * len(main_pedestrian_region[1]), | |
"description": f"Pedestrian area with {len(main_pedestrian_region[1])} people navigating the financial district" | |
} | |
return zones | |
def _identify_aerial_view_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
""" | |
Identify functional zones for scenes viewed from an aerial perspective. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
scene_type: Specific scene type | |
Returns: | |
Dict: Aerial view functional zones | |
""" | |
zones = {} | |
# For aerial views, we focus on patterns and flows rather than specific zones | |
# Identify pedestrian patterns | |
people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
if people_objs: | |
# Convert positions to arrays for pattern analysis | |
positions = np.array([obj["normalized_center"] for obj in people_objs]) | |
if len(positions) >= 3: | |
# Calculate distribution metrics | |
x_coords = positions[:, 0] | |
y_coords = positions[:, 1] | |
x_mean = np.mean(x_coords) | |
y_mean = np.mean(y_coords) | |
x_std = np.std(x_coords) | |
y_std = np.std(y_coords) | |
# Determine if people are organized in a linear pattern | |
if x_std < 0.1 or y_std < 0.1: | |
# Linear distribution along one axis | |
pattern_direction = "vertical" if x_std < y_std else "horizontal" | |
zones["pedestrian_pattern"] = { | |
"region": "central", | |
"objects": ["person"] * len(people_objs), | |
"description": f"Aerial view shows a {pattern_direction} pedestrian movement pattern" | |
} | |
else: | |
# More dispersed pattern | |
zones["pedestrian_distribution"] = { | |
"region": "wide", | |
"objects": ["person"] * len(people_objs), | |
"description": f"Aerial view shows pedestrians distributed across the area" | |
} | |
# Identify vehicle patterns for traffic analysis | |
vehicle_objs = [obj for obj in detected_objects if obj["class_id"] in [1, 2, 3, 5, 6, 7]] | |
if vehicle_objs: | |
# Convert positions to arrays for pattern analysis | |
positions = np.array([obj["normalized_center"] for obj in vehicle_objs]) | |
if len(positions) >= 2: | |
# Calculate distribution metrics | |
x_coords = positions[:, 0] | |
y_coords = positions[:, 1] | |
x_mean = np.mean(x_coords) | |
y_mean = np.mean(y_coords) | |
x_std = np.std(x_coords) | |
y_std = np.std(y_coords) | |
# Determine if vehicles are organized in lanes | |
if x_std < y_std * 0.5: | |
# Vehicles aligned vertically - indicates north-south traffic | |
zones["vertical_traffic_flow"] = { | |
"region": "central_vertical", | |
"objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
"description": "North-south traffic flow visible from aerial view" | |
} | |
elif y_std < x_std * 0.5: | |
# Vehicles aligned horizontally - indicates east-west traffic | |
zones["horizontal_traffic_flow"] = { | |
"region": "central_horizontal", | |
"objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
"description": "East-west traffic flow visible from aerial view" | |
} | |
else: | |
# Vehicles in multiple directions - indicates intersection | |
zones["intersection_traffic"] = { | |
"region": "central", | |
"objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
"description": "Multi-directional traffic at intersection visible from aerial view" | |
} | |
# For intersection specific aerial views, identify crossing patterns | |
if "intersection" in scene_type: | |
# Check for traffic signals | |
traffic_light_objs = [obj for obj in detected_objects if obj["class_id"] == 9] | |
if traffic_light_objs: | |
zones["traffic_control_pattern"] = { | |
"region": "intersection", | |
"objects": ["traffic light"] * len(traffic_light_objs), | |
"description": f"Intersection traffic control with {len(traffic_light_objs)} signals visible from above" | |
} | |
# Crosswalks are inferred from context in aerial views | |
zones["crossing_pattern"] = { | |
"region": "central", | |
"objects": ["inferred crosswalk"], | |
"description": "Crossing pattern visible from aerial perspective" | |
} | |
# For plaza aerial views, identify gathering patterns | |
if "plaza" in scene_type: | |
# Plazas typically have central open area with people | |
if people_objs: | |
# Check if people are clustered in central region | |
central_people = [obj for obj in people_objs | |
if "middle" in obj["region"]] | |
if central_people: | |
zones["central_gathering"] = { | |
"region": "middle_center", | |
"objects": ["person"] * len(central_people), | |
"description": f"Central plaza gathering area with {len(central_people)} people viewed from above" | |
} | |
return zones | |
def _identify_outdoor_general_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
""" | |
Identify functional zones for general outdoor scenes. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
scene_type: Specific outdoor scene type | |
Returns: | |
Dict: Outdoor functional zones | |
""" | |
zones = {} | |
# Identify pedestrian zones | |
people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
if people_objs: | |
people_regions = {} | |
for obj in people_objs: | |
region = obj["region"] | |
if region not in people_regions: | |
people_regions[region] = [] | |
people_regions[region].append(obj) | |
if people_regions: | |
# Find main pedestrian areas | |
main_people_regions = sorted(people_regions.items(), | |
key=lambda x: len(x[1]), | |
reverse=True)[:2] # Top 2 regions | |
for idx, (region, objs) in enumerate(main_people_regions): | |
if len(objs) > 0: | |
zones[f"pedestrian_zone_{idx+1}"] = { | |
"region": region, | |
"objects": ["person"] * len(objs), | |
"description": f"Pedestrian area with {len(objs)} {'people' if len(objs) > 1 else 'person'}" | |
} | |
# Identify vehicle zones for streets and parking lots | |
vehicle_objs = [obj for obj in detected_objects if obj["class_id"] in [1, 2, 3, 5, 6, 7]] | |
if vehicle_objs: | |
vehicle_regions = {} | |
for obj in vehicle_objs: | |
region = obj["region"] | |
if region not in vehicle_regions: | |
vehicle_regions[region] = [] | |
vehicle_regions[region].append(obj) | |
if vehicle_regions: | |
main_vehicle_region = max(vehicle_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_vehicle_region[0] is not None: | |
vehicle_types = [obj["class_name"] for obj in main_vehicle_region[1]] | |
zones["vehicle_zone"] = { | |
"region": main_vehicle_region[0], | |
"objects": vehicle_types, | |
"description": f"Traffic area with {', '.join(list(set(vehicle_types))[:3])}" | |
} | |
# For park areas, identify recreational zones | |
if scene_type == "park_area": | |
# Look for recreational objects (sports balls, kites, etc.) | |
rec_items = [] | |
rec_regions = {} | |
for obj in detected_objects: | |
if obj["class_id"] in [32, 33, 34, 35, 38]: # sports ball, kite, baseball bat, glove, tennis racket | |
region = obj["region"] | |
if region not in rec_regions: | |
rec_regions[region] = [] | |
rec_regions[region].append(obj) | |
rec_items.append(obj["class_name"]) | |
if rec_items: | |
main_rec_region = max(rec_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_rec_region[0] is not None: | |
zones["recreational_zone"] = { | |
"region": main_rec_region[0], | |
"objects": list(set(rec_items)), | |
"description": f"Recreational area with {', '.join(list(set(rec_items)))}" | |
} | |
# For parking lots, identify parking zones | |
if scene_type == "parking_lot": | |
# Look for parked cars with consistent spacing | |
car_objs = [obj for obj in detected_objects if obj["class_id"] == 2] # cars | |
if len(car_objs) >= 3: | |
# Check if cars are arranged in patterns (simplified) | |
car_positions = [obj["normalized_center"] for obj in car_objs] | |
# Check for row patterns by analyzing vertical positions | |
y_coords = [pos[1] for pos in car_positions] | |
y_clusters = {} | |
# Simplified clustering - group cars by similar y-coordinates | |
for i, y in enumerate(y_coords): | |
assigned = False | |
for cluster_y in y_clusters.keys(): | |
if abs(y - cluster_y) < 0.1: # Within 10% of image height | |
y_clusters[cluster_y].append(i) | |
assigned = True | |
break | |
if not assigned: | |
y_clusters[y] = [i] | |
# If we have row patterns | |
if max(len(indices) for indices in y_clusters.values()) >= 2: | |
zones["parking_row"] = { | |
"region": "central", | |
"objects": ["car"] * len(car_objs), | |
"description": f"Organized parking area with vehicles arranged in rows" | |
} | |
else: | |
zones["parking_area"] = { | |
"region": "wide", | |
"objects": ["car"] * len(car_objs), | |
"description": f"Parking area with {len(car_objs)} vehicles" | |
} | |
return zones | |
def _identify_default_zones(self, category_regions: Dict, detected_objects: List[Dict]) -> Dict: | |
""" | |
Identify general functional zones when no specific scene type is matched. | |
Args: | |
category_regions: Objects grouped by category and region | |
detected_objects: List of detected objects | |
Returns: | |
Dict: Default functional zones | |
""" | |
zones = {} | |
# Group objects by category and find main concentrations | |
for category, regions in category_regions.items(): | |
if not regions: | |
continue | |
# Find region with most objects in this category | |
main_region = max(regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_region[0] is None or len(main_region[1]) < 2: | |
continue | |
# Create zone based on object category | |
zone_objects = [obj["class_name"] for obj in main_region[1]] | |
# Skip if too few objects | |
if len(zone_objects) < 2: | |
continue | |
# Create appropriate zone name and description based on category | |
if category == "furniture": | |
zones["furniture_zone"] = { | |
"region": main_region[0], | |
"objects": zone_objects, | |
"description": f"Area with furniture including {', '.join(zone_objects[:3])}" | |
} | |
elif category == "electronics": | |
zones["electronics_zone"] = { | |
"region": main_region[0], | |
"objects": zone_objects, | |
"description": f"Area with electronic devices including {', '.join(zone_objects[:3])}" | |
} | |
elif category == "kitchen_items": | |
zones["dining_zone"] = { | |
"region": main_region[0], | |
"objects": zone_objects, | |
"description": f"Dining or food area with {', '.join(zone_objects[:3])}" | |
} | |
elif category == "vehicles": | |
zones["vehicle_zone"] = { | |
"region": main_region[0], | |
"objects": zone_objects, | |
"description": f"Area with vehicles including {', '.join(zone_objects[:3])}" | |
} | |
elif category == "personal_items": | |
zones["personal_items_zone"] = { | |
"region": main_region[0], | |
"objects": zone_objects, | |
"description": f"Area with personal items including {', '.join(zone_objects[:3])}" | |
} | |
# Check for people groups | |
people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
if len(people_objs) >= 2: | |
people_regions = {} | |
for obj in people_objs: | |
region = obj["region"] | |
if region not in people_regions: | |
people_regions[region] = [] | |
people_regions[region].append(obj) | |
if people_regions: | |
main_people_region = max(people_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_people_region[0] is not None: | |
zones["people_zone"] = { | |
"region": main_people_region[0], | |
"objects": ["person"] * len(main_people_region[1]), | |
"description": f"Area with {len(main_people_region[1])} people" | |
} | |
return zones | |
def _find_main_region(self, region_objects_dict: Dict) -> str: | |
"""Find the main region with the most objects""" | |
if not region_objects_dict: | |
return "unknown" | |
return max(region_objects_dict.items(), | |
key=lambda x: len(x[1]), | |
default=("unknown", []))[0] | |
def _find_main_region(self, region_objects_dict: Dict) -> str: | |
"""Find the main region with the most objects""" | |
if not region_objects_dict: | |
return "unknown" | |
return max(region_objects_dict.items(), | |
key=lambda x: len(x[1]), | |
default=("unknown", []))[0] | |