Compare commits

...

7 Commits

Author SHA1 Message Date
d2d1a2ddbc WIP 2019-02-04 22:31:09 +01:00
9292f293ec WIP 2019-01-31 21:42:37 +01:00
dd0a975243 WIP 2019-01-30 07:57:07 +01:00
e59afb827b Fix introspection xml 2019-01-27 15:13:15 +01:00
0cf27f7262 Introduce support for client-side asynchronous method invocations 2019-01-27 14:55:20 +01:00
97c47cb6df Put perftests in proper place 2019-01-26 23:16:37 +01:00
3839c3ffd7 Introduce simple method call and signal-based manual performance tests 2019-01-25 13:30:27 +01:00
30 changed files with 1135 additions and 131 deletions

View File

@ -13,7 +13,9 @@ include(GNUInstallDirs) # Installation directories for `install` command and pkg
#-------------------------------
find_package(PkgConfig REQUIRED)
pkg_check_modules(SYSTEMD REQUIRED libsystemd>=236)
#pkg_check_modules(SYSTEMD REQUIRED libsystemd>=236)
set(CMAKE_CXX_FLAGS "-O0 -g")
#-------------------------------
# SOURCE FILES CONFIGURATION
@ -80,7 +82,7 @@ set(SDBUSCPP_VERSION "${PROJECT_VERSION}")
# We are building in two steps: first objects, then link them into a library,
# and that's because we need object files since unit tests link against them.
add_library(sdbuscppobjects OBJECT ${SDBUSCPP_SRCS})
target_include_directories(sdbuscppobjects PUBLIC ${SYSTEMD_INCLUDE_DIRS})
#target_include_directories(sdbuscppobjects PUBLIC ${SYSTEMD_INCLUDE_DIRS})
target_compile_definitions(sdbuscppobjects PRIVATE BUILDLIB=1)
set_target_properties(sdbuscppobjects PROPERTIES POSITION_INDEPENDENT_CODE ON)
@ -91,7 +93,7 @@ set_target_properties(sdbus-c++
VERSION "${SDBUSCPP_VERSION}"
SOVERSION "${SDBUSCPP_VERSION_MAJOR}"
OUTPUT_NAME "sdbus-c++")
target_link_libraries(sdbus-c++ ${SYSTEMD_LIBRARIES})
target_link_libraries(sdbus-c++ -L/home/aeywalee/data/repos/systemd/.libs/ -lsystemd)
#----------------------------------
# INSTALLATION
@ -124,6 +126,7 @@ if(BUILD_CODE_GEN)
add_subdirectory("${CMAKE_SOURCE_DIR}/stub-generator")
endif()
#----------------------------------
# DOCUMENTATION
#----------------------------------

62
Notes.txt Normal file
View File

@ -0,0 +1,62 @@
Aktualne spravanie:
* Proxy dostane connection ako referenciu zvonka:
* Ak na tej connection bezi processing loop, tak
* bud mozeme robit RPC volanie, ked je to z callbacku nasho serveroveho D-Bus API (overene od Lennarta)
- vsetko je serializovane - volania a prijimania, i viacere proxy objekty medzi sebou,
* alebo nemozeme (t.j. z ineho threadu) lebo connection nie je thread-safe.
* Ak nebezi (nie idealny stav z hladiska D-Bus daemona), tak
* mozeme robit RPC volania iba ak mame zarucenu synchronizaciu medzi viacerymi proxy objektami nad touto istou connection,
* neprijimame signaly samozrejme, lebo nebezi loop.
Vyhody: Skalovatelnost - proxy zdielaju jednu connection a jeden jej loop thread.
Nevyhody: Nie je to dotiahnute do konca, v podsate to nefunguje hlavne ak bezi loop nad connection.
* Proxy dostane vlastnu connection zvonka alebo si ju vytvori vlastnu:
* Tu connection pouziva na volania, nebezi na nej loop.
Vyhody: Je nezavisly threadovo i message-komunikativne od inych proxy
Nevyhody: Skalovatelnost
* Plus, ak pocuva na signaly, tak si vytvori kopiu connection, na nej thread s loop.
Nevyhody: Skalovatelnost - V tomto pripade mame teda 2x connection per proxy a 1 thread per proxy
Navrhovane spravanie:
Pouzivatel si zvoli
* Skalovatelnost a nenarocnost na resourcy - pouzivatel chce mat 1 connection a 1 processing loop thread nad nou.
Pripadne 2 ak chce time-slicingovo oddelit server cast od proxy casti (ze nie su incoming server / outgoing proxy requesty serializovane medzi sebou).
* Alebo sa nestara a kazdy proxy bude mat vlastnu connection a vlastny event loop thread nad nou.
Oba pristupy z hladiska implementacie znamenaju, ze proxy ma vzdy connection handlovanu niekym.
Teraz ako by fungovalo RPC volanie z proxy:
* Proxy musi nejak poslat message do connection:
* Ak je call thread rovnaky ako connection event loop thread (vtedy ak sme v nejakom server method callbacku a pouzivame tuto connection,
alebo ak sme v handleri na signal), tak v podstate mozeme vykonat to volania priamo, rovnako ako to je teraz.
* Ak je call thread iny (robime volanie cez proxy odkialkolvek), tak musime MethodCall msg poslat cez queue do connection threadu.
(Tu mozeme vyuzit queue ktory tam uz mame na async replies - davat tam std::pair<Message, MsgType>)
Teraz:
-> Mame sync call? Tak spolu s msg vsak musime poslat z proxy aj promise<MethodReply>, a v proxy si z promise ziskat future
a na nom cakat na odpoved z Connection.
Zatial Connection posle msg cez msg.send(), hned dostane reply, a reply setne do promise.
Cakajuci proxy sa zobudi a vrati reply.
TODO: Jednoducho to urobit tak ako Object::sendReplyAsynchronously - ze send() deleguje hned to connection::send, a ta bude mat if.
-> Mame async call? Tak otazka je ci bude mat mapu slotov na std::function ObjectProxy alebo Connection. Vyzera ze Connection, kvoli timingu. Nie, tak nakoniec ObjectProxy, juchuu, lebo nekorelujeme cez slot.
Ked connection, tak ta zavola AsyncMethodCall::send(), ktory zoberie callback, user data, timeout a vrati slot.
Connection si ulozi do mapy slot a std::function, a je to.
Takze finalna impelmentacia ObjectProxy::send(MethodCall):
connection_->send(msg);
Takze finalna impelmentacia ObjectProxy::send(AsyncMethodCall):
connection_->send(msg); -> a ta vnutri da userdata ako new std::function
* Proxy dostane referenciu na connection zvonka.
* Na nej si zaregistruje signaly a metody
TODO: Dokumentacia
* Ze proxy komplet zmenil pohlad na koneksny - ze koneksny vzdy maju svoj thread, pokial ich vlastni proxy.
TODO: Prerobit i Object na taky style ze berie bud Connection& alebo Connection&& (a vtedy si pusti vlastny thread, ak uz nebezi);

View File

