origin version

This commit is contained in:
2025-10-23 17:42:21 +08:00
commit 5cb2c19a6c
8 changed files with 293 additions and 0 deletions

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
.vscode/
build/
install/
log/
test/
*.pyc
__pycache__/
*.pyo
*.pyd
*.swp
*.egg-info/
dist/

0
ctrlgui/__init__.py Normal file
View File

190
ctrlgui/ctrlgui_node.py Normal file
View File

@@ -0,0 +1,190 @@
"""NiceGUI + ROS2 example.
This module starts a NiceGUI web page and an rclpy node. Clicking the button on the
page publishes a std_msgs/String message to the /robot_status topic.
"""
import atexit
import threading
import os
import socket
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from std_srvs.srv import Trigger
from nicegui import ui
from interfaces.msg import RobotWorkInfo
class CtrlGuiNode(Node):
"""ROS2 node that publishes String messages to /robot_status."""
def __init__(self, node_name: str = 'ctrlgui_node'):
super().__init__(node_name)
# publisher on /robot_status
self.publisher_ = self.create_publisher(String, '/robot_status', 10)
self.run_trigger_ = self.create_client(Trigger, '/cerebrum/rebuild_now')
# subscribe to /robot_work_info to display incoming messages in the UI
self.last_work_info: RobotWorkInfo = RobotWorkInfo()
self._sub_work_info = self.create_subscription(
RobotWorkInfo, '/robot_work_info', self._on_work_info, 10
)
def _on_work_info(self, msg: RobotWorkInfo) -> None:
"""Callback for /robot_work_info subscription: store latest text."""
self.last_work_info = msg
# keep logs lightweight to avoid flooding
self.get_logger().debug(f"/robot_work_info: {self.last_work_info}")
def publish_info(self, text: str) -> None:
msg = String()
msg.data = text
self.publisher_.publish(msg)
self.get_logger().info(f'Published to /robot_status: {text}')
def rebuild_now(self) -> None:
self.get_logger().info('Rebuild now')
self.run_trigger_.call_async(Trigger.Request())
def build_ui(node: CtrlGuiNode) -> None:
"""Build the NiceGUI UI."""
# --- build UI ---
# ui.label('Robot Control GUI Node').classes('text-2xl')
# message_input = ui.input('Message', value='Hello from NiceGUI')
# def send_message() -> None:
# text = message_input.value or ''
# try:
# node.publish_info(text)
# ui.notify(f'Sent: {text}')
# except Exception as e:
# node.get_logger().error(f'Failed to publish: {e}')
# ui.notify('Failed to send message', color='negative')
# ui.button('Send to /robot_status', on_click=lambda: send_message())
def rebuild() -> None:
try:
node.get_logger().info('Running rebuild_now...')
node.rebuild_now()
except Exception:
pass
ui.button('Rebuild', on_click=lambda: rebuild())
# Display area for incoming /robot_work_info messages
ui.separator()
ui.label('Robot Work Information').classes('text-xl')
# Create labels for all RobotWorkInfo fields
with ui.card().classes('w-full'):
ui.label('Basic Information').classes('text-lg font-bold')
with ui.grid(columns=2).classes('w-full'):
msg_id_label = ui.label()
working_state_label = ui.label()
battery_capacity_label = ui.label()
working_time_label = ui.label()
nav_state_label = ui.label()
comm_quality_label = ui.label()
expt_completion_time_label = ui.label()
work_log_label = ui.label().classes('col-span-2')
ui.label('Task Information').classes('text-lg font-bold')
with ui.grid(columns=3).classes('w-full'):
last_task_label = ui.label('Last Task: ')
next_task_label = ui.label('Next Task: ')
current_task_label = ui.label('Current Task: ')
ui.label('Skill Information').classes('text-lg font-bold')
with ui.grid(columns=2).classes('w-full'):
skill_label = ui.label()
action_name_label = ui.label()
instance_params_label = ui.label().classes('col-span-2')
bt_node_status_label = ui.label()
smacc_state_label = ui.label()
# Update the labels periodically from the UI thread (safe cross-thread update)
def update_work_info():
msg = node.last_work_info
# Basic Information
msg_id_label.set_text(f'Message ID: {msg.msg_id}')
working_state_label.set_text(f'Working State: {msg.working_state}')
battery_capacity_label.set_text(f'Battery Capacity: {msg.battery_capacity}%')
working_time_label.set_text(f'Working Time: {msg.working_time}h')
nav_state_label.set_text(f'Navigation State: {msg.nav_state}')
comm_quality_label.set_text(f'Communication Quality: {msg.comm_quality}')
expt_completion_time_label.set_text(f'Expected Completion Time: {msg.expt_completion_time}')
work_log_label.set_text(f'Work Log: {msg.work_log}')
# Task Information
tasks = list(msg.task)
if len(tasks) >= 3:
last_task_label.set_text(f'Last Task: {tasks[0]}')
next_task_label.set_text(f'Next Task: {tasks[1]}')
current_task_label.set_text(f'Current Task: {tasks[2]}')
else:
last_task_label.set_text('Last Task: N/A')
next_task_label.set_text('Next Task: N/A')
current_task_label.set_text('Current Task: N/A')
# Skill Information
skill_label.set_text(f'Skill: {msg.skill}')
action_name_label.set_text(f'Action Name: {msg.action_name}')
instance_params_label.set_text(f'Instance Parameters: {msg.instance_params}')
bt_node_status_label.set_text(f'BT Node Status: {msg.bt_node_status}')
smacc_state_label.set_text(f'SMACC State: {msg.smacc_state}')
ui.timer(0.5, update_work_info)
def main() -> None:
"""Start rclpy, the ROS2 node in a background thread, and run the NiceGUI app.
The NiceGUI handlers will call node.publish_info(...) when the user clicks the button.
"""
# init ROS and start node
rclpy.init()
node = CtrlGuiNode()
# spin the node in a background thread so the NiceGUI server can run in the main thread
spin_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True)
spin_thread.start()
# Build UI for the default page
@ui.page('/')
def main_page():
build_ui(node)
# Ensure clean shutdown when the process exits
def shutdown() -> None:
try:
node.get_logger().info('Shutting down CtrlGuiNode...')
except Exception:
pass
try:
node.destroy_node()
except Exception:
pass
try:
rclpy.shutdown()
except Exception:
pass
# join the spin thread briefly
try:
spin_thread.join(timeout=1.0)
except Exception:
pass
atexit.register(shutdown)
if __name__ in {"__main__", "__mp_main__"}:
# Start NiceGUI honoring HOST/PORT environment variables (configurable via ROS launch)
host = os.getenv('HOST', '0.0.0.0')
try:
port = int(os.getenv('PORT', '8080'))
except ValueError:
port = 8080
print(f'Starting NiceGUI on {host}:{port}...')
main()
ui.run(title='Robot Control GUI', host=host, port=port, reload=False, uvicorn_logging_level='info')
else:
ui.run(main, title='Robot Control GUI')

