Skip to content
This repository has been archived by the owner on Jul 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #641 from mliszcz/fix-511-crash-race-condition-eve…
Browse files Browse the repository at this point in the history
…nt-push-attempt-2

Correction for race condition between polling threads and user threads pushing events.

Three different data races are addressed in the PR:
* All code that sets previous event value (like in below example) in attribute.cpp and eventsupplier.cpp, is protected by `event_mutex`,
* We remove `detect_mutex`. Previously it was protecting the whole `EventSupplier::detect_change`. Now this method is protected by `event_mutex`,
* Setting timestamps of last event subscription is protected now by `event_mutex`.

Fixes #511.
  • Loading branch information
mliszcz authored Jan 24, 2020
2 parents 85fbfd3 + 3bea45c commit ec89823
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 218 deletions.
209 changes: 113 additions & 96 deletions cppapi/server/attribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,47 @@ static bool WantedProp_f(AttrProperty a,const char *n)
return (a.get_name() == n);
}

void LastAttrValue::store(
const AttributeValue_5* attr_5,
const AttributeValue_4* attr_4,
const AttributeValue_3* attr_3,
const AttributeValue* attr,
DevFailed* error)
{
if (error)
{
except = *error;
err = true;
}
else
{
if (attr_5)
{
quality = attr_5->quality;
value_4 = attr_5->value;
}
else if (attr_4)
{
quality = attr_4->quality;
value_4 = attr_4->value;
}
else if (attr_3)
{
quality = attr_3->quality;
value = attr_3->value;
}
else if (attr)
{
quality = attr->quality;
value = attr->value;
}

err = false;
}

inited = true;
}


//--------------------------------------------------------------------------------------------------------------------
//
Expand Down Expand Up @@ -3786,9 +3827,13 @@ void Attribute::fire_change_event(DevFailed *except)
time_t change3_subscription,change4_subscription,change5_subscription;

now = time(NULL);
change3_subscription = now - event_change3_subscription;
change4_subscription = now - event_change4_subscription;
change5_subscription = now - event_change5_subscription;

{
omni_mutex_lock oml(EventSupplier::get_event_mutex());
change3_subscription = now - event_change3_subscription;
change4_subscription = now - event_change4_subscription;
change5_subscription = now - event_change5_subscription;
}

//
// Get the event supplier(s)
Expand Down Expand Up @@ -4029,54 +4074,34 @@ void Attribute::fire_change_event(DevFailed *except)
bool force_change = false;
bool quality_change = false;

if ((except != NULL) ||
(quality == Tango::ATTR_INVALID) ||
((except == NULL) && (prev_change_event.err == true)) ||
((quality != Tango::ATTR_INVALID) &&
(prev_change_event.quality == Tango::ATTR_INVALID)))
{
force_change = true;
}
omni_mutex_lock oml(EventSupplier::get_event_mutex());

std::vector<std::string> filterable_names;
std::vector<double> filterable_data;
std::vector<std::string> filterable_names_lg;
std::vector<long> filterable_data_lg;
const AttrQuality old_quality = prev_change_event.quality;

if (except != NULL)
{
prev_change_event.err = true;
prev_change_event.except = *except;
}
else
{
Tango::AttrQuality the_quality;

if (send_attr_5 != NULL)
{
the_quality = send_attr_5->quality;
prev_change_event.value_4 = send_attr_5->value;
}
else if (send_attr_4 != NULL)
if (except
|| quality == Tango::ATTR_INVALID
|| prev_change_event.err
|| old_quality == Tango::ATTR_INVALID)
{
the_quality = send_attr_4->quality;
prev_change_event.value_4 = send_attr_4->value;
}
else
{
the_quality = send_attr->quality;
prev_change_event.value = send_attr->value;
force_change = true;
}

if (prev_change_event.quality != the_quality)
{
quality_change = true;
}
prev_change_event.store(
send_attr_5,
send_attr_4,
send_attr,
Tango_nullptr,
except);

prev_change_event.quality = the_quality;
prev_change_event.err = false;
quality_change = (old_quality != prev_change_event.quality);
}
prev_change_event.inited = true;


std::vector<std::string> filterable_names;
std::vector<double> filterable_data;
std::vector<std::string> filterable_names_lg;
std::vector<long> filterable_data_lg;

filterable_names.push_back("forced_event");
if (force_change == true)
Expand Down Expand Up @@ -4211,9 +4236,12 @@ void Attribute::fire_archive_event(DevFailed *except)

now = time(NULL);

