ROS学习笔记8 —— 状态机在ROS中的实现

it2022-05-08  8

状态机在ROS中的实现

1. 概念2. SMACH的安装3. SMACH的实现3.1 状态机中状态的定义3.2 状态机主函数的编写3.3 完整Demo3.4 状态间数据传输的实现3.5 状态机的嵌套3.6 多状态并行 4. 状态机在C++中的实现5. 总结

1. 概念

在ROS中,一般使用SMACH来实现状态机。

SMACH即状态机,它是基于Python实现的一个功能强大且易于扩展的库,SMACH本质上并不依赖于ROS,其可以应用于任意Python项目。

在ROS中,executive_smach功能包把SMACH和ROS完美的集成在了一体,为机器人复杂应用开发提供任务级的状态机框架。

此外,该功能包还集成了actionlib和smach_viewer,用于管理action和状态机的可视化显示。

查看可视化状态机结构:

rosrun smach_viewer smach_viewer.py

2. SMACH的安装

sudo apt-get install ros-kinetic-executive-smach sudo apt-get install ros-kinetic-executive-smach-visualization

建议:命令行一般不要直接复制,手动敲一敲会避免很多问题的产生,而且不会特别麻烦,只需要打几个字母TAB补充一下即可完成。

3. SMACH的实现

3.1 状态机中状态的定义

# 定义状态Foo class Foo(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1','outcome2']) self.counter = 0 def execute(self, userdata): rospy.loginfo('Executing state FOO') if self.counter < 3: self.counter += 1 return 'outcome1' else: return 'outcome2' # 定义状态Bar class Bar(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome2']) def execute(self, userdata): rospy.loginfo('Executing state BAR') return 'outcome2'

再强调一次,SMACH是Python的一个库,所以一般用Python来写

从以上代码可以看出,状态的定义都包含初始化__init__和执行execute两个函数:

初始化函数 初始化函数用来初始化该状态类,调用smach中状态的初始化函数,同事需要定义输出状态:[‘outcome1’,‘outcome2’]

此处的outcome代表状态结束时的输出值,使用字符串来表示,每个状态的输出值可有多个,根据不同的输出值可跳转至不同的状态。

注意:初始化函数不能阻塞

执行函数 执行函数用于执行每个状态所对应的具体工作,可执行阻塞工作,工作结束后返回定义的输出值,则该状态完成。

3.2 状态机主函数的编写

# 主函数 def main(): rospy.init_node('smach_example_state_machine') # 创建一个状态机 sm = smach.StateMachine(outcomes=['outcome4', 'outcome5']) # 打开状态机容器 with sm: # 使用add方法添加状态到状态机容器当中 smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'}) smach.StateMachine.add('BAR', Bar(), transitions={'outcome2':'FOO'}) # 创建并启动内部监测服务器 sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT') sis.start() # 开始执行状态机 outcome = sm.execute() # 等待退出 rospy.spin() sis.stop()

首先初始化ROS节点,然后使用StateMachine创建一个状态机,且制定状态机执行完后的输出值[‘outcome4’, ‘outcome5’]

sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])

使用add()方法添加需要的状态到状态机容器,同时需要设置状态间的跳转关系:

# 把状态FOO添加到状态机容器 # 如果FOO状态执行完输出outcome1,那么跳转到BAR状态; # 如果FOO状态执行完输出outcome2,那么结束此状态机并输出outcome4 smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'})

使用IntrospectionServer()方法创建内部监测服务器,用于状态机的可视化显示。

sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT') # 参数1:观测服务器的名称,可根据需要自由给定 # 参数2:要观察的状态机 # 参数3:状态机的层级(可以用于查看状态机内部的状态机)

启动内部监测服务器

sis.start()

开始执行状态机,使用execute()方法来完成

# 开始执行状态机 outcome = sm.execute()

执行结束后需要停止内部观测器

sis.stop()