35
launch/ctrlgui.launch.py Normal file
View File

@@ -0,0 +1,35 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
# Optional node name override from launch argument
node_name_arg = DeclareLaunchArgument(
'node_name', default_value='ctrlgui_node',
description='Name of the ctrlgui node')
host_arg = DeclareLaunchArgument(
'host', default_value='0.0.0.0',
description='Host interface for NiceGUI')
port_arg = DeclareLaunchArgument(
'port', default_value='8080',
description='TCP port for NiceGUI')
ld = LaunchDescription([node_name_arg, host_arg, port_arg])
ctrlgui_node = Node(
package='ctrlgui',
executable='ctrlgui_node',
name=LaunchConfiguration('node_name'),
output='screen',
emulate_tty=True,
parameters=[],
env={
'HOST': LaunchConfiguration('host'),
'PORT': LaunchConfiguration('port'),
},
)
ld.add_action(ctrlgui_node)
return ld

25
package.xml Normal file
View File

@@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>ctrlgui</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="david@hivecore.cn">root</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>dpg</depend>
<depend>interfaces</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

0
resource/ctrlgui Normal file
View File

4
setup.cfg Normal file
View File

@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/ctrlgui
[install]
install_scripts=$base/lib/ctrlgui

27
setup.py Normal file
View File

@@ -0,0 +1,27 @@
from setuptools import find_packages, setup
package_name = 'ctrlgui'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', ['launch/ctrlgui.launch.py']),
],
install_requires=['setuptools', 'nicegui>=1.4.0'],
zip_safe=True,
maintainer='root',
maintainer_email='david@hivecore.cn',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'ctrlgui_node = ctrlgui.ctrlgui_node:main'
],
},
)