This commit is contained in:
sangelovic
2019-01-30 07:57:07 +01:00
parent e59afb827b
commit dd0a975243
9 changed files with 161 additions and 61 deletions

62
Notes.txt Normal file
View File

@ -0,0 +1,62 @@
Aktualne spravanie:
* Proxy dostane connection ako referenciu zvonka:
* Ak na tej connection bezi processing loop, tak
* bud mozeme robit RPC volanie, ked je to z callbacku nasho serveroveho D-Bus API (overene od Lennarta)
- vsetko je serializovane - volania a prijimania, i viacere proxy objekty medzi sebou,
* alebo nemozeme (t.j. z ineho threadu) lebo connection nie je thread-safe.
* Ak nebezi (nie idealny stav z hladiska D-Bus daemona), tak
* mozeme robit RPC volania iba ak mame zarucenu synchronizaciu medzi viacerymi proxy objektami nad touto istou connection,
* neprijimame signaly samozrejme, lebo nebezi loop.
Vyhody: Skalovatelnost - proxy zdielaju jednu connection a jeden jej loop thread.
Nevyhody: Nie je to dotiahnute do konca, v podsate to nefunguje hlavne ak bezi loop nad connection.
* Proxy dostane vlastnu connection zvonka alebo si ju vytvori vlastnu:
* Tu connection pouziva na volania, nebezi na nej loop.
Vyhody: Je nezavisly threadovo i message-komunikativne od inych proxy
Nevyhody: Skalovatelnost
* Plus, ak pocuva na signaly, tak si vytvori kopiu connection, na nej thread s loop.
Nevyhody: Skalovatelnost - V tomto pripade mame teda 2x connection per proxy a 1 thread per proxy
Navrhovane spravanie:
Pouzivatel si zvoli
* Skalovatelnost a nenarocnost na resourcy - pouzivatel chce mat 1 connection a 1 processing loop thread nad nou.
Pripadne 2 ak chce time-slicingovo oddelit server cast od proxy casti (ze nie su incoming server / outgoing proxy requesty serializovane medzi sebou).
* Alebo sa nestara a kazdy proxy bude mat vlastnu connection a vlastny event loop thread nad nou.
Oba pristupy z hladiska implementacie znamenaju, ze proxy ma vzdy connection handlovanu niekym.
Teraz ako by fungovalo RPC volanie z proxy:
* Proxy musi nejak poslat message do connection:
* Ak je call thread rovnaky ako connection event loop thread (vtedy ak sme v nejakom server method callbacku a pouzivame tuto connection,
alebo ak sme v handleri na signal), tak v podstate mozeme vykonat to volania priamo, rovnako ako to je teraz.
* Ak je call thread iny (robime volanie cez proxy odkialkolvek), tak musime MethodCall msg poslat cez queue do connection threadu.
(Tu mozeme vyuzit queue ktory tam uz mame na async replies - davat tam std::pair<Message, MsgType>)
Teraz:
-> Mame sync call? Tak spolu s msg vsak musime poslat z proxy aj promise<MethodReply>, a v proxy si z promise ziskat future
a na nom cakat na odpoved z Connection.
Zatial Connection posle msg cez msg.send(), hned dostane reply, a reply setne do promise.
Cakajuci proxy sa zobudi a vrati reply.
TODO: Jednoducho to urobit tak ako Object::sendReplyAsynchronously - ze send() deleguje hned to connection::send, a ta bude mat if.
-> Mame async call? Tak otazka je ci bude mat mapu slotov na std::function ObjectProxy alebo Connection. Vyzera ze Connection, kvoli timingu. Nie, tak nakoniec ObjectProxy, juchuu, lebo nekorelujeme cez slot.
Ked connection, tak ta zavola AsyncMethodCall::send(), ktory zoberie callback, user data, timeout a vrati slot.
Connection si ulozi do mapy slot a std::function, a je to.
Takze finalna impelmentacia ObjectProxy::send(MethodCall):
connection_->send(msg);
Takze finalna impelmentacia ObjectProxy::send(AsyncMethodCall):
connection_->send(msg); -> a ta vnutri da userdata ako new std::function
* Proxy dostane referenciu na connection zvonka.
* Na nej si zaregistruje signaly a metody
TODO: Dokumentacia
* Ze proxy komplet zmenil pohlad na koneksny - ze koneksny vzdy maju svoj thread, pokial ich vlastni proxy.
TODO: Prerobit i Object na taky style ze berie bud Connection& alebo Connection&& (a vtedy si pusti vlastny thread, ak uz nebezi);

View File

