3.1 话题通信介绍
话题发布订阅模型,有4个关键点:发布者、订阅者、话题名称、话题类型
bohu@bohu-TM1701:~$ ros2 node info /turtlesim
There are 2 nodes in the graph with the exact name "/turtlesim". You are seeing information about only one of them.
/turtlesimSubscribers:/parameter_events: rcl_interfaces/msg/ParameterEvent/turtle1/cmd_vel: geometry_msgs/msg/TwistPublishers:/parameter_events: rcl_interfaces/msg/ParameterEvent/rosout: rcl_interfaces/msg/Log/turtle1/color_sensor: turtlesim/msg/Color/turtle1/pose: turtlesim/msg/Pose
在小海龟模拟器节点启动后,可以查看节点信息,就有Publishers发布者、Subscribers接受者话题。
其中:ros2 topic echo /turtle1/pose 可以查看小海龟的位姿信息。
x: 2.833137035369873
y: 3.6268489360809326
theta: -2.7248146533966064
linear_velocity: 0.0
angular_velocity: 0.0
通过话题而不是之前的键盘也能控制小海龟移动。
ros2 topic pub /turtle1/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 1.0, y: 0.0} , angular: {z: -1.0}}"
这个可以让小海龟转圈。
3.2 python话题订阅与发布
需求背景:先创建一个节点,用来下载和发布novel话题,然后再创建一个小说阅读节点。用于订阅novel话题,合成语音并进行播放。
3.2.1 通过话题发布小说
准备工作:
上一节那样打开终端,启动本地web服务器python3 -m http.server
编写业务代码,在3/topic_ws/src文件夹下,打开终端,创建demo_python_topic功能包
bohu@bohu-TM1701:~/3/topic_ws/src$ ros2 pkg create demo_python_topic --build-type ament_python --dependencies rclpy example_interfaces --license Apache-2.0
going to create a new package
package name: demo_python_topic
destination directory: /home/bohu/3/topic_ws/src
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['bohu <13508678+bohu83@user.noreply.gitee.com>']
licenses: ['Apache-2.0']
build type: ament_python
dependencies: ['rclpy', 'example_interfaces']
creating folder ./demo_python_topic
creating ./demo_python_topic/package.xml
creating source folder
creating folder ./demo_python_topic/demo_python_topic
creating ./demo_python_topic/setup.py
creating ./demo_python_topic/setup.cfg
creating folder ./demo_python_topic/resource
creating ./demo_python_topic/resource/demo_python_topic
creating ./demo_python_topic/demo_python_topic/__init__.py
creating folder ./demo_python_topic/test
creating ./demo_python_topic/test/test_copyright.py
creating ./demo_python_topic/test/test_flake8.py
creating ./demo_python_topic/test/test_pep257.py
注意增加了2个依赖。在创建完成后再package.xml会发现。
创建完功能包之后,在src/demo_python_topic/demo_python_topic下创建novel_pub_node.py文件,代码如下
import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queueclass NovelPubNode(Node):def __init__(self, node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动')self.novels_queue_ = Queue() #创建队列self.novel_publisher_ = self.create_publisher(String,'novel',10)#发布者self.timer_ = self.create_timer(5,self.timer_callback)def timer_callback(self):if self.novels_queue_.qsize()>0:line = self.novels_queue_.get()msg = String();#组装消息msg.data = line;self.novel_publisher_.publish(msg)self.get_logger().info(f'发布了:{msg}') def download(self,url):reponse = requests.get(url)reponse.encoding = 'utf-8'text = reponse.textself.get_logger().info(f'下载完成‘{url},{len(text)}’')for line in text.splitlines():self.novels_queue_.put(line)def main():rclpy.init()node = NovelPubNode('novel_pub')node.download('http://0.0.0.0:8000/novel1.txt')rclpy.spin(node)rclpy.shutdown()
代码不长,可是我跟着视频敲了2遍才能跑起来有些不起眼有没提示的错误运行时会报错,下载逻辑跟之前一样,处理逻辑增加了按行处理,把每行放入队列。队列是个先进先出的数据结构,适合此业务场景。回调函数timer_callback 从队列获取数据,组装为String格式。调用发布者发布。
再setup.py注册novel_pub_node节点,并构建,运行novel_pub_node.
bohu@bohu-TM1701:~/3/topic_ws$ ros2 run demo_python_topic novel_pub_node
[INFO] [1736161295.090347220] [novel_pub]: novel_pub,启动
[INFO] [1736161295.097754661] [novel_pub]: 下载完成‘http://0.0.0.0:8000/novel1.txt,447’
[INFO] [1736161300.097022327] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。1')
[INFO] [1736161305.093794085] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。2')
[INFO] [1736161310.095291773] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。3')
[INFO] [1736161315.094403770] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。4')
[INFO] [1736161320.095770036] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。5')
[INFO] [1736161325.093794607] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。6')
[INFO] [1736161330.093633498] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。7')
[INFO] [1736161335.094141394] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。8')
[INFO] [1736161340.094232710] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。9')
[INFO] [1736161345.093890803] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。10')
[INFO] [1736161350.095869992] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。11')
[INFO] [1736161355.093779808] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。12')
可以看到小说已经被成功下载和发布。再观察下话题是否发布。
bohu@bohu-TM1701:~$ ros2 topic list
/novel
/parameter_events
/rosout
bohu@bohu-TM1701:~$ ros2 topic echo /novel
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。2
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。3
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。4
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。5
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。6
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。7
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。8
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。9
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。10
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。11
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。12
---
可以看到程序运行结果正常输出了小说内容。
3.2.2 订阅小说并合成语音
先安装依赖 sudo apt install python3-pip -y
pip3 install espeakng
sudo apt install espeak-ng
在src/demo_python_topic/demo_python_topic下创建novelsub_node.py文件。代码如下所示
import espeakng
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading
import timeclass NovelSubNode(Node):def __init__(self, node_name):super().__init__(node_name)self.get_logger().info(f'{node_name},启动')self.novels_queue_ = Queue()self.novel_subscriber_ = self.create_subscription(String,'novel',self.novel_callback,10)self.speaker_thread_ = threading.Thread(target=self.speaker_thread)self.speaker_thread_.start()def novel_callback(self,msg):self.novels_queue_.put(msg.data )def speaker_thread(self):speaker = espeakng.Speaker()speaker.voice = 'zh'while rclpy.ok(): #检测当前ros上下文是否okif self.novels_queue_.qsize()>0:text = self.novels_queue_.get()self.get_logger().info(f'朗读:{text}')speaker.say(text) #说speaker.wait() # 等他说完else:# 让当前线程休眠1stime.sleep(1) def main():rclpy.init()node = NovelSubNode('novel_sub')rclpy.spin(node)rclpy.shutdown()
相对上面的代码,基本结构类似,多了语音合成库espeakng.
因为朗读速度没有接受的快,所以使用队列存储接收到的数据。从话题接收到数据回调novel_callback就是往队列放。而朗读线程对应方法speaker_thread,创建了speaker对象,设置了声音为中文,调用rclpy.ok(): 检测当前ros上下文是否ok,不断循环判断队列是否有数据,有数据调用say进行朗读。没有数据就休眠1秒。
修改setup.py,注册节点,分别编译运行
ros2 run demo_python_topic novel_sub_node
ros2 run demo_python_topic novel_pub_node
就能听见合成语音了。