@ -37,6 +37,7 @@ namespace sdbus {
class IObject;
class IObjectProxy;
class Variant;
class Error;
}
namespace sdbus {
@ -165,6 +166,7 @@ namespace sdbus {
MethodInvoker& onInterface(const std::string& interfaceName);
template <typename... _Args> MethodInvoker& withArguments(_Args&&... args);
template <typename... _Args> void storeResultsTo(_Args&... args);
void dontExpectReply();
private:
@ -175,6 +177,21 @@ namespace sdbus {
bool methodCalled_{};
};
class AsyncMethodInvoker
{
public:
AsyncMethodInvoker(IObjectProxy& objectProxy, const std::string& methodName);
AsyncMethodInvoker& onInterface(const std::string& interfaceName);
template <typename... _Args> AsyncMethodInvoker& withArguments(_Args&&... args);
//template <typename... _OutputArgs> void uponReplyInvoke(std::function<void(const Error*, _OutputArgs...)> callback);
template <typename _Function> void uponReplyInvoke(_Function&& callback);
private:
IObjectProxy& objectProxy_;
const std::string& methodName_;
AsyncMethodCall method_;
};
class SignalSubscriber
{
public:

View File

@ -119,8 +119,7 @@ namespace sdbus {
// as a storage for the argument values deserialized from the message.
tuple_of_function_input_arg_types_t<_Function> inputArgs;
// Deserialize input arguments from the message into the tuple,
// plus store the result object as a last item of the tuple.
// Deserialize input arguments from the message into the tuple.
msg >> inputArgs;
// Invoke callback with input arguments from the tuple.
@ -487,6 +486,49 @@ namespace sdbus {
}
inline AsyncMethodInvoker::AsyncMethodInvoker(IObjectProxy& objectProxy, const std::string& methodName)
: objectProxy_(objectProxy)
, methodName_(methodName)
{
}
inline AsyncMethodInvoker& AsyncMethodInvoker::onInterface(const std::string& interfaceName)
{
method_ = objectProxy_.createAsyncMethodCall(interfaceName, methodName_);
return *this;
}
template <typename... _Args>
inline AsyncMethodInvoker& AsyncMethodInvoker::withArguments(_Args&&... args)
{
SDBUS_THROW_ERROR_IF(!method_.isValid(), "DBus interface not specified when calling a DBus method", EINVAL);
detail::serialize_pack(method_, std::forward<_Args>(args)...);
return *this;
}
template <typename _Function>
void AsyncMethodInvoker::uponReplyInvoke(_Function&& callback)
{
SDBUS_THROW_ERROR_IF(!method_.isValid(), "DBus interface not specified when calling a DBus method", EINVAL);
objectProxy_.callMethod(method_, [callback = std::forward<_Function>(callback)](MethodReply& reply, const Error* error)
{
// Create a tuple of callback input arguments' types, which will be used
// as a storage for the argument values deserialized from the message.
tuple_of_function_input_arg_types_t<_Function> args;
// Deserialize input arguments from the message into the tuple.
reply >> args;
// Invoke callback with input arguments from the tuple.
sdbus::apply(callback, error, args); // TODO: Use std::apply when switching to full C++17 support
});
}
inline SignalSubscriber::SignalSubscriber(IObjectProxy& objectProxy, const std::string& signalName)
: objectProxy_(objectProxy)
, signalName_(signalName)

View File

@ -57,6 +57,11 @@ namespace sdbus {
return message_;
}
bool isValid() const
{
return !getName().empty();
}
private:
std::string name_;
std::string message_;

View File

@ -34,6 +34,7 @@
// Forward declarations
namespace sdbus {
class MethodCall;
class AsyncMethodCall;
class MethodReply;
class IConnection;
}
@ -58,7 +59,7 @@ namespace sdbus {
*
* @param[in] interfaceName Name of an interface that the method is defined under
* @param[in] methodName Name of the method
* @return A method call message message
* @return A method call message
*
* Serialize method arguments into the returned message and invoke the method by passing
* the message with serialized arguments to the @c callMethod function.
@ -68,6 +69,21 @@ namespace sdbus {
*/
virtual MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) = 0;
/*!
* @brief Creates an asynchronous method call message
*
* @param[in] interfaceName Name of an interface that the method is defined under
* @param[in] methodName Name of the method
* @return A method call message
*
* Serialize method arguments into the returned message and invoke the method by passing
* the message with serialized arguments to the @c callMethod function.
* Alternatively, use higher-level API @c callMethodAsync(const std::string& methodName) defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) = 0;
/*!
* @brief Calls method on the proxied D-Bus object
*
@ -85,7 +101,23 @@ namespace sdbus {
*
* @throws sdbus::Error in case of failure
*/
virtual MethodReply callMethod(const sdbus::MethodCall& message) = 0;
virtual MethodReply callMethod(const MethodCall& message) = 0;
/*!
* @brief Calls method on the proxied D-Bus object asynchronously
*
* @param[in] message Message representing an async method call
* @param[in] asyncReplyHandler Handler for the async reply
*
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided async reply handler will get invoked from the context of the connection
* event loop processing thread.
*
* Note: To avoid messing with messages, use higher-level API defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) = 0;
/*!
* @brief Registers a handler for the desired signal emitted by the proxied D-Bus object
@ -131,6 +163,30 @@ namespace sdbus {
*/
MethodInvoker callMethod(const std::string& methodName);
/*!
* @brief Calls method on the proxied D-Bus object asynchronously
*
* @param[in] methodName Name of the method
* @return A helper object for convenient asynchronous invocation of the method
*
* This is a high-level, convenience way of calling D-Bus methods that abstracts
* from the D-Bus message concept. Method arguments/return value are automatically (de)serialized
* in a message and D-Bus signatures automatically deduced from the provided native arguments
* and return values.
*
* Example of use:
* @code
* int a = ..., b = ...;
* object_.callMethodAsync("multiply").onInterface(INTERFACE_NAME).withArguments(a, b).uponReplyInvoke([](int result)
* {
* std::cout << "Got result of multiplying " << a << " and " << b << ": " << result << std::endl;
* });
* @endcode
*
* @throws sdbus::Error in case of failure
*/
AsyncMethodInvoker callMethodAsync(const std::string& methodName);
/*!
* @brief Registers signal handler for a given signal of the proxied D-Bus object
*
@ -197,6 +253,11 @@ namespace sdbus {
return MethodInvoker(*this, methodName);
}
inline AsyncMethodInvoker IObjectProxy::callMethodAsync(const std::string& methodName)
{
return AsyncMethodInvoker(*this, methodName);
}
inline SignalSubscriber IObjectProxy::uponSignal(const std::string& signalName)
{
return SignalSubscriber(*this, signalName);

View File

@ -70,6 +70,16 @@ namespace sdbus {
***********************************************/
class Message
{
public:
enum class Type
{ METHOD_CALL
, ASYNC_METHOD_CALL
, METHOD_REPLY
/*, ASYNC_METHOD_REPLY? */
, SIGNAL
, PLAIN_MESSAGE
};
public:
Message() = default;
Message(void *msg) noexcept;
@ -163,6 +173,14 @@ namespace sdbus {
MethodReply sendWithNoReply() const;
};
class AsyncMethodCall : public Message
{
public:
using Message::Message;
AsyncMethodCall(MethodCall&& call) noexcept;
void send(void* callback, void* userData) const;
};
class MethodReply : public Message
{
public:

View File

@ -46,12 +46,14 @@ namespace sdbus {
class Signal;
class MethodResult;
template <typename... _Results> class Result;
class Error;
}
namespace sdbus {
using method_callback = std::function<void(MethodCall& msg, MethodReply& reply)>;
using async_method_callback = std::function<void(MethodCall& msg, MethodResult result)>;
using async_reply_handler = std::function<void(MethodReply& reply, const Error* error)>;
using signal_handler = std::function<void(Signal& signal)>;
using property_set_callback = std::function<void(Message& msg)>;
using property_get_callback = std::function<void(Message& reply)>;
@ -368,6 +370,12 @@ namespace sdbus {
static constexpr bool is_async = false;
};
template <typename... _Args>
struct function_traits<void(const Error*, _Args...)>
: public function_traits_base<void, _Args...>
{
};
template <typename... _Args, typename... _Results>
struct function_traits<void(Result<_Results...>, _Args...)>
: public function_traits_base<std::tuple<_Results...>, _Args...>
@ -498,6 +506,15 @@ namespace sdbus {
return std::forward<_Function>(f)(std::move(r), std::get<_I>(std::forward<_Tuple>(t))...);
}
template <class _Function, class _Tuple, std::size_t... _I>
constexpr decltype(auto) apply_impl( _Function&& f
, const Error* e
, _Tuple&& t
, std::index_sequence<_I...> )
{
return std::forward<_Function>(f)(e, std::get<_I>(std::forward<_Tuple>(t))...);
}
// Version of apply_impl for functions returning non-void values.
// In this case just forward function return value.
template <class _Function, class _Tuple, std::size_t... _I>
@ -542,6 +559,37 @@ namespace sdbus {
, std::forward<_Tuple>(t)
, std::make_index_sequence<std::tuple_size<std::decay_t<_Tuple>>::value>{} );
}
// Convert tuple `t' of values into a list of arguments
// and invoke function `f' with those arguments.
template <class _Function, class _Tuple>
constexpr decltype(auto) apply(_Function&& f, const Error* e, _Tuple&& t)
{
return detail::apply_impl( std::forward<_Function>(f)
, e
, std::forward<_Tuple>(t)
, std::make_index_sequence<std::tuple_size<std::decay_t<_Tuple>>::value>{} );
}
// Invoke a member function (custom version of C++17's invoke until we have full C++17 support)
template< typename _Function
, typename... _Args
, std::enable_if_t<std::is_member_pointer<std::decay_t<_Function>>{}, int> = 0 >
constexpr decltype(auto) invoke(_Function&& f, _Args&&... args)
noexcept(noexcept(std::mem_fn(f)(std::forward<_Args>(args)...)))
{
return std::mem_fn(f)(std::forward<_Args>(args)...);
}
// Invoke non-member function (custom version of C++17's invoke until we have full C++17 support)
template< typename _Function
, typename... _Args
, std::enable_if_t<!std::is_member_pointer<std::decay_t<_Function>>{}, int> = 0 >
constexpr decltype(auto) invoke(_Function&& f, _Args&&... args)
noexcept(noexcept(std::forward<_Function>(f)(std::forward<_Args>(args)...)))
{
return std::forward<_Function>(f)(std::forward<_Args>(args)...);
}
}
#endif /* SDBUS_CXX_TYPETRAITS_H_ */

View File

@ -43,11 +43,13 @@ Connection::Connection(Connection::BusType type)
finishHandshake(bus);
notificationFd_ = createLoopNotificationDescriptor();
std::cerr << "Created eventfd " << notificationFd_ << " of " << this << std::endl;
}
Connection::~Connection()
{
leaveProcessingLoop();
std::cerr << "Closing eventfd " << notificationFd_ << " of " << this << std::endl;
closeLoopNotificationDescriptor(notificationFd_);
}
@ -65,6 +67,10 @@ void Connection::releaseName(const std::string& name)
void Connection::enterProcessingLoop()
{
loopThreadId_ = std::this_thread::get_id();
std::lock_guard<std::mutex> guard(loopMutex_);
while (true)
{
auto processed = processPendingRequest();
@ -75,13 +81,18 @@ void Connection::enterProcessingLoop()
if (!success)
break; // Exit processing loop
if (success.asyncMsgsToProcess)
processAsynchronousMessages();
processUserRequests();
}
loopThreadId_ = std::thread::id{};
}
void Connection::enterProcessingLoopAsync()
{
asyncLoopThread_ = std::thread([this](){ enterProcessingLoop(); });
std::cerr << "--> enterProcessingLoopAsync() for connection " << this << std::endl;
// TODO: Check that joinable() means a valid non-empty thread
//if (!asyncLoopThread_.joinable())
asyncLoopThread_ = std::thread([this](){ enterProcessingLoop(); });
}
void Connection::leaveProcessingLoop()
@ -114,16 +125,17 @@ void Connection::removeObjectVTable(void* vtableHandle)
sd_bus_slot_unref((sd_bus_slot *)vtableHandle);
}
sdbus::MethodCall Connection::createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const
MethodCall Connection::createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const
{
sd_bus_message *sdbusMsg{};
// Returned message will become an owner of sdbusMsg
SCOPE_EXIT{ sd_bus_message_unref(sdbusMsg); };
// It is thread-safe to create a message this way
auto r = sd_bus_message_new_method_call( bus_.get()
, &sdbusMsg
, destination.c_str()
@ -136,15 +148,16 @@ sdbus::MethodCall Connection::createMethodCall( const std::string& destination
return MethodCall(sdbusMsg);
}
sdbus::Signal Connection::createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const
Signal Connection::createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const
{
sd_bus_message *sdbusSignal{};
// Returned message will become an owner of sdbusSignal
SCOPE_EXIT{ sd_bus_message_unref(sdbusSignal); };
// It is thread-safe to create a message this way
auto r = sd_bus_message_new_signal( bus_.get()
, &sdbusSignal
, objectPath.c_str()
@ -156,32 +169,194 @@ sdbus::Signal Connection::createSignal( const std::string& objectPath
return Signal(sdbusSignal);
}
template<typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int>>
inline auto Connection::tryExecuteSync(_Callable&& fnc, const _Args&... args)
{
std::thread::id loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
// Is the loop not yet on? => Go make synchronous call
while (loopThreadId == std::thread::id{})
{
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
if (!loopMutex_.try_lock())
{
loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
continue;
}
// Synchronous call
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
sdbus::invoke(std::forward<_Callable>(fnc), args...);
return true;
}
// Is the loop on and we are in the same thread? => Go for synchronous call
if (loopThreadId == std::this_thread::get_id())
{
assert(!loopMutex_.try_lock());
sdbus::invoke(std::forward<_Callable>(fnc), args...);
return true;
}
return false;
}
template<typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int>>
inline auto Connection::tryExecuteSync(_Callable&& fnc, const _Args&... args)
{
std::thread::id loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
// Is the loop not yet on? => Go make synchronous call
while (loopThreadId == std::thread::id{})
{
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
if (!loopMutex_.try_lock())
{
loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
continue;
}
// Synchronous call
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
return std::make_pair(true, sdbus::invoke(std::forward<_Callable>(fnc), args...));
}
// Is the loop on and we are in the same thread? => Go for synchronous call
if (loopThreadId == std::this_thread::get_id())
{
assert(!loopMutex_.try_lock());
return std::make_pair(true, sdbus::invoke(std::forward<_Callable>(fnc), args...));
}
return std::make_pair(false, function_result_t<_Callable>{});
}
template<typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int>>
inline void Connection::executeAsync(_Callable&& fnc, const _Args&... args)
{
std::promise<void> result;
auto future = result.get_future();
queueUserRequest([fnc = std::forward<_Callable>(fnc), args..., &result]()
{
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
std::cerr << " [lt] ... Invoking void request from within event loop thread..." << std::endl;
sdbus::invoke(fnc, args...);
std::cerr << " [lt] Request invoked" << std::endl;
result.set_value();
onSdbusError.dismiss();
});
// Wait for the the processing loop thread to process the request
future.get();
}
template<typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int>>
inline auto Connection::executeAsync(_Callable&& fnc, const _Args&... args)
{
std::promise<function_result_t<_Callable>> result;
auto future = result.get_future();
queueUserRequest([fnc = std::forward<_Callable>(fnc), args..., &result]()
{
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
std::cerr << " [lt] ... Invoking request from within event loop thread..." << std::endl;
auto returnValue = sdbus::invoke(fnc, args...);
std::cerr << " [lt] Request invoked and got result" << std::endl;
result.set_value(returnValue);
onSdbusError.dismiss();
});
// Wait for the reply from the processing loop thread
return future.get();
}
template<typename _Callable, typename... _Args>
inline void Connection::executeAsyncAndDontWaitForResult(_Callable&& fnc, const _Args&... args)
{
queueUserRequest([fnc = std::forward<_Callable>(fnc), args...]()
{
sdbus::invoke(fnc, args...);
});
}
void* Connection::registerSignalHandler( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName
, sd_bus_message_handler_t callback
, void* userData )
{
sd_bus_slot *slot{};
auto registerSignalHandler = [this]( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName
, sd_bus_message_handler_t callback
, void* userData )
{
sd_bus_slot *slot{};
auto filter = composeSignalMatchFilter(objectPath, interfaceName, signalName);
auto r = sd_bus_add_match(bus_.get(), &slot, filter.c_str(), callback, userData);
auto filter = composeSignalMatchFilter(objectPath, interfaceName, signalName);
auto r = sd_bus_add_match(bus_.get(), &slot, filter.c_str(), callback, userData);
std::cerr << "Registered signal " << signalName << " with slot " << slot << std::endl;
SDBUS_THROW_ERROR_IF(r < 0, "Failed to register signal handler", -r);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to register signal handler", -r);
return slot;
return slot;
};
std::cerr << "Trying to register signal " << signalName << " synchronously..." << std::endl;
auto result = tryExecuteSync(registerSignalHandler, objectPath, interfaceName, signalName, callback, userData);
if (!result.first) std::cerr << " ... Nope, going async way" << std::endl;
return result.first ? result.second
: executeAsync(registerSignalHandler, objectPath, interfaceName, signalName, callback, userData);
}
void Connection::unregisterSignalHandler(void* handlerCookie)
{
sd_bus_slot_unref((sd_bus_slot *)handlerCookie);
auto result = tryExecuteSync(sd_bus_slot_unref, (sd_bus_slot *)handlerCookie);
// if (!result.first)
// executeAsync(sd_bus_slot_unref, (sd_bus_slot *)handlerCookie);
if (result.first)
{
std::cerr << "Synchronously unregistered signal " << handlerCookie << ": " << result.second << std::endl;
return;
}
auto slot = executeAsync(sd_bus_slot_unref, (sd_bus_slot *)handlerCookie);
std::cerr << "Asynchronously unregistered signal " << handlerCookie << ": " << slot << std::endl;
}
void Connection::sendReplyAsynchronously(const sdbus::MethodReply& reply)
MethodReply Connection::callMethod(const MethodCall& message)
{
std::lock_guard<std::mutex> guard(mutex_);
asyncReplies_.push(reply);
notifyProcessingLoop();
std::cerr << "Trying to call method synchronously..." << std::endl;
auto result = tryExecuteSync(&MethodCall::send, message);
if (!result.first) std::cerr << " ... Nope, going async way" << std::endl;
return result.first ? result.second
: executeAsync(&MethodCall::send, message);
}
void Connection::callMethod(const AsyncMethodCall& message, void* callback, void* userData)
{
auto result = tryExecuteSync(&AsyncMethodCall::send, message, callback, userData);
if (!result)
executeAsyncAndDontWaitForResult(&AsyncMethodCall::send, message, callback, userData);
}
void Connection::sendMethodReply(const MethodReply& message)
{
auto result = tryExecuteSync(&MethodReply::send, message);
if (!result)
executeAsyncAndDontWaitForResult(&MethodReply::send, message);
}
void Connection::emitSignal(const Signal& message)
{
auto result = tryExecuteSync(&Signal::send, message);
if (!result)
executeAsyncAndDontWaitForResult(&Signal::send, message);
}
std::unique_ptr<sdbus::internal::IConnection> Connection::clone() const
@ -238,10 +413,14 @@ void Connection::notifyProcessingLoop()
{
assert(notificationFd_ >= 0);
uint64_t value = 1;
auto r = write(notificationFd_, &value, sizeof(value));
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify processing loop", -errno);
for (int i = 0; i < 1; ++i)
{
//std::this_thread::sleep_for(std::chrono::milliseconds(5));
uint64_t value = 1;
auto r = write(notificationFd_, &value, sizeof(value));
std::cerr << "Wrote to notification fd " << notificationFd_ << std::endl;
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify processing loop", -errno);
}
}
void Connection::notifyProcessingLoopToExit()
@ -270,14 +449,24 @@ bool Connection::processPendingRequest()
return r > 0;
}
void Connection::processAsynchronousMessages()
void Connection::queueUserRequest(UserRequest&& request)
{
std::lock_guard<std::mutex> guard(mutex_);
while (!asyncReplies_.empty())
{
auto reply = asyncReplies_.front();
asyncReplies_.pop();
reply.send();
std::lock_guard<std::mutex> guard(userRequestsMutex_);
userRequests_.push(std::move(request));
std::cerr << "Pushed to user request queue. Size: " << userRequests_.size() << std::endl;
}
notifyProcessingLoop();
}
void Connection::processUserRequests()
{
std::lock_guard<std::mutex> guard(userRequestsMutex_);
while (!userRequests_.empty())
{
auto& request = userRequests_.front();
request();
userRequests_.pop();
}
}
@ -299,22 +488,31 @@ Connection::WaitResult Connection::waitForNextRequest()
uint64_t usec;
sd_bus_get_timeout(bus, &usec);
struct pollfd fds[] = {{sdbusFd, sdbusEvents, 0}, {notificationFd_, POLLIN, 0}};
struct pollfd fds[] = {{sdbusFd, sdbusEvents, 0}, {notificationFd_, POLLIN | POLLHUP | POLLERR | POLLNVAL, 0}};
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
std::cerr << "[lt] Going to poll on fs " << sdbusFd << ", " << notificationFd_ << " with timeout " << usec << " and fdscount == " << fdsCount << std::endl;
r = poll(fds, fdsCount, usec == (uint64_t) -1 ? -1 : (usec+999)/1000);
if (r < 0 && errno == EINTR)
{
std::cerr << "<<<>>>> GOT EINTR" << std::endl;
return {true, false}; // Try again
}
SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno);
if ((fds[1].revents & POLLHUP) || (fds[1].revents & POLLERR) || ((fds[1].revents & POLLNVAL)))
{
std::cerr << "!!!!!!!!!! Something went wrong on polling" << std::endl;
}
if (fds[1].revents & POLLIN)
{
if (exitLoopThread_)
return {false, false}; // Got exit notification
// Otherwise we have some async messages to process
// Otherwise we have some user requests to process
std::cerr << "Loop found it has some async requests to process" << std::endl;
uint64_t value{};
auto r = read(notificationFd_, &value, sizeof(value));

View File

@ -29,11 +29,13 @@
#include <sdbus-c++/IConnection.h>
#include <sdbus-c++/Message.h>
#include "IConnection.h"
#include "ScopeGuard.h"
#include <systemd/sd-bus.h>
#include <memory>
#include <thread>
#include <atomic>
#include <mutex>
#include <future>
#include <queue>
namespace sdbus { namespace internal {
@ -50,7 +52,7 @@ namespace sdbus { namespace internal {
};
Connection(BusType type);
~Connection();
~Connection() override;
void requestName(const std::string& name) override;
void releaseName(const std::string& name) override;
@ -64,13 +66,13 @@ namespace sdbus { namespace internal {
, void* userData ) override;
void removeObjectVTable(void* vtableHandle) override;
sdbus::MethodCall createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const override;
sdbus::Signal createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const override;
MethodCall createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const override;
Signal createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const override;
void* registerSignalHandler( const std::string& objectPath
, const std::string& interfaceName
@ -79,7 +81,10 @@ namespace sdbus { namespace internal {
, void* userData ) override;
void unregisterSignalHandler(void* handlerCookie) override;
void sendReplyAsynchronously(const sdbus::MethodReply& reply) override;
MethodReply callMethod(const MethodCall& message) override;
void callMethod(const AsyncMethodCall& message, void* callback, void* userData) override;
void sendMethodReply(const MethodReply& message) override;
void emitSignal(const Signal& message) override;
std::unique_ptr<sdbus::internal::IConnection> clone() const override;
@ -93,12 +98,16 @@ namespace sdbus { namespace internal {
return msgsToProcess || asyncMsgsToProcess;
}
};
using UserRequest = std::function<void()>;
static sd_bus* openBus(Connection::BusType type);
static void finishHandshake(sd_bus* bus);
static int createLoopNotificationDescriptor();
static void closeLoopNotificationDescriptor(int fd);
bool processPendingRequest();
void processAsynchronousMessages();
void queueUserRequest(UserRequest&& request);
void processUserRequests();
WaitResult waitForNextRequest();
static std::string composeSignalMatchFilter( const std::string& objectPath
, const std::string& interfaceName
@ -107,11 +116,27 @@ namespace sdbus { namespace internal {
void notifyProcessingLoopToExit();
void joinWithProcessingLoop();
// TODO move this and threading logic and method around it to separate class?
template <typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int> = 0>
inline auto tryExecuteSync(_Callable&& fnc, const _Args&... args);
template <typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int> = 0>
inline auto tryExecuteSync(_Callable&& fnc, const _Args&... args);
template <typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int> = 0>
inline void executeAsync(_Callable&& fnc, const _Args&... args);
template <typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int> = 0>
inline auto executeAsync(_Callable&& fnc, const _Args&... args);
template <typename _Callable, typename... _Args>
inline void executeAsyncAndDontWaitForResult(_Callable&& fnc, const _Args&... args);
private:
std::unique_ptr<sd_bus, decltype(&sd_bus_flush_close_unref)> bus_{nullptr, &sd_bus_flush_close_unref};
std::thread asyncLoopThread_;
std::mutex mutex_;
std::queue<MethodReply> asyncReplies_;
std::atomic<std::thread::id> loopThreadId_;
std::mutex loopMutex_;
std::queue<UserRequest> userRequests_;
std::mutex userRequestsMutex_;
std::atomic<bool> exitLoopThread_;
int notificationFd_{-1};
BusType busType_;

View File

@ -33,6 +33,7 @@
// Forward declaration
namespace sdbus {
class MethodCall;
class AsyncMethodCall;
class MethodReply;
class Signal;
}
@ -49,14 +50,14 @@ namespace internal {
, void* userData ) = 0;
virtual void removeObjectVTable(void* vtableHandle) = 0;
virtual sdbus::MethodCall createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const = 0;
virtual MethodCall createMethodCall( const std::string& destination
, const std::string& objectPath
, const std::string& interfaceName
, const std::string& methodName ) const = 0;
virtual sdbus::Signal createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const = 0;
virtual Signal createSignal( const std::string& objectPath
, const std::string& interfaceName
, const std::string& signalName ) const = 0;
virtual void* registerSignalHandler( const std::string& objectPath
, const std::string& interfaceName
@ -68,7 +69,10 @@ namespace internal {
virtual void enterProcessingLoopAsync() = 0;
virtual void leaveProcessingLoop() = 0;
virtual void sendReplyAsynchronously(const sdbus::MethodReply& reply) = 0;
virtual MethodReply callMethod(const MethodCall& message) = 0;
virtual void callMethod(const AsyncMethodCall& message, void* callback, void* userData) = 0;
virtual void sendMethodReply(const MethodReply& message) = 0;
virtual void emitSignal(const Signal& message) = 0;
virtual std::unique_ptr<sdbus::internal::IConnection> clone() const = 0;

View File

@ -648,6 +648,17 @@ MethodReply MethodCall::createErrorReply(const Error& error) const
return MethodReply(sdbusErrorReply);
}
AsyncMethodCall::AsyncMethodCall(MethodCall&& call) noexcept
: Message(call)
{
}
void AsyncMethodCall::send(void* callback, void* userData) const
{
auto r = sd_bus_call_async(nullptr, nullptr, (sd_bus_message*)getMsg(), (sd_bus_message_handler_t)callback, userData, 0);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r);
}
void MethodReply::send() const
{
auto r = sd_bus_send(nullptr, (sd_bus_message*)getMsg(), nullptr);

View File

@ -37,7 +37,7 @@ MethodResult::MethodResult(const MethodCall& msg, sdbus::internal::Object& objec
void MethodResult::send(const MethodReply& reply) const
{
assert(object_ != nullptr);
object_->sendReplyAsynchronously(reply);
object_->sendMethodReply(reply);
}
}

View File

@ -156,15 +156,12 @@ sdbus::Signal Object::createSignal(const std::string& interfaceName, const std::
void Object::emitSignal(const sdbus::Signal& message)
{
// TODO: Make signal emitting asynchronous. Now signal can probably be emitted only from user code
// handled within the D-Bus processing loop thread, but not from any thread. In principle it will
// be the same as async replies.
message.send();
connection_.emitSignal(message);
}
void Object::sendReplyAsynchronously(const MethodReply& reply)
void Object::sendMethodReply(const MethodReply& reply)
{
connection_.sendReplyAsynchronously(reply);
connection_.sendMethodReply(reply);
}
const std::vector<sd_bus_vtable>& Object::createInterfaceVTable(InterfaceData& interfaceData)

View File

@ -84,7 +84,7 @@ namespace internal {
sdbus::Signal createSignal(const std::string& interfaceName, const std::string& signalName) override;
void emitSignal(const sdbus::Signal& message) override;
void sendReplyAsynchronously(const MethodReply& reply);
void sendMethodReply(const MethodReply& reply);
private:
using InterfaceName = std::string;

View File

@ -30,12 +30,13 @@
#include "IConnection.h"
#include <systemd/sd-bus.h>
#include <cassert>
#include <chrono>
#include <thread>
namespace sdbus { namespace internal {
ObjectProxy::ObjectProxy(sdbus::internal::IConnection& connection, std::string destination, std::string objectPath)
: connection_(&connection, [](sdbus::internal::IConnection *){ /* Intentionally left empty */ })
, ownConnection_(false)
, destination_(std::move(destination))
, objectPath_(std::move(objectPath))
{
@ -45,30 +46,37 @@ ObjectProxy::ObjectProxy( std::unique_ptr<sdbus::internal::IConnection>&& connec
, std::string destination
, std::string objectPath )
: connection_(std::move(connection))
, ownConnection_(true)
, destination_(std::move(destination))
, objectPath_(std::move(objectPath))
{
}
ObjectProxy::~ObjectProxy()
{
// If the dedicated connection for signals is used, we have to stop the processing loop
// upon this connection prior to unregistering signal slots in the interfaces_ container,
// otherwise we might have a race condition of two threads working upon one connection.
if (signalConnection_ != nullptr)
signalConnection_->leaveProcessingLoop();
// The connection is ours only, so we have to manage event loop upon this connection,
// so we get signals, async replies, and other messages from D-Bus.
connection_->enterProcessingLoopAsync();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const std::string& methodName)
{
// Tell, don't ask
return connection_->createMethodCall(destination_, objectPath_, interfaceName, methodName);
}
AsyncMethodCall ObjectProxy::createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName)
{
return AsyncMethodCall{ObjectProxy::createMethodCall(interfaceName, methodName)};
}
MethodReply ObjectProxy::callMethod(const MethodCall& message)
{
return message.send();
return connection_->callMethod(message);
}
void ObjectProxy::callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback)
{
auto callback = (void*)&ObjectProxy::sdbus_async_reply_handler;
// Allocated userData gets deleted in the sdbus_async_reply_handler
auto userData = new async_reply_handler(std::move(asyncReplyCallback));
connection_->callMethod(message, callback, userData);
}
void ObjectProxy::registerSignalHandler( const std::string& interfaceName
@ -88,30 +96,7 @@ void ObjectProxy::registerSignalHandler( const std::string& interfaceName
void ObjectProxy::finishRegistration()
{
bool hasSignals = listensToSignals();
if (hasSignals && ownConnection_)
{
// Let's use dedicated signalConnection_ for signals,
// which will then be used by the processing loop thread.
signalConnection_ = connection_->clone();
registerSignalHandlers(*signalConnection_);
signalConnection_->enterProcessingLoopAsync();
}
else if (hasSignals)
{
// Let's used connection provided from the outside.
registerSignalHandlers(*connection_);
}
}
bool ObjectProxy::listensToSignals() const
{
for (auto& interfaceItem : interfaces_)
if (!interfaceItem.second.signals_.empty())
return true;
return false;
registerSignalHandlers(*connection_);
}
void ObjectProxy::registerSignalHandlers(sdbus::internal::IConnection& connection)
@ -136,11 +121,30 @@ void ObjectProxy::registerSignalHandlers(sdbus::internal::IConnection& connectio
}
}
int ObjectProxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError)
{
MethodReply message(sdbusMessage);
std::unique_ptr<async_reply_handler> asyncReplyCallback{static_cast<async_reply_handler*>(userData)};
assert(asyncReplyCallback != nullptr);
if (!sd_bus_error_is_set(retError))
{
(*asyncReplyCallback)(message, nullptr);
}
else
{
sdbus::Error error(retError->name, retError->message);
(*asyncReplyCallback)(message, &error);
}
}
int ObjectProxy::sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
{
Signal message(sdbusMessage);
auto* object = static_cast<ObjectProxy*>(userData);
assert(object != nullptr);
// Note: The lookup can be optimized by using sorted vectors instead of associative containers
auto& callback = object->interfaces_[message.getInterfaceName()].signals_[message.getMemberName()].callback_;
assert(callback);

View File

@ -50,10 +50,11 @@ namespace internal {
ObjectProxy( std::unique_ptr<sdbus::internal::IConnection>&& connection
, std::string destination
, std::string objectPath );
~ObjectProxy();
MethodCall createMethodCall(const std::string& interfaceName, const std::string& methodName) override;
AsyncMethodCall createAsyncMethodCall(const std::string& interfaceName, const std::string& methodName) override;
MethodReply callMethod(const MethodCall& message) override;
void callMethod(const AsyncMethodCall& message, async_reply_handler asyncReplyCallback) override;
void registerSignalHandler( const std::string& interfaceName
, const std::string& signalName
@ -61,16 +62,14 @@ namespace internal {
void finishRegistration() override;
private:
bool listensToSignals() const;
void registerSignalHandlers(sdbus::internal::IConnection& connection);
static int sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_signal_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
private:
std::unique_ptr< sdbus::internal::IConnection
, std::function<void(sdbus::internal::IConnection*)>
> connection_;
bool ownConnection_{};
std::unique_ptr<sdbus::internal::IConnection> signalConnection_;
std::string destination_;
std::string objectPath_;

View File

@ -63,20 +63,35 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
# targets even when INTERFACE_INCLUDE_DIRECTORIES is used.
set(CMAKE_NO_SYSTEM_FROM_IMPORTED "1")
add_executable(libsdbus-c++_unittests ${UNITTESTS_SRCS} $<TARGET_OBJECTS:sdbuscppobjects>)
target_link_libraries(libsdbus-c++_unittests ${SYSTEMD_LIBRARIES} gmock gmock_main)
#add_executable(libsdbus-c++_unittests ${UNITTESTS_SRCS} $<TARGET_OBJECTS:sdbuscppobjects>)
#target_link_libraries(libsdbus-c++_unittests ${SYSTEMD_LIBRARIES} gmock gmock_main)
add_executable(libsdbus-c++_integrationtests ${INTEGRATIONTESTS_SRCS})
target_link_libraries(libsdbus-c++_integrationtests sdbus-c++ gmock gmock_main)
# Manual performance tests
option(ENABLE_PERFTESTS "Build and install manual performance tests (default OFF)" OFF)
if(ENABLE_PERFTESTS)
add_executable(libsdbus-c++_perftests_client perftests/client.cpp perftests/perftest-proxy.h)
target_link_libraries(libsdbus-c++_perftests_client sdbus-c++)
add_executable(libsdbus-c++_perftests_server perftests/server.cpp perftests/perftest-adaptor.h)
target_link_libraries(libsdbus-c++_perftests_server sdbus-c++)
endif()
#----------------------------------
# INSTALLATION
#----------------------------------
install(TARGETS libsdbus-c++_unittests DESTINATION /opt/test/bin)
#install(TARGETS libsdbus-c++_unittests DESTINATION /opt/test/bin)
install(TARGETS libsdbus-c++_integrationtests DESTINATION /opt/test/bin)
install(FILES ${INTEGRATIONTESTS_SOURCE_DIR}/files/libsdbus-cpp-test.conf DESTINATION ${CMAKE_INSTALL_SYSCONFDIR}/dbus-1/system.d)
if(ENABLE_PERFTESTS)
install(TARGETS libsdbus-c++_perftests_client DESTINATION /opt/test/bin)
install(TARGETS libsdbus-c++_perftests_server DESTINATION /opt/test/bin)
install(FILES perftests/files/org.sdbuscpp.perftest.conf DESTINATION ${CMAKE_INSTALL_SYSCONFDIR}/dbus-1/system.d)
endif()
#----------------------------------
# RUNNING THE TESTS UPON BUILD
#----------------------------------
@ -92,9 +107,9 @@ if(CMAKE_CROSSCOMPILING)
if(NOT (UNIT_TESTS_RUNNER AND TEST_DEVICE_IP))
message(WARNING "UNIT_TESTS_RUNNER and TEST_DEVICE_IP variables must be defined to run tests remotely")
endif()
add_test(NAME libsdbus-c++_unittests COMMAND ${UNIT_TESTS_RUNNER} --deviceip=${TEST_DEVICE_IP} --testbin=libsdbus-c++_unittests)
#add_test(NAME libsdbus-c++_unittests COMMAND ${UNIT_TESTS_RUNNER} --deviceip=${TEST_DEVICE_IP} --testbin=libsdbus-c++_unittests)
add_test(NAME libsdbus-c++_integrationtests COMMAND ${UNIT_TESTS_RUNNER} --deviceip=${TEST_DEVICE_IP} --testbin=libsdbus-c++_integrationtests)
else()
add_test(NAME libsdbus-c++_unittests COMMAND libsdbus-c++_unittests)
#add_test(NAME libsdbus-c++_unittests COMMAND libsdbus-c++_unittests)
add_test(NAME libsdbus-c++_integrationtests COMMAND libsdbus-c++_integrationtests)
endif()

View File

@ -42,6 +42,7 @@
#include <tuple>
#include <chrono>
#include <fstream>
#include <future>
using ::testing::Eq;
using ::testing::Gt;
@ -58,10 +59,12 @@ public:
{
m_connection.requestName(INTERFACE_NAME);
m_connection.enterProcessingLoopAsync();
//m_connection2.enterProcessingLoopAsync();
}
static void TearDownTestCase()
{
//m_connection2.leaveProcessingLoop();
m_connection.leaveProcessingLoop();
m_connection.releaseName(INTERFACE_NAME);
}
@ -70,7 +73,7 @@ private:
void SetUp() override
{
m_adaptor = std::make_unique<TestingAdaptor>(m_connection);
m_proxy = std::make_unique<TestingProxy>(INTERFACE_NAME, OBJECT_PATH);
m_proxy = std::make_unique<TestingProxy>(/*m_connection2, */INTERFACE_NAME, OBJECT_PATH);
std::this_thread::sleep_for(50ms); // Give time for the proxy to start listening to signals
}
@ -82,12 +85,14 @@ private:
public:
static sdbus::internal::Connection m_connection;
//static sdbus::internal::Connection m_connection2;
std::unique_ptr<TestingAdaptor> m_adaptor;
std::unique_ptr<TestingProxy> m_proxy;
};
sdbus::internal::Connection AdaptorAndProxyFixture::m_connection{sdbus::internal::Connection::BusType::eSystem};
//sdbus::internal::Connection AdaptorAndProxyFixture::m_connection2{sdbus::internal::Connection::BusType::eSystem};
}
@ -95,16 +100,16 @@ sdbus::internal::Connection AdaptorAndProxyFixture::m_connection{sdbus::internal
/* -- TEST CASES -- */
/*-------------------------------------*/
TEST(AdaptorAndProxy, CanBeConstructedSuccesfully)
{
auto connection = sdbus::createConnection();
connection->requestName(INTERFACE_NAME);
//TEST(AdaptorAndProxy, CanBeConstructedSuccesfully)
//{
// auto connection = sdbus::createConnection();
// connection->requestName(INTERFACE_NAME);
ASSERT_NO_THROW(TestingAdaptor adaptor(*connection));
ASSERT_NO_THROW(TestingProxy proxy(INTERFACE_NAME, OBJECT_PATH));
// ASSERT_NO_THROW(TestingAdaptor adaptor(*connection));
// ASSERT_NO_THROW(TestingProxy proxy(INTERFACE_NAME, OBJECT_PATH));
connection->releaseName(INTERFACE_NAME);
}
// connection->releaseName(INTERFACE_NAME);
//}
// Methods
@ -246,7 +251,7 @@ TEST_F(SdbusTestObject, CallsErrorThrowingMethodWithDontExpectReplySet)
ASSERT_TRUE(m_adaptor->wasThrowErrorCalled());
}
TEST_F(SdbusTestObject, DoesServerSideAsynchoronousMethodInParallel)
TEST_F(SdbusTestObject, RunsServerSideAsynchoronousMethodAsynchronously)
{
// Yeah, this is kinda timing-dependent test, but times should be safe...
std::mutex mtx;
@ -300,7 +305,41 @@ TEST_F(SdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncMethods)
ASSERT_THAT(resultCount, Eq(1500));
}
/*
TEST_F(SdbusTestObject, InvokesMethodAsynchronouslyOnClientSide)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err)
{
if (err == nullptr)
promise.set_value(res);
else
promise.set_exception(std::make_exception_ptr(*err));
});
m_proxy->doOperationClientSideAsync(100);
ASSERT_THAT(future.get(), Eq(100));
}
TEST_F(SdbusTestObject, InvokesErroneousMethodAsynchronouslyOnClientSide)
{
std::promise<uint32_t> promise;
auto future = promise.get_future();
m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t res, const sdbus::Error* err)
{
if (err == nullptr)
promise.set_value(res);
else
promise.set_exception(std::make_exception_ptr(*err));
});
m_proxy->doErroneousOperationClientSideAsync();
ASSERT_THROW(future.get(), sdbus::Error);
}
*/
TEST_F(SdbusTestObject, FailsCallingNonexistentMethod)
{
ASSERT_THROW(m_proxy->callNonexistentMethod(), sdbus::Error);

View File

@ -45,7 +45,7 @@ public:
protected:
void noArgNoReturn() const { }
void noArgNoReturn() const { std::cerr << "Server: noArgNoReturn() called;" << std::endl;}
int32_t getInt() const { return INT32_VALUE; }
@ -110,7 +110,7 @@ protected:
return res;
}
uint32_t doOperationSync(uint32_t param)
uint32_t doOperation(uint32_t param)
{
std::this_thread::sleep_for(std::chrono::milliseconds(param));
return param;

View File

@ -38,6 +38,11 @@ public:
double getVariantValue() const { return m_variantValue; }
std::map<std::string, std::string> getSignatureFromSignal() const { return m_signature; }
void installDoOperationClientSideAsyncReplyHandler(std::function<void(uint32_t res, const sdbus::Error* err)> handler)
{
m_DoOperationClientSideAsyncReplyHandler = handler;
}
protected:
void onSimpleSignal() override { ++m_simpleCallCounter; }
@ -53,12 +58,19 @@ protected:
m_signature[std::get<0>(s)] = static_cast<std::string>(std::get<0>(std::get<1>(s)));
}
void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) override
{
if (m_DoOperationClientSideAsyncReplyHandler)
m_DoOperationClientSideAsyncReplyHandler(returnValue, error);
}
private:
int m_simpleCallCounter{};
std::map<int32_t, std::string> m_map;
double m_variantValue;
std::map<std::string, std::string> m_signature;
std::function<void(uint32_t res, const sdbus::Error* err)> m_DoOperationClientSideAsyncReplyHandler;
};

View File

@ -85,9 +85,9 @@ protected:
return this->sumVectorItems(a, b);
});
object_.registerMethod("doOperationSync").onInterface(INTERFACE_NAME).implementedAs([this](uint32_t param)
object_.registerMethod("doOperation").onInterface(INTERFACE_NAME).implementedAs([this](uint32_t param)
{
return this->doOperationSync(param);
return this->doOperation(param);
});
object_.registerMethod("doOperationAsync").onInterface(INTERFACE_NAME).implementedAs([this](sdbus::Result<uint32_t> result, uint32_t param)
@ -158,7 +158,7 @@ protected:
virtual sdbus::Struct<std::string, sdbus::Struct<std::map<int32_t, int32_t>>> getStructInStruct() const = 0;
virtual int32_t sumStructItems(const sdbus::Struct<uint8_t, uint16_t>& a, const sdbus::Struct<int32_t, int64_t>& b) = 0;
virtual uint32_t sumVectorItems(const std::vector<uint16_t>& a, const std::vector<uint64_t>& b) = 0;
virtual uint32_t doOperationSync(uint32_t param) = 0;
virtual uint32_t doOperation(uint32_t param) = 0;
virtual void doOperationAsync(uint32_t param, sdbus::Result<uint32_t> result) = 0;
virtual sdbus::Signature getSignature() const = 0;
virtual sdbus::ObjectPath getObjectPath() const = 0;
@ -212,11 +212,11 @@ R"delimiter(<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspectio
</interface>
<interface name="com.kistler.testsdbuscpp">
<annotation name="org.freedesktop.DBus.Deprecated" value="true"/>
<method name="doOperationAsync">
<method name="doOperation">
<arg type="u" direction="in"/>
<arg type="u" direction="out"/>
</method>
<method name="doOperationSync">
<method name="doOperationAsync">
<arg type="u" direction="in"/>
<arg type="u" direction="out"/>
</method>

View File

@ -50,6 +50,8 @@ protected:
virtual void onSignalWithVariant(const sdbus::Variant& v) = 0;
virtual void onSignalWithoutRegistration(const sdbus::Struct<std::string, sdbus::Struct<sdbus::Signature>>& s) = 0;
virtual void onDoOperationReply(uint32_t returnValue, const sdbus::Error* error) = 0;
public:
void noArgNoReturn()
{
@ -124,10 +126,10 @@ public:
return result;
}
uint32_t doOperationSync(uint32_t param)
uint32_t doOperation(uint32_t param)
{
uint32_t result;
object_.callMethod("doOperationSync").onInterface(INTERFACE_NAME).withArguments(param).storeResultsTo(result);
object_.callMethod("doOperation").onInterface(INTERFACE_NAME).withArguments(param).storeResultsTo(result);
return result;
}
@ -138,6 +140,25 @@ public:
return result;
}
uint32_t doOperationClientSideAsync(uint32_t param)
{
object_.callMethodAsync("doOperation")
.onInterface(INTERFACE_NAME)
.withArguments(param)
.uponReplyInvoke([this](const sdbus::Error* error, uint32_t returnValue)
{
this->onDoOperationReply(returnValue, error);
});
}
uint32_t doErroneousOperationClientSideAsync()
{
object_.callMethodAsync("throwError").onInterface(INTERFACE_NAME).uponReplyInvoke([this](const sdbus::Error* error)
{
this->onDoOperationReply(0, error);
});
}
sdbus::Signature getSignature()
{
sdbus::Signature result;

View File

@ -0,0 +1,26 @@
#-------------------------------
# GENERAL COMPILER CONFIGURATION
#-------------------------------
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
#----------------------------------
# BUILD INFORMATION
#----------------------------------
# Turn off -isystem gcc option that CMake uses for imported
# targets even when INTERFACE_INCLUDE_DIRECTORIES is used.
#set(CMAKE_NO_SYSTEM_FROM_IMPORTED "1")
add_executable(perftestclient client.cpp perftest-proxy.h)
target_link_libraries(perftestclient sdbus-c++)
add_executable(perftestserver server.cpp perftest-adaptor.h)
target_link_libraries(perftestserver sdbus-c++)
#----------------------------------
# INSTALLATION
#----------------------------------
install(TARGETS perftestclient DESTINATION /opt/test/bin)
install(TARGETS perftestserver DESTINATION /opt/test/bin)

174
test/perftests/client.cpp Normal file
View File

@ -0,0 +1,174 @@
/**
* (C) 2019 KISTLER INSTRUMENTE AG, Winterthur, Switzerland
*
* @file client.cpp
*
* Created on: Jan 25, 2019
* Project: sdbus-c++
* Description: High-level D-Bus IPC C++ library based on sd-bus
*
* This file is part of sdbus-c++.
*
* sdbus-c++ is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* sdbus-c++ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with sdbus-c++. If not, see <http://www.gnu.org/licenses/>.
*/
#include "perftest-proxy.h"
#include <sdbus-c++/sdbus-c++.h>
#include <vector>
#include <string>
#include <iostream>
#include <unistd.h>
#include <thread>
#include <chrono>
#include <cassert>
using namespace std::chrono_literals;
class PerftestClient : public sdbus::ProxyInterfaces<org::sdbuscpp::perftest_proxy>
{
public:
PerftestClient(std::string destination, std::string objectPath)
: sdbus::ProxyInterfaces<org::sdbuscpp::perftest_proxy>(std::move(destination), std::move(objectPath))
{
}
protected:
virtual void onDataSignal(const std::string& data) override
{
static unsigned int counter = 0;
static std::chrono::time_point<std::chrono::steady_clock> startTime;
assert(data.size() == m_msgSize);
++counter;
if (counter == 1)
startTime = std::chrono::steady_clock::now();
else if (counter == m_msgCount)
{
auto stopTime = std::chrono::steady_clock::now();
std::cout << "Received " << m_msgCount << " signals in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
counter = 0;
}
}
public:
unsigned int m_msgSize{};
unsigned int m_msgCount{};
};
std::string createRandomString(size_t length)
{
auto randchar = []() -> char
{
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[ rand() % max_index ];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}
//-----------------------------------------
int main(int /*argc*/, char */*argv*/[])
{
const char* destinationName = "org.sdbuscpp.perftest";
const char* objectPath = "/org/sdbuscpp/perftest";
//PerftestClient client(destinationName, objectPath);
const unsigned int repetitions{2};
unsigned int msgCount = 1000;
unsigned int msgSize{};
{
PerftestClient client(destinationName, objectPath);
msgSize = 20;
std::cout << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
for (unsigned int r = 0; r < repetitions; ++r)
{
client.sendDataSignals(msgCount, msgSize);
std::this_thread::sleep_for(1000ms);
}
}
{
PerftestClient client(destinationName, objectPath);
msgSize = 1000;
std::cout << std::endl << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
for (unsigned int r = 0; r < repetitions; ++r)
{
client.sendDataSignals(msgCount, msgSize);
std::this_thread::sleep_for(1000ms);
}
}
{
PerftestClient client(destinationName, objectPath);
msgSize = 20;
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
for (unsigned int r = 0; r < repetitions; ++r)
{
auto str1 = createRandomString(msgSize/2);
auto str2 = createRandomString(msgSize/2);
auto startTime = std::chrono::steady_clock::now();
for (unsigned int i = 0; i < msgCount; i++)
{
auto result = client.concatenateTwoStrings(str1, str2);
assert(result.size() == str1.size() + str2.size());
assert(result.size() == msgSize);
}
auto stopTime = std::chrono::steady_clock::now();
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
std::this_thread::sleep_for(1000ms);
}
}
{
PerftestClient client(destinationName, objectPath);
msgSize = 1000;
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
for (unsigned int r = 0; r < repetitions; ++r)
{
auto str1 = createRandomString(msgSize/2);
auto str2 = createRandomString(msgSize/2);
auto startTime = std::chrono::steady_clock::now();
for (unsigned int i = 0; i < msgCount; i++)
{
auto result = client.concatenateTwoStrings(str1, str2);
assert(result.size() == str1.size() + str2.size());
assert(result.size() == msgSize);
}
auto stopTime = std::chrono::steady_clock::now();
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
std::this_thread::sleep_for(1000ms);
}
}
return 0;
}

View File

@ -0,0 +1,16 @@
<!-- This configuration file specifies the required security policies
for the Kistler DBUS example to run core daemon to work. -->
<!DOCTYPE busconfig PUBLIC "-//freedesktop//DTD D-BUS Bus Configuration 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/busconfig.dtd">
<busconfig>
<!-- ../system.conf have denied everything, so we just punch some holes -->
<policy context="default">
<allow own="org.sdbuscpp.perftest"/>
<allow send_destination="org.sdbuscpp.perftest"/>
<allow send_interface="org.sdbuscpp.perftest"/>
</policy>
</busconfig>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<node name="/org/sdbuscpp/perftest">
<interface name="org.sdbuscpp.perftest">
<method name="sendDataSignals">
<arg type="u" name="numberOfSignals" direction="in" />
<arg type="u" name="signalMsgSize" direction="in" />
</method>
<method name="concatenateTwoStrings">
<arg type="s" name="string1" direction="in" />
<arg type="s" name="string2" direction="in" />
<arg type="s" name="result" direction="out" />
</method>
<signal name="dataSignal">
<arg type="s" name="data" />
</signal>
</interface>
</node>

View File

@ -0,0 +1,46 @@
/*
* This file was automatically generated by sdbuscpp-xml2cpp; DO NOT EDIT!
*/
#ifndef __sdbuscpp__perftest_adaptor_h__adaptor__H__
#define __sdbuscpp__perftest_adaptor_h__adaptor__H__
#include <sdbus-c++/sdbus-c++.h>
#include <string>
#include <tuple>
namespace org {
namespace sdbuscpp {
class perftest_adaptor
{
public:
static constexpr const char* interfaceName = "org.sdbuscpp.perftest";
protected:
perftest_adaptor(sdbus::IObject& object)
: object_(object)
{
object_.registerMethod("sendDataSignals").onInterface(interfaceName).implementedAs([this](const uint32_t& numberOfSignals, const uint32_t& signalMsgSize){ return this->sendDataSignals(numberOfSignals, signalMsgSize); });
object_.registerMethod("concatenateTwoStrings").onInterface(interfaceName).implementedAs([this](const std::string& string1, const std::string& string2){ return this->concatenateTwoStrings(string1, string2); });
object_.registerSignal("dataSignal").onInterface(interfaceName).withParameters<std::string>();
}
public:
void dataSignal(const std::string& data)
{
object_.emitSignal("dataSignal").onInterface(interfaceName).withArguments(data);
}
private:
virtual void sendDataSignals(const uint32_t& numberOfSignals, const uint32_t& signalMsgSize) = 0;
virtual std::string concatenateTwoStrings(const std::string& string1, const std::string& string2) = 0;
private:
sdbus::IObject& object_;
};
}} // namespaces
#endif

View File

@ -0,0 +1,49 @@
/*
* This file was automatically generated by sdbuscpp-xml2cpp; DO NOT EDIT!
*/
#ifndef __sdbuscpp__perftest_proxy_h__proxy__H__
#define __sdbuscpp__perftest_proxy_h__proxy__H__
#include <sdbus-c++/sdbus-c++.h>
#include <string>
#include <tuple>
namespace org {
namespace sdbuscpp {
class perftest_proxy
{
public:
static constexpr const char* interfaceName = "org.sdbuscpp.perftest";
protected:
perftest_proxy(sdbus::IObjectProxy& object)
: object_(object)
{
object_.uponSignal("dataSignal").onInterface(interfaceName).call([this](const std::string& data){ this->onDataSignal(data); });
}
virtual void onDataSignal(const std::string& data) = 0;
public:
void sendDataSignals(const uint32_t& numberOfSignals, const uint32_t& signalMsgSize)
{
object_.callMethod("sendDataSignals").onInterface(interfaceName).withArguments(numberOfSignals, signalMsgSize);
}
std::string concatenateTwoStrings(const std::string& string1, const std::string& string2)
{
std::string result;
object_.callMethod("concatenateTwoStrings").onInterface(interfaceName).withArguments(string1, string2).storeResultsTo(result);
return result;
}
private:
sdbus::IObjectProxy& object_;
};
}} // namespaces
#endif

94
test/perftests/server.cpp Normal file
View File

@ -0,0 +1,94 @@
/**
* (C) 2019 KISTLER INSTRUMENTE AG, Winterthur, Switzerland
*
* @file server.cpp
*
* Created on: Jan 25, 2019
* Project: sdbus-c++
* Description: High-level D-Bus IPC C++ library based on sd-bus
*
* This file is part of sdbus-c++.
*
* sdbus-c++ is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 2.1 of the License, or
* (at your option) any later version.
*
* sdbus-c++ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with sdbus-c++. If not, see <http://www.gnu.org/licenses/>.
*/
#include "perftest-adaptor.h"
#include <sdbus-c++/sdbus-c++.h>
#include <vector>
#include <string>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
std::string createRandomString(size_t length);
class PerftestServer : public sdbus::Interfaces<org::sdbuscpp::perftest_adaptor>
{
public:
PerftestServer(sdbus::IConnection& connection, std::string objectPath)
: sdbus::Interfaces<org::sdbuscpp::perftest_adaptor>(connection, std::move(objectPath))
{
}
protected:
virtual void sendDataSignals(const uint32_t& numberOfSignals, const uint32_t& signalMsgSize) override
{
auto data = createRandomString(signalMsgSize);
char digits[] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'};
auto start_time = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < numberOfSignals; ++i)
{
// Emit signal
dataSignal(data);
}
auto stop_time = std::chrono::steady_clock::now();
std::cout << "Server sent " << numberOfSignals << " signals in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stop_time - start_time).count() << " ms" << std::endl;
}
virtual std::string concatenateTwoStrings(const std::string& string1, const std::string& string2) override
{
return string1 + string2;
}
};
std::string createRandomString(size_t length)
{
auto randchar = []() -> char
{
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[ rand() % max_index ];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}
//-----------------------------------------
int main(int /*argc*/, char */*argv*/[])
{
const char* serviceName = "org.sdbuscpp.perftest";
auto connection = sdbus::createSystemBusConnection(serviceName);
const char* objectPath = "/org/sdbuscpp/perftest";
PerftestServer server(*connection, objectPath);
connection->enterProcessingLoop();
}