Skip to content

Commit

Permalink
Merge pull request OpenDDS#1158 from jrw972/master
Browse files Browse the repository at this point in the history
Make inconsistent topic tests fail predictably
  • Loading branch information
mitza-oci authored Jun 19, 2019
2 parents bf57625 + 5919fe5 commit 3b43e76
Show file tree
Hide file tree
Showing 18 changed files with 336 additions and 474 deletions.
2 changes: 1 addition & 1 deletion bin/dcps_tests.lst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ tests/DCPS/LivelinessTest/run_test.pl rtps_disc: !DCPS_MIN !DDS_NO_OWNERSHIP_PRO
tests/DCPS/LivelinessTest/run_test.pl rtps_disc take: !DCPS_MIN !DDS_NO_OWNERSHIP_PROFILE RTPS

tests/DCPS/Inconsistent_Qos/run_test.pl: RTPS XERCES3 !STATIC
tests/DCPS/InconsistentTopic/run_test.pl rtps_disc: RTPS
tests/DCPS/InconsistentTopic/run_test.pl rtps_disc: RTPS !NO_BUILT_IN_TOPICS
tests/DCPS/TopicReuse/run_test.pl: RTPS
tests/DCPS/DpShutdown/run_test.pl: RTPS
tests/DCPS/Serializer/run_test.pl: !DCPS_MIN
Expand Down
1 change: 1 addition & 0 deletions dds/DCPS/Discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class OpenDDS_Dcps_Export Discovery : public RcObject {

virtual TopicStatus find_topic(
DDS::DomainId_t domainId,
const RepoId& participantId,
const char* topicName,
CORBA::String_out dataTypeName,
DDS::TopicQos_out qos,
Expand Down
147 changes: 58 additions & 89 deletions dds/DCPS/DiscoveryBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ namespace OpenDDS {
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
topics_.find(topicName);
if (iter != topics_.end()) { // types must match, RtpsDiscovery checked for us
if (iter != topics_.end()) {
if (iter->second.data_type_ != dataTypeName) {
return DCPS::CONFLICTING_TYPENAME;
}
iter->second.qos_ = qos;
iter->second.has_dcps_key_ = hasDcpsKey;
topicId = iter->second.repo_id_;
Expand All @@ -319,25 +322,48 @@ namespace OpenDDS {
return DCPS::CREATED;
}

DCPS::TopicStatus remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
DCPS::TopicStatus find_topic(const char* topicName,
CORBA::String_out dataTypeName,
DDS::TopicQos_out qos,
OpenDDS::DCPS::RepoId_out topicId)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::const_iterator iter =
topics_.find(topicName);
if (iter == topics_.end()) {
return DCPS::NOT_FOUND;
}

const TopicDetails& td = iter->second;

dataTypeName = td.data_type_.c_str();
qos = new DDS::TopicQos(td.qos_);
topicId = td.repo_id_;
return DCPS::FOUND;
}

DCPS::TopicStatus remove_topic(const RepoId& topicId)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
name = topic_names_[topicId];
typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
topics_.find(name);
TopicNameMap::iterator name_iter = topic_names_.find(topicId);
if (name_iter == topic_names_.end()) {
return DCPS::NOT_FOUND;
}
const OPENDDS_STRING& name = name_iter->second;

typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = topics_.find(name);
if (top_it != topics_.end()) {
TopicDetails& td = top_it->second;
if (td.endpoints_.empty()) {
topics_.erase(name);
}
}

topic_names_.erase(topicId);
topic_names_.erase(name_iter);
return DCPS::REMOVED;
}

virtual bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
OPENDDS_STRING& name) = 0;
virtual bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos) = 0;

DCPS::RepoId add_publication(const DCPS::RepoId& topicId,
DCPS::DataWriterCallbacks* publication,
Expand Down Expand Up @@ -1423,9 +1449,18 @@ namespace OpenDDS {
}

DCPS::TopicStatus
remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
find_topic(const char* topicName,
CORBA::String_out dataTypeName,
DDS::TopicQos_out qos,
OpenDDS::DCPS::RepoId_out topicId)
{
return endpoint_manager().find_topic(topicName, dataTypeName, qos, topicId);
}

DCPS::TopicStatus
remove_topic(const RepoId& topicId)
{
return endpoint_manager().remove_topic(topicId, name);
return endpoint_manager().remove_topic(topicId);
}

void
Expand All @@ -1436,10 +1471,9 @@ namespace OpenDDS {
}

bool
update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
OPENDDS_STRING& name)
update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos)
{
return endpoint_manager().update_topic_qos(topicId, qos, name);
return endpoint_manager().update_topic_qos(topicId, qos);
}

