Spaces:
Runtime error
Runtime error
File size: 5,716 Bytes
9a40e4f 47097db 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 5f6073b 9a40e4f 588af69 9a40e4f fc126c5 9a40e4f 5f6073b 9a40e4f 47097db 9a40e4f 47097db 5f6073b 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 5f6073b 9a40e4f 47097db 9a40e4f 47097db fc126c5 9a40e4f 47097db 9a40e4f 588af69 9a40e4f 47097db 5f6073b 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 47097db 9a40e4f 47097db 9a40e4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import openai
import numpy as np
from tempfile import NamedTemporaryFile
import copy
import shapely
from shapely.geometry import *
from shapely.affinity import *
from omegaconf import OmegaConf
from moviepy.editor import ImageSequenceClip
import gradio as gr
from lmp import LMP, LMPFGen
from sim import PickPlaceEnv, LMP_wrapper
from consts import ALL_BLOCKS, ALL_BOWLS
from md_logger import MarkdownLogger
class DemoRunner:
def __init__(self):
self._cfg = OmegaConf.to_container(OmegaConf.load('cfg.yaml'), resolve=True)
self._env = None
self._model_name = ''
self._md_logger = MarkdownLogger()
def make_LMP(self, env):
# LMP env wrapper
cfg = copy.deepcopy(self._cfg)
cfg['env'] = {
'init_objs': list(env.obj_name_to_id.keys()),
'coords': cfg['tabletop_coords']
}
for vs in cfg['lmps'].values():
vs['engine'] = self._model_name
LMP_env = LMP_wrapper(env, cfg)
# creating APIs that the LMPs can interact with
fixed_vars = {
'np': np
}
fixed_vars.update({
name: eval(name)
for name in shapely.geometry.__all__ + shapely.affinity.__all__
})
variable_vars = {
k: getattr(LMP_env, k)
for k in [
'get_bbox', 'get_obj_pos', 'get_color', 'is_obj_visible', 'denormalize_xy',
'put_first_on_second', 'get_obj_names',
'get_corner_name', 'get_side_name',
]
}
variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"')
# creating the function-generating LMP
lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger)
# creating other low-level LMPs
variable_vars.update({
k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger)
for k in ['parse_obj_name', 'parse_position', 'parse_question', 'transform_shape_pts']
})
# creating the LMP that deals w/ high-level language commands
lmp_tabletop_ui = LMP(
'tabletop_ui', cfg['lmps']['tabletop_ui'], lmp_fgen, fixed_vars, variable_vars, self._md_logger
)
return lmp_tabletop_ui
def setup(self, api_key, model_name, n_blocks, n_bowls):
if not api_key:
return 'Please enter your OpenAI API key!', None
if n_blocks + n_bowls == 0:
return 'Please select at least one object!', None
openai.api_key = api_key
self._model_name = model_name
self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False)
block_list = np.random.choice(ALL_BLOCKS, size=n_blocks, replace=False).tolist()
bowl_list = np.random.choice(ALL_BOWLS, size=n_bowls, replace=False).tolist()
obj_list = block_list + bowl_list
self._env.reset(obj_list)
self._lmp_tabletop_ui = self.make_LMP(self._env)
info = '### Available Objects: \n- ' + '\n- '.join(obj_list)
img = self._env.get_camera_image()
return info, img
def run(self, instruction):
if self._env is None:
return 'Please run setup first!', None, None
self._env.cache_video = []
self._md_logger.clear()
try:
self._lmp_tabletop_ui(instruction, f'objects = {self._env.object_list}')
except Exception as e:
return str(e), None, None
video_file_name = None
if self._env.cache_video:
rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25)
video_file_name = NamedTemporaryFile(suffix='.mp4').name
rendered_clip.write_videofile(video_file_name, fps=25)
return self._md_logger.get_log(), self._env.get_camera_image(), video_file_name
if __name__ == '__main__':
demo_runner = DemoRunner()
demo = gr.Blocks()
with open('README.md', 'r') as f:
for _ in range(12):
next(f)
readme_text = f.read()
with demo:
gr.Markdown(readme_text)
gr.Markdown('# Interactive Demo')
with gr.Row():
with gr.Column():
with gr.Row():
inp_api_key = gr.Textbox(label='OpenAI API Key', lines=1)
inp_model_name = gr.Dropdown(label='Model Name', choices=['code-davinci-002', 'text-davinci-002'], value='code-davinci-002')
with gr.Row():
inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=4, value=3, step=1)
inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=4, value=3, step=1)
btn_setup = gr.Button("Setup/Reset Env")
info_setup = gr.Markdown(label='Setup Info')
with gr.Column():
img_setup = gr.Image(label='Current Simulation')
with gr.Row():
with gr.Column():
inp_instruction = gr.Textbox(label='Instruction', lines=1)
btn_run = gr.Button("Run Instruction")
info_run = gr.Markdown(label='Generated Code')
with gr.Column():
video_run = gr.Video(label='Video of Last Instruction')
btn_setup.click(
demo_runner.setup,
inputs=[inp_api_key, inp_model_name, inp_n_blocks, inp_n_bowls],
outputs=[info_setup, img_setup]
)
btn_run.click(
demo_runner.run,
inputs=[inp_instruction],
outputs=[info_run, img_setup, video_run]
)
demo.launch() |