Scratch_vlm_v1 / utils /block_builder_main.py
WebashalarForML's picture
Upload 175 files
a522962 verified
raw
history blame
23.1 kB
import json
import copy
import re
from collections import defaultdict
import secrets
import string
from typing import Dict, Any, TypedDict
from plan_generator_10 import generate_plan,generate_blocks_from_opcodes,all_block_definitions
#################################################################################################################################################################
#--------------------------------------------------[Security key id generation for the better understanding of keys]---------------------------------------------
#################################################################################################################################################################
def generate_secure_token(length=20):
charset = string.ascii_letters + string.digits + "!@#$%^&*()[]{}=+-_~"
return ''.join(secrets.choice(charset) for _ in range(length))
#################################################################################################################################################################
#--------------------------------------------------[Processed the two Skelton as input and generate refined skelton json]----------------------------------------
#################################################################################################################################################################
def process_scratch_blocks(all_generated_blocks, generated_output_json):
processed_blocks = {}
# Initialize dictionaries to store and reuse generated unique IDs
# This prevents creating multiple unique IDs for the same variable/broadcast across different blocks
variable_id_map = defaultdict(lambda: generate_secure_token(20))
broadcast_id_map = defaultdict(lambda: generate_secure_token(20))
for block_id, gen_block_data in generated_output_json.items():
processed_block = {}
all_gen_block_data = all_generated_blocks.get(block_id, {})
# Copy and update fields, inputs, next, parent, shadow, topLevel, mutation, and opcode
processed_block["opcode"] = all_gen_block_data.get("op_code", gen_block_data.get("op_code"))
processed_block["inputs"] = {}
processed_block["fields"] = {}
processed_block["shadow"] = all_gen_block_data.get("shadow", gen_block_data.get("shadow"))
processed_block["topLevel"] = all_gen_block_data.get("topLevel", gen_block_data.get("topLevel"))
processed_block["parent"] = all_gen_block_data.get("parent", gen_block_data.get("parent"))
processed_block["next"] = all_gen_block_data.get("next", gen_block_data.get("next"))
if "mutation" in all_gen_block_data:
processed_block["mutation"] = all_gen_block_data["mutation"]
# Process inputs
if "inputs" in all_gen_block_data:
for input_name, input_data in all_gen_block_data["inputs"].items():
if input_name in ["SUBSTACK", "CONDITION"]:
# These should always be type 2
if isinstance(input_data, list) and len(input_data) == 2:
processed_block["inputs"][input_name] = [2, input_data[1]]
elif isinstance(input_data, dict) and input_data.get("kind") == "block":
processed_block["inputs"][input_name] = [2, input_data.get("block")]
else: # Fallback for unexpected formats, try to use the original if possible
processed_block["inputs"][input_name] = gen_block_data["inputs"].get(input_name, [2, None])
elif isinstance(input_data, dict):
if input_data.get("kind") == "value":
# Case 1: Direct value input
processed_block["inputs"][input_name] = [
1,
[
4,
str(input_data.get("value", ""))
]
]
elif input_data.get("kind") == "block":
# Case 3: Nested block input
existing_shadow_value = ""
if input_name in gen_block_data.get("inputs", {}) and \
isinstance(gen_block_data["inputs"][input_name], list) and \
len(gen_block_data["inputs"][input_name]) > 2 and \
isinstance(gen_block_data["inputs"][input_name][2], list) and \
len(gen_block_data["inputs"][input_name][2]) > 1:
existing_shadow_value = gen_block_data["inputs"][input_name][2][1]
processed_block["inputs"][input_name] = [
3,
input_data.get("block", ""),
[
10, # Assuming 10 for number/string shadow
existing_shadow_value
]
]
elif input_data.get("kind") == "menu":
# Handle menu inputs like in event_broadcast
menu_option = input_data.get("option", "")
# Generate or retrieve a unique ID for the broadcast message
broadcast_id = broadcast_id_map[menu_option] # Use defaultdict for unique IDs
processed_block["inputs"][input_name] = [
1,
[
11, # This is typically the code for menu dropdowns
menu_option,
broadcast_id
]
]
elif isinstance(input_data, list):
# For cases like TOUCHINGOBJECTMENU, where input_data is a list [1, "block_id"]
processed_block["inputs"][input_name] = input_data
# Process fields
if "fields" in all_gen_block_data:
for field_name, field_value in all_gen_block_data["fields"].items():
if field_name == "VARIABLE" and isinstance(field_value, list) and len(field_value) > 0:
# Generate or retrieve a unique ID for the variable
variable_name = field_value[0]
unique_id = variable_id_map[variable_name] # Use defaultdict for unique IDs
processed_block["fields"][field_name] = [
variable_name,
unique_id
]
elif field_name == "STOP_OPTION":
processed_block["fields"][field_name] = [
field_value[0],
None
]
elif field_name == "TOUCHINGOBJECTMENU":
referenced_menu_block_id = all_gen_block_data["inputs"].get("TOUCHINGOBJECTMENU", [None, None])[1]
if referenced_menu_block_id and referenced_menu_block_id in all_generated_blocks:
menu_block = all_generated_blocks[referenced_menu_block_id]
menu_value = menu_block.get("fields", {}).get("TOUCHINGOBJECTMENU", ["", None])[0]
processed_block["fields"][field_name] = [menu_value, None]
else:
processed_block["fields"][field_name] = [field_value[0], None]
else:
processed_block["fields"][field_name] = field_value
# Remove unwanted keys from the processed block
keys_to_remove = ["functionality", "block_shape", "id", "block_name", "block_type"]
for key in keys_to_remove:
if key in processed_block:
del processed_block[key]
processed_blocks[block_id] = processed_block
return processed_blocks
#################################################################################################################################################################
#--------------------------------------------------[Unique secret key for skelton json to make sure it donot overwrite each other]-------------------------------
#################################################################################################################################################################
def rename_blocks(block_json: dict, opcode_count: dict) -> tuple[dict, dict]:
"""
Replace each block key in block_json and each identifier in opcode_count
with a newly generated secure token.
Args:
block_json: Mapping of block_key -> block_data.
opcode_count: Mapping of opcode -> list of block_keys.
Returns:
A tuple of (new_block_json, new_opcode_count) with updated keys.
"""
# Step 1: Generate a secure token mapping for every existing block key
token_map = {}
for old_key in block_json.keys():
# Ensure uniqueness in the unlikely event of a collision
while True:
new_key = generate_secure_token()
if new_key not in token_map.values():
break
token_map[old_key] = new_key
# Step 2: Rebuild block_json with new keys
new_block_json = {}
for old_key, block in block_json.items():
new_key = token_map[old_key]
new_block_json[new_key] = block.copy()
# Update parent and next references
if 'parent' in block and block['parent'] in token_map:
new_block_json[new_key]['parent'] = token_map[block['parent']]
if 'next' in block and block['next'] in token_map:
new_block_json[new_key]['next'] = token_map[block['next']]
# Update inputs if they reference blocks
for inp_key, inp_val in block.get('inputs', {}).items():
if isinstance(inp_val, list) and len(inp_val) == 2:
idx, ref = inp_val
if idx in (2, 3) and isinstance(ref, str) and ref in token_map:
new_block_json[new_key]['inputs'][inp_key] = [idx, token_map[ref]]
# Step 3: Update opcode count map
new_opcode_count = {}
for opcode, key_list in opcode_count.items():
new_opcode_count[opcode] = [token_map.get(k, k) for k in key_list]
return new_block_json, new_opcode_count
#################################################################################################################################################################
#--------------------------------------------------[Helper function to add Variables and Broadcasts [USed in main app file for main projectjson]]----------------
#################################################################################################################################################################
def variable_intialization(project_data):
"""
Updates variable and broadcast definitions in a Scratch project JSON,
populating the 'variables' and 'broadcasts' sections of the Stage target
and extracting initial values for variables.
Args:
project_data (dict): The loaded JSON data of the Scratch project.
Returns:
dict: The updated project JSON data.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
# Ensure 'variables' and 'broadcasts' exist in the Stage target
if "variables" not in stage_target:
stage_target["variables"] = {}
if "broadcasts" not in stage_target:
stage_target["broadcasts"] = {}
# Helper function to recursively find and update variable/broadcast fields
def process_dict(obj):
if isinstance(obj, dict):
# Check for "data_setvariableto" opcode to extract initial values
if obj.get("opcode") == "data_setvariableto":
variable_field = obj.get("fields", {}).get("VARIABLE")
value_input = obj.get("inputs", {}).get("VALUE")
if variable_field and isinstance(variable_field, list) and len(variable_field) == 2:
var_name = variable_field[0]
var_id = variable_field[1]
initial_value = ""
if value_input and isinstance(value_input, list) and len(value_input) > 1 and \
isinstance(value_input[1], list) and len(value_input[1]) > 1:
# Extract value from various formats, e.g., [1, [10, "0"]] or [3, [12, "score", "id"], [10, "0"]]
if value_input[1][0] == 10: # Direct value like [10, "0"]
initial_value = str(value_input[1][1])
elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: # Variable reference with initial value block
initial_value = str(value_input[2][1])
elif isinstance(value_input[1], (str, int, float)): # For direct number/string inputs
initial_value = str(value_input[1])
# Add/update the variable in the Stage's 'variables' with its initial value
stage_target["variables"][var_id] = [var_name, initial_value]
for key, value in obj.items():
# Process variable definitions in 'fields' (for blocks that define variables like 'show variable')
if key == "VARIABLE" and isinstance(value, list) and len(value) == 2:
var_name = value[0]
var_id = value[1]
# Only add if not already defined with an initial value from set_variableto
if var_id not in stage_target["variables"]:
stage_target["variables"][var_id] = [var_name, ""] # Default to empty string if no initial value found yet
elif stage_target["variables"][var_id][0] != var_name: # Update name if ID exists but name is different
stage_target["variables"][var_id][0] = var_name
# Process broadcast definitions in 'inputs' (BROADCAST_INPUT)
elif key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \
isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11:
broadcast_name = value[1][1]
broadcast_id = value[1][2]
# Add/update the broadcast in the Stage's 'broadcasts'
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Process broadcast definitions in 'fields' (BROADCAST_OPTION)
elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2:
broadcast_name = value[0]
broadcast_id = value[1]
# Add/update the broadcast in the Stage's 'broadcasts'
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Recursively call for nested dictionaries or lists
process_dict(value)
elif isinstance(obj, list):
for i, item in enumerate(obj):
# Process variable references in 'inputs' (like [12, "score", "id"])
if isinstance(item, list) and len(item) == 3 and item[0] == 12:
var_name = item[1]
var_id = item[2]
# Only add if not already defined with an initial value from set_variableto
if var_id not in stage_target["variables"]:
stage_target["variables"][var_id] = [var_name, ""] # Default to empty string if no initial value found yet
elif stage_target["variables"][var_id][0] != var_name: # Update name if ID exists but name is different
stage_target["variables"][var_id][0] = var_name
process_dict(item)
# Iterate through all targets to process their blocks
for target in project_data['targets']:
if "blocks" in target:
for block_id, block_data in target["blocks"].items():
process_dict(block_data)
return project_data
def deduplicate_variables(project_data):
"""
Removes duplicate variable entries in the 'variables' dictionary of the Stage target,
prioritizing entries with non-empty values.
Args:
project_data (dict): The loaded JSON data of the Scratch project.
Returns:
dict: The updated project JSON data with deduplicated variables.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
if "variables" not in stage_target:
return project_data # No variables to deduplicate
# Use a temporary dictionary to store the preferred variable entry by name
# Format: {variable_name: [variable_id, variable_name, variable_value]}
resolved_variables = {}
for var_id, var_info in stage_target["variables"].items():
var_name = var_info[0]
var_value = var_info[1]
if var_name not in resolved_variables:
# If the variable name is not yet seen, add it
resolved_variables[var_name] = [var_id, var_name, var_value]
else:
# If the variable name is already seen, decide which one to keep
existing_id, existing_name, existing_value = resolved_variables[var_name]
# Prioritize the entry with a non-empty value
if var_value != "" and existing_value == "":
resolved_variables[var_name] = [var_id, var_name, var_value]
# If both have non-empty values, or both are empty, keep the current one (arbitrary choice, but consistent)
# The current logic will effectively keep the last one encountered that has a value,
# or the very last one if all are empty.
elif var_value != "" and existing_value != "":
# If there are multiple non-empty values for the same variable name
# this keeps the one from the most recent iteration.
# For the given example, this will correctly keep "5".
resolved_variables[var_name] = [var_id, var_name, var_value]
elif var_value == "" and existing_value == "":
# If both are empty, just keep the current one (arbitrary)
resolved_variables[var_name] = [var_id, var_name, var_value]
# Reconstruct the 'variables' dictionary using the resolved entries
new_variables_dict = {}
for var_name, var_data in resolved_variables.items():
var_id_to_keep = var_data[0]
var_name_to_keep = var_data[1]
var_value_to_keep = var_data[2]
new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep]
stage_target["variables"] = new_variables_dict
return project_data
def variable_adder_main(project_data):
try:
declare_variable_json= variable_intialization(project_data)
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
try:
processed_json= deduplicate_variables(declare_variable_json)
return
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
#################################################################################################################################################################
#--------------------------------------------------[Helper main function]----------------------------------------------------------------------------------------
#################################################################################################################################################################
def block_builder(opcode_count,pseudo_code):
try:
generated_output_json, initial_opcode_occurrences = generate_blocks_from_opcodes(opcode_count, all_block_definitions)
except Exception as e:
print(f"Error generating blocks from opcodes: {e}")
return {}
try:
all_generated_blocks = generate_plan(generated_output_json, initial_opcode_occurrences, pseudo_code)
except Exception as e:
print(f"Error generating plan from blocks: {e}")
return {}
try:
processed_blocks= process_scratch_blocks(all_generated_blocks, generated_output_json)
except Exception as e:
print(f"Error processing Scratch blocks: {e}")
return {}
renamed_blocks, renamed_counts = rename_blocks(processed_blocks, initial_opcode_occurrences)
return renamed_blocks
#################################################################################################################################################################
#--------------------------------------------------[Example use of the function here]----------------------------------------------------------------------------
#################################################################################################################################################################
initial_opcode_counts = [
{
"opcode": "event_whenflagclicked",
"count": 1
},
{
"opcode": "data_setvariableto",
"count": 2
},
{
"opcode": "data_showvariable",
"count": 2
},
{
"opcode": "event_broadcast",
"count": 1
}
]
pseudo_code="""
when green flag clicked
set [score v] to (0)
set [lives v] to (3)
show variable [score v]
show variable [lives v]
broadcast [Game Start v]
"""
generated_output_json, initial_opcode_occurrences = generate_blocks_from_opcodes(initial_opcode_counts, all_block_definitions)
all_generated_blocks = generate_plan(generated_output_json, initial_opcode_occurrences, pseudo_code)
processed_blocks= process_scratch_blocks(all_generated_blocks, generated_output_json)
print(all_generated_blocks)
print("--------------\n\n")
print(processed_blocks)
print("--------------\n\n")
print(initial_opcode_occurrences)