@ -70,6 +70,16 @@ namespace sdbus {
***********************************************/ ***********************************************/
class Message class Message
{ {
public:
enum class Type
{ METHOD_CALL
, ASYNC_METHOD_CALL
, METHOD_REPLY
/*, ASYNC_METHOD_REPLY? */
, SIGNAL
, PLAIN_MESSAGE
};
public: public:
Message() = default; Message() = default;
Message(void *msg) noexcept; Message(void *msg) noexcept;

View File

@ -65,6 +65,10 @@ void Connection::releaseName(const std::string& name)
void Connection::enterProcessingLoop() void Connection::enterProcessingLoop()
{ {
loopThreadId_ = std::this_thread::get_id();
std::lock_guard<std::mutex> guard(loopMutex_);
while (true) while (true)
{ {
auto processed = processPendingRequest(); auto processed = processPendingRequest();
@ -77,11 +81,15 @@ void Connection::enterProcessingLoop()
if (success.asyncMsgsToProcess) if (success.asyncMsgsToProcess)
processAsynchronousMessages(); processAsynchronousMessages();
} }
loopThreadId_ = std::thread::id{};
} }
void Connection::enterProcessingLoopAsync() void Connection::enterProcessingLoopAsync()
{ {
asyncLoopThread_ = std::thread([this](){ enterProcessingLoop(); }); // TODO: Check that joinable() means a valid non-empty thread
if (!asyncLoopThread_.joinable())
asyncLoopThread_ = std::thread([this](){ enterProcessingLoop(); });
} }
void Connection::leaveProcessingLoop() void Connection::leaveProcessingLoop()
@ -124,6 +132,7 @@ sdbus::MethodCall Connection::createMethodCall( const std::string& destination
// Returned message will become an owner of sdbusMsg // Returned message will become an owner of sdbusMsg
SCOPE_EXIT{ sd_bus_message_unref(sdbusMsg); }; SCOPE_EXIT{ sd_bus_message_unref(sdbusMsg); };
// It is thread-safe to create a message this way
auto r = sd_bus_message_new_method_call( bus_.get() auto r = sd_bus_message_new_method_call( bus_.get()
, &sdbusMsg , &sdbusMsg
, destination.c_str() , destination.c_str()
@ -136,6 +145,41 @@ sdbus::MethodCall Connection::createMethodCall( const std::string& destination
return MethodCall(sdbusMsg); return MethodCall(sdbusMsg);
} }
sdbus::MethodReply Connection::callMethod(const MethodCall& message)
{
std::thread::id loopThreadId = loopThreadId_;
// Is the loop not yet on? => Go make synchronous call
while (loopThreadId == std::thread::id{})
{
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
if (!loopMutex_.try_lock())
continue;
// Synchronous call
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
return message.send();
}
// Is the loop on and we are in the same thread? => Go for synchronous call
if (loopThreadId == std::this_thread::get_id())
{
assert(!loopMutex_.try_lock());
return message.send(); // Synchronous call
}
// We are in a different thread than the loop thread => Asynchronous call
UserRequest request{message, Message::Type::METHOD_CALL, {}, Message::Type::METHOD_REPLY};
auto future = request.ret.get_future();
{
std::lock_guard<std::mutex> guard(userRequestsMutex_);
userRequests_.push(request);
}
// Wait for the reply from the loop thread
auto reply = future.get();
return *static_cast<const MethodReply*>(&reply);
}
sdbus::Signal Connection::createSignal( const std::string& objectPath sdbus::Signal Connection::createSignal( const std::string& objectPath
, const std::string& interfaceName , const std::string& interfaceName
, const std::string& signalName ) const , const std::string& signalName ) const
@ -145,6 +189,7 @@ sdbus::Signal Connection::createSignal( const std::string& objectPath
// Returned message will become an owner of sdbusSignal // Returned message will become an owner of sdbusSignal
SCOPE_EXIT{ sd_bus_message_unref(sdbusSignal); }; SCOPE_EXIT{ sd_bus_message_unref(sdbusSignal); };
// It is thread-safe to create a message this way
auto r = sd_bus_message_new_signal( bus_.get() auto r = sd_bus_message_new_signal( bus_.get()
, &sdbusSignal , &sdbusSignal
, objectPath.c_str() , objectPath.c_str()

View File

@ -34,6 +34,7 @@
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
#include <future>
#include <queue> #include <queue>
namespace sdbus { namespace internal { namespace sdbus { namespace internal {
@ -79,6 +80,7 @@ namespace sdbus { namespace internal {
, void* userData ) override; , void* userData ) override;
void unregisterSignalHandler(void* handlerCookie) override; void unregisterSignalHandler(void* handlerCookie) override;
MethodReply callMethod(const MethodCall& message);
void sendReplyAsynchronously(const sdbus::MethodReply& reply) override; void sendReplyAsynchronously(const sdbus::MethodReply& reply) override;
std::unique_ptr<sdbus::internal::IConnection> clone() const override; std::unique_ptr<sdbus::internal::IConnection> clone() const override;
@ -110,8 +112,25 @@ namespace sdbus { namespace internal {
private: private:
std::unique_ptr<sd_bus, decltype(&sd_bus_flush_close_unref)> bus_{nullptr, &sd_bus_flush_close_unref}; std::unique_ptr<sd_bus, decltype(&sd_bus_flush_close_unref)> bus_{nullptr, &sd_bus_flush_close_unref};
std::thread asyncLoopThread_; std::thread asyncLoopThread_;
std::mutex mutex_; std::atomic<std::thread::id> loopThreadId_;
std::queue<MethodReply> asyncReplies_; std::mutex loopMutex_;
//std::queue<MethodReply> asyncReplies_;
struct UserRequest
{
Message msg;
Message::Type msgType;
std::promise<Message> ret;
Message::Type retType;
static_assert(sizeof(Message) == sizeof(MethodCall));
static_assert(sizeof(Message) == sizeof(AsyncMethodCall));
static_assert(sizeof(Message) == sizeof(MethodReply));
static_assert(sizeof(Message) == sizeof(Signal));
};
std::queue<UserRequest> userRequests_;
std::mutex userRequestsMutex_;
std::atomic<bool> exitLoopThread_; std::atomic<bool> exitLoopThread_;
int notificationFd_{-1}; int notificationFd_{-1};
BusType busType_; BusType busType_;

View File

@ -35,7 +35,6 @@ namespace sdbus { namespace internal {
ObjectProxy::ObjectProxy(sdbus::internal::IConnection& connection, std::string destination, std::string objectPath) ObjectProxy::ObjectProxy(sdbus::internal::IConnection& connection, std::string destination, std::string objectPath)
: connection_(&connection, [](sdbus::internal::IConnection *){ /* Intentionally left empty */ }) : connection_(&connection, [](sdbus::internal::IConnection *){ /* Intentionally left empty */ })
, ownConnection_(false)
, destination_(std::move(destination)) , destination_(std::move(destination))
, objectPath_(std::move(objectPath)) , objectPath_(std::move(objectPath))
{ {
@ -45,30 +44,22 @@ ObjectProxy::ObjectProxy( std::unique_ptr<sdbus::internal::IConnection>&& connec
, std::string destination , std::string destination
, std::string objectPath ) , std::string objectPath )
: connection_(std::move(connection)) : connection_(std::move(connection))
, ownConnection_(true)
, destination_(std::move(destination)) , destination_(std::move(destination))
, objectPath_(std::move(objectPath)) , objectPath_(std::move(objectPath))
{ {
} // The connection is ours only, so we have to manage event loop upon this connection,
// so we get signals, async replies, and other messages from D-Bus.
ObjectProxy::~ObjectProxy() connection_->enterProcessingLoopAsync();
{
// If the dedicated connection for signals is used, we have to stop the processing loop
// upon this connection prior to unregistering signal slots in the interfaces_ container,
// otherwise we might have a race condition of two threads working upon one connection.
if (signalConnection_ != nullptr)
signalConnection_->leaveProcessingLoop();
} }
MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const std::string& methodName) MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const std::string& methodName)
{ {
// Tell, don't ask
return connection_->createMethodCall(destination_, objectPath_, interfaceName, methodName); return connection_->createMethodCall(destination_, objectPath_, interfaceName, methodName);
} }
AsyncMethodCall ObjectProxy::createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) AsyncMethodCall ObjectProxy::createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName)
{ {
return AsyncMethodCall{createMethodCall(interfaceName, methodName)}; return AsyncMethodCall{ObjectProxy::createMethodCall(interfaceName, methodName)};
} }
MethodReply ObjectProxy::callMethod(const MethodCall& message) MethodReply ObjectProxy::callMethod(const MethodCall& message)
@ -99,30 +90,7 @@ void ObjectProxy::registerSignalHandler( const std::string& interfaceName
void ObjectProxy::finishRegistration() void ObjectProxy::finishRegistration()
{ {
bool hasSignals = listensToSignals(); registerSignalHandlers(*connection_);
if (hasSignals && ownConnection_)
{
// Let's use dedicated signalConnection_ for signals,
// which will then be used by the processing loop thread.
signalConnection_ = connection_->clone();
registerSignalHandlers(*signalConnection_);
signalConnection_->enterProcessingLoopAsync();
}
else if (hasSignals)
{
// Let's used connection provided from the outside.
registerSignalHandlers(*connection_);
}
}
bool ObjectProxy::listensToSignals() const
{
for (auto& interfaceItem : interfaces_)
if (!interfaceItem.second.signals_.empty())
return true;
return false;
} }
void ObjectProxy::registerSignalHandlers(sdbus::internal::IConnection& connection) void ObjectProxy::registerSignalHandlers(sdbus::internal::IConnection& connection)

View File

@ -50,7 +50,6 @@ namespace internal {
ObjectProxy( std::unique_ptr<sdbus::internal::IConnection>&& connection ObjectProxy( std::unique_ptr<sdbus::internal::IConnection>&& connection
, std::string destination , std::string destination
, std::string objectPath ); , std::string objectPath );
~ObjectProxy();
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override; MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) override; AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) override;
@ -63,7 +62,6 @@ namespace internal {
void finishRegistration() override; void finishRegistration() override;
private: private:
bool listensToSignals() const;
void registerSignalHandlers(sdbus::internal::IConnection& connection); void registerSignalHandlers(sdbus::internal::IConnection& connection);
static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); static int sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
@ -72,8 +70,6 @@ namespace internal {
std::unique_ptr< sdbus::internal::IConnection std::unique_ptr< sdbus::internal::IConnection
, std::function<void(sdbus::internal::IConnection*)> , std::function<void(sdbus::internal::IConnection*)>
> connection_; > connection_;
bool ownConnection_{};
std::unique_ptr<sdbus::internal::IConnection> signalConnection_;
std::string destination_; std::string destination_;
std::string objectPath_; std::string objectPath_;

View File

@ -3,25 +3,25 @@
# https://github.com/google/googletest/blob/master/googletest/README.md#incorporating-into-an-existing-cmake-project # https://github.com/google/googletest/blob/master/googletest/README.md#incorporating-into-an-existing-cmake-project
#------------------------------- #-------------------------------
configure_file(googletest-download/CMakeLists.txt.in googletest-download/CMakeLists.txt) #configure_file(googletest-download/CMakeLists.txt.in googletest-download/CMakeLists.txt)
execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" . #execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" .
RESULT_VARIABLE result # RESULT_VARIABLE result
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download) # WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download)
if(result) #if(result)
message(FATAL_ERROR "CMake step for googletest failed: ${result}") # message(FATAL_ERROR "CMake step for googletest failed: ${result}")
endif() #endif()
execute_process(COMMAND ${CMAKE_COMMAND} --build . #execute_process(COMMAND ${CMAKE_COMMAND} --build .
RESULT_VARIABLE result # RESULT_VARIABLE result
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download) # WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/googletest-download)
if(result) #if(result)
message(FATAL_ERROR "Build step for googletest failed: ${result}") # message(FATAL_ERROR "Build step for googletest failed: ${result}")
endif() #endif()
set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) #set(gtest_force_shared_crt ON CACHE BOOL "" FORCE)
add_subdirectory(${CMAKE_CURRENT_BINARY_DIR}/googletest-src add_subdirectory(${CMAKE_CURRENT_BINARY_DIR}/googletest-src
${CMAKE_CURRENT_BINARY_DIR}/googletest-build ${CMAKE_CURRENT_BINARY_DIR}/googletest-build

View File

@ -301,7 +301,7 @@ TEST_F(SdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncMethods)
ASSERT_THAT(resultCount, Eq(1500)); ASSERT_THAT(resultCount, Eq(1500));
} }
/*
TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide) TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
{ {
std::promise<uint32_t> promise; std::promise<uint32_t> promise;
@ -335,7 +335,7 @@ TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
ASSERT_THROW(future.get(), sdbus::Error); ASSERT_THROW(future.get(), sdbus::Error);
} }
*/
TEST_F(SdbusTestObject, FailsCallingNonexistentMethod) TEST_F(SdbusTestObject, FailsCallingNonexistentMethod)
{ {
ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error); ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error);

View File

@ -92,7 +92,7 @@ int main(int /*argc*/, char */*argv*/[])
const char* objectPath = "/org/sdbuscpp/perftest"; const char* objectPath = "/org/sdbuscpp/perftest";
PerftestClient client(destinationName, objectPath); PerftestClient client(destinationName, objectPath);
const unsigned int repetitions{20}; const unsigned int repetitions{2};
unsigned int msgCount = 1000; unsigned int msgCount = 1000;
unsigned int msgSize{}; unsigned int msgSize{};