RepoId
Expand Down Expand Up @@ -1791,88 +1825,31 @@ namespace OpenDDS {
bool hasDcpsKey)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
typename OPENDDS_MAP(DDS::DomainId_t,
OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
topics_.find(domainId);
if (topic_it != topics_.end()) {
const typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator it =
topic_it->second.find(topicName);
if (it != topic_it->second.end()
&& it->second.data_type_ != dataTypeName) {
topicId = GUID_UNKNOWN;
return DCPS::CONFLICTING_TYPENAME;
}
}

// Verified its safe to hold lock during call to assert_topic
const DCPS::TopicStatus stat =
participants_[domainId][participantId]->assert_topic(topicId, topicName,
dataTypeName, qos,
hasDcpsKey);
if (stat == DCPS::CREATED || stat == DCPS::FOUND) { // qos change (FOUND)
TopicDetails& td = topics_[domainId][topicName];
td.data_type_ = dataTypeName;
td.qos_ = qos;
td.repo_id_ = topicId;
++topic_use_[domainId][topicName];
}
return stat;
return participants_[domainId][participantId]->assert_topic(topicId, topicName,
dataTypeName, qos,
hasDcpsKey);
}

virtual DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char* topicName,
CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
virtual DCPS::TopicStatus find_topic(DDS::DomainId_t domainId,
const OpenDDS::DCPS::RepoId& participantId,
const char* topicName,
CORBA::String_out dataTypeName,
DDS::TopicQos_out qos,
OpenDDS::DCPS::RepoId_out topicId)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
typename OPENDDS_MAP(DDS::DomainId_t,
OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
topics_.find(domainId);
if (topic_it == topics_.end()) {
return DCPS::NOT_FOUND;
}
typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
topic_it->second.find(topicName);
if (iter == topic_it->second.end()) {
return DCPS::NOT_FOUND;
}
TopicDetails& td = iter->second;
dataTypeName = td.data_type_.c_str();
qos = new DDS::TopicQos(td.qos_);
topicId = td.repo_id_;
++topic_use_[domainId][topicName];
return DCPS::FOUND;
return participants_[domainId][participantId]->find_topic(topicName, dataTypeName, qos, topicId);
}

virtual DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId,
const OpenDDS::DCPS::RepoId& participantId,
const OpenDDS::DCPS::RepoId& topicId)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
typename OPENDDS_MAP(DDS::DomainId_t,
OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
topics_.find(domainId);
if (topic_it == topics_.end()) {
return DCPS::NOT_FOUND;
}

OPENDDS_STRING name;
// Safe to hold lock while calling remove topic
const DCPS::TopicStatus stat =
participants_[domainId][participantId]->remove_topic(topicId, name);

if (stat == DCPS::REMOVED) {
if (0 == --topic_use_[domainId][name]) {
topic_use_[domainId].erase(name);
if (topic_it->second.empty()) {
topic_use_.erase(domainId);
}
topic_it->second.erase(name);
if (topic_it->second.empty()) {
topics_.erase(topic_it);
}
}
}
return stat;
return participants_[domainId][participantId]->remove_topic(topicId);
}

virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId& myParticipantId,
Expand All @@ -1886,14 +1863,8 @@ namespace OpenDDS {
const OpenDDS::DCPS::RepoId& participantId, const DDS::TopicQos& qos)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
OPENDDS_STRING name;
// Safe to hold lock while calling update_topic_qos
if (participants_[domainId][participantId]->update_topic_qos(topicId,
qos, name)) {
topics_[domainId][name].qos_ = qos;
return true;
}
return false;
return participants_[domainId][participantId]->update_topic_qos(topicId, qos);
}