3.3 完整Demo

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import smach import smach_ros # 定义状态Foo class Foo(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1','outcome2']) self.counter = 0 def execute(self, userdata): rospy.loginfo('Executing state FOO') if self.counter < 3: self.counter += 1 return 'outcome1' else: return 'outcome2' # 定义状态Bar class Bar(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome2']) def execute(self, userdata): rospy.loginfo('Executing state BAR') return 'outcome2' # 主函数 def main(): rospy.init_node('smach_example_state_machine') # 创建一个状态机 sm = smach.StateMachine(outcomes=['outcome4', 'outcome5']) # 打开状态机容器 with sm: # 使用add方法添加状态到状态机容器当中 smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'}) smach.StateMachine.add('BAR', Bar(), transitions={'outcome2':'FOO'}) # 创建并启动内部监测服务器 sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT') sis.start() # 开始执行状态机 outcome = sm.execute() # 等待退出 rospy.spin() sis.stop() if __name__ == '__main__': main()

执行过程如下: 状态机开启后首先进入FOO状态然后在该状态中累加countercounter<3时输出outcome1,FOO状态结束并跳至BAR状态BAR状态没有任何工作的执行,状态结束输出outcome2并跳至FOO状态当counter = 3时,FOO输出outcome2,并跳至outcome4,状态机结束

在整个过程中,outcome5没有涉及状态转移,所以是孤立的

3.4 状态间数据传输的实现

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import smach import smach_ros # 定义状态Foo class Foo(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1','outcome2'], input_keys=['foo_counter_in'], output_keys=['foo_counter_out']) def execute(self, userdata): rospy.loginfo('Executing state FOO') if userdata.foo_counter_in < 3: userdata.foo_counter_out = userdata.foo_counter_in + 1 return 'outcome1' else: return 'outcome2' # 定义状态Bar class Bar(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1'], input_keys=['bar_counter_in']) def execute(self, userdata): rospy.loginfo('Executing state BAR') rospy.loginfo('Counter = %f'%userdata.bar_counter_in) return 'outcome1' def main(): rospy.init_node('smach_example_state_machine') # 创建一个状态机 sm = smach.StateMachine(outcomes=['outcome4']) sm.userdata.sm_counter = 0 # 打开状态机容器 with sm: # 使用add方法添加状态到状态机容器当中 # 此处的remapping参数类似于ROS中的remapping重映射机制 # 在FOO状态中,传进的时候将sm_counter映射为foo_counter_in,传出的时候同样将sm_counter映射为foo_counter_out # 在BAR状态中同上 smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'}, remapping={'foo_counter_in':'sm_counter', 'foo_counter_out':'sm_counter'}) smach.StateMachine.add('BAR', Bar(), transitions={'outcome1':'FOO'}, remapping={'bar_counter_in':'sm_counter'}) # 创建并启动内部监测服务器 sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT') sis.start() # 开始执行状态机 outcome = sm.execute() # 等待退出 rospy.spin() sis.stop() if __name__ == '__main__': main()

由上代码可以看到,在状态定义类中状态的初始化smach.State.__init__()多了两个参数: input_keys=['foo_counter_in']、output_keys=['foo_counter_out']

此外,状态执行函数execute()中也多了参数userdata,这就是存储状态之间所传递数据的容器,FOO状态的输入/输出数据foo_counter_in和foo_counter_out就储存在userdata中,因此在执行状态动作时如果访问和修改数据就需要使用userdata.foo_counter_in和userdata.foo_counter_out的形式来实现。

值得注意的是,在主函数中,通过remapping重映射参数将变量sm_counter重映射为了一堆名称,这些名称供各个状态机使用,本质上都是对sm_counter进行操作。

3.5 状态机的嵌套

