diff --git a/src/Proxy.cpp b/src/Proxy.cpp index 17a84c8..d9a2ea3 100644 --- a/src/Proxy.cpp +++ b/src/Proxy.cpp @@ -91,9 +91,8 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout) SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid method call message provided", EINVAL); - return message.send(timeout); + //return message.send(timeout); - /* // If we don't need to wait for any reply, we can send the message now irrespective of the context if (message.doesntExpectReply()) return message.sendWithNoReply(); @@ -106,7 +105,6 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout) // Otherwise we send the call asynchronously and do blocking wait for the reply from the event loop thread return callMethodWithAsyncReplyBlocking(message, timeout); - */ } void Proxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout) @@ -123,31 +121,63 @@ void Proxy::callMethod(const AsyncMethodCall& message, async_reply_handler async MethodReply Proxy::callMethodWithAsyncReplyBlocking(const MethodCall& message, uint64_t timeout) { - // TODO: use thread_local data exchange facility (OPTIMIZE) - std::promise result; - auto future = result.get_future(); + // VARIANT 1: THREAD LOCAL COND VAR + message.sendWithAsyncReply((void*)&Proxy::sdbus_sync_reply_handler, this, timeout); - auto callback = (void*)&Proxy::sdbus_quasi_sync_reply_handler; - auto data = std::make_pair(std::ref(result), std::ref(connection_->getSdBusInterface())); - message.sendWithAsyncReply(callback, &data, timeout); + auto& syncCallReplyData = getSyncCallReplyData(); + std::unique_lock lock(syncCallReplyData.mutex); + syncCallReplyData.cond.wait(lock, [&syncCallReplyData](){ return syncCallReplyData.arrived; }); - //printf("Thread %d: Proxy going to wait on future\n", gettid()); - MethodReply r = future.get(); - //printf("Thread %d: Proxy woken up on future\n", gettid()); - return r; + syncCallReplyData.arrived = false; + if (syncCallReplyData.error) + throw *syncCallReplyData.error; - // // TODO: Switch to thread_local once we have re-usable thread_local data exchange facility - // /*thread_local*/ async_reply_handler asyncReplyCallback = [&result](MethodReply& reply, const Error* error) - // { - // if (error == nullptr) - // result.set_value(std::move(reply)); - // else - // result.set_exception(std::make_exception_ptr(error)); - // }; + return std::move(syncCallReplyData.reply); - // auto callback = (void*)&Proxy::sdbus_async_reply_handler; - // AsyncCalls::CallData callData{*this, std::move(asyncReplyCallback), {}}; - // message.sendWithAsyncReply((void*)&Proxy::sdbus_async_reply_handler, &data, timeout); + // VARIANT 2: USING SPECIAL APPROACH, A. PROMISE/FUTURE +// std::promise result; +// auto future = result.get_future(); + +// auto callback = (void*)&Proxy::sdbus_sync_reply_handler; +// auto data = std::make_pair(std::ref(result), std::ref(connection_->getSdBusInterface())); +// message.sendWithAsyncReply(callback, &data, timeout); + +// //printf("Thread %d: Proxy going to wait on future\n", gettid()); +// MethodReply r = future.get(); +// //printf("Thread %d: Proxy woken up on future\n", gettid()); +// return r; + + // VARIANT 3: USING CLASSIC ASYNC APPROACH + // TODO: Try with thread local std::function +// thread_local async_reply_handler asyncReplyCallback = [](MethodReply& reply, const Error* error) +// { +// auto& syncCallReplyData = getSyncCallReplyData(); + +// std::unique_lock lock(syncCallReplyData.mutex); + +// syncCallReplyData.error = nullptr; +// if (error == nullptr) +// syncCallReplyData.reply = std::move(reply); +// else +// syncCallReplyData.error = std::make_unique(*error); +// syncCallReplyData.arrived = true; + +// lock.unlock(); +// syncCallReplyData.cond.notify_one(); +// }; + +// AsyncCalls::CallData callData{*this, /*std::move(*/asyncReplyCallback/*)*/, {}}; +// message.sendWithAsyncReply((void*)&Proxy::sdbus_async_reply_handler, &callData, timeout); + +// auto& syncCallReplyData = getSyncCallReplyData(); +// std::unique_lock lock(syncCallReplyData.mutex); +// syncCallReplyData.cond.wait(lock, [&syncCallReplyData](){ return syncCallReplyData.arrived; }); + +// syncCallReplyData.arrived = false; +// if (syncCallReplyData.error) +// throw *syncCallReplyData.error; + +// return std::move(syncCallReplyData.reply); } void Proxy::registerSignalHandler( const std::string& interfaceName @@ -196,6 +226,12 @@ void Proxy::unregister() interfaces_.clear(); } +Proxy::SyncCallReplyData& Proxy::getSyncCallReplyData() +{ + thread_local SyncCallReplyData syncCallReplyData; + return syncCallReplyData; +} + // Handler for D-Bus method replies of fully asynchronous D-Bus method calls int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) { @@ -204,7 +240,7 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat assert(asyncCallData->callback); auto& proxy = asyncCallData->proxy; - SCOPE_EXIT{ proxy.pendingAsyncCalls_.removeCall(asyncCallData->slot.get()); }; + SCOPE_EXIT{ if (asyncCallData->slot) proxy.pendingAsyncCalls_.removeCall(asyncCallData->slot.get()); }; auto message = Message::Factory::create(sdbusMessage, &proxy.connection_->getSdBusInterface()); @@ -223,28 +259,53 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat } // Handler for D-Bus method replies of synchronous D-Bus method calls done out of event loop thread context -int Proxy::sdbus_quasi_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) +int Proxy::sdbus_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) { //printf("Thread %d: Proxy::sdbus_quasi_sync_reply_handler 1\n", gettid()); + // VARIANT 1: THREAD LOCAL COND VAR assert(userData != nullptr); - auto* data = static_cast&, ISdBus&>*>(userData); - auto& promise = data->first; - auto& sdBus = data->second; - auto message = Message::Factory::create(sdbusMessage, &sdBus); + auto& syncCallReplyData = getSyncCallReplyData(); + std::unique_lock lock(syncCallReplyData.mutex); const auto* error = sd_bus_message_get_error(sdbusMessage); if (error == nullptr) { - //printf("Thread %d: Proxy::sdbus_quasi_sync_reply_handler 2\n", gettid()); - promise.set_value(std::move(message)); + auto* proxy = static_cast(userData); + auto message = Message::Factory::create(sdbusMessage, &proxy->connection_->getSdBusInterface()); + syncCallReplyData.reply = std::move(message); + syncCallReplyData.error = nullptr; } else { - sdbus::Error exception(error->name, error->message); - promise.set_exception(std::make_exception_ptr(exception)); + auto exception = std::make_unique(error->name, error->message); + syncCallReplyData.error = std::move(exception); } + syncCallReplyData.arrived = true; + + lock.unlock(); + syncCallReplyData.cond.notify_one(); + + // VARIANT 2: Promise/Future +// assert(userData != nullptr); +// auto* data = static_cast&, ISdBus&>*>(userData); +// auto& promise = data->first; +// auto& sdBus = data->second; + +// auto message = Message::Factory::create(sdbusMessage, &sdBus); + +// const auto* error = sd_bus_message_get_error(sdbusMessage); +// if (error == nullptr) +// { +// //printf("Thread %d: Proxy::sdbus_quasi_sync_reply_handler 2\n", gettid()); +// promise.set_value(std::move(message)); +// } +// else +// { +// sdbus::Error exception(error->name, error->message); +// promise.set_exception(std::make_exception_ptr(exception)); +// } return 1; } diff --git a/src/Proxy.h b/src/Proxy.h index 659d264..7f87db7 100644 --- a/src/Proxy.h +++ b/src/Proxy.h @@ -35,6 +35,7 @@ #include #include #include +#include namespace sdbus { namespace internal { @@ -62,10 +63,12 @@ namespace internal { void unregister() override; private: + struct SyncCallReplyData; + static SyncCallReplyData& getSyncCallReplyData(); MethodReply callMethodWithAsyncReplyBlocking(const MethodCall& message, uint64_t timeout); void registerSignalHandlers(sdbus::internal::IConnection& connection); static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); - static int sdbus_quasi_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); + static int sdbus_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError); private: @@ -132,6 +135,15 @@ namespace internal { std::unordered_map> calls_; std::mutex mutex_; } pendingAsyncCalls_; + + struct SyncCallReplyData + { + std::mutex mutex; + std::condition_variable cond; + bool arrived; + MethodReply reply; + std::unique_ptr error; + }; }; }} diff --git a/src/SdBus.cpp b/src/SdBus.cpp index a2f97b5..8139cf5 100644 --- a/src/SdBus.cpp +++ b/src/SdBus.cpp @@ -27,9 +27,58 @@ #include "SdBus.h" #include +#include "ScopeGuard.h" namespace sdbus { namespace internal { +sd_bus_message* createPlainMessage() +{ + int r; + + // All references to the bus (like created messages) must not outlive this thread (because messages refer to sdbus + // which is thread-local, and because BusReferenceKeeper below destroys the bus at thread exit). + // A more flexible solution would be that the caller would already provide an ISdBus reference as a parameter. + // Variant is one of the callers. This means Variant could no more be created in a stand-alone way, but + // through a factory of some existing facility (Object, Proxy, Connection...). + // TODO: Consider this alternative of creating Variant, it may live next to the current one. This function would + // get IConnection* parameter and IConnection would provide createPlainMessage factory (just like it already + // provides e.g. createMethodCall). If this parameter were null, the current mechanism would be used. + + thread_local internal::SdBus sdbus; + + sd_bus* bus{}; + SCOPE_EXIT{ sd_bus_unref(bus); }; + r = sd_bus_default_system(&bus); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to get default system bus", -r); + + thread_local struct BusReferenceKeeper + { + explicit BusReferenceKeeper(sd_bus* bus) : bus_(sd_bus_ref(bus)) { sd_bus_flush(bus_); } + ~BusReferenceKeeper() { sd_bus_flush_close_unref(bus_); } + sd_bus* bus_{}; + } busReferenceKeeper{bus}; + + // Shelved here as handy thing for potential future tracing purposes: + //#include + //#include + //#define gettid() syscall(SYS_gettid) + //printf("createPlainMessage: sd_bus*=[%p], n_ref=[%d], TID=[%d]\n", bus, *(unsigned*)bus, gettid()); + + sd_bus_message* sdbusMsg{}; + r = sd_bus_message_new(bus, &sdbusMsg, _SD_BUS_MESSAGE_TYPE_INVALID); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to create a new message", -r); + + r = sd_bus_message_append_basic(sdbusMsg, SD_BUS_TYPE_STRING, "This is item.c_str()"); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to serialize a string value", -r); + + r = ::sd_bus_message_seal(sdbusMsg, 1, 0); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to seal the reply", -r); + + return sdbusMsg; +} + +static auto g_sdbusMessage = createPlainMessage(); + sd_bus_message* SdBus::sd_bus_message_ref(sd_bus_message *m) { std::unique_lock lock(sdbusMutex_); @@ -55,14 +104,38 @@ int SdBus::sd_bus_call(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_bus_err { std::unique_lock lock(sdbusMutex_); - return ::sd_bus_call(bus, m, usec, ret_error, reply); + //return ::sd_bus_call(bus, m, usec, ret_error, reply); + ::sd_bus_message_ref(g_sdbusMessage); + + *reply = g_sdbusMessage; + + return 1; } int SdBus::sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec) { std::unique_lock lock(sdbusMutex_); - return ::sd_bus_call_async(bus, slot, m, callback, userdata, usec); + //return ::sd_bus_call_async(bus, slot, m, callback, userdata, usec); + +// auto r = ::sd_bus_message_seal(m, 1, 0); +// SDBUS_THROW_ERROR_IF(r < 0, "Failed to seal the message", -r); + +// sd_bus_message* sdbusReply{}; +// r = this->sd_bus_message_new_method_return(m, &sdbusReply); +// SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method reply", -r); + +// r = sd_bus_message_append_basic(sdbusReply, SD_BUS_TYPE_STRING, "This is item.c_str()"); +// SDBUS_THROW_ERROR_IF(r < 0, "Failed to serialize a string value", -r); + +// r = ::sd_bus_message_seal(sdbusReply, 1, 0); +// SDBUS_THROW_ERROR_IF(r < 0, "Failed to seal the reply", -r); + + ::sd_bus_message_ref(g_sdbusMessage); + + callback(g_sdbusMessage, userdata, nullptr); + + return 1; } int SdBus::sd_bus_message_new_method_call(sd_bus *bus, sd_bus_message **m, const char *destination, const char *path, const char *interface, const char *member) diff --git a/src/SdBus.h b/src/SdBus.h index 3ab28c2..4025d2c 100644 --- a/src/SdBus.h +++ b/src/SdBus.h @@ -33,7 +33,7 @@ namespace sdbus { namespace internal { -class SdBus final : public ISdBus +class SdBus /*final*/ : public ISdBus { public: virtual sd_bus_message* sd_bus_message_ref(sd_bus_message *m) override; diff --git a/tests/perftests/client.cpp b/tests/perftests/client.cpp index 1f46fd7..7db13fe 100644 --- a/tests/perftests/client.cpp +++ b/tests/perftests/client.cpp @@ -34,9 +34,30 @@ #include #include #include +//#include "SdBus.h" using namespace std::chrono_literals; +//class MySdBus : public sdbus::internal::SdBus +//{ +//public: +// virtual int sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec) override +// { +// sd_bus_message* sdbusReply{}; +// auto r = this->sd_bus_message_new_method_return(m, &sdbusReply); +// SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method reply", -r); + +// callback(sdbusReply, userdata, nullptr); + +// return 1; +// } +//}; + +namespace sdbus +{ + PlainMessage createPlainMessage(); +} + uint64_t totalDuration = 0; class PerftestProxy : public sdbus::ProxyInterfaces @@ -102,10 +123,10 @@ int main(int /*argc*/, char */*argv*/[]) { const char* destinationName = "org.sdbuscpp.perftests"; const char* objectPath = "/org/sdbuscpp/perftests"; - PerftestProxy client(destinationName, objectPath); + //PerftestProxy client(destinationName, objectPath); const unsigned int repetitions{20}; - unsigned int msgCount = 1000; + unsigned int msgCount = 100000; unsigned int msgSize{}; /* @@ -136,54 +157,109 @@ int main(int /*argc*/, char */*argv*/[]) totalDuration = 0; */ +// msgSize = 20; +// std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; +// for (unsigned int r = 0; r < repetitions; ++r) +// { +// auto str1 = createRandomString(msgSize/2); +// auto str2 = createRandomString(msgSize/2); + +// auto startTime = std::chrono::steady_clock::now(); +// for (unsigned int i = 0; i < msgCount; i++) +// { +// auto result = client.concatenateTwoStrings(str1, str2); + +// assert(result.size() == str1.size() + str2.size()); +// assert(result.size() == msgSize); +// } +// auto stopTime = std::chrono::steady_clock::now(); +// auto duration = std::chrono::duration_cast(stopTime - startTime).count(); +// totalDuration += duration; +// std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; + +// std::this_thread::sleep_for(1000ms); +// } + +// std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; +// totalDuration = 0; + +// msgSize = 1000; +// std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; +// for (unsigned int r = 0; r < repetitions; ++r) +// { +// auto str1 = createRandomString(msgSize/2); +// auto str2 = createRandomString(msgSize/2); + +// auto startTime = std::chrono::steady_clock::now(); +// for (unsigned int i = 0; i < msgCount; i++) +// { +// auto result = client.concatenateTwoStrings(str1, str2); + +// assert(result.size() == str1.size() + str2.size()); +// assert(result.size() == msgSize); +// } +// auto stopTime = std::chrono::steady_clock::now(); +// auto duration = std::chrono::duration_cast(stopTime - startTime).count(); +// totalDuration += duration; +// std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; + +// std::this_thread::sleep_for(1000ms); +// } + +// std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; +// totalDuration = 0; + + auto proxy = sdbus::createProxy(destinationName, objectPath); + auto msg = proxy->createMethodCall("org.sdbuscpp.perftests", "concatenateTwoStrings"); + //auto msg = sdbus::createPlainMessage(); + msg.seal(); + msgSize = 20; std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; for (unsigned int r = 0; r < repetitions; ++r) { - auto str1 = createRandomString(msgSize/2); - auto str2 = createRandomString(msgSize/2); - auto startTime = std::chrono::steady_clock::now(); for (unsigned int i = 0; i < msgCount; i++) { - auto result = client.concatenateTwoStrings(str1, str2); + //auto result = client.concatenateTwoStrings(str1, str2); + proxy->callMethod(msg); - assert(result.size() == str1.size() + str2.size()); - assert(result.size() == msgSize); + //assert(result.size() == str1.size() + str2.size()); + //assert(result.size() == msgSize); } auto stopTime = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(stopTime - startTime).count(); totalDuration += duration; std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; - std::this_thread::sleep_for(1000ms); + std::this_thread::sleep_for(100ms); } std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; totalDuration = 0; - msgSize = 1000; - std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; - for (unsigned int r = 0; r < repetitions; ++r) - { - auto str1 = createRandomString(msgSize/2); - auto str2 = createRandomString(msgSize/2); +// msgSize = 1000; +// std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl; +// for (unsigned int r = 0; r < repetitions; ++r) +// { +// auto str1 = createRandomString(msgSize/2); +// auto str2 = createRandomString(msgSize/2); - auto startTime = std::chrono::steady_clock::now(); - for (unsigned int i = 0; i < msgCount; i++) - { - auto result = client.concatenateTwoStrings(str1, str2); +// auto startTime = std::chrono::steady_clock::now(); +// for (unsigned int i = 0; i < msgCount; i++) +// { +// auto result = client.concatenateTwoStrings(str1, str2); - assert(result.size() == str1.size() + str2.size()); - assert(result.size() == msgSize); - } - auto stopTime = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(stopTime - startTime).count(); - totalDuration += duration; - std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; +// assert(result.size() == str1.size() + str2.size()); +// assert(result.size() == msgSize); +// } +// auto stopTime = std::chrono::steady_clock::now(); +// auto duration = std::chrono::duration_cast(stopTime - startTime).count(); +// totalDuration += duration; +// std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl; - std::this_thread::sleep_for(1000ms); - } +// std::this_thread::sleep_for(1000ms); +// } std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl; totalDuration = 0;