virtual OpenDDS::DCPS::RepoId add_publication(DDS::DomainId_t domainId,
Expand Down Expand Up @@ -2073,8 +2044,6 @@ namespace OpenDDS {
} reactor_runner_;

DomainParticipantMap participants_;
OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, TopicDetails) ) topics_;
OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, unsigned int) ) topic_use_;
};

} // namespace DCPS
Expand Down
1 change: 1 addition & 0 deletions dds/DCPS/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ DomainParticipantImpl::find_topic(

Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
TopicStatus status = disco->find_topic(domain_id_,
get_id(),
topic_name,
type_name.out(),
qos.out(),
Expand Down
7 changes: 5 additions & 2 deletions dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,11 @@ InfoRepoDiscovery::assert_topic(DCPS::RepoId_out topicId, DDS::DomainId_t domain
}

DCPS::TopicStatus
InfoRepoDiscovery::find_topic(DDS::DomainId_t domainId, const char* topicName,
CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
InfoRepoDiscovery::find_topic(DDS::DomainId_t domainId,
const DCPS::RepoId& /*participantId*/,
const char* topicName,
CORBA::String_out dataTypeName,
DDS::TopicQos_out qos,
DCPS::RepoId_out topicId)
{
try {
Expand Down
1 change: 1 addition & 0 deletions dds/DCPS/InfoRepoDiscovery/InfoRepoDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class OpenDDS_InfoRepoDiscovery_Export InfoRepoDiscovery : public Discovery {

virtual OpenDDS::DCPS::TopicStatus find_topic(
DDS::DomainId_t domainId,
const OpenDDS::DCPS::RepoId& participantId,
const char* topicName,
CORBA::String_out dataTypeName,
DDS::TopicQos_out qos,
Expand Down
27 changes: 16 additions & 11 deletions dds/DCPS/RTPS/Sedp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,16 +1418,15 @@ Sedp::sub_bit()
#endif /* DDS_HAS_MINIMUM_BIT */

bool
Sedp::update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
OPENDDS_STRING& name)
Sedp::update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator iter =
topic_names_.find(topicId);
if (iter == topic_names_.end()) {
return false;
}
name = iter->second;
const OPENDDS_STRING& name = iter->second;
TopicDetails& topic = topics_[name];
using namespace DCPS;
// If the TOPIC_DATA QoS changed our local endpoints must be resent
Expand Down Expand Up @@ -1765,6 +1764,7 @@ void Sedp::process_discovered_writer_data(DCPS::MessageId message_id,
#endif

DiscoveredPublication& pub = discovered_publications_[guid] = prepub;
bool inconsistent = false;

OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
topics_.find(topic_name);
Expand All @@ -1788,12 +1788,14 @@ void Sedp::process_discovered_writer_data(DCPS::MessageId message_id,
wdata.ddsPublicationData.type_name.in(),
top_it->second.data_type_.c_str()));
}
return;
inconsistent = true;
}

TopicDetails& td = top_it->second;
topic_names_[td.repo_id_] = topic_name;
td.endpoints_.insert(guid);
if (!inconsistent) {
TopicDetails& td = top_it->second;
topic_names_[td.repo_id_] = topic_name;
td.endpoints_.insert(guid);
}

std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value,
guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
Expand Down Expand Up @@ -2086,6 +2088,7 @@ void Sedp::process_discovered_reader_data(DCPS::MessageId message_id,
#endif

DiscoveredSubscription& sub = discovered_subscriptions_[guid] = presub;
bool inconsistent = false;

OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
topics_.find(topic_name);
Expand All @@ -2109,12 +2112,14 @@ void Sedp::process_discovered_reader_data(DCPS::MessageId message_id,
rdata.ddsSubscriptionData.type_name.in(),
top_it->second.data_type_.c_str()));
}
return;
inconsistent = true;
}

TopicDetails& td = top_it->second;
topic_names_[td.repo_id_] = topic_name;
td.endpoints_.insert(guid);
if (!inconsistent) {
TopicDetails& td = top_it->second;
topic_names_[td.repo_id_] = topic_name;
td.endpoints_.insert(guid);
}

std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value,
guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t));
Expand Down
3 changes: 1 addition & 2 deletions dds/DCPS/RTPS/Sedp.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ class Sedp : public DCPS::EndpointManager<ParticipantData_t> {
DDS::ReturnCode_t write_dcps_participant_dispose(const DCPS::RepoId& part);

// Topic
bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
OPENDDS_STRING& name);
bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos);

// Publication
bool update_publication_qos(const DCPS::RepoId& publicationId,
Expand Down
3 changes: 1 addition & 2 deletions dds/DCPS/StaticDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ void StaticEndpointManager::assign_subscription_key(RepoId& rid,

bool
StaticEndpointManager::update_topic_qos(const RepoId& /*topicId*/,
const DDS::TopicQos& /*qos*/,
OPENDDS_STRING& /*name*/)
const DDS::TopicQos& /*qos*/)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
ACE_TEXT("Not allowed\n")));
Expand Down
3 changes: 1 addition & 2 deletions dds/DCPS/StaticDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ class StaticEndpointManager
const DDS::DataReaderQos& qos);

virtual bool update_topic_qos(const RepoId& /*topicId*/,
const DDS::TopicQos& /*qos*/,
OPENDDS_STRING& /*name*/);
const DDS::TopicQos& /*qos*/);

virtual bool update_publication_qos(const RepoId& /*publicationId*/,
const DDS::DataWriterQos& /*qos*/,
Expand Down
3 changes: 1 addition & 2 deletions tests/DCPS/InconsistentTopic/.gitignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/MessengerTypeSupportImpl.cpp
/MessengerTypeSupport.idl
/publisher
/MessengerTypeSupportImpl.h
/subscriber
/MessengerTypeSupportC.h
/MessengerC.h
/MessengerTypeSupportS.h
Expand All @@ -15,3 +13,4 @@
/MessengerTypeSupportS.cpp
/MessengerS.inl
/MessengerS.cpp
/pubsub
Loading

0 comments on commit 3b43e76

Please sign in to comment.