状态机类似于一个容器,对于状态机的嵌套,主要在主函数中来实现,在内层状态机中状态的定义与添加与普通非嵌套的状态机相同,参考以下例子:

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import smach import smach_ros # 定义状态Foo class Foo(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1','outcome2']) self.counter = 0 def execute(self, userdata): rospy.loginfo('Executing state FOO') if self.counter < 3: self.counter += 1 return 'outcome1' else: return 'outcome2' # 定义状态Bar class Bar(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1']) def execute(self, userdata): rospy.loginfo('Executing state BAR') return 'outcome1' # 定义状态Bas class Bas(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome3']) def execute(self, userdata): rospy.loginfo('Executing state BAS') return 'outcome3' def main(): rospy.init_node('smach_example_state_machine') # 创建一个顶层状态机 sm_top = smach.StateMachine(outcomes=['outcome5']) # 打开状态机容器 with sm_top: smach.StateMachine.add('BAS', Bas(), transitions={'outcome3':'SUB'}) # 创建一个内嵌的状态机 sm_sub = smach.StateMachine(outcomes=['outcome4']) # 打开状态机容器 with sm_sub: # 使用add方法添加状态到状态机容器当中 smach.StateMachine.add('FOO', Foo(), transitions={'outcome1':'BAR', 'outcome2':'outcome4'}) smach.StateMachine.add('BAR', Bar(), transitions={'outcome1':'FOO'}) smach.StateMachine.add('SUB', sm_sub, transitions={'outcome4':'outcome5'}) # 创建并启动内部监测服务器 sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm_top, '/SM_ROOT') sis.start() # 开始执行状态机 outcome = sm_top.execute() # 等待退出 rospy.spin() sis.stop() if __name__ == '__main__': main()

3.6 多状态并行

对于多状态的并行,其状态的定义与上相同,区别主要还是在于主函数中,通常使用Concurrence()方法来实现,代码如下:

# default_outcome: 表示该状态机的默认输出为outcome4 # outcome_map: 设置了状态机同步运行的状态跳转 # 当FOO状态输出为outcome2,且BAR状态输出为outcome1时状态机才会输出outcome5. sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'], default_outcome='outcome4', outcome_map={'outcome5': { 'FOO':'outcome2', 'BAR':'outcome1'}})

多状态并行Demo代码如下:

#!/usr/bin/env python # -*- coding: utf-8 -*- import rospy import smach import smach_ros # 定义状态Foo class Foo(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1','outcome2']) self.counter = 0 def execute(self, userdata): rospy.loginfo('Executing state FOO') if self.counter < 3: self.counter += 1 return 'outcome1' else: return 'outcome2' # 定义状态Bar class Bar(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome1']) def execute(self, userdata): rospy.loginfo('Executing state BAR') return 'outcome1' # 定义状态Bas class Bas(smach.State): def __init__(self): smach.State.__init__(self, outcomes=['outcome3']) def execute(self, userdata): rospy.loginfo('Executing state BAS') return 'outcome3' def main(): rospy.init_node('smach_example_state_machine') # 创建一个顶层状态机 sm_top = smach.StateMachine(outcomes=['outcome6']) # 打开状态机容器 with sm_top: smach.StateMachine.add('BAS', Bas(), transitions={'outcome3':'CON'}) # 创建一个内嵌的状态机 sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'], default_outcome='outcome4', outcome_map={'outcome5': { 'FOO':'outcome2', 'BAR':'outcome1'}}) # 打开状态机容器 with sm_con: # 使用add方法添加状态到状态机容器当中 smach.Concurrence.add('FOO', Foo()) smach.Concurrence.add('BAR', Bar()) smach.StateMachine.add('CON', sm_con, transitions={'outcome4':'CON', 'outcome5':'outcome6'}) # 创建并启动内部监测服务器 sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm_top, '/SM_ROOT') sis.start() # 开始执行状态机 outcome = sm_top.execute() # 等待退出 rospy.spin() sis.stop() if __name__ == '__main__': main()

4. 状态机在C++中的实现

一般通过while()循环中的switch-case结构来实现。

具体请参见本博客博文《状态机在运动控制中的应用》。

5. 总结

状态机是一种非常好的建模工具,能把复杂的逻辑简化,利于复杂程序逻辑的编写。

非常适用于结构化,尤其涉及应用任务级的任务,但是如果仅仅为了拆分模块,不建议适用状态机,因为状态机相比原程序会平白无故的添加很多判断语句,从而造成执行效率低下的情况。

编不下去了,总之状态机这玩意儿就是好用……


参考文献: [1]胡春旭.ROS机器人开发实践[M].机械工业出版社:北京,2018:414.


最新回复(0)