From 5251bee5fd72752bd6d380ddc4a7254aa67a36cb Mon Sep 17 00:00:00 2001 From: mliszcz Date: Mon, 16 Dec 2019 14:13:20 +0100 Subject: [PATCH 1/4] Protect storing last attribute value with mutex Fixes data race between: * polling thread (EventSupplier::detect_and_push_xxx_event) * and user thread pushing events (Attribute::fire_xxx_event) --- cppapi/server/attribute.cpp | 139 ++++++++++++++++---------------- cppapi/server/attribute.h | 10 ++- cppapi/server/eventsupplier.cpp | 131 ++++++------------------------ 3 files changed, 102 insertions(+), 178 deletions(-) diff --git a/cppapi/server/attribute.cpp b/cppapi/server/attribute.cpp index 9c2d15c75..e29e0fbc0 100644 --- a/cppapi/server/attribute.cpp +++ b/cppapi/server/attribute.cpp @@ -64,6 +64,47 @@ static bool WantedProp_f(AttrProperty a,const char *n) return (a.get_name() == n); } +void LastAttrValue::store( + const AttributeValue_5* attr_5, + const AttributeValue_4* attr_4, + const AttributeValue_3* attr_3, + const AttributeValue* attr, + DevFailed* error) +{ + if (error) + { + except = *error; + err = true; + } + else + { + if (attr_5) + { + quality = attr_5->quality; + value_4 = attr_5->value; + } + else if (attr_4) + { + quality = attr_4->quality; + value_4 = attr_4->value; + } + else if (attr_3) + { + quality = attr_3->quality; + value = attr_3->value; + } + else if (attr) + { + quality = attr->quality; + value = attr->value; + } + + err = false; + } + + inited = true; +} + //-------------------------------------------------------------------------------------------------------------------- // @@ -4029,54 +4070,34 @@ void Attribute::fire_change_event(DevFailed *except) bool force_change = false; bool quality_change = false; - if ((except != NULL) || - (quality == Tango::ATTR_INVALID) || - ((except == NULL) && (prev_change_event.err == true)) || - ((quality != Tango::ATTR_INVALID) && - (prev_change_event.quality == Tango::ATTR_INVALID))) { - force_change = true; - } + omni_mutex_lock oml(EventSupplier::get_event_mutex()); - std::vector filterable_names; - std::vector filterable_data; - std::vector filterable_names_lg; - std::vector filterable_data_lg; + const AttrQuality old_quality = prev_change_event.quality; - if (except != NULL) - { - prev_change_event.err = true; - prev_change_event.except = *except; - } - else - { - Tango::AttrQuality the_quality; - - if (send_attr_5 != NULL) - { - the_quality = send_attr_5->quality; - prev_change_event.value_4 = send_attr_5->value; - } - else if (send_attr_4 != NULL) + if (except + || quality == Tango::ATTR_INVALID + || ((! except) && prev_change_event.err) + || (quality != Tango::ATTR_INVALID && old_quality == Tango::ATTR_INVALID)) { - the_quality = send_attr_4->quality; - prev_change_event.value_4 = send_attr_4->value; - } - else - { - the_quality = send_attr->quality; - prev_change_event.value = send_attr->value; + force_change = true; } - if (prev_change_event.quality != the_quality) - { - quality_change = true; - } + prev_change_event.store( + send_attr_5, + send_attr_4, + send_attr, + Tango_nullptr, + except); - prev_change_event.quality = the_quality; - prev_change_event.err = false; + quality_change = (old_quality != prev_change_event.quality); } - prev_change_event.inited = true; + + + std::vector filterable_names; + std::vector filterable_data; + std::vector filterable_names_lg; + std::vector filterable_data_lg; filterable_names.push_back("forced_event"); if (force_change == true) @@ -4490,40 +4511,20 @@ void Attribute::fire_archive_event(DevFailed *except) std::vector filterable_names_lg; std::vector filterable_data_lg; - if (except != NULL) - { - prev_archive_event.err = true; - prev_archive_event.except = *except; - } - else { - Tango::AttrQuality the_quality; + omni_mutex_lock oml(EventSupplier::get_event_mutex()); - if (send_attr_5 != nullptr) - { - prev_archive_event.value_4 = send_attr_5->value; - the_quality = send_attr_5->quality; - } - else if (send_attr_4 != nullptr) - { - prev_archive_event.value_4 = send_attr_4->value; - the_quality = send_attr_4->quality; - } - else - { - prev_archive_event.value = send_attr->value; - the_quality = send_attr->quality; - } + const AttrQuality old_quality = prev_archive_event.quality; - if (prev_archive_event.quality != the_quality) - { - quality_change = true; - } + prev_archive_event.store( + send_attr_5, + send_attr_4, + send_attr, + Tango_nullptr, + except); - prev_archive_event.quality = the_quality; - prev_archive_event.err = false; + quality_change = (old_quality != prev_archive_event.quality); } - prev_archive_event.inited = true; filterable_names.push_back("forced_event"); if (force_change == true) diff --git a/cppapi/server/attribute.h b/cppapi/server/attribute.h index e524842ab..2a89a3d63 100644 --- a/cppapi/server/attribute.h +++ b/cppapi/server/attribute.h @@ -102,7 +102,7 @@ typedef union _Attr_Value }Attr_Value; -typedef struct last_attr_value +struct LastAttrValue { bool inited; Tango::AttrQuality quality; @@ -110,7 +110,13 @@ typedef struct last_attr_value bool err; DevFailed except; AttrValUnion value_4; -} LastAttrValue; + void store( + const AttributeValue_5*, + const AttributeValue_4*, + const AttributeValue_3*, + const AttributeValue*, + DevFailed*); +}; typedef enum prop_type { diff --git a/cppapi/server/eventsupplier.cpp b/cppapi/server/eventsupplier.cpp index 3546a8a44..a57cfbbc6 100644 --- a/cppapi/server/eventsupplier.cpp +++ b/cppapi/server/eventsupplier.cpp @@ -335,34 +335,12 @@ bool EventSupplier::detect_and_push_change_event(DeviceImpl *device_impl, struct if (attr.prev_change_event.inited == false) { - if (except != NULL) - { - attr.prev_change_event.err = true; - attr.prev_change_event.except = *except; - } - else - { - if (attr_value.attr_val_5 != NULL) - { - attr.prev_change_event.value_4 = attr_value.attr_val_5->value; - } - else if (attr_value.attr_val_4 != NULL) - { - attr.prev_change_event.value_4 = attr_value.attr_val_4->value; - } - else if (attr_value.attr_val_3 != NULL) - { - attr.prev_change_event.value = attr_value.attr_val_3->value; - } - else - { - attr.prev_change_event.value = attr_value.attr_val->value; - } - - attr.prev_change_event.quality = the_quality; - attr.prev_change_event.err = false; - } - attr.prev_change_event.inited = true; + attr.prev_change_event.store( + attr_value.attr_val_5, + attr_value.attr_val_4, + attr_value.attr_val_3, + attr_value.attr_val, + except); is_change = true; } else @@ -401,32 +379,12 @@ bool EventSupplier::detect_and_push_change_event(DeviceImpl *device_impl, struct std::vector filterable_names_lg; std::vector filterable_data_lg; - if (except != NULL) - { - attr.prev_change_event.err = true; - attr.prev_change_event.except = *except; - } - else - { - if (attr_value.attr_val_5 != NULL) - { - attr.prev_change_event.value_4 = attr_value.attr_val_5->value; - } - else if (attr_value.attr_val_4 != NULL) - { - attr.prev_change_event.value_4 = attr_value.attr_val_4->value; - } - else if (attr_value.attr_val_3 != NULL) - { - attr.prev_change_event.value = attr_value.attr_val_3->value; - } - else - { - attr.prev_change_event.value = attr_value.attr_val->value; - } - attr.prev_change_event.quality = the_quality; - attr.prev_change_event.err = false; - } + attr.prev_change_event.store( + attr_value.attr_val_5, + attr_value.attr_val_4, + attr_value.attr_val_3, + attr_value.attr_val, + except); // // Prepare to push the event @@ -692,36 +650,15 @@ bool EventSupplier::detect_and_push_archive_event(DeviceImpl *device_impl, if (attr.prev_archive_event.inited == false) { - if (except != NULL) - { - attr.prev_archive_event.err = true; - attr.prev_archive_event.except = *except; - } - else - { - if (attr_value.attr_val_5 != NULL) - { - attr.prev_archive_event.value_4 = attr_value.attr_val_5->value; - } - else if (attr_value.attr_val_4 != NULL) - { - attr.prev_archive_event.value_4 = attr_value.attr_val_4->value; - } - else if (attr_value.attr_val_3 != NULL) - { - attr.prev_archive_event.value = attr_value.attr_val_3->value; - } - else - { - attr.prev_archive_event.value = attr_value.attr_val->value; - } + attr.prev_archive_event.store( + attr_value.attr_val_5, + attr_value.attr_val_4, + attr_value.attr_val_3, + attr_value.attr_val, + except); - attr.prev_archive_event.quality = the_quality; - attr.prev_archive_event.err = false; - } attr.archive_last_periodic = now_ms; attr.archive_last_event = now_ms; - attr.prev_archive_event.inited = true; is_change = true; } else @@ -764,32 +701,12 @@ bool EventSupplier::detect_and_push_archive_event(DeviceImpl *device_impl, domain_name = device_impl->get_name() + "/" + attr_name; - if (except != NULL) - { - attr.prev_archive_event.err = true; - attr.prev_archive_event.except = *except; - } - else - { - if (attr_value.attr_val_5 != NULL) - { - attr.prev_archive_event.value_4 = attr_value.attr_val_5->value; - } - else if (attr_value.attr_val_4 != NULL) - { - attr.prev_archive_event.value_4 = attr_value.attr_val_4->value; - } - else if (attr_value.attr_val_3 != NULL) - { - attr.prev_archive_event.value = attr_value.attr_val_3->value; - } - else - { - attr.prev_archive_event.value = attr_value.attr_val->value; - } - attr.prev_archive_event.quality = the_quality; - attr.prev_archive_event.err = false; - } + attr.prev_archive_event.store( + attr_value.attr_val_5, + attr_value.attr_val_4, + attr_value.attr_val_3, + attr_value.attr_val, + except); // // Prepare to push the event From 8d4a1bbe68a75e6cf1a35f7945626fc821374afc Mon Sep 17 00:00:00 2001 From: mliszcz Date: Mon, 16 Dec 2019 14:36:24 +0100 Subject: [PATCH 2/4] Change mutex used for EventSupplier::detect_change Remove detect_mutex and protect detect_change method with event_mutex to synchronize access to the old attribute value. Fixes data race between: * polling thread (EventSupplier::detect_and_push_xxx_event) * and user thread pushing events (EventSupplier::detect_change) --- cppapi/server/attribute.cpp | 39 ++++++++++++++++++--------------- cppapi/server/eventsupplier.cpp | 8 ------- cppapi/server/eventsupplier.h | 5 ----- 3 files changed, 21 insertions(+), 31 deletions(-) diff --git a/cppapi/server/attribute.cpp b/cppapi/server/attribute.cpp index e29e0fbc0..643c0c90f 100644 --- a/cppapi/server/attribute.cpp +++ b/cppapi/server/attribute.cpp @@ -4489,32 +4489,30 @@ void Attribute::fire_archive_event(DevFailed *except) } else { - -// -// Execute detect_change only to calculate the delta_change_rel and -// delta_change_abs and force_change ! -// - bool force_change = false; bool quality_change = false; double delta_change_rel = 0.0; double delta_change_abs = 0.0; - if (event_supplier_nd != NULL) - event_supplier_nd->detect_change(*this, ad,true,delta_change_rel,delta_change_abs,except,force_change,dev); - else if (event_supplier_zmq != NULL) - event_supplier_zmq->detect_change(*this, ad,true,delta_change_rel,delta_change_abs,except,force_change,dev); - - - std::vector filterable_names; - std::vector filterable_data; - std::vector filterable_names_lg; - std::vector filterable_data_lg; - { omni_mutex_lock oml(EventSupplier::get_event_mutex()); - const AttrQuality old_quality = prev_archive_event.quality; + // Execute detect_change only to calculate the delta_change_rel and + // delta_change_abs and force_change ! + + if (event_supplier_nd || event_supplier_zmq) + { + EventSupplier* event_supplier = event_supplier_nd ? event_supplier_nd : event_supplier_zmq; + event_supplier->detect_change( + *this, + ad, + true, + delta_change_rel, + delta_change_abs, + except, + force_change, + dev); + } prev_archive_event.store( send_attr_5, @@ -4526,6 +4524,11 @@ void Attribute::fire_archive_event(DevFailed *except) quality_change = (old_quality != prev_archive_event.quality); } + std::vector filterable_names; + std::vector filterable_data; + std::vector filterable_names_lg; + std::vector filterable_data_lg; + filterable_names.push_back("forced_event"); if (force_change == true) filterable_data.push_back((double)1.0); diff --git a/cppapi/server/eventsupplier.cpp b/cppapi/server/eventsupplier.cpp index a57cfbbc6..674acf9a3 100644 --- a/cppapi/server/eventsupplier.cpp +++ b/cppapi/server/eventsupplier.cpp @@ -43,8 +43,6 @@ namespace Tango omni_mutex EventSupplier::event_mutex; -omni_mutex EventSupplier::detect_mutex; - omni_mutex EventSupplier::push_mutex; omni_condition EventSupplier::push_cond(&EventSupplier::push_mutex); @@ -1098,12 +1096,6 @@ bool EventSupplier::detect_change(Attribute &attr, struct SuppliedEventData &att the_new_any = &(attr_value.attr_val->value); } -// -// get the mutex to synchronize the sending of events -// - - omni_mutex_lock l(detect_mutex); - // // Send event, if the read_attribute failed or if it is the first time that the read_attribute succeed after a failure. // Same thing if the attribute quality factor changes to INVALID diff --git a/cppapi/server/eventsupplier.h b/cppapi/server/eventsupplier.h index bcda2ec85..11554a157 100644 --- a/cppapi/server/eventsupplier.h +++ b/cppapi/server/eventsupplier.h @@ -173,11 +173,6 @@ protected : static omni_mutex push_mutex; static omni_condition push_cond; - // Added a mutex to synchronize the access to - // detect_event which is used - // from different threads - static omni_mutex detect_mutex; - private: bool one_subscription_cmd; }; From ee262d4fc3349be074fef3e86e74a5c6706e648e Mon Sep 17 00:00:00 2001 From: mliszcz Date: Mon, 16 Dec 2019 14:46:53 +0100 Subject: [PATCH 3/4] Protect event subscription timestamps with mutex Fixes data race between: * omni worker thread (DServer::event_subscription) * and user thread pushing events (Attribute::fire_xxx_event) --- cppapi/server/attribute.cpp | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/cppapi/server/attribute.cpp b/cppapi/server/attribute.cpp index 643c0c90f..448b919dc 100644 --- a/cppapi/server/attribute.cpp +++ b/cppapi/server/attribute.cpp @@ -3827,9 +3827,13 @@ void Attribute::fire_change_event(DevFailed *except) time_t change3_subscription,change4_subscription,change5_subscription; now = time(NULL); - change3_subscription = now - event_change3_subscription; - change4_subscription = now - event_change4_subscription; - change5_subscription = now - event_change5_subscription; + + { + omni_mutex_lock oml(EventSupplier::get_event_mutex()); + change3_subscription = now - event_change3_subscription; + change4_subscription = now - event_change4_subscription; + change5_subscription = now - event_change5_subscription; + } // // Get the event supplier(s) @@ -4232,9 +4236,12 @@ void Attribute::fire_archive_event(DevFailed *except) now = time(NULL); - archive3_subscription = now - event_archive3_subscription; - archive4_subscription = now - event_archive4_subscription; - archive5_subscription = now - event_archive5_subscription; + { + omni_mutex_lock oml(EventSupplier::get_event_mutex()); + archive3_subscription = now - event_archive3_subscription; + archive4_subscription = now - event_archive4_subscription; + archive5_subscription = now - event_archive5_subscription; + } // // Get the event supplier(s) @@ -4658,9 +4665,12 @@ void Attribute::fire_event(std::vector &filt_names,std::vector client_libs = get_client_lib(PERIODIC_EVENT); // We want a copy From 3bea45cbbb00a289ba447b22395431789c6376a1 Mon Sep 17 00:00:00 2001 From: mliszcz Date: Thu, 23 Jan 2020 16:23:03 +0100 Subject: [PATCH 4/4] Simplify condition for enforcing change event Redundant checks are removed to simplify the condition for enforcing sending of change event. New condition is equivalent to the original one. This is non-functional change. --- cppapi/server/attribute.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cppapi/server/attribute.cpp b/cppapi/server/attribute.cpp index 448b919dc..8c8e3a941 100644 --- a/cppapi/server/attribute.cpp +++ b/cppapi/server/attribute.cpp @@ -4081,8 +4081,8 @@ void Attribute::fire_change_event(DevFailed *except) if (except || quality == Tango::ATTR_INVALID - || ((! except) && prev_change_event.err) - || (quality != Tango::ATTR_INVALID && old_quality == Tango::ATTR_INVALID)) + || prev_change_event.err + || old_quality == Tango::ATTR_INVALID) { force_change = true; }