Source code for mikasa_robo_suite.vla.memory_envs.take_it_back_vla

"""Take-it-back manipulation tasks for the VLA benchmark."""

from typing import Any, Dict, Union

import numpy as np
import sapien
import torch
from mani_skill.agents.robots.panda.panda import Panda
from mani_skill.agents.robots.panda.panda_wristcam import PandaWristCam
from mani_skill.envs.sapien_env import BaseEnv
from mani_skill.sensors.camera import CameraConfig
from mani_skill.utils import sapien_utils
from mani_skill.utils.building import actors
from mani_skill.utils.registration import register_env
from mani_skill.utils.scene_builder.table import TableSceneBuilder
from mani_skill.utils.structs.pose import Pose
from mani_skill.utils.structs.types import Array, GPUMemoryConfig, SimConfig
from transforms3d.euler import euler2quat

from mikasa_robo_suite.vla.utils import shapes


[docs] class TakeItBackVLABaseEnv(BaseEnv): """Push an object away, then bring it back in the same episode. The cube starts on an initial target region. The robot must first move it to a separate goal region. Once that happens, the task switches and the robot must return the same cube back to where it started. The challenge is not only reaching both targets, but also reacting correctly to the stage change. Episode flow: - The cube starts on the initial region. - The robot pushes the cube to the goal region. - After the stage switch, the robot brings the cube back to the initial region. Success (`success=True`): - The cube must first reach the goal region and then end up back inside the initial region within the same episode. How to customize: - `GOAL_RADIUS` changes how large both target regions are and therefore how forgiving both placement checks become. - `CUBE_HALFSIZE` changes the cube geometry and contact behavior. - The stage switch is implicit: if you change goal logic in the task, you are also changing when the return stage begins. """ LANGUAGE_INSTRUCTION = "Push the cube onto the red target, and when the target changes color, return the cube to its original position." SUPPORTED_ROBOTS = ["panda", "panda_wristcam"] agent: Union[Panda, PandaWristCam] GOAL_RADIUS: float = 0.08 CUBE_HALFSIZE: float = 0.02 ACTION_L2_COEF = 0.01 ACTION_DELTA_L2_COEF = 0.08 QVEL_L2_COEF = 0.01 def __init__(self, *args, robot_uids="panda_wristcam", robot_init_qpos_noise=0.02, **kwargs): self.robot_init_qpos_noise = robot_init_qpos_noise super().__init__(*args, robot_uids=robot_uids, **kwargs) @property def _default_sim_config(self): return SimConfig( gpu_memory_config=GPUMemoryConfig( found_lost_pairs_capacity=2**25, max_rigid_patch_count=2**18, ) ) @property def _default_sensor_configs(self): pose = sapien_utils.look_at(eye=[0.3, 0, 0.6], target=[-0.1, 0, 0.1]) return [CameraConfig("base_camera", pose, 128, 128, np.pi / 2, 0.01, 100)] @property def _default_human_render_camera_configs(self): pose = sapien_utils.look_at([0.5, 1, 1], [-0.3, 0, 0]) return CameraConfig("render_camera", pose, 512, 512, 1, 0.01, 100) def _load_agent(self, options: dict): super()._load_agent(options, sapien.Pose(p=[-0.615, 0, 0])) def _load_scene(self, options: dict): self.table_scene = TableSceneBuilder( self, robot_init_qpos_noise=self.robot_init_qpos_noise, ) self.table_scene.build() self.cube = actors.build_cube( self.scene, half_size=self.CUBE_HALFSIZE, color=np.array([0, 255, 0, 255]) / 255, name="cube", body_type="dynamic", initial_pose=sapien.Pose(p=[0, 0, self.CUBE_HALFSIZE]), ) self.initial_region = shapes.build_target( self.scene, radius=self.GOAL_RADIUS, thickness=1e-5, name="initial_region", add_collision=False, body_type="kinematic", primary_color=np.array([0, 0, 255, 255]) / 255, secondary_color=np.array([255, 255, 255, 255]) / 255, initial_pose=sapien.Pose(p=[0, 0, 0.1]), ) self.goal_region = shapes.build_target( self.scene, radius=self.GOAL_RADIUS, thickness=1e-5, name="goal_region", add_collision=False, body_type="kinematic", primary_color=np.array([194, 19, 22, 255]) / 255, secondary_color=np.array([255, 255, 255, 255]) / 255, initial_pose=sapien.Pose(p=[0, 0, 0.1]), ) self.changed_goal_region = shapes.build_target( self.scene, radius=self.GOAL_RADIUS, thickness=1e-5, name="changed_goal_region", add_collision=False, body_type="kinematic", primary_color=np.array([147, 0, 211, 255]) / 255, secondary_color=np.array([255, 255, 255, 255]) / 255, initial_pose=sapien.Pose(p=[0, 0, 0.1]), ) self.goal_reached_status = torch.zeros(self.num_envs, dtype=torch.float32) self.goal_achieved = torch.zeros(self.num_envs, dtype=torch.bool) self._hidden_objects.append(self.initial_region) def _initialize_episode(self, env_idx: torch.Tensor, options: dict): self.goal_reached_status = self.goal_reached_status.to(self.device) self.goal_achieved = self.goal_achieved.to(self.device) with torch.device(self.device): b = len(env_idx) self.table_scene.initialize(env_idx) self.task_cue = None self.reward_dict = None initial_positions = torch.from_numpy(self._batched_episode_rng.choice([-1, 0, 1])).to(self.device) xyz_initial = torch.zeros((b, 3)) xyz_initial[..., 0] = (torch.rand((b,)) - 0.5) * 0.1 xyz_initial[..., 1] = initial_positions * 0.1 + torch.rand((b,)) * 0.05 xyz_initial[..., 2] = 1e-3 xyz_goal = torch.zeros((b, 3)) xyz_goal[..., 0] = torch.rand((b,)) * 0.05 + 0.2 xyz_goal[..., 1] = (torch.rand((b,)) - 0.5) * 0.05 xyz_goal[..., 2] = 1e-3 xyz_changed_goal = xyz_goal.clone() xyz_changed_goal[..., 2] = 1000 self.initial_region.set_pose( Pose.create_from_pq(p=xyz_initial, q=euler2quat(0, np.pi / 2, 0)), ) self.goal_region.set_pose( Pose.create_from_pq(p=xyz_goal, q=euler2quat(0, np.pi / 2, 0)), ) self.changed_goal_region.set_pose( Pose.create_from_pq(p=xyz_changed_goal, q=euler2quat(0, np.pi / 2, 0)), ) xyz_cube = xyz_initial.clone() xyz_cube[..., 2] = self.CUBE_HALFSIZE self.cube.set_pose(Pose.create_from_pq(p=xyz_cube, q=[1, 0, 0, 0])) self.oracle_info = xyz_initial.clone() if self.robot_uids in ("panda", "panda_wristcam"): qpos = np.array([0.0, 0, 0, -np.pi * 2 / 3, 0, np.pi * 2 / 3, np.pi / 4, 0.04, 0.04]) qpos[:-2] += self._episode_rng.normal( 0, self.robot_init_qpos_noise, len(qpos) - 2, ) self.agent.reset(qpos) self.agent.robot.set_root_pose(sapien.Pose([-0.615, 0, 0])) else: raise NotImplementedError(self.robot_uids) self.goal_reached_status[env_idx] = False self.goal_achieved[env_idx] = False
[docs] def evaluate(self): self.original_poses = { "goal_region": self.goal_region.pose.raw_pose.clone(), "changed_goal_region": self.changed_goal_region.pose.raw_pose.clone(), "cube": self.cube.pose.raw_pose.clone(), } new_goal_region_pose = self.original_poses["goal_region"] new_goal_region_pose[self.goal_achieved, 2] = 1000 new_changed_goal_region_pose = self.original_poses["changed_goal_region"] new_changed_goal_region_pose[self.goal_achieved, 2] = 1e-3 self.goal_region.pose = new_goal_region_pose self.changed_goal_region.pose = new_changed_goal_region_pose self.goal_achieved = ( torch.linalg.norm( self.cube.pose.p[..., :2] - self.goal_region.pose.p[..., :2], axis=1, ) < self.GOAL_RADIUS ) self.goal_reached_status = torch.logical_or( self.goal_reached_status, self.goal_achieved, ) is_cube_returned = ( torch.linalg.norm( self.cube.pose.p[..., :2] - self.initial_region.pose.p[..., :2], axis=1, ) < self.GOAL_RADIUS ) return { "success": is_cube_returned & self.goal_reached_status, "task_cue": self.task_cue, "language_instruction": self.LANGUAGE_INSTRUCTION, "oracle_info": self.oracle_info, "goal_achieved": self.goal_achieved, "goal_reached_status": self.goal_reached_status, "reward_dict": self.reward_dict, }
def _get_obs_extra(self, info: Dict): actual_goal_pos = self.goal_region.pose.p actual_goal_pos[actual_goal_pos[:, 2] > 10, 2] = self.changed_goal_region.pose.p[actual_goal_pos[:, 2] > 10, 2] obs = dict(tcp_pose=self.agent.tcp.pose.raw_pose) if self._obs_mode in ["state", "state_dict"]: obs.update( goal_pos=actual_goal_pos, cube_pose=self.cube.pose.raw_pose, oracle_info=self.oracle_info, goal_reached_status=self.goal_reached_status, ) return obs
[docs] def step(self, action): obs, reward, terminated, truncated, info = super().step(action) if isinstance(info, dict) and "success" in info: success = info["success"] if torch.is_tensor(success): success = success.to(dtype=torch.bool) if torch.is_tensor(terminated): terminated = terminated.to(dtype=torch.bool) & (~success) else: terminated = bool(terminated) and not bool(success.any().item()) else: terminated = bool(terminated) and not bool(success) return obs, reward, terminated, truncated, info
[docs] def compute_dense_reward(self, obs: Any, action: Array, info: Dict): tcp_push_pose = Pose.create_from_pq( p=self.cube.pose.p + torch.tensor([-self.CUBE_HALFSIZE - 0.005, 0, 0], device=self.device) ) tcp_to_cube_pose_push = tcp_push_pose.p - self.agent.tcp.pose.p tcp_to_cube_pose_push_dist = torch.linalg.norm(tcp_to_cube_pose_push, axis=1) reaching_reward_push = 1 - torch.tanh(5.0 * tcp_to_cube_pose_push_dist) cube_to_goal_dist = torch.linalg.norm(self.cube.pose.p[..., :2] - self.goal_region.pose.p[..., :2], axis=1) place_in_goal_reward = 1 - torch.tanh(5.0 * cube_to_goal_dist) cube_to_initial_dist = torch.linalg.norm( self.cube.pose.p[..., :2] - self.initial_region.pose.p[..., :2], axis=1 ) place_back_reward = 1 - torch.tanh(5.0 * cube_to_initial_dist) goal_achieved_status = info["goal_reached_status"] reached_push = tcp_to_cube_pose_push_dist < 0.03 tcp_pull_pos = self.cube.pose.p + torch.tensor([self.CUBE_HALFSIZE + 2 * 0.005, 0, 0], device=self.device) tcp_to_cube_pose_pull = tcp_pull_pos - self.agent.tcp.pose.p tcp_to_cube_pose_pull_dist = torch.linalg.norm(tcp_to_cube_pose_pull, axis=1) reaching_reward_pull = 1 - torch.tanh(5.0 * tcp_to_cube_pose_pull_dist) tcp_to_cube_dist = torch.linalg.norm(self.cube.pose.p - self.agent.tcp.pose.p, axis=1) keep_contact_reward = 1 - torch.tanh(6.0 * tcp_to_cube_dist) reached_pull = tcp_to_cube_pose_pull_dist < 0.03 if not torch.is_tensor(action): action = torch.as_tensor(action, device=self.device) if not hasattr(self, "_prev_action") or self._prev_action is None or self._prev_action.shape != action.shape: self._prev_action = torch.zeros_like(action) delta_action = action - self._prev_action action_l2 = torch.linalg.norm(action, axis=1) delta_action_l2 = torch.linalg.norm(delta_action, axis=1) if hasattr(self, "elapsed_steps") and torch.is_tensor(self.elapsed_steps): first_step_mask = self.elapsed_steps <= 1 delta_action_l2 = torch.where(first_step_mask, torch.zeros_like(delta_action_l2), delta_action_l2) qvel = self.agent.robot.get_qvel()[..., :-2] qvel_l2 = torch.linalg.norm(qvel, axis=1) smooth_penalty = ( self.ACTION_L2_COEF * torch.tanh(2.0 * action_l2) + self.ACTION_DELTA_L2_COEF * torch.tanh(5.0 * delta_action_l2) + self.QVEL_L2_COEF * torch.tanh(2.0 * qvel_l2) ) reward = torch.zeros_like(reaching_reward_push) reward[~goal_achieved_status] += 2.0 * reaching_reward_push[~goal_achieved_status] reward[~goal_achieved_status] += 1.5 * place_in_goal_reward[~goal_achieved_status] reward[~goal_achieved_status & reached_push] += 5.0 * place_in_goal_reward[~goal_achieved_status & reached_push] reward[goal_achieved_status] += 10.0 * reaching_reward_pull[goal_achieved_status] reward[goal_achieved_status] += 4.0 * keep_contact_reward[goal_achieved_status] reward[goal_achieved_status & reached_pull] += 15.0 * place_back_reward[goal_achieved_status & reached_pull] reward -= smooth_penalty reward[info["success"]] = 30.0 self.reward_dict = { "reached_push": reached_push, "reached_pull": reached_pull, "tcp_push_dist": tcp_to_cube_pose_push_dist, "tcp_pull_dist": tcp_to_cube_pose_pull_dist, "reaching_reward_push": reaching_reward_push, "reaching_reward_pull": reaching_reward_pull, "keep_contact_reward": keep_contact_reward, "goal_achieved": info["goal_achieved"], "goal_reached_status": info["goal_reached_status"], "place_in_goal_reward": place_in_goal_reward, "place_back_reward": place_back_reward, "action_l2": action_l2, "delta_action_l2": delta_action_l2, "qvel_l2": qvel_l2, "smooth_penalty": smooth_penalty, } self._prev_action = action.detach() return reward
[docs] def compute_normalized_dense_reward(self, obs: Any, action: Array, info: Dict): return self.compute_dense_reward(obs=obs, action=action, info=info) / 30.0
[docs] @register_env("TakeItBack-VLA-v0", max_episode_steps=60) class TakeItBackVLAEnv(TakeItBackVLABaseEnv): pass