• Unidade 1
  • Capítulo 2 - Introdução à ROS 2
  • Atividades
  • 4 - Publisher e Subscriber na ROS

Criando um Pubshisher e um Subscriber na ROS

Pubshisher e Subscriber

Agora que você já sabe como criar um novo pacote, vamos aprender como se inscrever em um tópico para receber mensagens.

Primeiro, vamos para algumas definições:

  • Subscriber: Um subscriber é um nó que recebe mensagens de um tópico específico.

  • Publisher: Um publisher é um nó que envia mensagens para um tópico específico.

Na ROS, quando um nó enviar uma mensagem para um tópico, todos os nós que estiverem inscritos nesse tópico receberão a mensagem. O gif abaixo ilustra esse processo:

ros-pub-sub

Criando um Publisher

Vamos começar abrindo um novo terminal e executando o comando abaixo para abrir o VSCode no diretório do pacote my_package:

cd ~/colcon_ws/src/my_package
code .

Agora vamos criar um novo arquivo chamado first_node.py dentro da pasta my_package. Esta pasta contém todos os arquivos Python que serão executados como nós ROS.

No mesmo terminal rode os comandos abaixo, um de cada vez, para criar o arquivo:

cd my_package
touch first_node.py
chmod +x *.py

Os comandos acima fazem o seguinte:

  • cd my_package: entra na pasta my_package.

  • touch first_node.py: cria o arquivo first_node.py.

  • chmod +x *.py: dá permissão de execução para todos os arquivos Python da pasta. Este comando é necessário para que o ROS consiga executar o arquivo Python como um nó ROS.

Agora vamos abrir o arquivo first_node.py e adicionar o seguinte código:

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from rclpy.qos import ReliabilityPolicy, QoSProfile
import numpy as np

"""
ros2 launch my_package first_node.launch.py
"""

class FirstNode(Node):

    def __init__(self):
        super().__init__('first_node')
        self.vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)

        self.timer = self.create_timer(0.25, self.control)

    def control(self):
        self.twist = Twist()
        self.twist.linear.x = -0.2

        self.vel_pub.publish(self.twist)


def main(args=None):
    rclpy.init(args=args)
    first_node = FirstNode()

    rclpy.spin(first_node)

    first_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Explicação do Código

Vamos entender o que cada linha do código acima faz:

  • super().__init__('first_node'): cria um nó com o nome first_node.

  • self.create_publisher(Twist, 'cmd_vel', 10): recebe três argumentos:

    • Twist: tipo da mensagem que será publicada.

    • 'cmd_vel': nome do tópico que será publicado.

    • 10: tamanho da fila de mensagens. Este argumento é opcional e o valor padrão é 10.

  • self.create_timer(0.25, self.control): recebe dois argumentos:

    • 0.25: tempo entre execuções da função self.control.

    • self.control: função que será executada a cada 0.25 segundos.

A função self.control será chamada a cada 0.25 segundos, e em cada chamada, ela cria uma mensagem do tipo Twist, altera o valor da velocidade linear para 0.2 e publica a mensagem no tópico cmd_vel.

Por fim, a função main faz o seguinte:

  • rclpy.init(args=args): inicializa o módulo rclpy (ROS Client Library for Python).

  • first_node = FirstNode(): cria uma instância da classe FirstNode.

  • rclpy.spin(first_node): mantém o nó em execução até que ele seja finalizado.

  • first_node.destroy_node(): finaliza o nó.

  • rclpy.shutdown(): finaliza o módulo rclpy.

Para rodar o nó precisamos fazer duas coisas:

  • Criar um arquivo launch.

  • Configurar o arquivo setup.py para que a ROS consiga encontrar o nó.

Criando um Arquivo Launch

Vamos criar um arquivo launch chamado first_node.launch.py dentro da pasta launch do pacote my_package. Este arquivo será responsável por iniciar o nó first_node.py.

No mesmo terminal rode os comandos abaixo, um de cada vez, para criar o arquivo:

cd ~/colcon_ws/src/my_package
mkdir launch
cd launch
touch first_node.launch.py

Agora vamos abrir o arquivo first_node.launch.py e adicionar o seguinte código:

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='my_package',
            executable='first_node',
            output='screen'),
    ],)
  • Node recebe três argumentos:

    • package='my_package': nome do pacote que contém o nó.

    • executable='first_node': nome do nó que será executado.

    • output='screen': imprime a saída do nó no terminal.

