Provide a few alternatives

This commit is contained in:
sangelovic
2020-01-18 00:52:34 +01:00
parent 4688e81534
commit 9bd37d2227
5 changed files with 288 additions and 66 deletions

View File

@@ -91,9 +91,8 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid method call message provided", EINVAL);
return message.send(timeout);
//return message.send(timeout);
/*
// If we don't need to wait for any reply, we can send the message now irrespective of the context
if (message.doesntExpectReply())
return message.sendWithNoReply();
@@ -106,7 +105,6 @@ MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
// Otherwise we send the call asynchronously and do blocking wait for the reply from the event loop thread
return callMethodWithAsyncReplyBlocking(message, timeout);
*/
}
void Proxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback, uint64_t timeout)
@@ -123,31 +121,63 @@ void Proxy::callMethod(const AsyncMethodCall& message, async_reply_handler async
MethodReply Proxy::callMethodWithAsyncReplyBlocking(const MethodCall& message, uint64_t timeout)
{
// TODO: use thread_local data exchange facility (OPTIMIZE)
std::promise<MethodReply> result;
auto future = result.get_future();
// VARIANT 1: THREAD LOCAL COND VAR
message.sendWithAsyncReply((void*)&Proxy::sdbus_sync_reply_handler, this, timeout);
auto callback = (void*)&Proxy::sdbus_quasi_sync_reply_handler;
auto data = std::make_pair(std::ref(result), std::ref(connection_->getSdBusInterface()));
message.sendWithAsyncReply(callback, &data, timeout);
auto& syncCallReplyData = getSyncCallReplyData();
std::unique_lock<std::mutex> lock(syncCallReplyData.mutex);
syncCallReplyData.cond.wait(lock, [&syncCallReplyData](){ return syncCallReplyData.arrived; });
//printf("Thread %d: Proxy going to wait on future\n", gettid());
MethodReply r = future.get();
//printf("Thread %d: Proxy woken up on future\n", gettid());
return r;
syncCallReplyData.arrived = false;
if (syncCallReplyData.error)
throw *syncCallReplyData.error;
// // TODO: Switch to thread_local once we have re-usable thread_local data exchange facility
// /*thread_local*/ async_reply_handler asyncReplyCallback = [&result](MethodReply& reply, const Error* error)
// {
// if (error == nullptr)
// result.set_value(std::move(reply));
// else
// result.set_exception(std::make_exception_ptr(error));
// };
return std::move(syncCallReplyData.reply);
// auto callback = (void*)&Proxy::sdbus_async_reply_handler;
// AsyncCalls::CallData callData{*this, std::move(asyncReplyCallback), {}};
// message.sendWithAsyncReply((void*)&Proxy::sdbus_async_reply_handler, &data, timeout);
// VARIANT 2: USING SPECIAL APPROACH, A. PROMISE/FUTURE
// std::promise<MethodReply> result;
// auto future = result.get_future();
// auto callback = (void*)&Proxy::sdbus_sync_reply_handler;
// auto data = std::make_pair(std::ref(result), std::ref(connection_->getSdBusInterface()));
// message.sendWithAsyncReply(callback, &data, timeout);
// //printf("Thread %d: Proxy going to wait on future\n", gettid());
// MethodReply r = future.get();
// //printf("Thread %d: Proxy woken up on future\n", gettid());
// return r;
// VARIANT 3: USING CLASSIC ASYNC APPROACH
// TODO: Try with thread local std::function
// thread_local async_reply_handler asyncReplyCallback = [](MethodReply& reply, const Error* error)
// {
// auto& syncCallReplyData = getSyncCallReplyData();
// std::unique_lock<std::mutex> lock(syncCallReplyData.mutex);
// syncCallReplyData.error = nullptr;
// if (error == nullptr)
// syncCallReplyData.reply = std::move(reply);
// else
// syncCallReplyData.error = std::make_unique<Error>(*error);
// syncCallReplyData.arrived = true;
// lock.unlock();
// syncCallReplyData.cond.notify_one();
// };
// AsyncCalls::CallData callData{*this, /*std::move(*/asyncReplyCallback/*)*/, {}};
// message.sendWithAsyncReply((void*)&Proxy::sdbus_async_reply_handler, &callData, timeout);
// auto& syncCallReplyData = getSyncCallReplyData();
// std::unique_lock<std::mutex> lock(syncCallReplyData.mutex);
// syncCallReplyData.cond.wait(lock, [&syncCallReplyData](){ return syncCallReplyData.arrived; });
// syncCallReplyData.arrived = false;
// if (syncCallReplyData.error)
// throw *syncCallReplyData.error;
// return std::move(syncCallReplyData.reply);
}
void Proxy::registerSignalHandler( const std::string& interfaceName
@@ -196,6 +226,12 @@ void Proxy::unregister()
interfaces_.clear();
}
Proxy::SyncCallReplyData& Proxy::getSyncCallReplyData()
{
thread_local SyncCallReplyData syncCallReplyData;
return syncCallReplyData;
}
// Handler for D-Bus method replies of fully asynchronous D-Bus method calls
int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
{
@@ -204,7 +240,7 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
assert(asyncCallData->callback);
auto& proxy = asyncCallData->proxy;
SCOPE_EXIT{ proxy.pendingAsyncCalls_.removeCall(asyncCallData->slot.get()); };
SCOPE_EXIT{ if (asyncCallData->slot) proxy.pendingAsyncCalls_.removeCall(asyncCallData->slot.get()); };
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &proxy.connection_->getSdBusInterface());
@@ -223,28 +259,53 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
}
// Handler for D-Bus method replies of synchronous D-Bus method calls done out of event loop thread context
int Proxy::sdbus_quasi_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
int Proxy::sdbus_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
{
//printf("Thread %d: Proxy::sdbus_quasi_sync_reply_handler 1\n", gettid());
// VARIANT 1: THREAD LOCAL COND VAR
assert(userData != nullptr);
auto* data = static_cast<std::pair<std::promise<MethodReply>&, ISdBus&>*>(userData);
auto& promise = data->first;
auto& sdBus = data->second;
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &sdBus);
auto& syncCallReplyData = getSyncCallReplyData();
std::unique_lock<std::mutex> lock(syncCallReplyData.mutex);
const auto* error = sd_bus_message_get_error(sdbusMessage);
if (error == nullptr)
{
//printf("Thread %d: Proxy::sdbus_quasi_sync_reply_handler 2\n", gettid());
promise.set_value(std::move(message));
auto* proxy = static_cast<Proxy*>(userData);
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &proxy->connection_->getSdBusInterface());
syncCallReplyData.reply = std::move(message);
syncCallReplyData.error = nullptr;
}
else
{
sdbus::Error exception(error->name, error->message);
promise.set_exception(std::make_exception_ptr(exception));
auto exception = std::make_unique<sdbus::Error>(error->name, error->message);
syncCallReplyData.error = std::move(exception);
}
syncCallReplyData.arrived = true;
lock.unlock();
syncCallReplyData.cond.notify_one();
// VARIANT 2: Promise/Future
// assert(userData != nullptr);
// auto* data = static_cast<std::pair<std::promise<MethodReply>&, ISdBus&>*>(userData);
// auto& promise = data->first;
// auto& sdBus = data->second;
// auto message = Message::Factory::create<MethodReply>(sdbusMessage, &sdBus);
// const auto* error = sd_bus_message_get_error(sdbusMessage);
// if (error == nullptr)
// {
// //printf("Thread %d: Proxy::sdbus_quasi_sync_reply_handler 2\n", gettid());
// promise.set_value(std::move(message));
// }
// else
// {
// sdbus::Error exception(error->name, error->message);
// promise.set_exception(std::make_exception_ptr(exception));
// }
return 1;
}