archive3_subscription = now - event_archive3_subscription;
archive4_subscription = now - event_archive4_subscription;
archive5_subscription = now - event_archive5_subscription;
{
omni_mutex_lock oml(EventSupplier::get_event_mutex());
archive3_subscription = now - event_archive3_subscription;
archive4_subscription = now - event_archive4_subscription;
archive5_subscription = now - event_archive5_subscription;
}

//
// Get the event supplier(s)
Expand Down Expand Up @@ -4468,62 +4496,45 @@ void Attribute::fire_archive_event(DevFailed *except)
}
else
{

//
// Execute detect_change only to calculate the delta_change_rel and
// delta_change_abs and force_change !
//

bool force_change = false;
bool quality_change = false;
double delta_change_rel = 0.0;
double delta_change_abs = 0.0;

if (event_supplier_nd != NULL)
event_supplier_nd->detect_change(*this, ad,true,delta_change_rel,delta_change_abs,except,force_change,dev);
else if (event_supplier_zmq != NULL)
event_supplier_zmq->detect_change(*this, ad,true,delta_change_rel,delta_change_abs,except,force_change,dev);


std::vector<std::string> filterable_names;
std::vector<double> filterable_data;
std::vector<std::string> filterable_names_lg;
std::vector<long> filterable_data_lg;

if (except != NULL)
{
prev_archive_event.err = true;
prev_archive_event.except = *except;
}
else
{
Tango::AttrQuality the_quality;
omni_mutex_lock oml(EventSupplier::get_event_mutex());

if (send_attr_5 != nullptr)
{
prev_archive_event.value_4 = send_attr_5->value;
the_quality = send_attr_5->quality;
}
else if (send_attr_4 != nullptr)
{
prev_archive_event.value_4 = send_attr_4->value;
the_quality = send_attr_4->quality;
}
else
{
prev_archive_event.value = send_attr->value;
the_quality = send_attr->quality;
}
// Execute detect_change only to calculate the delta_change_rel and
// delta_change_abs and force_change !

if (prev_archive_event.quality != the_quality)
if (event_supplier_nd || event_supplier_zmq)
{
quality_change = true;
EventSupplier* event_supplier = event_supplier_nd ? event_supplier_nd : event_supplier_zmq;
event_supplier->detect_change(
*this,
ad,
true,
delta_change_rel,
delta_change_abs,
except,
force_change,
dev);
}

prev_archive_event.quality = the_quality;
prev_archive_event.err = false;
prev_archive_event.store(
send_attr_5,
send_attr_4,
send_attr,
Tango_nullptr,
except);

quality_change = (old_quality != prev_archive_event.quality);
}
prev_archive_event.inited = true;

std::vector<std::string> filterable_names;
std::vector<double> filterable_data;
std::vector<std::string> filterable_names_lg;
std::vector<long> filterable_data_lg;

filterable_names.push_back("forced_event");
if (force_change == true)
Expand Down Expand Up @@ -4654,9 +4665,12 @@ void Attribute::fire_event(std::vector<std::string> &filt_names,std::vector<doub

now = time(NULL);

user3_subscription = now - event_user3_subscription;
user4_subscription = now - event_user4_subscription;
user5_subscription = now - event_user5_subscription;
{
omni_mutex_lock oml(EventSupplier::get_event_mutex());
user3_subscription = now - event_user3_subscription;
user4_subscription = now - event_user4_subscription;
user5_subscription = now - event_user5_subscription;
}

//
// Get the event supplier(s)
Expand Down Expand Up @@ -4926,9 +4940,12 @@ void Attribute::fire_error_periodic_event(DevFailed *except)

now = time(NULL);

periodic3_subscription = now - event_periodic3_subscription;
periodic4_subscription = now - event_periodic4_subscription;
periodic5_subscription = now - event_periodic5_subscription;
{
omni_mutex_lock oml(EventSupplier::get_event_mutex());
periodic3_subscription = now - event_periodic3_subscription;
periodic4_subscription = now - event_periodic4_subscription;
periodic5_subscription = now - event_periodic5_subscription;
}

std::vector<int> client_libs = get_client_lib(PERIODIC_EVENT); // We want a copy

Expand Down
10 changes: 8 additions & 2 deletions cppapi/server/attribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,21 @@ typedef union _Attr_Value
}Attr_Value;


typedef struct last_attr_value
struct LastAttrValue
{
bool inited;
Tango::AttrQuality quality;
CORBA::Any value;
bool err;
DevFailed except;
AttrValUnion value_4;
} LastAttrValue;
void store(
const AttributeValue_5*,
const AttributeValue_4*,
const AttributeValue_3*,
const AttributeValue*,
DevFailed*);
};

typedef enum prop_type
{
Expand Down
Loading

0 comments on commit ec89823

Please sign in to comment.