优化数据采集

This commit is contained in:
NuoDaJia02
2026-01-29 18:45:51 +08:00
parent ec279ec154
commit 5a1c55b3d8

View File

@@ -4,10 +4,14 @@ This module starts a NiceGUI web page and an rclpy node. Clicking the button on
page publishes a std_msgs/String message to the /led_cmd topic.
"""
import atexit
from email.mime import text
import threading
import os
import socket
import json
import time
import asyncio
import datetime
import logging
import rclpy
from rclpy.node import Node
from rclpy.action.client import ActionClient
@@ -119,6 +123,8 @@ class CtrlGuiNode(Node):
self.data_collection_dir = None
self.last_action_name = ""
self.last_bt_status = ""
self.saved_actions = {} # (timestamp, action_name) -> last_known_state
self.collection_lock = threading.Lock()
self.file_logger.info('CtrlGuiNode initialized')
self.file_logger.info(f'Node started at: {self.start_time}')
@@ -244,6 +250,41 @@ class CtrlGuiNode(Node):
self.last_arm_error[msg.arm_id] = msg
self.file_logger.error(f'Arm error: {msg}')
def start_collection(self) -> bool:
"""Initialize and start data collection. Returns True if successful."""
with self.collection_lock:
if self.is_collecting:
return True
now = datetime.datetime.now()
date_str = now.strftime("%Y%m%d")
time_str = now.strftime("%H%M%S")
data_base_dir = "/home/demo/hivecore_robot_os1/data"
self.data_collection_dir = os.path.join(data_base_dir, date_str, time_str)
try:
os.makedirs(self.data_collection_dir, exist_ok=True)
os.makedirs(os.path.join(self.data_collection_dir, 'images'), exist_ok=True)
skill_info_path = os.path.join(self.data_collection_dir, 'skill_info.txt')
with open(skill_info_path, 'w') as f:
pass
self.is_collecting = True
self.last_action_name = ""
self.saved_actions = {}
self.file_logger.info(f'Started data collection: {self.data_collection_dir}')
return True
except Exception as e:
self.file_logger.error(f'Failed to start collection: {str(e)}')
return False
def stop_collection(self):
"""Stop data collection."""
with self.collection_lock:
if not self.is_collecting:
return
self.is_collecting = False
self.file_logger.info('Stopped data collection')
def _on_gripper_status(self, msg: GripperStatus, gripper_id: int) -> None:
"""Callback for /gripper_status subscription."""
self.last_gripper_status[gripper_id] = msg
@@ -331,16 +372,197 @@ class CtrlGuiNode(Node):
"""Callback for /robot_work_info subscription: store latest text."""
self.last_work_info = msg
self.messages_received += 1
# keep logs lightweight to avoid flooding
self.file_logger.debug(f"/robot_work_info: {str(self.last_work_info)}")
# Log to file with message content
# Logging content for diagnostic purposes
action_strs = []
for action in msg.action:
ts = f"{action.stamp.sec}.{action.stamp.nanosec:09d}"
action_strs.append(f"{{skill: {action.skill}, action_name: {action.action_name}, params: {action.params}, execution: {action.execution}, state: {action.state}, stamp: {ts}}}")
log_msg = (
f"Message content - msg_id: {str(msg.msg_id)}, working_state: {str(msg.working_state)}, "
f"battery_capacity: {str(msg.battery_capacity)}, working_time: {str(msg.working_time)}, "
f"nav_state: {str(msg.nav_state)}, comm_quality: {str(msg.comm_quality)}, "
f"expt_completion_time: {str(msg.expt_completion_time)}, work_log: {str(msg.work_log)}, "
f"task: {str(list(msg.task))}, bt_node_status: {str(msg.bt_node_status)}, "
f"smacc_state: {str(msg.smacc_state)}, actions: [{', '.join(action_strs)}], bt_xml: {str(msg.bt_xml)}"
)
self.file_logger.info(f"Received message #{self.messages_received} from /robot_work_info")
self.file_logger.info(f"Message content - msg_id: {str(msg.msg_id)}, timestamp: {str(msg.stamp.sec)}.{str(msg.stamp.nanosec)}, working_state: {str(msg.working_state)}, battery_capacity: {str(msg.battery_capacity)}, working_time: {str(msg.working_time)}, nav_state: {str(msg.nav_state)}, comm_quality: {str(msg.comm_quality)}, expt_completion_time: {str(msg.expt_completion_time)}, work_log: {str(msg.work_log)}, task: {str(list(msg.task))}, skill: {str(msg.skill)}, action_name: {str(msg.action_name)}, instance_params: {str(msg.instance_params)}, bt_node_status: {str(msg.bt_node_status)}, smacc_state: {str(msg.smacc_state)}")
# Log periodically to avoid spamming the logs
if self.messages_received % 10 == 0:
self.file_logger.info(f'Received {self.messages_received} messages from /robot_work_info')
self.file_logger.info(log_msg)
current_bt_status = str(msg.bt_node_status)
has_bt_xml = bool(msg.bt_xml and msg.bt_xml.strip())
# Automation Trigger Start: bt_xml present AND transition to RUNNING from anything else
if not self.is_collecting and has_bt_xml and self.last_bt_status != "RUNNING" and current_bt_status == "RUNNING":
self.start_collection()
# Handle data collection
if self.is_collecting:
# Automation Trigger Stop: transition from RUNNING to anything else
if self.last_bt_status == "RUNNING" and current_bt_status != "RUNNING":
# Final storage pass: Session summary + current actions
self._log_session_summary(msg, current_bt_status)
self._collect_data(msg)
self.stop_collection()
else:
self._collect_data(msg)
self.last_bt_status = current_bt_status
# Log summary periodically
if self.messages_received % 100 == 0:
self.file_logger.info(f'Processed {self.messages_received} messages from /robot_work_info')
def _log_session_summary(self, msg: RobotWorkInfo, end_status: str):
"""Log a summary entry and capture pictures when a BT session finishes."""
with self.collection_lock:
if not self.data_collection_dir:
return
skill_info_path = os.path.join(self.data_collection_dir, 'skill_info.txt')
try:
# Use current ROS time for the summary event
now_msg = self.get_clock().now().to_msg()
event_ts = f"{now_msg.sec}.{now_msg.nanosec:09d}"
image_files = {}
camera_data = [
('top', self.last_top_image, self.last_top_depth),
('left', self.last_left_image, self.last_left_depth),
('right', self.last_right_image, self.last_right_depth)
]
for cam_name, color_img, depth_img in camera_data:
if color_img is not None:
fname = f"{cam_name}_color_{event_ts}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), color_img)
image_files[f"color_path_{cam_name}"] = f"images/{fname}"
if depth_img is not None:
fname = f"{cam_name}_depth_{event_ts}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), depth_img)
image_files[f"depth_path_{cam_name}"] = f"images/{fname}"
last_task = msg.task[0] if len(msg.task) > 0 else "N/A"
summary_data = {
"timestamp": event_ts,
"event": "BT_SESSION_FINISHED",
"bt_status": end_status,
"last_task": last_task,
**image_files
}
with open(skill_info_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(summary_data, ensure_ascii=False) + '\n')
self.file_logger.info(f"BT Session finished with status {end_status}. Summary logged.")
except Exception as e:
self.file_logger.error(f"Failed to log BT session summary: {str(e)}")
def _collect_data(self, msg: RobotWorkInfo):
"""Internal data collection logic for action-level changes."""
with self.collection_lock:
if not self.data_collection_dir:
return
skill_info_path = os.path.join(self.data_collection_dir, 'skill_info.txt')
# Save BehaviorTree XML if present (only once per session)
if msg.bt_xml and msg.bt_xml.strip():
bt_xml_path = os.path.join(self.data_collection_dir, 'behaviortree.xml')
if not os.path.exists(bt_xml_path):
try:
with open(bt_xml_path, 'w', encoding='utf-8') as f:
f.write(msg.bt_xml)
self.file_logger.info(f"Saved non-empty BehaviorTree XML to {bt_xml_path}")
except Exception as e:
self.file_logger.error(f"Failed to save BehaviorTree XML: {str(e)}")
for action_item in msg.action:
timestamp_str = f"{action_item.stamp.sec}.{action_item.stamp.nanosec:09d}"
action_name = str(action_item.action_name)
action_key = (timestamp_str, action_name)
current_state = str(action_item.state)
# Case 1: New action instance
if action_key not in self.saved_actions:
try:
# Parse params
params_data = {}
if action_item.params:
for line in action_item.params.strip().split('\n'):
if ':' in line:
k, v = line.split(':', 1)
key, val = k.strip(), v.strip()
try:
# Handle lists or basic JSON types
params_data[key] = json.loads(val.replace("'", '"'))
except:
params_data[key] = val
# Save images using action timestamp
image_files = {}
camera_data = [
('top', self.last_top_image, self.last_top_depth),
('left', self.last_left_image, self.last_left_depth),
('right', self.last_right_image, self.last_right_depth)
]
for cam_name, color_img, depth_img in camera_data:
if color_img is not None:
fname = f"{cam_name}_color_{timestamp_str}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), color_img)
image_files[f"color_path_{cam_name}"] = f"images/{fname}"
if depth_img is not None:
fname = f"{cam_name}_depth_{timestamp_str}.png"
cv2.imwrite(os.path.join(self.data_collection_dir, 'images', fname), depth_img)
image_files[f"depth_path_{cam_name}"] = f"images/{fname}"
data = {
"timestamp": timestamp_str,
"skill": str(action_item.skill),
"action_name": action_name,
"action_status": current_state,
"execution": str(action_item.execution),
"parameters": params_data,
**image_files
}
with open(skill_info_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(data, ensure_ascii=False) + '\n')
self.saved_actions[action_key] = current_state
self.last_action_name = action_name
except Exception as e:
self.file_logger.error(f'Failed to log new action {action_name}: {str(e)}')
# Case 2: Existing action, status changed (e.g. from RUNNING to SUCCESS/FAILURE)
elif current_state != self.saved_actions[action_key]:
try:
if os.path.exists(skill_info_path):
with open(skill_info_path, 'r', encoding='utf-8') as f:
file_lines = f.readlines()
updated = False
for i in range(len(file_lines)-1, -1, -1):
try:
entry = json.loads(file_lines[i].strip())
if entry.get("timestamp") == timestamp_str and entry.get("action_name") == action_name:
entry["action_status"] = current_state
file_lines[i] = json.dumps(entry, ensure_ascii=False) + '\n'
updated = True
break
except:
continue
if updated:
with open(skill_info_path, 'w', encoding='utf-8') as f:
f.writelines(file_lines)
self.saved_actions[action_key] = current_state
self.file_logger.info(f"Updated action_status for {action_name} at {timestamp_str} to {current_state}")
except Exception as e:
self.file_logger.error(f'Failed to update action status for {action_name}: {str(e)}')
def _on_rebuild_response(self, future):
"""Callback for rebuild_now service response."""
@@ -498,29 +720,12 @@ def build_ui(node: CtrlGuiNode) -> None:
def toggle_collection():
if not node.is_collecting:
# Start collecting
timestamp_s = int(time.time())
data_base_dir = "/home/demo/hivecore_robot_os1/data"
node.data_collection_dir = os.path.join(data_base_dir, str(timestamp_s))
try:
os.makedirs(node.data_collection_dir, exist_ok=True)
os.makedirs(os.path.join(node.data_collection_dir, 'images'), exist_ok=True)
skill_info_path = os.path.join(node.data_collection_dir, 'skill_info.txt')
# Create empty file
with open(skill_info_path, 'w') as f:
pass
node.is_collecting = True
node.last_action_name = "" # Reset to ensure we capture the first state
collect_btn.set_text('停止采集')
collect_btn.props('color=red')
if node.start_collection():
ui.notify(f'开始采集数据: {node.data_collection_dir}')
except Exception as e:
ui.notify(f'创建采集目录失败: {str(e)}', color='negative')
else:
ui.notify('创建采集目录失败', color='negative')
else:
# Stop collecting
node.is_collecting = False
collect_btn.set_text('采集数据')
collect_btn.props('color=primary')
node.stop_collection()
ui.notify('停止采集数据')
collect_btn = ui.button('采集数据', on_click=toggle_collection)
@@ -926,200 +1131,17 @@ def build_ui(node: CtrlGuiNode) -> None:
def update_work_info():
msg = node.last_work_info
# Data collection logic
# Keep button appearance synced with node state (handles automation triggered by ROS)
if node.is_collecting:
current_action_name = str(msg.action_name)
current_bt_status = str(msg.bt_node_status)
skill_info_path = os.path.join(node.data_collection_dir, 'skill_info.txt')
# 记录逻辑:
# 1. 如果 action_name 变了,新增一行数据
if current_action_name != node.last_action_name:
try:
# 解析 instance_params
try:
# 先尝试直接作为 JSON 解析
instance_params_data = json.loads(msg.instance_params)
except (json.JSONDecodeError, TypeError):
# 如果不是 JSON尝试解析 "key: value\n" 格式
instance_params_data = {}
if isinstance(msg.instance_params, str) and msg.instance_params.strip():
lines = msg.instance_params.strip().split('\n')
for line in lines:
if ':' in line:
key, val = line.split(':', 1)
key = key.strip()
val = val.strip()
# 尝试将值转换为 JSON (处理数字、列表等)
try:
# 兼容单引号
cleaned_val = val.replace("'", '"')
instance_params_data[key] = json.loads(cleaned_val)
except:
instance_params_data[key] = val
elif line.strip():
instance_params_data[f"param_{len(instance_params_data)}"] = line.strip()
# 如果没有任何键值对,回退为原始字符串
if not instance_params_data and msg.instance_params:
instance_params_data = str(msg.instance_params)
collect_btn.set_text('停止采集')
collect_btn.props('color=red')
else:
collect_btn.set_text('采集数据')
collect_btn.props('color=primary')
# Save images if available
# timestamp = msg.stamp.sec + msg.stamp.nanosec * 1e-9
timestamp = f"{msg.stamp.sec}.{msg.stamp.nanosec:09d}"
image_filename_top = ""
image_filename_top_depth = ""
image_filename_left = ""
image_filename_left_depth = ""
image_filename_right = ""
image_filename_right_depth = ""
if not msg:
return
if node.last_top_image is not None:
try:
image_filename_top = f"top_color_{timestamp}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', image_filename_top), node.last_top_image)
except Exception as e:
node.file_logger.error(f"Failed to save top image: {str(e)}")
if node.last_top_depth is not None:
try:
image_filename_top_depth = f"top_depth_{timestamp}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', image_filename_top_depth), node.last_top_depth)
except Exception as e:
node.file_logger.error(f"Failed to save top depth image: {str(e)}")
if node.last_left_image is not None:
try:
image_filename_left = f"left_color_{timestamp}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', image_filename_left), node.last_left_image)
except Exception as e:
node.file_logger.error(f"Failed to save left image: {str(e)}")
if node.last_left_depth is not None:
try:
image_filename_left_depth = f"left_depth_{timestamp}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', image_filename_left_depth), node.last_left_depth)
except Exception as e:
node.file_logger.error(f"Failed to save left depth image: {str(e)}")
if node.last_right_image is not None:
try:
image_filename_right = f"right_color_{timestamp}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', image_filename_right), node.last_right_image)
except Exception as e:
node.file_logger.error(f"Failed to save right image: {str(e)}")
if node.last_right_depth is not None:
try:
image_filename_right_depth = f"right_depth_{timestamp}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', image_filename_right_depth), node.last_right_depth)
except Exception as e:
node.file_logger.error(f"Failed to save right depth image: {str(e)}")
data = {
"timestamp": timestamp, #f"{msg.stamp.sec}.{msg.stamp.nanosec:09d}",
"skill": str(msg.skill),
"action_name": current_action_name,
"action_status": current_bt_status,
"parameters": instance_params_data,
"color_path_top": f"images/{image_filename_top}" if image_filename_top else "",
"depth_path_top": f"images/{image_filename_top_depth}" if image_filename_top_depth else "",
"color_path_left": f"images/{image_filename_left}" if image_filename_left else "",
"depth_path_left": f"images/{image_filename_left_depth}" if image_filename_left_depth else "",
"color_path_right": f"images/{image_filename_right}" if image_filename_right else "",
"depth_path_right": f"images/{image_filename_right_depth}" if image_filename_right_depth else ""
}
with open(skill_info_path, 'a') as f:
f.write(json.dumps(data, ensure_ascii=False) + '\n')
node.last_action_name = current_action_name
node.last_bt_status = current_bt_status
except Exception as e:
node.file_logger.error(f'Failed to log new action: {str(e)}')
# 2. 如果 action_name 没变,但 bt_node_status 变了,更新当前最后一行的内容
elif current_bt_status != node.last_bt_status:
try:
if os.path.exists(skill_info_path):
with open(skill_info_path, 'r') as f:
lines = f.readlines()
if lines:
# 解析最后一行
last_entry = json.loads(lines[-1].strip())
# 只有当 action_name 匹配时才更新状态
if last_entry.get("action_name") == current_action_name:
last_entry["action_status"] = current_bt_status
lines[-1] = json.dumps(last_entry, ensure_ascii=False) + '\n'
with open(skill_info_path, 'w') as f:
f.writelines(lines)
else:
# 如果不匹配(理论上不应发生),可以视为新动作或记录异常
node.file_logger.warning(f'Action name mismatch during status update: {last_entry.get("action_name")} vs {current_action_name}')
# 当检测到 action_status 不等于 RUNNING 时,额外触发一次图片存储并记录
if current_bt_status != "RUNNING" and current_bt_status.strip():
# Save images
timestamp = time.time()
img_top = ""
dep_top = ""
img_left = ""
dep_left = ""
img_right = ""
dep_right = ""
if node.last_top_image is not None:
try:
img_top = f"top_color_{timestamp:.9f}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', img_top), node.last_top_image)
except: pass
if node.last_top_depth is not None:
try:
dep_top = f"top_depth_{timestamp:.9f}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', dep_top), node.last_top_depth)
except: pass
if node.last_left_image is not None:
try:
img_left = f"left_color_{timestamp:.9f}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', img_left), node.last_left_image)
except: pass
if node.last_left_depth is not None:
try:
dep_left = f"left_depth_{timestamp:.9f}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', dep_left), node.last_left_depth)
except: pass
if node.last_right_image is not None:
try:
img_right = f"right_color_{timestamp:.9f}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', img_right), node.last_right_image)
except: pass
if node.last_right_depth is not None:
try:
dep_right = f"right_depth_{timestamp:.9f}.png"
cv2.imwrite(os.path.join(node.data_collection_dir, 'images', dep_right), node.last_right_depth)
except: pass
tasks = list(msg.task)
last_task_val = str(tasks[0]) if tasks else "N/A"
result_data = {
"timestamp": f"{timestamp:.9f}",
"schedule": last_task_val,
"action_status": current_bt_status,
"color_path_top": f"images/{img_top}" if img_top else "",
"depth_path_top": f"images/{dep_top}" if dep_top else "",
"color_path_left": f"images/{img_left}" if img_left else "",
"depth_path_left": f"images/{dep_left}" if dep_left else "",
"color_path_right": f"images/{img_right}" if img_right else "",
"depth_path_right": f"images/{dep_right}" if dep_right else ""
}
with open(skill_info_path, 'a') as f:
f.write(json.dumps(result_data, ensure_ascii=False) + '\n')
node.last_bt_status = current_bt_status
except Exception as e:
node.file_logger.error(f'Failed to update action_status: {str(e)}')
# Basic Information
msg_id_label.set_text(f'Message ID: {str(msg.msg_id)}')
working_state_label.set_text(f'Working State: {str(msg.working_state)}')
@@ -1158,10 +1180,18 @@ def build_ui(node: CtrlGuiNode) -> None:
right_arm_joint_label.set_text('Right Arm Joint: N/A')
# Skill Information
timestamp_label.set_text(f'Timestamp: {str(msg.stamp.sec)}.{str(msg.stamp.nanosec)}')
skill_label.set_text(f'Skill: {str(msg.skill)}')
action_name_label.set_text(f'Action Name: {str(msg.action_name)}')
instance_params_label.set_text(f'Instance Parameters: {str(msg.instance_params)}')
if msg.action:
primary_action = msg.action[0]
timestamp_label.set_text(f'Timestamp: {str(primary_action.stamp.sec)}.{str(primary_action.stamp.nanosec)}')
skill_label.set_text(f'Skill: {str(primary_action.skill)}')
action_name_label.set_text(f'Action Name: {str(primary_action.action_name)}')
instance_params_label.set_text(f'Instance Parameters: {str(primary_action.params)}')
else:
timestamp_label.set_text('Timestamp: N/A')
skill_label.set_text('Skill: N/A')
action_name_label.set_text('Action Name: N/A')
instance_params_label.set_text('Instance Parameters: N/A')
bt_node_status_label.set_text(f'BT Node Status: {str(msg.bt_node_status)}')
smacc_state_label.set_text(f'SMACC State: {str(msg.smacc_state)}')