Executing a BeTFSM Tree under ROS2 with cROSpi

Install Crospi

For the installation of Crospi, see the following documentation site

Run Crospi

Once installed, it is recommended to install the crospi_application_template in your workspace and:

  • Run RVIZ and Robotstate publisher:
    https://github.com/Robotics-Research-Group-KUL/betfsm
    
  • Run the Crospi Node:
    ros2 run crospi_core crospi_node --ros-args -p config_file:='$[crospi_application_template]/applications/application_example_tutorials/application_example_tutorials.setup.json' -p simulation:=true
    
  • Run your BeTFSM. The examples can be found in the betfsm_demos directory. Below you find a typical setup:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    import rclpy
    import sys
    
    from betfsm import (
        Sequence,  Message, Repeat,
        SUCCEED, TICKING, CANCEL, ABORT, TIMEOUT, NO_EVENT,
        TickingStateMachine,get_logger,set_logger,
        EventSequential, Ctrl_C_Condition
    )
    from betfsm_crospi import load_task_list, CrospiTask, CrospiDeactivate
    from betfsm_ros import BeTFSMNode,ROSRunner,Node,Duration,LifeCycle,Transition
    
    
    class MySequence(Sequence):
        def __init__(self):
            super().__init__("my_sequence", [
                CrospiTask("MovingHome","MovingHome"),
                CrospiTask("MovingDown","MovingDown"),
                CrospiTask("MovingUp","MovingUp"),
                CrospiTask("MovingSpline","MovingSpline") ]
            )
    
    # main
    def main(args=None):
        rclpy.init(args=args)    
        my_node = BeTFSMNode.get_instance("skill_example")
    
        set_logger("default",my_node.get_logger())
        set_logger("crospi",my_node.get_logger())
    
        blackboard = {}
        load_task_list("$[crospi_application_template]/skill_specifications/libraries/skill_lib_example/tasks/skill_example.json",blackboard)
    
        # running MySequence() and cleaning up when CTRL_C is pressed        
        nominal_sm = Repeat("repeat",-1,MySequence())
        cleanup_sm = CrospiDeactivate(force_outcome=CANCEL)
        sm = EventSequential("check_cancel", Ctrl_C_Condition("CTRL_C",repeated=3),{NO_EVENT:nominal_sm, "CTRL_C":cleanup_sm})
    
        # ROSRunner accepts command-line parameters (see --help)
        # has many more optional arguments than used below, see API documentation
        runner = ROSRunner(my_node,sm,blackboard, frequency=100.0, publish_frequency=5.0, debug=False, display_active=False)
        try:
            runner.run()
        finally:
            my_node.destroy_node()
            rclpy.shutdown()
        print("shutdown")
    
    if __name__ == "__main__":
        sys.exit(main(sys.argv))
    

ROS2 setup and RosRunner

The following happens in the above code:

  • line 25 is necessary to start/initialize ROS2
  • line 26 creates a ROS2 node by using BeTFSM's singleton class: it only creates a node once, and whenever a default node is required, it will get this singleton node.
  • line 41-46, with run the full BeTFSM tree sm using ROSRunner and ensure that when something happens the ROS node is properly cleaned up. In a ROS2 environment, you run your BeTFSM tree using RosRunner. This ensures that ROS2 can simulatneously react to topics and other ROS2 interactions and run the BeTFSM tree.

Ctrl-C handling

  • line 15-24, with run the full BeTFSM tree sm using ROSRunner and ensure that when something happens the ROS node is properly cleaned up. ()

  • line 35-37, we create our BeTFSM tree (in this case, generated by a function MySequence() ) The cleanup_sm subtree is defined to cleanup if a Ctrl-C events happen, we use these two subtrees to define the final BeTFSM tree sm that checks Ctrl-C and calls cleanup to stop the robot moving. See the eventhandling described in events

  • line 41-46, Uses ROSRunner to run the full BeTFSM tree and at the same time checks for ROS2 events such as incomming topics.