Configurando o Arquivo setup.py

Agora precisamos configurar o arquivo setup.py para que a ROS consiga encontrar o nó. Para isso, abra o arquivo setup.py e adicione o seguinte código:

from setuptools import setup
import os
from glob import glob

package_name = 'my_package'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='somebody very awesome',
    maintainer_email='user@user.com',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'first_node = my_package.first_node:main',
        ],
    },
)

O que foi alterado no arquivo setup.py foi o seguinte:

  • Adicionamos dois novos imports:

  • Dentro da lista data_files adicionamos a seguinte linha:

    • (os.path.join('share', package_name), glob('launch/*.launch.py')): adiciona todos os arquivos launch da pasta launch ao pacote.
  • Dentro do diciário entry_points na chave console_scripts adicionamos a seguinte linha:

    • 'first_node = my_package.first_node:main': cria um comando chamado first_node que executa a função main do arquivo first_node.py.

Rodando o Nó

Antes de rodar o nó, precisamos compilar o pacote novamente. Para isso, abra um novo terminal e rode os comandos abaixo:

cd ~/colcon_ws
colcon build --packages-select my_package

Dessa vez, enviamos o argumento --packages-select para o comando colcon build. Este argumento faz com que o comando compile apenas o pacote my_package.

Rode o nó com o comando abaixo: IMPORTANTE: Certifique-se de que o simulador está rodando antes de executar o comando abaixo!

ros2 launch my_package first_node.launch.py

Criando um Subscriber

Com isso, o nó first_node será executado e publicará mensagens no tópico cmd_vel, movendo o robô para frente. Agora vamos criar um novo nó que se inscreve no tópico odom e imprime as mensagens recebidas no terminal.

Crie um novo arquivo chamado second_node.py dentro da pasta my_package e então, faça com que o arquivo seja executável com os mesmos comandos que usamos anteriormente para criar o arquivo first_node.py.

Primeiro, cheque o tipo de mensagem que o tópico odom transporta com o comando abaixo:

ros2 topic info /odom

Podemos ver que o tópico odom transporta mensagens do tipo Odometry. Agora vamos abrir o arquivo second_node.py e adicionar o seguinte código:

import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from rclpy.qos import ReliabilityPolicy, QoSProfile

class SecondNode(Node):

    def __init__(self):
        super().__init__('second_node')
        self.timer = self.create_timer(0.25, self.control)

        self.x = 0
        self.y = 0

        self.odom_sub = self.create_subscription(
            Odometry,
            '/odom',
            self.odom_callback,
            QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))

    def odom_callback(self, data: Odometry):
        self.x = data.pose.pose.position.x
        self.y = data.pose.pose.position.y

    def control(self):
        print(f'Posição x: {self.x}')
        print(f'Posição y: {self.y}\n')


def main(args=None):
    rclpy.init(args=args)
    second_node = SecondNode()

    rclpy.spin(second_node)

    second_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Explicação do Código

Vamos entender o que as linhas novas do código acima fazem:

  • self.odom_sub = self.create_subscription(Odometry, '/odom', self.odom_callback, QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE)): recebe quatro argumentos:

    • Odometry: tipo da mensagem que será recebida. Descobrimos o tipo da mensagem verificando as informações do tópico /odom.

    • '/odom': nome do tópico que será inscrito.

    • self.odom_callback: função que será executada quando uma mensagem for recebida.

    • QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE): configura a qualidade de serviço do tópico. Este argumento é opcional e o valor padrão é QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE).

  • self.odom_callback: coleta a posição x e y do robô a partir da mensagem recebida.

  • self.control: imprime a posição x e y do robô no terminal.

Configurando o Arquivo setup.py

Agora precisamos configurar o arquivo setup.py para que a ROS consiga encontrar o nó e então compilar o pacote novamente. Para isso, abra o arquivo setup.py e adicione a seguinte linha dentro do dicionário entry_points na chave console_scripts:

'second_node = my_package.second_node:main',

Rodando o Nó

Uma vez que o arquivo setup.py foi configurado, compile o pacote novamente com o mesmo comando que usamos anteriormente e rode o arquivo launch first_node.launch.py, isso vai iniciar apenas o nó first_node.py.

ros2 launch my_package first_node.launch.py

E agora, em um novo terminal, execute o comando abaixo para rodar o nó second_node.py:

ros2 run my_package second_node

Você deve ver a posição x e y do robô sendo impressa no terminal a cada 0.25 segundos.