增加数据采集开关

This commit is contained in:
NuoDaJia02
2026-02-01 11:03:27 +08:00
parent 5a1c55b3d8
commit 04d0606034

View File

@@ -23,8 +23,6 @@ import cv2
from nicegui import ui
from interfaces.msg import RobotWorkInfo
from starlette.staticfiles import StaticFiles
import datetime
import logging
from interfaces.action import ExecuteBtAction
from interfaces.msg import SkillCall
from interfaces.action import Arm
@@ -33,8 +31,6 @@ from interfaces.msg import ArmError
from interfaces.srv import ClearArmError
from interfaces.action import GripperCmd
from interfaces.msg import GripperStatus
import time
import json
class CtrlGuiNode(Node):
@@ -47,6 +43,8 @@ class CtrlGuiNode(Node):
self.messages_received = 0
self.rebuild_requests = 0
self.start_time = datetime.datetime.now()
# Data collection enable flag (performance toggle)
self.collection_enabled = True
# Setup logging to file
self._setup_file_logging()
@@ -90,33 +88,25 @@ class CtrlGuiNode(Node):
# Top Camera subscription
self.bridge = CvBridge()
self.last_top_image = None
self.last_top_depth = None
self.last_left_image = None
self.last_left_depth = None
self.last_right_image = None
self.last_right_depth = None
self._sub_top_image = self.create_subscription(
Image, '/camera/color/image_raw', self._on_top_image, 10
)
self._sub_top_depth = self.create_subscription(
Image, '/camera/depth/image_raw', self._on_top_depth, 10
)
# Left Camera subscription
self._sub_left_image = self.create_subscription(
Image, '/camera1/camera1/color/image_raw', self._on_left_image, 10
)
self._sub_left_depth = self.create_subscription(
Image, '/camera1/camera1/depth/image_rect_raw', self._on_left_depth, 10
)
# Right Camera subscription
self._sub_right_image = self.create_subscription(
Image, '/camera2/camera2/color/image_raw', self._on_right_image, 10
)
self._sub_right_depth = self.create_subscription(
Image, '/camera2/camera2/depth/image_rect_raw', self._on_right_depth, 10
)
self.last_top_image_msg = None
self.last_top_depth_msg = None
self.last_left_image_msg = None
self.last_left_depth_msg = None
self.last_right_image_msg = None
self.last_right_depth_msg = None
# Camera subscriptions (created only when collection is enabled)
self._sub_top_image = None
self._sub_top_depth = None
self._sub_left_image = None
self._sub_left_depth = None
self._sub_right_image = None
self._sub_right_depth = None
if self.collection_enabled:
self._create_camera_subscriptions()
# Joint state throttling
self.last_joint_state_log_time = 0
self.last_work_info_log_time = 0
# Data collection state
self.is_collecting = False
@@ -247,12 +237,17 @@ class CtrlGuiNode(Node):
def _on_arm_error(self, msg: ArmError) -> None:
"""Callback for /arm_errors subscription."""
# Only log if the error state actually changed to avoid high CPU/log spam
if self.last_arm_error.get(msg.arm_id) != msg:
self.file_logger.error(f'Arm {msg.arm_id} error state changed: {msg}')
self.last_arm_error[msg.arm_id] = msg
self.file_logger.error(f'Arm error: {msg}')
def start_collection(self) -> bool:
"""Initialize and start data collection. Returns True if successful."""
with self.collection_lock:
if not self.collection_enabled:
self.file_logger.info('Data collection is disabled by user toggle')
return False
if self.is_collecting:
return True
@@ -285,6 +280,65 @@ class CtrlGuiNode(Node):
self.is_collecting = False
self.file_logger.info('Stopped data collection')
def set_collection_enabled(self, enabled: bool) -> None:
"""Enable/disable data collection and camera subscriptions for performance."""
if self.collection_enabled == enabled:
return
self.collection_enabled = enabled
if not enabled:
self.stop_collection()
self._destroy_camera_subscriptions()
self.last_top_image_msg = None
self.last_top_depth_msg = None
self.last_left_image_msg = None
self.last_left_depth_msg = None
self.last_right_image_msg = None
self.last_right_depth_msg = None
self.file_logger.info('Data collection disabled and camera subscriptions stopped')
else:
self._create_camera_subscriptions()
self.file_logger.info('Data collection enabled and camera subscriptions started')
def _create_camera_subscriptions(self) -> None:
if self._sub_top_image is None:
self._sub_top_image = self.create_subscription(
Image, '/camera/color/image_raw', self._on_top_image, 10
)
if self._sub_top_depth is None:
self._sub_top_depth = self.create_subscription(
Image, '/camera/depth/image_raw', self._on_top_depth, 10
)
if self._sub_left_image is None:
self._sub_left_image = self.create_subscription(
Image, '/camera1/camera1/color/image_raw', self._on_left_image, 10
)
if self._sub_left_depth is None:
self._sub_left_depth = self.create_subscription(
Image, '/camera1/camera1/depth/image_rect_raw', self._on_left_depth, 10
)
if self._sub_right_image is None:
self._sub_right_image = self.create_subscription(
Image, '/camera2/camera2/color/image_raw', self._on_right_image, 10
)
if self._sub_right_depth is None:
self._sub_right_depth = self.create_subscription(
Image, '/camera2/camera2/depth/image_rect_raw', self._on_right_depth, 10
)
def _destroy_camera_subscriptions(self) -> None:
for attr in (
'_sub_top_image', '_sub_top_depth',
'_sub_left_image', '_sub_left_depth',
'_sub_right_image', '_sub_right_depth',
):
sub = getattr(self, attr, None)
if sub is not None:
try:
self.destroy_subscription(sub)
except Exception:
pass
setattr(self, attr, None)
def _on_gripper_status(self, msg: GripperStatus, gripper_id: int) -> None:
"""Callback for /gripper_status subscription."""
self.last_gripper_status[gripper_id] = msg
@@ -373,26 +427,38 @@ class CtrlGuiNode(Node):
self.last_work_info = msg
self.messages_received += 1
# Logging content for diagnostic purposes
action_strs = []
for action in msg.action:
ts = f"{action.stamp.sec}.{action.stamp.nanosec:09d}"
action_strs.append(f"{{skill: {action.skill}, action_name: {action.action_name}, params: {action.params}, execution: {action.execution}, state: {action.state}, stamp: {ts}}}")
log_msg = (
f"Message content - msg_id: {str(msg.msg_id)}, working_state: {str(msg.working_state)}, "
f"battery_capacity: {str(msg.battery_capacity)}, working_time: {str(msg.working_time)}, "
f"nav_state: {str(msg.nav_state)}, comm_quality: {str(msg.comm_quality)}, "
f"expt_completion_time: {str(msg.expt_completion_time)}, work_log: {str(msg.work_log)}, "
f"task: {str(list(msg.task))}, bt_node_status: {str(msg.bt_node_status)}, "
f"smacc_state: {str(msg.smacc_state)}, actions: [{', '.join(action_strs)}], bt_xml: {str(msg.bt_xml)}"
)
self.file_logger.info(f"Received message #{self.messages_received} from /robot_work_info")
self.file_logger.info(log_msg)
current_time = time.time()
# Throttle logging to 1Hz
if current_time - self.last_work_info_log_time >= 1.0:
# Logging content for diagnostic purposes
action_strs = []
for action in msg.action:
ts = f"{action.stamp.sec}.{action.stamp.nanosec:09d}"
action_strs.append(f"{{skill: {action.skill}, action_name: {action.action_name}, params: {action.params}, execution: {action.execution}, state: {action.state}, stamp: {ts}}}")
# Truncate bt_xml in periodic logs to save CPU and log size
bt_xml_log = (msg.bt_xml[:100] + "...") if msg.bt_xml and len(msg.bt_xml) > 100 else str(msg.bt_xml)
log_msg = (
f"Message content - msg_id: {str(msg.msg_id)}, working_state: {str(msg.working_state)}, "
f"battery_capacity: {str(msg.battery_capacity)}, working_time: {str(msg.working_time)}, "
f"nav_state: {str(msg.nav_state)}, comm_quality: {str(msg.comm_quality)}, "
f"expt_completion_time: {str(msg.expt_completion_time)}, work_log: {str(msg.work_log)}, "
f"task: {str(list(msg.task))}, bt_node_status: {str(msg.bt_node_status)}, "
f"smacc_state: {str(msg.smacc_state)}, actions: [{', '.join(action_strs)}], bt_xml: {bt_xml_log}"
)
self.file_logger.info(f"Received message #{self.messages_received} from /robot_work_info")
self.file_logger.info(log_msg)
self.last_work_info_log_time = current_time
current_bt_status = str(msg.bt_node_status)
has_bt_xml = bool(msg.bt_xml and msg.bt_xml.strip())
# If data collection is disabled, skip all collection logic
if not self.collection_enabled:
self.last_bt_status = current_bt_status
return
# Automation Trigger Start: bt_xml present AND transition to RUNNING from anything else
if not self.is_collecting and has_bt_xml and self.last_bt_status != "RUNNING" and current_bt_status == "RUNNING":
self.start_collection()
@@ -417,6 +483,8 @@ class CtrlGuiNode(Node):
def _log_session_summary(self, msg: RobotWorkInfo, end_status: str):
"""Log a summary entry and capture pictures when a BT session finishes."""
with self.collection_lock:
if not self.collection_enabled:
return
if not self.data_collection_dir:
return
skill_info_path = os.path.join(self.data_collection_dir, 'skill_info.txt')
@@ -426,21 +494,30 @@ class CtrlGuiNode(Node):
event_ts = f"{now_msg.sec}.{now_msg.nanosec:09d}"
image_files = {}
camera_data = [
('top', self.last_top_image, self.last_top_depth),
('left', self.last_left_image, self.last_left_depth),
('right', self.last_right_image, self.last_right_depth)
camera_configs = [
('top', self.last_top_image_msg, 'bgr8', self.last_top_depth_msg, 'passthrough'),
('left', self.last_left_image_msg, 'bgr8', self.last_left_depth_msg, 'passthrough'),
('right', self.last_right_image_msg, 'bgr8', self.last_right_depth_msg, 'passthrough')
]
for cam_name, color_img, depth_img in camera_data:
if color_img is not None:
fname = f"{cam_name}_color_{event_ts}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), color_img)
image_files[f"color_path_{cam_name}"] = f"images/{fname}"
if depth_img is not None:
fname = f"{cam_name}_depth_{event_ts}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), depth_img)
image_files[f"depth_path_{cam_name}"] = f"images/{fname}"
for cam_name, color_msg, color_enc, depth_msg, depth_enc in camera_configs:
if color_msg is not None:
try:
color_img = self.bridge.imgmsg_to_cv2(color_msg, desired_encoding=color_enc)
fname = f"{cam_name}_color_{event_ts}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), color_img)
image_files[f"color_path_{cam_name}"] = f"images/{fname}"
except Exception as e:
self.file_logger.error(f"Error saving color image {cam_name}: {str(e)}")
if depth_msg is not None:
try:
depth_img = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding=depth_enc)
fname = f"{cam_name}_depth_{event_ts}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), depth_img)
image_files[f"depth_path_{cam_name}"] = f"images/{fname}"
except Exception as e:
self.file_logger.error(f"Error saving depth image {cam_name}: {str(e)}")
last_task = msg.task[0] if len(msg.task) > 0 else "N/A"
@@ -462,6 +539,8 @@ class CtrlGuiNode(Node):
def _collect_data(self, msg: RobotWorkInfo):
"""Internal data collection logic for action-level changes."""
with self.collection_lock:
if not self.collection_enabled:
return
if not self.data_collection_dir:
return
@@ -502,21 +581,29 @@ class CtrlGuiNode(Node):
# Save images using action timestamp
image_files = {}
camera_data = [
('top', self.last_top_image, self.last_top_depth),
('left', self.last_left_image, self.last_left_depth),
('right', self.last_right_image, self.last_right_depth)
camera_configs = [
('top', self.last_top_image_msg, 'bgr8', self.last_top_depth_msg, 'passthrough'),
('left', self.last_left_image_msg, 'bgr8', self.last_left_depth_msg, 'passthrough'),
('right', self.last_right_image_msg, 'bgr8', self.last_right_depth_msg, 'passthrough')
]
for cam_name, color_img, depth_img in camera_data:
if color_img is not None:
fname = f"{cam_name}_color_{timestamp_str}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), color_img)
image_files[f"color_path_{cam_name}"] = f"images/{fname}"
if depth_img is not None:
fname = f"{cam_name}_depth_{timestamp_str}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), depth_img)
image_files[f"depth_path_{cam_name}"] = f"images/{fname}"
for cam_name, color_msg, color_enc, depth_msg, depth_enc in camera_configs:
if color_msg is not None:
try:
color_img = self.bridge.imgmsg_to_cv2(color_msg, desired_encoding=color_enc)
fname = f"{cam_name}_color_{timestamp_str}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), color_img)
image_files[f"color_path_{cam_name}"] = f"images/{fname}"
except Exception as e:
self.file_logger.error(f"Error saving color image {cam_name}: {str(e)}")
if depth_msg is not None:
try:
depth_img = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding=depth_enc)
fname = f"{cam_name}_depth_{timestamp_str}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), depth_img)
image_files[f"depth_path_{cam_name}"] = f"images/{fname}"
except Exception as e:
self.file_logger.error(f"Error saving depth image {cam_name}: {str(e)}")
data = {
"timestamp": timestamp_str,
@@ -583,49 +670,35 @@ class CtrlGuiNode(Node):
# for i, position in enumerate(self.last_joint_state.position):
# print(f" Joint {i}: {position}")
self.file_logger.info(f"/joint_states positions: {str(self.last_joint_state.position)}")
# Throttle logging to 1Hz
current_time = time.time()
if current_time - self.last_joint_state_log_time >= 1.0:
self.file_logger.info(f"/joint_states positions: {str(self.last_joint_state.position)}")
self.last_joint_state_log_time = current_time
def _on_top_image(self, msg: Image) -> None:
"""Callback for /camera/color/image_raw subscription: store latest top image."""
try:
self.last_top_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as e:
self.file_logger.error(f"Error converting top image: {str(e)}")
self.last_top_image_msg = msg
def _on_top_depth(self, msg: Image) -> None:
"""Callback for /camera/depth/image_raw subscription: store latest top depth image."""
try:
self.last_top_depth = self.bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
except Exception as e:
self.file_logger.error(f"Error converting top depth image: {str(e)}")
self.last_top_depth_msg = msg
def _on_left_image(self, msg: Image) -> None:
"""Callback for /camera1/camera1/color/image_raw subscription: store latest left image."""
try:
self.last_left_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as e:
self.file_logger.error(f"Error converting left image: {str(e)}")
self.last_left_image_msg = msg
def _on_left_depth(self, msg: Image) -> None:
"""Callback for /camera1/camera1/depth/image_rect_raw subscription: store latest left depth image."""
try:
self.last_left_depth = self.bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
except Exception as e:
self.file_logger.error(f"Error converting left depth image: {str(e)}")
self.last_left_depth_msg = msg
def _on_right_image(self, msg: Image) -> None:
"""Callback for /camera2/camera1/color/image_raw subscription: store latest right image."""
try:
self.last_right_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as e:
self.file_logger.error(f"Error converting right image: {str(e)}")
self.last_right_image_msg = msg
def _on_right_depth(self, msg: Image) -> None:
"""Callback for /camera2/camera2/depth/image_rect_raw subscription: store latest right depth image."""
try:
self.last_right_depth = self.bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
except Exception as e:
self.file_logger.error(f"Error converting right depth image: {str(e)}")
self.last_right_depth_msg = msg
def publish_info(self, text: str) -> None:
msg = String()
@@ -730,6 +803,15 @@ def build_ui(node: CtrlGuiNode) -> None:
collect_btn = ui.button('采集数据', on_click=toggle_collection)
def toggle_collection_enabled():
node.set_collection_enabled(not node.collection_enabled)
if not node.collection_enabled:
ui.notify('已禁用数据采集与图像保存流程', color='warning')
else:
ui.notify('已启用数据采集与图像保存流程', color='positive')
collection_enable_btn = ui.button('禁用采集流程', on_click=toggle_collection_enabled)
# confirm dialog function
def show_confirm_dialog(message: str, confirm_callback) -> None:
"""显示确认对话框
@@ -1128,6 +1210,13 @@ def build_ui(node: CtrlGuiNode) -> None:
gripper1_state_label = ui.label('State: N/A')
# Update the labels periodically from the UI thread (safe cross-thread update)
last_ui_values = {}
def _set_label_if_changed(label, key: str, text: str) -> None:
if last_ui_values.get(key) != text:
label.set_text(text)
last_ui_values[key] = text
def update_work_info():
msg = node.last_work_info
@@ -1139,29 +1228,37 @@ def build_ui(node: CtrlGuiNode) -> None:
collect_btn.set_text('采集数据')
collect_btn.props('color=primary')
# Keep collection enable button state synced
if node.collection_enabled:
collection_enable_btn.set_text('禁用采集流程')
collection_enable_btn.props('color=primary')
else:
collection_enable_btn.set_text('启用采集流程')
collection_enable_btn.props('color=grey')
if not msg:
return
# Basic Information
msg_id_label.set_text(f'Message ID: {str(msg.msg_id)}')
working_state_label.set_text(f'Working State: {str(msg.working_state)}')
battery_capacity_label.set_text(f'Battery Capacity: {str(msg.battery_capacity)}%')
working_time_label.set_text(f'Working Time: {msg.working_time:.3f} h')
nav_state_label.set_text(f'Navigation State: {str(msg.nav_state)}')
comm_quality_label.set_text(f'Communication Quality: {str(msg.comm_quality)}')
expt_completion_time_label.set_text(f'Expected Completion Time: {str(msg.expt_completion_time)}')
work_log_label.set_text(f'Work Log: {str(msg.work_log)}')
_set_label_if_changed(msg_id_label, 'msg_id', f'Message ID: {str(msg.msg_id)}')
_set_label_if_changed(working_state_label, 'working_state', f'Working State: {str(msg.working_state)}')
_set_label_if_changed(battery_capacity_label, 'battery_capacity', f'Battery Capacity: {str(msg.battery_capacity)}%')
_set_label_if_changed(working_time_label, 'working_time', f'Working Time: {msg.working_time:.3f} h')
_set_label_if_changed(nav_state_label, 'nav_state', f'Navigation State: {str(msg.nav_state)}')
_set_label_if_changed(comm_quality_label, 'comm_quality', f'Communication Quality: {str(msg.comm_quality)}')
_set_label_if_changed(expt_completion_time_label, 'expt_completion_time', f'Expected Completion Time: {str(msg.expt_completion_time)}')
_set_label_if_changed(work_log_label, 'work_log', f'Work Log: {str(msg.work_log)}')
# Task Information
tasks = list(msg.task)
if len(tasks) >= 3:
last_task_label.set_text(f'Last Task: {str(tasks[0])}')
next_task_label.set_text(f'Next Task: {str(tasks[1])}')
current_task_label.set_text(f'Current Task: {str(tasks[2])}')
_set_label_if_changed(last_task_label, 'last_task', f'Last Task: {str(tasks[0])}')
_set_label_if_changed(next_task_label, 'next_task', f'Next Task: {str(tasks[1])}')
_set_label_if_changed(current_task_label, 'current_task', f'Current Task: {str(tasks[2])}')
else:
last_task_label.set_text('Last Task: N/A')
next_task_label.set_text('Next Task: N/A')
current_task_label.set_text('Current Task: N/A')
_set_label_if_changed(last_task_label, 'last_task', 'Last Task: N/A')
_set_label_if_changed(next_task_label, 'next_task', 'Next Task: N/A')
_set_label_if_changed(current_task_label, 'current_task', 'Current Task: N/A')
if hasattr(node, 'last_joint_state') and hasattr(node.last_joint_state, 'position'):
positions = list(node.last_joint_state.position)
@@ -1170,40 +1267,40 @@ def build_ui(node: CtrlGuiNode) -> None:
right_arm_subset = positions[15:21]
formatted_left = [f"{p * 180.0 / 3.14:.2f}" for p in left_arm_subset]
formatted_right = [f"{p * 180.0 / 3.14:.2f}" for p in right_arm_subset]
left_arm_joint_label.set_text(f'Left Arm Joint: {", ".join(formatted_left)}')
right_arm_joint_label.set_text(f'Right Arm Joint: {", ".join(formatted_right)}')
_set_label_if_changed(left_arm_joint_label, 'left_arm_joint', f'Left Arm Joint: {", ".join(formatted_left)}')
_set_label_if_changed(right_arm_joint_label, 'right_arm_joint', f'Right Arm Joint: {", ".join(formatted_right)}')
else:
left_arm_joint_label.set_text('Left Arm Joint: N/A (not enough joints)')
right_arm_joint_label.set_text('Right Arm Joint: N/A (not enough joints)')
_set_label_if_changed(left_arm_joint_label, 'left_arm_joint', 'Left Arm Joint: N/A (not enough joints)')
_set_label_if_changed(right_arm_joint_label, 'right_arm_joint', 'Right Arm Joint: N/A (not enough joints)')
else:
left_arm_joint_label.set_text('Left Arm Joint: N/A')
right_arm_joint_label.set_text('Right Arm Joint: N/A')
_set_label_if_changed(left_arm_joint_label, 'left_arm_joint', 'Left Arm Joint: N/A')
_set_label_if_changed(right_arm_joint_label, 'right_arm_joint', 'Right Arm Joint: N/A')
# Skill Information
if msg.action:
primary_action = msg.action[0]
timestamp_label.set_text(f'Timestamp: {str(primary_action.stamp.sec)}.{str(primary_action.stamp.nanosec)}')
skill_label.set_text(f'Skill: {str(primary_action.skill)}')
action_name_label.set_text(f'Action Name: {str(primary_action.action_name)}')
instance_params_label.set_text(f'Instance Parameters: {str(primary_action.params)}')
_set_label_if_changed(timestamp_label, 'timestamp', f'Timestamp: {str(primary_action.stamp.sec)}.{str(primary_action.stamp.nanosec)}')
_set_label_if_changed(skill_label, 'skill', f'Skill: {str(primary_action.skill)}')
_set_label_if_changed(action_name_label, 'action_name', f'Action Name: {str(primary_action.action_name)}')
_set_label_if_changed(instance_params_label, 'instance_params', f'Instance Parameters: {str(primary_action.params)}')
else:
timestamp_label.set_text('Timestamp: N/A')
skill_label.set_text('Skill: N/A')
action_name_label.set_text('Action Name: N/A')
instance_params_label.set_text('Instance Parameters: N/A')
_set_label_if_changed(timestamp_label, 'timestamp', 'Timestamp: N/A')
_set_label_if_changed(skill_label, 'skill', 'Skill: N/A')
_set_label_if_changed(action_name_label, 'action_name', 'Action Name: N/A')
_set_label_if_changed(instance_params_label, 'instance_params', 'Instance Parameters: N/A')
bt_node_status_label.set_text(f'BT Node Status: {str(msg.bt_node_status)}')
smacc_state_label.set_text(f'SMACC State: {str(msg.smacc_state)}')
_set_label_if_changed(bt_node_status_label, 'bt_node_status', f'BT Node Status: {str(msg.bt_node_status)}')
_set_label_if_changed(smacc_state_label, 'smacc_state', f'SMACC State: {str(msg.smacc_state)}')
# Update Arm Diagnostics
for arm_id, label_err, label_brake in [(0, left_arm_err_label, left_arm_brake_label), (1, right_arm_err_label, right_arm_brake_label)]:
arm_msg = node.last_arm_error.get(arm_id)
if arm_msg:
label_err.set_text(f'Errors: {list(arm_msg.err_flag)}')
label_brake.set_text(f'Brakes: {list(arm_msg.brake_state)} (1=Locked, 0=Released)')
_set_label_if_changed(label_err, f'arm_err_{arm_id}', f'Errors: {list(arm_msg.err_flag)}')
_set_label_if_changed(label_brake, f'arm_brake_{arm_id}', f'Brakes: {list(arm_msg.brake_state)} (1=Locked, 0=Released)')
else:
label_err.set_text('Errors: N/A')
label_brake.set_text('Brakes: N/A')
_set_label_if_changed(label_err, f'arm_err_{arm_id}', 'Errors: N/A')
_set_label_if_changed(label_brake, f'arm_brake_{arm_id}', 'Brakes: N/A')
# Update Gripper Status
for gid, l_pos, l_speed, l_torque, l_state in [
@@ -1212,15 +1309,15 @@ def build_ui(node: CtrlGuiNode) -> None:
]:
g_msg = node.last_gripper_status.get(gid)
if g_msg and hasattr(g_msg, 'loc'):
l_pos.set_text(f'Pos: {g_msg.loc}')
l_speed.set_text(f'Speed: {g_msg.speed}')
l_torque.set_text(f'Torque: {g_msg.torque}')
l_state.set_text(f'State: {g_msg.state}')
_set_label_if_changed(l_pos, f'gripper_{gid}_pos', f'Pos: {g_msg.loc}')
_set_label_if_changed(l_speed, f'gripper_{gid}_speed', f'Speed: {g_msg.speed}')
_set_label_if_changed(l_torque, f'gripper_{gid}_torque', f'Torque: {g_msg.torque}')
_set_label_if_changed(l_state, f'gripper_{gid}_state', f'State: {g_msg.state}')
else:
l_pos.set_text('Pos: N/A')
l_speed.set_text('Speed: N/A')
l_torque.set_text('Torque: N/A')
l_state.set_text('State: N/A')
_set_label_if_changed(l_pos, f'gripper_{gid}_pos', 'Pos: N/A')
_set_label_if_changed(l_speed, f'gripper_{gid}_speed', 'Speed: N/A')
_set_label_if_changed(l_torque, f'gripper_{gid}_torque', 'Torque: N/A')
_set_label_if_changed(l_state, f'gripper_{gid}_state', 'State: N/A')
ui.timer(0.5, update_work_info)