Fix sporadic race condition between Variant and underlying bus

The underlying bus was thread_local, but the design assumption that Variants built on top of that instance won't outlive the thread was incorrect. In stress tests, Variants were moved (and this is completely legal) to a different thread.
This commit is contained in:
sangelovic
2020-01-25 22:20:12 +01:00
parent 245db893b8
commit 75709e31f1
8 changed files with 43 additions and 60 deletions

View File

@ -190,6 +190,17 @@ SlotPtr Connection::addObjectVTable( const std::string& objectPath
return {slot, [this](void *slot){ iface_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
}
PlainMessage Connection::createPlainMessage() const
{
sd_bus_message* sdbusMsg{};
auto r = iface_->sd_bus_message_new(bus_.get(), &sdbusMsg, _SD_BUS_MESSAGE_TYPE_INVALID);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create a plain message", -r);
return Message::Factory::create<PlainMessage>(sdbusMsg, iface_.get(), adopt_message);
}
MethodCall Connection::createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
@ -213,17 +224,17 @@ Signal Connection::createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const
{
sd_bus_message *sdbusSignal{};
sd_bus_message *sdbusMsg{};
auto r = iface_->sd_bus_message_new_signal( bus_.get()
, &sdbusSignal
, &sdbusMsg
, objectPath.c_str()
, interfaceName.c_str()
, signalName.c_str() );
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create signal", -r);
return Message::Factory::create<Signal>(sdbusSignal, iface_.get(), adopt_message);
return Message::Factory::create<Signal>(sdbusMsg, iface_.get(), adopt_message);
}
void Connection::emitPropertiesChangedSignal( const std::string& objectPath
@ -447,6 +458,16 @@ Connection::LoopExitEventFd::~LoopExitEventFd()
namespace sdbus {
namespace internal {
std::unique_ptr<sdbus::internal::IConnection> createConnection()
{
auto connection = sdbus::createConnection();
SCOPE_EXIT{ connection.release(); };
auto connectionInternal = dynamic_cast<sdbus::internal::IConnection*>(connection.get());
return std::unique_ptr<sdbus::internal::IConnection>(connectionInternal);
}
}
std::unique_ptr<sdbus::IConnection> createConnection()
{
return createSystemBusConnection();

View File

@ -80,11 +80,11 @@ namespace sdbus { namespace internal {
, const sd_bus_vtable* vtable
, void* userData ) override;
PlainMessage createPlainMessage() const override;
MethodCall createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const override;
Signal createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const override;

View File

@ -38,6 +38,7 @@ namespace sdbus {
class MethodCall;
class MethodReply;
class Signal;
class PlainMessage;
namespace internal {
class ISdBus;
}
@ -61,11 +62,11 @@ namespace internal {
, const sd_bus_vtable* vtable
, void* userData ) = 0;
virtual PlainMessage createPlainMessage() const = 0;
virtual MethodCall createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const = 0;
virtual Signal createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const = 0;
@ -94,6 +95,8 @@ namespace internal {
virtual MethodReply tryCallMethodSynchronously(const MethodCall& message, uint64_t timeout) = 0;
};
std::unique_ptr<sdbus::internal::IConnection> createConnection();
}
}

View File

@ -51,6 +51,7 @@ namespace sdbus { namespace internal {
virtual int sd_bus_call(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_bus_error *ret_error, sd_bus_message **reply) = 0;
virtual int sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec) = 0;
virtual int sd_bus_message_new(sd_bus *bus, sd_bus_message **m, uint8_t type) = 0;
virtual int sd_bus_message_new_method_call(sd_bus *bus, sd_bus_message **m, const char *destination, const char *path, const char *interface, const char *member) = 0;
virtual int sd_bus_message_new_signal(sd_bus *bus, sd_bus_message **m, const char *path, const char *interface, const char *member) = 0;
virtual int sd_bus_message_new_method_return(sd_bus_message *call, sd_bus_message **m) = 0;

View File

@ -28,7 +28,8 @@
#include <sdbus-c++/Types.h>
#include <sdbus-c++/Error.h>
#include "MessageUtils.h"
#include "SdBus.h"
#include "ISdBus.h"
#include "IConnection.h"
#include "ScopeGuard.h"
#include <systemd/sd-bus.h>
#include <cassert>
@ -712,60 +713,8 @@ void Signal::send() const
PlainMessage createPlainMessage()
{
int r;
// All references to the bus (like created messages) must not outlive this thread (because messages refer to sdbus
// which is thread-local, and because BusReferenceKeeper below destroys the bus at thread exit).
// A more flexible solution would be that the caller would already provide an ISdBus reference as a parameter.
// Variant is one of the callers. This means Variant could no more be created in a stand-alone way, but
// through a factory of some existing facility (Object, Proxy, Connection...).
// TODO: Consider this alternative of creating Variant, it may live next to the current one. This function would
// get IConnection* parameter and IConnection would provide createPlainMessage factory (just like it already
// provides e.g. createMethodCall). If this parameter were null, the current mechanism would be used.
thread_local internal::SdBus sdbus;
sd_bus* bus{};
SCOPE_EXIT{ sd_bus_unref(bus); };
r = sd_bus_default_system(&bus);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get default system bus", -r);
thread_local struct BusReferenceKeeper
{
explicit BusReferenceKeeper(sd_bus* bus) : bus_(sd_bus_ref(bus)) { sd_bus_flush(bus_); }
~BusReferenceKeeper() { sd_bus_flush_close_unref(bus_); }
sd_bus* bus_{};
} busReferenceKeeper{bus};
// thread_local struct BusKeeper
// {
// BusKeeper()
// {
// auto r = sd_bus_open_system(&bus);
// sd_bus_flush(bus);
// SDBUS_THROW_ERROR_IF(r < 0, "Failed to get default system bus", -r);
// }
// ~BusKeeper()
// {
// sd_bus_flush_close_unref(bus);
// }
// sd_bus* bus{};
// internal::SdBus intf;
// } busKeeper;
// Shelved here as handy thing for potential future tracing purposes:
//#include <unistd.h>
//#include <sys/syscall.h>
//#define gettid() syscall(SYS_gettid)
//printf("createPlainMessage: sd_bus*=[%p], n_ref=[%d], TID=[%d]\n", bus, *(unsigned*)bus, gettid());
sd_bus_message* sdbusMsg{};
r = sd_bus_message_new(bus, &sdbusMsg, _SD_BUS_MESSAGE_TYPE_INVALID);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create a new message", -r);
return Message::Factory::create<PlainMessage>(sdbusMsg, &sdbus, adopt_message);
static auto connection = internal::createConnection();
return connection->createPlainMessage();
}
}

View File

@ -65,6 +65,13 @@ int SdBus::sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m,
return ::sd_bus_call_async(bus, slot, m, callback, userdata, usec);
}
int SdBus::sd_bus_message_new(sd_bus *bus, sd_bus_message **m, uint8_t type)
{
std::unique_lock<std::recursive_mutex> lock(sdbusMutex_);
return ::sd_bus_message_new(bus, m, type);
}
int SdBus::sd_bus_message_new_method_call(sd_bus *bus, sd_bus_message **m, const char *destination, const char *path, const char *interface, const char *member)
{
std::unique_lock<std::recursive_mutex> lock(sdbusMutex_);

View File

@ -43,6 +43,7 @@ public:
virtual int sd_bus_call(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_bus_error *ret_error, sd_bus_message **reply) override;
virtual int sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec) override;
virtual int sd_bus_message_new(sd_bus *bus, sd_bus_message **m, uint8_t type) override;
virtual int sd_bus_message_new_method_call(sd_bus *bus, sd_bus_message **m, const char *destination, const char *path, const char *interface, const char *member) override;
virtual int sd_bus_message_new_signal(sd_bus *bus, sd_bus_message **m, const char *path, const char *interface, const char *member) override;
virtual int sd_bus_message_new_method_return(sd_bus_message *call, sd_bus_message **m) override;

View File

@ -42,6 +42,7 @@ public:
MOCK_METHOD5(sd_bus_call, int(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_bus_error *ret_error, sd_bus_message **reply));
MOCK_METHOD6(sd_bus_call_async, int(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec));
MOCK_METHOD3(sd_bus_message_new, int(sd_bus *bus, sd_bus_message **m, uint8_t type));
MOCK_METHOD6(sd_bus_message_new_method_call, int(sd_bus *bus, sd_bus_message **m, const char *destination, const char *path, const char *interface, const char *member));
MOCK_METHOD5(sd_bus_message_new_signal, int(sd_bus *bus, sd_bus_message **m, const char *path, const char *interface, const char *member));
MOCK_METHOD2(sd_bus_message_new_method_return, int(sd_bus_message *call, sd_bus_message **m));