Skip to content

Commit

Permalink
Adds multi-group point streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
rr-mark committed Feb 29, 2024
1 parent 3807bde commit d34b4ad
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ class JointTrajectoryInterface
*/
virtual void jointTrajectoryCB(const trajectory_msgs::JointTrajectoryConstPtr &msg);

virtual void jointCommandExCB(const motoman_msgs::DynamicJointTrajectoryConstPtr &msg) = 0;

/**
* \brief Callback function registered to joint_command topic subscriber.
* Specific method is implemented in JointTrajectoryStreamer class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class JointTrajectoryStreamer : public JointTrajectoryInterface

virtual void jointTrajectoryCB(const motoman_msgs::DynamicJointTrajectoryConstPtr &msg);

virtual void jointCommandExCB(const motoman_msgs::DynamicJointTrajectoryConstPtr &msg);

virtual void jointCommandCB(const trajectory_msgs::JointTrajectoryConstPtr &msg);

virtual bool trajectory_to_msgs(const trajectory_msgs::JointTrajectoryConstPtr& traj,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ bool JointTrajectoryInterface::init(
this->srv_stop_motion_ = this->node_.advertiseService(
"stop_motion", &JointTrajectoryInterface::stopMotionCB, this);

this->sub_joint_command_ = this->node_.subscribe("joint_command", 0, &JointTrajectoryInterface::jointCommandExCB, this);

for (it_type iterator = this->robot_groups_.begin(); iterator != this->robot_groups_.end(); iterator++)
{
std::string name_str, ns_str;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,194 @@ void JointTrajectoryStreamer::jointTrajectoryCB(const trajectory_msgs::JointTraj
send_to_robot(new_traj_msgs);
}

void JointTrajectoryStreamer::jointCommandExCB(const motoman_msgs::DynamicJointTrajectoryConstPtr &msg)
{
// read current state value (should be atomic)
int state = this->state_;

ROS_DEBUG("Current state is: %d", state);

// Point specific variables that can persist between IDLE and POINT_STREAM state
bool start_point = false;

const size_t num_msg_points = msg->points.size();

motoman_msgs::DynamicJointsGroup rbt_pt;

//If current state is idle, set to POINT_STREAMING
if (TransferStates::IDLE == state)
{
// Check to see if the message contains more than one trajectory point, currently the
// POINT_STREAMING state only accepts a single point
if (num_msg_points > 1)
{
ROS_ERROR("JointTrajectory command must contain a single point, ignoring message and maintaining IDLE state");
motoman_driver::MotomanErrors error;
error.code = motoman_driver::MotomanErrors::CMD_MUST_HAVE_SINGLE_POINT;
motoman_errors_pub_.publish(error);
return;
}

if (num_msg_points == 0) return;

// Get the message point and select
const motoman_msgs::DynamicJointsGroup msg_pt = msg->points[0].groups[0];

if (!select(msg->joint_names, msg_pt, robot_groups_[msg_pt.group_number].get_joint_names(), &rbt_pt))
{
// Select function will report message to console, just return here to stay in IDLE state
return;
}
else
{
// Check for required zero velocities
const double zero_tolerance = 1e-5;
bool zero_velocities = true;

for (size_t i = 0; i < rbt_pt.velocities.size(); ++i)
{
if (std::abs(rbt_pt.velocities[i]) > zero_tolerance )
{
zero_velocities = false;
break;
}
}

if (!zero_velocities)
{
ROS_ERROR("Starting joint point must contain zero velocity for each joint,"
" unable to transition to on-the-fly streaming");
motoman_driver::MotomanErrors error;
error.code = motoman_driver::MotomanErrors::CMD_MUST_HAVE_ZERO_VELOCITY;
motoman_errors_pub_.publish(error);
return;
}
}

// Have a valid starting point, update the flag and the time from start variable
// Note: last_time_from_start_ variable is only used in this function therefore we do not need to wrap in lock
start_point = true;
ptstreaming_last_time_from_start_ = msg_pt.time_from_start;

// Update point streaming sequence count, empty the point queue and set internal state to point streaming
this->mutex_.lock();
this->state_ = TransferStates::POINT_STREAMING;
motoman_driver::MotomanStatus status;
status.status = motoman_driver::MotomanStatus::POINT_STREAMING;
motoman_status_pub_.publish(status);
this->ptstreaming_seq_count_ = 0;
this->ptstreaming_queue_ = std::queue<SimpleMessage>();
this->mutex_.unlock();

// Set the local state to point streaming to force enqueuing of the starting point
state = TransferStates::POINT_STREAMING;

ROS_INFO("Starting joint point received. Starting on-the-fly streaming.");
}

// Process incoming point since we are in point streaming mode
if (TransferStates::POINT_STREAMING == state)
{
bool stop_trajectory = false;

ros::Duration pt_time_from_start;

// Empty points is a trigger to abort the POINT_STREAMING mode by stopping the trajectory
if (msg->points.empty())
{
ROS_INFO("Empty point received");
stop_trajectory = true;
}
else if (num_msg_points != 1) // Check for invalid number of trajectory points
{
ROS_ERROR("JointTrajectory command must contain a single point");
motoman_driver::MotomanErrors error;
error.code = motoman_driver::MotomanErrors::CMD_MUST_HAVE_SINGLE_POINT;
motoman_errors_pub_.publish(error);
stop_trajectory = true;
}
else // We have at one point, let check for timing if we are not the start point
{
pt_time_from_start = msg->points[0].groups[0].time_from_start;
if (!start_point && (pt_time_from_start <= ptstreaming_last_time_from_start_))
{
ROS_ERROR("JointTrajectory point must have a time from start duration that is greater than the previously "
"processes point");
motoman_driver::MotomanErrors error;
error.code = motoman_driver::MotomanErrors::CMD_HAS_INCORRECT_TIME_FROM_START;
motoman_errors_pub_.publish(error);
stop_trajectory = true;
}
}

if (ptstreaming_queue_.size() > max_ptstreaming_queue_elements) // Check for max queue size
{
ROS_ERROR("Point streaming queue has reached max allowed elements");
motoman_driver::MotomanErrors error;
error.code = motoman_driver::MotomanErrors::STREAMING_QUEUE_OVERFLOW;
motoman_errors_pub_.publish(error);
stop_trajectory = true;
}

// Stop the trajectory and cancel the point streaming mode if we need to
if (stop_trajectory)
{
ROS_INFO("Canceling on-the-fly streaming");
this->mutex_.lock();
trajectoryStop();
this->mutex_.unlock();
return;
}

// select / reorder joints for sending to robot (if it is not the starting point)
if (!start_point)
{
if (!select(msg->joint_names, msg->points[0].groups[0], robot_groups_[msg->points[0].groups[0].group_number].get_joint_names(), &rbt_pt))
{
return;
}
}

motoman_msgs::DynamicJointsGroup xform_pt;
// transform point data (e.g. for joint-coupling)
if (!transform(rbt_pt, &xform_pt))
return;

SimpleMessage message;
// convert trajectory point to ROS message
if (!create_message(this->ptstreaming_seq_count_, xform_pt, &message))
return;

// Update the last time from start
// Note: last_time_from_start_ variable is only used in this function therefore we do not need to wrap in lock
ptstreaming_last_time_from_start_ = pt_time_from_start;

// Points get pushed into queue here. They will be popped in the Streaming Thread and sent to controller.
this->mutex_.lock();
this->ptstreaming_queue_.push(message);
this->ptstreaming_seq_count_++;
this->mutex_.unlock();
}

//Else, cannot splice. Cancel current motion.
else
{
if (msg->points.empty())
ROS_INFO("Empty trajectory received, canceling current trajectory");
else {
ROS_ERROR("Trajectory splicing not yet implemented, stopping current motion.");
motoman_driver::MotomanErrors error;
error.code = motoman_driver::MotomanErrors::SPLICING_NOT_ALLOWED;
motoman_errors_pub_.publish(error);
}

this->mutex_.lock();
trajectoryStop();
this->mutex_.unlock();
return;
}
}

void JointTrajectoryStreamer::jointCommandCB(const trajectory_msgs::JointTrajectoryConstPtr &msg)
{
// read current state value (should be atomic)
Expand Down Expand Up @@ -428,7 +616,7 @@ void JointTrajectoryStreamer::streamingThread()
while (ros::ok())
{
ros::Duration(0.001).sleep();
ROS_INFO("QUEUE SIZE: %d", ptstreaming_queue_.size());
ROS_INFO("QUEUE SIZE: %ld", ptstreaming_queue_.size());
ros::Time start = ros::Time::now();
ROS_INFO("JOINT STREAMER THREAD START TIME: %lf", start.sec + ((double) start.nsec / pow(10, 9)));

Expand Down

0 comments on commit d34b4ad

Please sign in to comment.