View File

@@ -35,6 +35,7 @@
#include <map>
#include <unordered_map>
#include <mutex>
#include <condition_variable>
namespace sdbus {
namespace internal {
@@ -62,10 +63,12 @@ namespace internal {
void unregister() override;
private:
struct SyncCallReplyData;
static SyncCallReplyData& getSyncCallReplyData();
MethodReply callMethodWithAsyncReplyBlocking(const MethodCall& message, uint64_t timeout);
void registerSignalHandlers(sdbus::internal::IConnection& connection);
static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_quasi_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_sync_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
private:
@@ -132,6 +135,15 @@ namespace internal {
std::unordered_map<void*, std::unique_ptr<CallData>> calls_;
std::mutex mutex_;
} pendingAsyncCalls_;
struct SyncCallReplyData
{
std::mutex mutex;
std::condition_variable cond;
bool arrived;
MethodReply reply;
std::unique_ptr<Error> error;
};
};
}}

View File

@@ -27,9 +27,58 @@
#include "SdBus.h"
#include <sdbus-c++/Error.h>
#include "ScopeGuard.h"
namespace sdbus { namespace internal {
sd_bus_message* createPlainMessage()
{
int r;
// All references to the bus (like created messages) must not outlive this thread (because messages refer to sdbus
// which is thread-local, and because BusReferenceKeeper below destroys the bus at thread exit).
// A more flexible solution would be that the caller would already provide an ISdBus reference as a parameter.
// Variant is one of the callers. This means Variant could no more be created in a stand-alone way, but
// through a factory of some existing facility (Object, Proxy, Connection...).
// TODO: Consider this alternative of creating Variant, it may live next to the current one. This function would
// get IConnection* parameter and IConnection would provide createPlainMessage factory (just like it already
// provides e.g. createMethodCall). If this parameter were null, the current mechanism would be used.
thread_local internal::SdBus sdbus;
sd_bus* bus{};
SCOPE_EXIT{ sd_bus_unref(bus); };
r = sd_bus_default_system(&bus);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get default system bus", -r);
thread_local struct BusReferenceKeeper
{
explicit BusReferenceKeeper(sd_bus* bus) : bus_(sd_bus_ref(bus)) { sd_bus_flush(bus_); }
~BusReferenceKeeper() { sd_bus_flush_close_unref(bus_); }
sd_bus* bus_{};
} busReferenceKeeper{bus};
// Shelved here as handy thing for potential future tracing purposes:
//#include <unistd.h>
//#include <sys/syscall.h>
//#define gettid() syscall(SYS_gettid)
//printf("createPlainMessage: sd_bus*=[%p], n_ref=[%d], TID=[%d]\n", bus, *(unsigned*)bus, gettid());
sd_bus_message* sdbusMsg{};
r = sd_bus_message_new(bus, &sdbusMsg, _SD_BUS_MESSAGE_TYPE_INVALID);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create a new message", -r);
r = sd_bus_message_append_basic(sdbusMsg, SD_BUS_TYPE_STRING, "This is item.c_str()");
SDBUS_THROW_ERROR_IF(r < 0, "Failed to serialize a string value", -r);
r = ::sd_bus_message_seal(sdbusMsg, 1, 0);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to seal the reply", -r);
return sdbusMsg;
}
static auto g_sdbusMessage = createPlainMessage();
sd_bus_message* SdBus::sd_bus_message_ref(sd_bus_message *m)
{
std::unique_lock<std::recursive_mutex> lock(sdbusMutex_);
@@ -55,14 +104,38 @@ int SdBus::sd_bus_call(sd_bus *bus, sd_bus_message *m, uint64_t usec, sd_bus_err
{
std::unique_lock<std::recursive_mutex> lock(sdbusMutex_);
return ::sd_bus_call(bus, m, usec, ret_error, reply);
//return ::sd_bus_call(bus, m, usec, ret_error, reply);
::sd_bus_message_ref(g_sdbusMessage);
*reply = g_sdbusMessage;
return 1;
}
int SdBus::sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec)
{
std::unique_lock<std::recursive_mutex> lock(sdbusMutex_);
return ::sd_bus_call_async(bus, slot, m, callback, userdata, usec);
//return ::sd_bus_call_async(bus, slot, m, callback, userdata, usec);
// auto r = ::sd_bus_message_seal(m, 1, 0);
// SDBUS_THROW_ERROR_IF(r < 0, "Failed to seal the message", -r);
// sd_bus_message* sdbusReply{};
// r = this->sd_bus_message_new_method_return(m, &sdbusReply);
// SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method reply", -r);
// r = sd_bus_message_append_basic(sdbusReply, SD_BUS_TYPE_STRING, "This is item.c_str()");
// SDBUS_THROW_ERROR_IF(r < 0, "Failed to serialize a string value", -r);
// r = ::sd_bus_message_seal(sdbusReply, 1, 0);
// SDBUS_THROW_ERROR_IF(r < 0, "Failed to seal the reply", -r);
::sd_bus_message_ref(g_sdbusMessage);
callback(g_sdbusMessage, userdata, nullptr);
return 1;
}
int SdBus::sd_bus_message_new_method_call(sd_bus *bus, sd_bus_message **m, const char *destination, const char *path, const char *interface, const char *member)

View File

@@ -33,7 +33,7 @@
namespace sdbus { namespace internal {
class SdBus final : public ISdBus
class SdBus /*final*/ : public ISdBus
{
public:
virtual sd_bus_message* sd_bus_message_ref(sd_bus_message *m) override;

View File

@@ -34,9 +34,30 @@
#include <cassert>
#include <algorithm>
#include <iostream>
//#include "SdBus.h"
using namespace std::chrono_literals;
//class MySdBus : public sdbus::internal::SdBus
//{
//public:
// virtual int sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, sd_bus_message_handler_t callback, void *userdata, uint64_t usec) override
// {
// sd_bus_message* sdbusReply{};
// auto r = this->sd_bus_message_new_method_return(m, &sdbusReply);
// SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method reply", -r);
// callback(sdbusReply, userdata, nullptr);
// return 1;
// }
//};
namespace sdbus
{
PlainMessage createPlainMessage();
}
uint64_t totalDuration = 0;
class PerftestProxy : public sdbus::ProxyInterfaces<org::sdbuscpp::perftests_proxy>
@@ -102,10 +123,10 @@ int main(int /*argc*/, char */*argv*/[])
{
const char* destinationName = "org.sdbuscpp.perftests";
const char* objectPath = "/org/sdbuscpp/perftests";
PerftestProxy client(destinationName, objectPath);
//PerftestProxy client(destinationName, objectPath);
const unsigned int repetitions{20};
unsigned int msgCount = 1000;
unsigned int msgCount = 100000;
unsigned int msgSize{};
/*
@@ -136,54 +157,109 @@ int main(int /*argc*/, char */*argv*/[])
totalDuration = 0;
*/
// msgSize = 20;
// std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
// for (unsigned int r = 0; r < repetitions; ++r)
// {
// auto str1 = createRandomString(msgSize/2);
// auto str2 = createRandomString(msgSize/2);
// auto startTime = std::chrono::steady_clock::now();
// for (unsigned int i = 0; i < msgCount; i++)
// {
// auto result = client.concatenateTwoStrings(str1, str2);
// assert(result.size() == str1.size() + str2.size());
// assert(result.size() == msgSize);
// }
// auto stopTime = std::chrono::steady_clock::now();
// auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
// totalDuration += duration;
// std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
// std::this_thread::sleep_for(1000ms);
// }
// std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
// totalDuration = 0;
// msgSize = 1000;
// std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
// for (unsigned int r = 0; r < repetitions; ++r)
// {
// auto str1 = createRandomString(msgSize/2);
// auto str2 = createRandomString(msgSize/2);
// auto startTime = std::chrono::steady_clock::now();
// for (unsigned int i = 0; i < msgCount; i++)
// {
// auto result = client.concatenateTwoStrings(str1, str2);
// assert(result.size() == str1.size() + str2.size());
// assert(result.size() == msgSize);
// }
// auto stopTime = std::chrono::steady_clock::now();
// auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
// totalDuration += duration;
// std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
// std::this_thread::sleep_for(1000ms);
// }
// std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
// totalDuration = 0;
auto proxy = sdbus::createProxy(destinationName, objectPath);
auto msg = proxy->createMethodCall("org.sdbuscpp.perftests", "concatenateTwoStrings");
//auto msg = sdbus::createPlainMessage();
msg.seal();
msgSize = 20;
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
for (unsigned int r = 0; r < repetitions; ++r)
{
auto str1 = createRandomString(msgSize/2);
auto str2 = createRandomString(msgSize/2);
auto startTime = std::chrono::steady_clock::now();
for (unsigned int i = 0; i < msgCount; i++)
{
auto result = client.concatenateTwoStrings(str1, str2);
//auto result = client.concatenateTwoStrings(str1, str2);
proxy->callMethod(msg);
assert(result.size() == str1.size() + str2.size());
assert(result.size() == msgSize);
//assert(result.size() == str1.size() + str2.size());
//assert(result.size() == msgSize);
}
auto stopTime = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
totalDuration += duration;
std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
std::this_thread::sleep_for(1000ms);
std::this_thread::sleep_for(100ms);
}
std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
totalDuration = 0;
msgSize = 1000;
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
for (unsigned int r = 0; r < repetitions; ++r)
{
auto str1 = createRandomString(msgSize/2);
auto str2 = createRandomString(msgSize/2);
// msgSize = 1000;
// std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
// for (unsigned int r = 0; r < repetitions; ++r)
// {
// auto str1 = createRandomString(msgSize/2);
// auto str2 = createRandomString(msgSize/2);
auto startTime = std::chrono::steady_clock::now();
for (unsigned int i = 0; i < msgCount; i++)
{
auto result = client.concatenateTwoStrings(str1, str2);
// auto startTime = std::chrono::steady_clock::now();
// for (unsigned int i = 0; i < msgCount; i++)
// {
// auto result = client.concatenateTwoStrings(str1, str2);
assert(result.size() == str1.size() + str2.size());
assert(result.size() == msgSize);
}
auto stopTime = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
totalDuration += duration;
std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
// assert(result.size() == str1.size() + str2.size());
// assert(result.size() == msgSize);
// }
// auto stopTime = std::chrono::steady_clock::now();
// auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count();
// totalDuration += duration;
// std::cout << "Called " << msgCount << " methods in: " << duration << " ms" << std::endl;
std::this_thread::sleep_for(1000ms);
}
// std::this_thread::sleep_for(1000ms);
// }
std::cout << "AVERAGE: " << (totalDuration/repetitions) << " ms" << std::endl;
totalDuration = 0;