Custom Planner Integration#
Overview#
The WMX ROS2 application supports any motion planner that can send
trajectories via the standard FollowJointTrajectory action interface.
This allows integration of custom planners without modifying the WMX ROS2
nodes.
The architecture is:
┌──────────────────────────┐
│ Your Custom Planner │
│ │
│ 1. Read /joint_states │──── sensor_msgs/JointState (500 Hz)
│ 2. Compute trajectory │
│ 3. Send action goal │──── FollowJointTrajectory
│ 4. Wait for result │
└──────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ follow_joint_trajectory_server │
│ │
│ - Validates trajectory (max 1000 points) │
│ - Converts to WMX3 CSplinePos command │
│ - Executes via AdvancedMotion API │
│ - Blocks until complete, returns error_code │
└──────────────────────────────────────────────────────────┘
│
▼
EtherCAT → Servo Drives → Robot Motion
Replacing the Default Planner#
MoveIt2 is the default planner, but you can replace it entirely. Any node that implements the following two-step pattern can control the robot:
Read current joint state from
/joint_states(sensor_msgs/msg/JointState) – published bymanipulator_stateat 500 HzSend trajectory goals to the
FollowJointTrajectoryaction server at/movensys_manipulator_arm_controller/follow_joint_trajectory(control_msgs/action/FollowJointTrajectory)
No other setup is required. The manipulator launch already starts the trajectory server, encoder publisher, and general control node.
Interface Requirements#
The trajectory goal must contain:
trajectory.joint_names: must match the robot configuration (["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"])trajectory.points[].positions: target positions in radians for each waypoint (6 values per point)trajectory.points[].time_from_start: timestamp for each waypoint
Constraints and Limitations#
These constraints are enforced by the follow_joint_trajectory_server
source code:
Constraint |
Details |
|---|---|
Max waypoints |
1000 per trajectory ( |
Min time interval |
1 ms between the last two consecutive waypoints. If the final point is within 1 ms of the previous point, it is dropped. |
First point timing |
The first point’s |
Used fields |
Only |
Goal acceptance |
All goals are accepted unconditionally ( |
Cancellation |
Cancel requests are accepted but not acted upon during motion.
The server blocks on |
Feedback |
No intermediate feedback is published. The action completes only when the full trajectory has been executed or an error occurs. |
Concurrency |
The execute callback runs in a detached thread, but only one trajectory can be executed at a time on the same set of axes. |
See ROS2 Actions for the complete action interface documentation.
Python Example: Custom Trajectory Generator#
This complete example reads the current joint positions, computes a simple multi-point trajectory, sends it to the action server, and waits for the result.
#!/usr/bin/env python3
"""Custom planner example: generate and execute a multi-point trajectory."""
import math
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from sensor_msgs.msg import JointState
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
from builtin_interfaces.msg import Duration
JOINT_NAMES = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']
ACTION_NAME = '/movensys_manipulator_arm_controller/follow_joint_trajectory'
class CustomPlanner(Node):
def __init__(self):
super().__init__('custom_planner')
self._current_positions = None
# Subscribe to joint states to know current position
self._joint_sub = self.create_subscription(
JointState, '/joint_states', self._joint_cb, 10
)
# Action client for trajectory execution
self._action_client = ActionClient(
self, FollowJointTrajectory, ACTION_NAME
)
def _joint_cb(self, msg):
# Extract only the 6 arm joints (ignore gripper at indices 6, 7)
if len(msg.position) >= 6:
self._current_positions = list(msg.position[:6])
def wait_for_joint_state(self, timeout_sec=5.0):
"""Wait until we receive at least one joint state message."""
self.get_logger().info('Waiting for /joint_states...')
start = self.get_clock().now()
while self._current_positions is None:
rclpy.spin_once(self, timeout_sec=0.1)
elapsed = (self.get_clock().now() - start).nanoseconds / 1e9
if elapsed > timeout_sec:
self.get_logger().error('Timeout waiting for /joint_states')
return False
self.get_logger().info(
f'Current positions: {self._current_positions}'
)
return True
def generate_trajectory(self, target_positions, num_points=10,
total_time_sec=5.0):
"""Generate a linear interpolation trajectory.
Args:
target_positions: Target joint angles (6 values, radians).
num_points: Number of intermediate waypoints.
total_time_sec: Total trajectory duration.
Returns:
List of JointTrajectoryPoint objects.
"""
start = self._current_positions
points = []
for i in range(num_points + 1):
t = i / num_points
positions = [
s + t * (e - s) for s, e in zip(start, target_positions)
]
time_sec = t * total_time_sec
sec = int(time_sec)
nanosec = int((time_sec - sec) * 1e9)
pt = JointTrajectoryPoint()
pt.positions = positions
pt.time_from_start = Duration(sec=sec, nanosec=nanosec)
points.append(pt)
return points
def execute_trajectory(self, points):
"""Send trajectory to the action server and wait for result."""
self.get_logger().info('Waiting for action server...')
self._action_client.wait_for_server()
goal = FollowJointTrajectory.Goal()
goal.trajectory.joint_names = JOINT_NAMES
goal.trajectory.points = points
self.get_logger().info(
f'Sending trajectory with {len(points)} points'
)
future = self._action_client.send_goal_async(goal)
rclpy.spin_until_future_complete(self, future)
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error('Goal rejected')
return False
self.get_logger().info('Goal accepted, executing...')
result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, result_future)
result = result_future.result().result
if result.error_code == 0:
self.get_logger().info('Trajectory executed successfully')
return True
else:
self.get_logger().error(
f'Trajectory failed: error_code={result.error_code}'
)
return False
def main():
rclpy.init()
planner = CustomPlanner()
if not planner.wait_for_joint_state():
rclpy.shutdown()
return
# Define target: move joint1 by +0.3 rad, joint2 by -0.2 rad
current = planner._current_positions
target = list(current)
target[0] += 0.3
target[1] -= 0.2
# Generate 10-point trajectory over 5 seconds
points = planner.generate_trajectory(target, num_points=10,
total_time_sec=5.0)
planner.execute_trajectory(points)
planner.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Warning
This example moves real motors. No collision checking is performed – ensure the robot workspace is clear before running.
Standalone Axis Control#
For applications that don’t use trajectory-based planning, the
wmx_core_motion_node provides direct axis control through:
Velocity commands via
/wmx/axis/velocity(wmx_ros2_message/msg/AxisVelocity)Absolute position commands via
/wmx/axis/position(wmx_ros2_message/msg/AxisPose)Relative position commands via
/wmx/axis/position/relative(wmx_ros2_message/msg/AxisPose)
These topics bypass the action server and control motors directly using
CoreMotion::StartVel(), CoreMotion::StartPos(), and
CoreMotion::StartMov() respectively. All commands use trapezoidal
velocity profiles.
See Building Custom Applications for Python examples using direct axis control and ROS2 Topics for message field details.
The full setup sequence for standalone axis control is documented in ROS2 Services (Service Call Workflow section).
See Also#
ROS2 Actions – Complete action interface reference
ROS2 Topics – Topic message formats
ROS2 Services – Service call workflow
Building Custom Applications – Building custom applications with code examples
MoveIt2 Motion Planning – MoveIt2 configuration