jackyliang42's picture
sampling blocks and bowls with matching colors
715fc3c
raw
history blame
6.17 kB
import openai
import numpy as np
from tempfile import NamedTemporaryFile
import copy
import shapely
from shapely.geometry import *
from shapely.affinity import *
from omegaconf import OmegaConf
from moviepy.editor import ImageSequenceClip
import gradio as gr
from lmp import LMP, LMPFGen
from sim import PickPlaceEnv, LMP_wrapper
from consts import ALL_BLOCKS, ALL_BOWLS
from md_logger import MarkdownLogger
class DemoRunner:
def __init__(self):
self._cfg = OmegaConf.to_container(OmegaConf.load('cfg.yaml'), resolve=True)
self._env = None
self._model_name = ''
self._md_logger = MarkdownLogger()
def make_LMP(self, env):
# LMP env wrapper
cfg = copy.deepcopy(self._cfg)
cfg['env'] = {
'init_objs': list(env.obj_name_to_id.keys()),
'coords': cfg['tabletop_coords']
}
for vs in cfg['lmps'].values():
vs['engine'] = self._model_name
LMP_env = LMP_wrapper(env, cfg)
# creating APIs that the LMPs can interact with
fixed_vars = {
'np': np
}
fixed_vars.update({
name: eval(name)
for name in shapely.geometry.__all__ + shapely.affinity.__all__
})
variable_vars = {
k: getattr(LMP_env, k)
for k in [
'get_bbox', 'get_obj_pos', 'get_color', 'is_obj_visible', 'denormalize_xy',
'put_first_on_second', 'get_obj_names',
'get_corner_name', 'get_side_name',
]
}
variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"')
# creating the function-generating LMP
lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger)
# creating other low-level LMPs
variable_vars.update({
k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger)
for k in ['parse_obj_name', 'parse_position', 'parse_question', 'transform_shape_pts']
})
# creating the LMP that deals w/ high-level language commands
lmp_tabletop_ui = LMP(
'tabletop_ui', cfg['lmps']['tabletop_ui'], lmp_fgen, fixed_vars, variable_vars, self._md_logger
)
return lmp_tabletop_ui
def setup(self, api_key, model_name, n_blocks, n_bowls):
openai.api_key = api_key
self._model_name = model_name
self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False)
list_idxs = np.random.choice(len(ALL_BLOCKS), size=max(n_blocks, n_bowls), replace=False)
block_list = [ALL_BLOCKS[i] for i in list_idxs[:n_blocks]]
bowl_list = [ALL_BOWLS[i] for i in list_idxs[:n_bowls]]
obj_list = block_list + bowl_list
self._env.reset(obj_list)
self._lmp_tabletop_ui = self.make_LMP(self._env)
info = '### Available Objects: \n- ' + '\n- '.join(obj_list)
img = self._env.get_camera_image()
return info, img
def run(self, instruction):
if self._env is None:
return 'Please run setup first!', None, None
self._env.cache_video = []
self._md_logger.clear()
try:
self._lmp_tabletop_ui(instruction, f'objects = {self._env.object_list}')
except Exception as e:
return f'Error: {e}', None, None
video_file_name = None
if self._env.cache_video:
rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25)
video_file_name = NamedTemporaryFile(suffix='.mp4').name
rendered_clip.write_videofile(video_file_name, fps=25)
return self._md_logger.get_log(), self._env.get_camera_image(), video_file_name
def setup(api_key, model_name, n_blocks, n_bowls):
if not api_key:
return 'Please enter your OpenAI API key!', None, None
if n_blocks + n_bowls == 0:
return 'Please select at least one object!', None, None
demo_runner = DemoRunner()
info, img = demo_runner.setup(api_key, model_name, n_blocks, n_bowls)
return info, img, demo_runner
def run(instruction, demo_runner):
if demo_runner is None:
return 'Please run setup first!', None, None
return demo_runner.run(instruction)
if __name__ == '__main__':
with open('README.md', 'r') as f:
for _ in range(12):
next(f)
readme_text = f.read()
with gr.Blocks() as demo:
state = gr.State(None)
gr.Markdown(readme_text)
gr.Markdown('# Interactive Demo')
with gr.Row():
with gr.Column():
with gr.Row():
inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1)
inp_model_name = gr.Dropdown(label='Model Name', choices=['code-davinci-002', 'text-davinci-002'], value='text-davinci-002')
with gr.Row():
inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=4, value=3, step=1)
inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=4, value=3, step=1)
btn_setup = gr.Button("Setup/Reset Simulation")
info_setup = gr.Markdown(label='Setup Info')
with gr.Column():
img_setup = gr.Image(label='Current Simulation')
with gr.Row():
with gr.Column():
inp_instruction = gr.Textbox(label='Instruction', lines=1)
btn_run = gr.Button("Run (this may take 30+ seconds)")
info_run = gr.Markdown(label='Generated Code')
with gr.Column():
video_run = gr.Video(label='Video of Last Instruction')
btn_setup.click(
setup,
inputs=[inp_api_key, inp_model_name, inp_n_blocks, inp_n_bowls],
outputs=[info_setup, img_setup, state]
)
btn_run.click(
run,
inputs=[inp_instruction, state],
outputs=[info_run, img_setup, video_run]
)
demo.launch()