CrospiNode

  • line 31-32 creates a blackboard and fills it with a list of tasks. These tasks bring together eTaSL lua specifications of a constraint-based task, and a set of parameters for these lua-specifications. See Crospi
  • line 14-21 defines a simple application as a Sequence of robot motions. CrospiTask is used to execute an eTaSL task using Crospi. This BeTFSM node has many optional parameters, but only two obligatory: the name of the BeTFSM node and the task_name, i.e. the name in the laoded task list. [CrospiTask][betfsm_cropsi.CrospiTask] checks for the e_finished event from Crospi and when finished cleans up and deactivates Crospi, if an error occurs, CrospiTask will take care that Crospi is deactivated and cleaned up. You can manually deactivate and cleanup Crospy by using CrospiDeactivate, we used this to cleanup after the Ctrl-C event.
  • Crospi can send more events related to a task (eTaSL's monitor). You can handle these in two ways:
    • Call [CrospiTask][betfsm_cropsi.CrospiTask] with the event_check parameter False. This will keep Crospi running after CrospiTask is finised, such that you can do some custom checking and use CrospiDeactivate to stop Crospi.
    • You do an event check concurrently with the running CrospiTask by using EventOutcome, EventSequential or EventConcurrent, together with TopicEvent_Condition. It is no problem to run these checks concurrently. For each topic TopicEvent_Condition will maintain a single queue for all instances.

CrospiOutput

Todo

This subqection and the following still have to be elaborated.

Directly interrupting and cleaning-up

A first solution is to directly interrupting the running state machines and cleaning up afterwards by calling the appropriate service calls to stop the robot.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import rclpy
import sys

from betfsm import (
    Sequence,  Message, Repeat,
    SUCCEED, TICKING, CANCEL, ABORT, TIMEOUT, NO_EVENT,
    TickingStateMachine,get_logger,set_logger,
    EventSequential, Ctrl_C_Condition
)
from betfsm_crospi import load_task_list, CrospiTask, CrospiDeactivate
from betfsm_ros import BeTFSMNode,ROSRunner,Node,Duration,LifeCycle,Transition


class MySequence(Sequence):
    def __init__(self):
        super().__init__("my_sequence", [
            CrospiTask("MovingHome","MovingHome"),
            CrospiTask("MovingDown","MovingDown"),
            CrospiTask("MovingUp","MovingUp"),
            CrospiTask("MovingSpline","MovingSpline") ]
        )

# main
def main(args=None):
    rclpy.init(args=args)    
    my_node = BeTFSMNode.get_instance("skill_example")

    set_logger("default",my_node.get_logger())
    set_logger("crospi",my_node.get_logger())

    blackboard = {}
    load_task_list("$[crospi_application_template]/skill_specifications/libraries/skill_lib_example/tasks/skill_example.json",blackboard)

    # running MySequence() and cleaning up when CTRL_C is pressed        
    nominal_sm = Repeat("repeat",-1,MySequence())
    cleanup_sm = CrospiDeactivate(force_outcome=CANCEL)
    sm = EventSequential("check_cancel", Ctrl_C_Condition("CTRL_C",repeated=3),{NO_EVENT:nominal_sm, "CTRL_C":cleanup_sm})

    # ROSRunner accepts command-line parameters (see --help)
    # has many more optional arguments than used below, see API documentation
    runner = ROSRunner(my_node,sm,blackboard, frequency=100.0, publish_frequency=5.0, debug=False, display_active=False)
    try:
        runner.run()
    finally:
        my_node.destroy_node()
        rclpy.shutdown()
    print("shutdown")

if __name__ == "__main__":
    sys.exit(main(sys.argv))

Directly interrupting and cleaning-up

A second solution is to register the interruption and only react to it at a given state in the state machine. This would allow e.g. a robot to continue until he is in a safe state before stopping.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import rclpy
import sys

from betfsm import (
    Sequence,  Message, Repeat,
    SUCCEED, TICKING, CANCEL, ABORT, TIMEOUT, NO_EVENT,
    TickingStateMachine,get_logger,set_logger,
    EventSequential, Ctrl_C_Condition
)
from betfsm_crospi import load_task_list, CrospiTask, CrospiDeactivate
from betfsm_ros import BeTFSMNode,ROSRunner,Node,Duration,LifeCycle,Transition


class MySequence(Sequence):
    def __init__(self):
        super().__init__("my_sequence", [
            CrospiTask("MovingHome","MovingHome"),
            CrospiTask("MovingDown","MovingDown"),
            CrospiTask("MovingUp","MovingUp"),
            CrospiTask("MovingSpline","MovingSpline") ]
        )

# main
def main(args=None):
    rclpy.init(args=args)    
    my_node = BeTFSMNode.get_instance("skill_example")

    set_logger("default",my_node.get_logger())
    set_logger("crospi",my_node.get_logger())

    blackboard = {}
    load_task_list("$[crospi_application_template]/skill_specifications/libraries/skill_lib_example/tasks/skill_example.json",blackboard)

    # running MySequence() and cleaning up when CTRL_C is pressed        
    nominal_sm = Repeat("repeat",-1,MySequence())
    cleanup_sm = CrospiDeactivate(force_outcome=CANCEL)
    sm = EventSequential("check_cancel", Ctrl_C_Condition("CTRL_C",repeated=3),{NO_EVENT:nominal_sm, "CTRL_C":cleanup_sm})

    # ROSRunner accepts command-line parameters (see --help)
    # has many more optional arguments than used below, see API documentation
    runner = ROSRunner(my_node,sm,blackboard, frequency=100.0, publish_frequency=5.0, debug=False, display_active=False)
    try:
        runner.run()
    finally:
        my_node.destroy_node()
        rclpy.shutdown()
    print("shutdown")

if __name__ == "__main__":
    sys.exit(main(sys.argv))