add button and config

This commit is contained in:
2025-10-25 11:29:12 +08:00
parent 1013a4782b
commit 4df742e6ca

View File

@@ -1,14 +1,16 @@
"""NiceGUI + ROS2 example.
This module starts a NiceGUI web page and an rclpy node. Clicking the button on the
page publishes a std_msgs/String message to the /robot_status topic.
page publishes a std_msgs/String message to the /led_cmd topic.
"""
import atexit
from email.mime import text
import threading
import os
import socket
import rclpy
from rclpy.node import Node
from rclpy.action.client import ActionClient
from std_msgs.msg import String
from std_srvs.srv import Trigger
from nicegui import ui
@@ -16,9 +18,11 @@ from interfaces.msg import RobotWorkInfo
from starlette.staticfiles import StaticFiles
import datetime
import logging
from interfaces.action import ExecuteBtAction
from interfaces.msg import SkillCall
class CtrlGuiNode(Node):
"""ROS2 node that publishes String messages to /robot_status."""
"""ROS2 node that publishes String messages to /led_cmd."""
def __init__(self, node_name: str = 'ctrlgui_node'):
super().__init__(node_name)
@@ -31,17 +35,47 @@ class CtrlGuiNode(Node):
# Setup logging to file
self._setup_file_logging()
# publisher on /robot_status
self.publisher_ = self.create_publisher(String, '/robot_status', 10)
# publisher on /led_cmd
self.publisher_ = self.create_publisher(String, '/led_cmd', 10)
self.run_trigger_ = self.create_client(Trigger, '/cerebrum/rebuild_now')
# subscribe to /robot_work_info to display incoming messages in the UI
self.last_work_info: RobotWorkInfo = RobotWorkInfo()
self._sub_work_info = self.create_subscription(
RobotWorkInfo, '/robot_work_info', self._on_work_info, 10
)
self.get_logger().info('CtrlGuiNode initialized')
self.get_logger().info(f'Node started at: {self.start_time}')
# bt action client
self.exe_bt_action_client_ = ActionClient(self, ExecuteBtAction, '/execute_bt_action')
self.exe_bt_action_epoch = 0
self.file_logger.info('CtrlGuiNode initialized')
self.file_logger.info(f'Node started at: {self.start_time}')
def bt_action_send_goal(self, action_name: str, str_params: SkillCall):
"""Callback to execute a Behavior Tree action."""
goal_msg = ExecuteBtAction.Goal()
goal_msg.action_name = action_name
goal_msg.calls = [str_params]
self.exe_bt_action_epoch += 1
goal_msg.epoch = self.exe_bt_action_epoch
print(f"goal_msg: {goal_msg}")
self.file_logger.info(f'Sending goal to execute BT action: {action_name}')
self.file_logger.info(f"goal_msg: {goal_msg}")
self.exe_bt_action_client_.wait_for_server(10.0)
send_goal_future = self.exe_bt_action_client_.send_goal_async(goal_msg)
send_goal_future.add_done_callback(self.bt_action_response_callback)
self.file_logger.info(f'Action {action_name} goal sent.')
def bt_action_response_callback(self, future):
"""Callback for BT action response."""
goal_handle = future.result()
if not goal_handle.accepted:
self.file_logger.warning('BT action goal rejected')
return
self.file_logger.info('BT action goal accepted')
def _setup_file_logging(self):
"""Setup logging to a file with timestamp in filename."""
@@ -67,15 +101,15 @@ class CtrlGuiNode(Node):
self.file_logger.addHandler(file_handler)
self.log_filename = log_filename
self.get_logger().info('File logging setup complete.')
self.file_logger.info('File logging setup complete.')
def _on_work_info(self, msg: RobotWorkInfo) -> None:
"""Callback for /robot_work_info subscription: store latest text."""
self.last_work_info = msg
self.messages_received += 1
# keep logs lightweight to avoid flooding
self.get_logger().debug(f"/robot_work_info: {str(self.last_work_info)}")
self.file_logger.debug(f"/robot_work_info: {str(self.last_work_info)}")
# Log to file with message content
self.file_logger.info(f"Received message #{self.messages_received} from /robot_work_info")
@@ -83,24 +117,32 @@ class CtrlGuiNode(Node):
# Log periodically to avoid spamming the logs
if self.messages_received % 10 == 0:
self.get_logger().info(f'Received {self.messages_received} messages from /robot_work_info')
self.file_logger.info(f'Received {self.messages_received} messages from /robot_work_info')
def _on_rebuild_response(self, future):
"""Callback for rebuild_now service response."""
try:
response = future.result()
if response.success:
self.file_logger.info('Rebuild request successful')
else:
self.file_logger.warning(f'Rebuild request failed: {response.message}')
except Exception as e:
self.file_logger.error(f'Rebuild request exception: {str(e)}')
def publish_info(self, text: str) -> None:
msg = String()
msg.data = text
self.publisher_.publish(msg)
self.messages_sent += 1
self.get_logger().info(f'Published to /robot_status: {str(text)}')
self.get_logger().info(f'Total messages sent: {str(self.messages_sent)}')
# Log to file
self.file_logger.info(f"Published message #{self.messages_sent} to /robot_status: {str(text)}")
self.file_logger.info(f"Published message #{self.messages_sent} to /led_cmd: {str(text)}")
def rebuild_now(self) -> None:
self.rebuild_requests += 1
self.get_logger().info('Rebuild now')
self.get_logger().info(f'Total rebuild requests: {str(self.rebuild_requests)}')
self.run_trigger_.call_async(Trigger.Request())
self.file_logger.info('Rebuild BehaviorTree now')
self.file_logger.info(f'Total rebuild requests: {str(self.rebuild_requests)}')
future = self.run_trigger_.call_async(Trigger.Request())
future.add_done_callback(self._on_rebuild_response)
# Log to file
self.file_logger.info(f"Rebuild requested. Total rebuild requests: {str(self.rebuild_requests)}")
@@ -136,29 +178,94 @@ def build_ui(node: CtrlGuiNode) -> None:
''')
# --- build UI ---
# ui.label('HiveCore Robot Control Node').classes('text-2xl')
# message_input = ui.input('Message', value='Hello from NiceGUI')
ui.label('HiveCore Robot Control Node').classes('text-2xl')
# 公共确认对话框函数
def show_confirm_dialog(message: str, confirm_callback) -> None:
"""显示确认对话框
Args:
message: 要显示的确认消息
confirm_callback: 点击确认按钮后执行的函数
"""
with ui.dialog() as dialog, ui.card():
ui.label(message)
with ui.row():
ui.button('OK', on_click=lambda: (dialog.close(), confirm_callback())).classes('text-white bg-blue-500')
ui.button('Cancel', on_click=dialog.close).classes('text-white bg-red-500')
dialog.open()
# def send_message() -> None:
# text = message_input.value or ''
# try:
# node.publish_info(text)
# ui.notify(f'Sent: {text}')
# except Exception as e:
# node.get_logger().error(f'Failed to publish: {e}')
# ui.notify('Failed to send message', color='negative')
with ui.row():
# rebuild button
with ui.column():
with ui.row():
pass
def rebuild() -> None:
try:
node.file_logger.info('Running rebuild_now...')
node.rebuild_now()
except Exception as e:
node.file_logger.error(f'Failed to trigger rebuild: {str(e)}')
# ui.button('Send to /robot_status', on_click=lambda: send_message())
# 使用明确的高度来对齐按钮
ui.element().style('height: 40px')
ui.button('Switch Schedule', on_click=lambda: show_confirm_dialog('Confirm Switch Schedule?', rebuild)).classes('self-end')
def rebuild() -> None:
try:
node.get_logger().info('Running rebuild_now...')
node.rebuild_now()
except Exception as e:
node.get_logger().error(f'Failed to trigger rebuild: {str(e)}')
node.file_logger.error(f'Failed to trigger rebuild: {str(e)}')
# send action package
def send_action_package(action_name: str, params: str) -> None:
"""Send an action package to the robot."""
try:
msg = SkillCall()
msg.name = action_name
msg.interface_type = f"interfaces/action/{action_name}"
msg.call_kind = "action"
msg.topic = ""
msg.payload_yaml = params
node.bt_action_send_goal(action_name, msg)
ui.notify(f'Sent action: {action_name} with params: {params}')
except Exception as e:
node.file_logger.error(f'Failed to trigger bt_action_send_goal: {str(e)}')
ui.notify(f'Failed to send action: {str(e)}', color='negative')
ui.button('Rebuild', on_click=lambda: rebuild())
#move waist
with ui.column():
with ui.row():
move_waist_input_move_pitch_degree = ui.input('pitch degree', value='0').style('width: 100px')
move_waist_input_move_yaw_degree = ui.input('yaw degree', value='0').style('width: 100px')
# TODO check input data
def move_waist_() -> None:
node.file_logger.info('Running move_waist_...')
action_name = "MoveWaist"
text = f"move_pitch_degree: {move_waist_input_move_pitch_degree.value}\nmove_yaw_degree: {move_waist_input_move_yaw_degree.value}\n"
send_action_package(action_name, text)
ui.button('Move Waist', on_click=lambda: show_confirm_dialog(
f'Confirm Move Waist operation?\nmove_pitch_degree: {move_waist_input_move_pitch_degree.value}\nmove_yaw_degree: {move_waist_input_move_yaw_degree.value}',
move_waist_
)).classes('self-end')
# LED control
with ui.column():
with ui.row():
color_input = ui.select(['red', 'green', 'blue', 'cyan', 'yellow', 'purple', 'white'], value='red').style('width: 100px')
def led_control_() -> None:
try:
# TODO check input data
text = f"data: {{{color_input.value}, 10}}"
node.publish_info(text)
ui.notify(f'Sent: {text}')
except Exception as e:
node.get_logger().error(f'Failed to publish: {e}')
ui.notify('Failed to send message', color='negative')
ui.button('/led_cmd', on_click=lambda: show_confirm_dialog(
f'Confirm LED control operation?\nColor: {color_input.value}',
led_control_
)).classes('self-end')
# Add statistics display
ui.separator()
@@ -204,20 +311,20 @@ def build_ui(node: CtrlGuiNode) -> None:
# Update statistics
stats = node.get_stats()
stats_text = f"""
Node Statistics:
- Uptime: {str(stats['uptime'])}
- Messages Sent: {str(stats['messages_sent'])}
- Messages Received: {str(stats['messages_received'])}
- Rebuild Requests: {str(stats['rebuild_requests'])}
- Start Time: {str(stats['start_time'])}
"""
- System Time: {str(datetime.datetime.now())}
- Start Time: {str(stats['start_time'])}
- Uptime: {str(stats['uptime'])}
- Messages Sent: {str(stats['messages_sent'])}
- Messages Received: {str(stats['messages_received'])}
- Rebuild Requests: {str(stats['rebuild_requests'])}
"""
stats_label.set_text(stats_text)
# Basic Information
msg_id_label.set_text(f'Message ID: {str(msg.msg_id)}')
working_state_label.set_text(f'Working State: {str(msg.working_state)}')
battery_capacity_label.set_text(f'Battery Capacity: {str(msg.battery_capacity)}%')
working_time_label.set_text(f'Working Time: {str(msg.working_time)}h')
working_time_label.set_text(f'Working Time: {msg.working_time:.6f} h')
nav_state_label.set_text(f'Navigation State: {str(msg.nav_state)}')
comm_quality_label.set_text(f'Communication Quality: {str(msg.comm_quality)}')
expt_completion_time_label.set_text(f'Expected Completion Time: {str(msg.expt_completion_time)}')
@@ -270,10 +377,9 @@ def main() -> None:
# Ensure clean shutdown when the process exits
def shutdown() -> None:
try:
node.get_logger().info('Shutting down CtrlGuiNode...')
node.file_logger.info('Shutting down CtrlGuiNode...')
stats = node.get_stats()
node.get_logger().info(f'Final statistics: {str(stats)}')
node.file_logger.info(f'Node shutting down. Final statistics: {str(stats)}')
node.file_logger.info(f'Final statistics: {str(stats)}')
except Exception as e:
pass
try:
@@ -304,6 +410,6 @@ if __name__ in {"__main__", "__mp_main__"}:
port = 8080
print(f'Starting NiceGUI on {host}:{port}...')
main()
ui.run(title='HiveCore Robot Control', host=host, port=port, reload=False, uvicorn_logging_level='info')
ui.run(title='HiveCore Robot Control Panel', host=host, port=port, reload=False, uvicorn_logging_level='info')
else:
ui.run(main, title='HiveCore Robot Control')
ui.run(main, title='HiveCore Robot Control Panel')