forked from Kistler-Group/sdbus-cpp
WIP
This commit is contained in:
@ -570,6 +570,26 @@ namespace sdbus {
|
|||||||
, std::forward<_Tuple>(t)
|
, std::forward<_Tuple>(t)
|
||||||
, std::make_index_sequence<std::tuple_size<std::decay_t<_Tuple>>::value>{} );
|
, std::make_index_sequence<std::tuple_size<std::decay_t<_Tuple>>::value>{} );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Invoke a member function (custom version of C++17's invoke until we have full C++17 support)
|
||||||
|
template< typename _Function
|
||||||
|
, typename... _Args
|
||||||
|
, std::enable_if_t<std::is_member_pointer<std::decay_t<_Function>>{}, int> = 0 >
|
||||||
|
constexpr decltype(auto) invoke(_Function&& f, _Args&&... args)
|
||||||
|
noexcept(noexcept(std::mem_fn(f)(std::forward<_Args>(args)...)))
|
||||||
|
{
|
||||||
|
return std::mem_fn(f)(std::forward<_Args>(args)...);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke non-member function (custom version of C++17's invoke until we have full C++17 support)
|
||||||
|
template< typename _Function
|
||||||
|
, typename... _Args
|
||||||
|
, std::enable_if_t<!std::is_member_pointer<std::decay_t<_Function>>{}, int> = 0 >
|
||||||
|
constexpr decltype(auto) invoke(_Function&& f, _Args&&... args)
|
||||||
|
noexcept(noexcept(std::forward<_Function>(f)(std::forward<_Args>(args)...)))
|
||||||
|
{
|
||||||
|
return std::forward<_Function>(f)(std::forward<_Args>(args)...);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif /* SDBUS_CXX_TYPETRAITS_H_ */
|
#endif /* SDBUS_CXX_TYPETRAITS_H_ */
|
||||||
|
@ -43,11 +43,13 @@ Connection::Connection(Connection::BusType type)
|
|||||||
finishHandshake(bus);
|
finishHandshake(bus);
|
||||||
|
|
||||||
notificationFd_ = createLoopNotificationDescriptor();
|
notificationFd_ = createLoopNotificationDescriptor();
|
||||||
|
std::cerr << "Created eventfd " << notificationFd_ << " of " << this << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
Connection::~Connection()
|
Connection::~Connection()
|
||||||
{
|
{
|
||||||
leaveProcessingLoop();
|
leaveProcessingLoop();
|
||||||
|
std::cerr << "Closing eventfd " << notificationFd_ << " of " << this << std::endl;
|
||||||
closeLoopNotificationDescriptor(notificationFd_);
|
closeLoopNotificationDescriptor(notificationFd_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,8 +89,9 @@ void Connection::enterProcessingLoop()
|
|||||||
|
|
||||||
void Connection::enterProcessingLoopAsync()
|
void Connection::enterProcessingLoopAsync()
|
||||||
{
|
{
|
||||||
|
std::cerr << "--> enterProcessingLoopAsync() for connection " << this << std::endl;
|
||||||
// TODO: Check that joinable() means a valid non-empty thread
|
// TODO: Check that joinable() means a valid non-empty thread
|
||||||
if (!asyncLoopThread_.joinable())
|
//if (!asyncLoopThread_.joinable())
|
||||||
asyncLoopThread_ = std::thread([this](){ enterProcessingLoop(); });
|
asyncLoopThread_ = std::thread([this](){ enterProcessingLoop(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,6 +169,122 @@ Signal Connection::createSignal( const std::string& objectPath
|
|||||||
return Signal(sdbusSignal);
|
return Signal(sdbusSignal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template<typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int>>
|
||||||
|
inline auto Connection::tryExecuteSync(_Callable&& fnc, const _Args&... args)
|
||||||
|
{
|
||||||
|
std::thread::id loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
|
// Is the loop not yet on? => Go make synchronous call
|
||||||
|
while (loopThreadId == std::thread::id{})
|
||||||
|
{
|
||||||
|
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
||||||
|
if (!loopMutex_.try_lock())
|
||||||
|
{
|
||||||
|
loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Synchronous call
|
||||||
|
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
||||||
|
sdbus::invoke(std::forward<_Callable>(fnc), args...);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is the loop on and we are in the same thread? => Go for synchronous call
|
||||||
|
if (loopThreadId == std::this_thread::get_id())
|
||||||
|
{
|
||||||
|
assert(!loopMutex_.try_lock());
|
||||||
|
sdbus::invoke(std::forward<_Callable>(fnc), args...);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int>>
|
||||||
|
inline auto Connection::tryExecuteSync(_Callable&& fnc, const _Args&... args)
|
||||||
|
{
|
||||||
|
std::thread::id loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
|
// Is the loop not yet on? => Go make synchronous call
|
||||||
|
while (loopThreadId == std::thread::id{})
|
||||||
|
{
|
||||||
|
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
||||||
|
if (!loopMutex_.try_lock())
|
||||||
|
{
|
||||||
|
loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Synchronous call
|
||||||
|
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
||||||
|
return std::make_pair(true, sdbus::invoke(std::forward<_Callable>(fnc), args...));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is the loop on and we are in the same thread? => Go for synchronous call
|
||||||
|
if (loopThreadId == std::this_thread::get_id())
|
||||||
|
{
|
||||||
|
assert(!loopMutex_.try_lock());
|
||||||
|
return std::make_pair(true, sdbus::invoke(std::forward<_Callable>(fnc), args...));
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_pair(false, function_result_t<_Callable>{});
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int>>
|
||||||
|
inline void Connection::executeAsync(_Callable&& fnc, const _Args&... args)
|
||||||
|
{
|
||||||
|
std::promise<void> result;
|
||||||
|
auto future = result.get_future();
|
||||||
|
|
||||||
|
queueUserRequest([fnc = std::forward<_Callable>(fnc), args..., &result]()
|
||||||
|
{
|
||||||
|
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
|
||||||
|
|
||||||
|
std::cerr << " [lt] ... Invoking void request from within event loop thread..." << std::endl;
|
||||||
|
sdbus::invoke(fnc, args...);
|
||||||
|
std::cerr << " [lt] Request invoked" << std::endl;
|
||||||
|
result.set_value();
|
||||||
|
|
||||||
|
onSdbusError.dismiss();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the the processing loop thread to process the request
|
||||||
|
future.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int>>
|
||||||
|
inline auto Connection::executeAsync(_Callable&& fnc, const _Args&... args)
|
||||||
|
{
|
||||||
|
std::promise<function_result_t<_Callable>> result;
|
||||||
|
auto future = result.get_future();
|
||||||
|
|
||||||
|
queueUserRequest([fnc = std::forward<_Callable>(fnc), args..., &result]()
|
||||||
|
{
|
||||||
|
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
|
||||||
|
|
||||||
|
std::cerr << " [lt] ... Invoking request from within event loop thread..." << std::endl;
|
||||||
|
auto returnValue = sdbus::invoke(fnc, args...);
|
||||||
|
std::cerr << " [lt] Request invoked and got result" << std::endl;
|
||||||
|
result.set_value(returnValue);
|
||||||
|
|
||||||
|
onSdbusError.dismiss();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for the reply from the processing loop thread
|
||||||
|
return future.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename _Callable, typename... _Args>
|
||||||
|
inline void Connection::executeAsyncAndDontWaitForResult(_Callable&& fnc, const _Args&... args)
|
||||||
|
{
|
||||||
|
queueUserRequest([fnc = std::forward<_Callable>(fnc), args...]()
|
||||||
|
{
|
||||||
|
sdbus::invoke(fnc, args...);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
void* Connection::registerSignalHandler( const std::string& objectPath
|
void* Connection::registerSignalHandler( const std::string& objectPath
|
||||||
, const std::string& interfaceName
|
, const std::string& interfaceName
|
||||||
, const std::string& signalName
|
, const std::string& signalName
|
||||||
@ -182,181 +301,62 @@ void* Connection::registerSignalHandler( const std::string& objectPath
|
|||||||
|
|
||||||
auto filter = composeSignalMatchFilter(objectPath, interfaceName, signalName);
|
auto filter = composeSignalMatchFilter(objectPath, interfaceName, signalName);
|
||||||
auto r = sd_bus_add_match(bus_.get(), &slot, filter.c_str(), callback, userData);
|
auto r = sd_bus_add_match(bus_.get(), &slot, filter.c_str(), callback, userData);
|
||||||
|
std::cerr << "Registered signal " << signalName << " with slot " << slot << std::endl;
|
||||||
|
|
||||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to register signal handler", -r);
|
SDBUS_THROW_ERROR_IF(r < 0, "Failed to register signal handler", -r);
|
||||||
|
|
||||||
return slot;
|
return slot;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::thread::id loopThreadId = loopThreadId_;
|
std::cerr << "Trying to register signal " << signalName << " synchronously..." << std::endl;
|
||||||
|
auto result = tryExecuteSync(registerSignalHandler, objectPath, interfaceName, signalName, callback, userData);
|
||||||
// Is the loop not yet on? => Go make synchronous call
|
if (!result.first) std::cerr << " ... Nope, going async way" << std::endl;
|
||||||
while (loopThreadId == std::thread::id{})
|
return result.first ? result.second
|
||||||
{
|
: executeAsync(registerSignalHandler, objectPath, interfaceName, signalName, callback, userData);
|
||||||
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
|
||||||
if (!loopMutex_.try_lock())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Synchronous call
|
|
||||||
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
|
||||||
return registerSignalHandler(objectPath, interfaceName, signalName, callback, userData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is the loop on and we are in the same thread? => Go for synchronous call
|
|
||||||
if (loopThreadId == std::this_thread::get_id())
|
|
||||||
{
|
|
||||||
assert(!loopMutex_.try_lock());
|
|
||||||
return registerSignalHandler(objectPath, interfaceName, signalName, callback, userData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are in a different thread than the loop thread => Asynchronous call
|
|
||||||
std::promise<void*> result;
|
|
||||||
auto future = result.get_future();
|
|
||||||
queueUserRequest([registerSignalHandler, objectPath, interfaceName, signalName, callback, userData, &result]()
|
|
||||||
{
|
|
||||||
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
|
|
||||||
|
|
||||||
void* slot = registerSignalHandler(objectPath, interfaceName, signalName, callback, userData);
|
|
||||||
result.set_value(slot);
|
|
||||||
|
|
||||||
onSdbusError.dismiss();
|
|
||||||
});
|
|
||||||
auto request = std::make_unique<SignalRegistrationRequest>();
|
|
||||||
request->registerSignalHandler = registerSignalHandler;
|
|
||||||
auto future = request->result.get_future();
|
|
||||||
queueUserRequest(std::move(request));
|
|
||||||
// Wait for the reply from the loop thread
|
|
||||||
return future.get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::unregisterSignalHandler(void* handlerCookie)
|
void Connection::unregisterSignalHandler(void* handlerCookie)
|
||||||
{
|
{
|
||||||
sd_bus_slot_unref((sd_bus_slot *)handlerCookie);
|
auto result = tryExecuteSync(sd_bus_slot_unref, (sd_bus_slot *)handlerCookie);
|
||||||
|
// if (!result.first)
|
||||||
|
// executeAsync(sd_bus_slot_unref, (sd_bus_slot *)handlerCookie);
|
||||||
|
if (result.first)
|
||||||
|
{
|
||||||
|
std::cerr << "Synchronously unregistered signal " << handlerCookie << ": " << result.second << std::endl;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto slot = executeAsync(sd_bus_slot_unref, (sd_bus_slot *)handlerCookie);
|
||||||
|
std::cerr << "Asynchronously unregistered signal " << handlerCookie << ": " << slot << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
//class AsyncExecutor
|
|
||||||
//ifPossibleExecuteSync()
|
|
||||||
|
|
||||||
MethodReply Connection::callMethod(const MethodCall& message)
|
MethodReply Connection::callMethod(const MethodCall& message)
|
||||||
{
|
{
|
||||||
//ifPossibleExecuteSync().otherwiseExecuteAsync();
|
std::cerr << "Trying to call method synchronously..." << std::endl;
|
||||||
std::thread::id loopThreadId = loopThreadId_;
|
auto result = tryExecuteSync(&MethodCall::send, message);
|
||||||
|
if (!result.first) std::cerr << " ... Nope, going async way" << std::endl;
|
||||||
// Is the loop not yet on? => Go make synchronous call
|
return result.first ? result.second
|
||||||
while (loopThreadId == std::thread::id{})
|
: executeAsync(&MethodCall::send, message);
|
||||||
{
|
|
||||||
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
|
||||||
if (!loopMutex_.try_lock())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Synchronous call
|
|
||||||
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
|
||||||
return message.send();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is the loop on and we are in the same thread? => Go for synchronous call
|
|
||||||
if (loopThreadId == std::this_thread::get_id())
|
|
||||||
{
|
|
||||||
assert(!loopMutex_.try_lock());
|
|
||||||
return message.send(); // Synchronous call
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are in a different thread than the loop thread => Asynchronous call
|
|
||||||
auto request = std::make_unique<MethodCallRequest>();
|
|
||||||
request->msg = message;
|
|
||||||
auto future = request->result.get_future();
|
|
||||||
queueUserRequest(std::move(request));
|
|
||||||
// Wait for the reply from the loop thread
|
|
||||||
return future.get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::callMethod(const AsyncMethodCall& message, void* callback, void* userData)
|
void Connection::callMethod(const AsyncMethodCall& message, void* callback, void* userData)
|
||||||
{
|
{
|
||||||
std::thread::id loopThreadId = loopThreadId_;
|
auto result = tryExecuteSync(&AsyncMethodCall::send, message, callback, userData);
|
||||||
|
if (!result)
|
||||||
// Is the loop not yet on? => Go make synchronous call
|
executeAsyncAndDontWaitForResult(&AsyncMethodCall::send, message, callback, userData);
|
||||||
while (loopThreadId == std::thread::id{})
|
|
||||||
{
|
|
||||||
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
|
||||||
if (!loopMutex_.try_lock())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Synchronous call
|
|
||||||
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
|
||||||
return message.send(callback, userData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is the loop on and we are in the same thread? => Go for synchronous call
|
|
||||||
if (loopThreadId == std::this_thread::get_id())
|
|
||||||
{
|
|
||||||
assert(!loopMutex_.try_lock());
|
|
||||||
return message.send(callback, userData); // Synchronous call
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are in a different thread than the loop thread => Asynchronous call
|
|
||||||
auto request = std::make_unique<AsyncMethodCallRequest>();
|
|
||||||
request->msg = message;
|
|
||||||
request->callback = callback;
|
|
||||||
request->userData = userData;
|
|
||||||
queueUserRequest(std::move(request));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::sendMethodReply(const MethodReply& message)
|
void Connection::sendMethodReply(const MethodReply& message)
|
||||||
{
|
{
|
||||||
std::thread::id loopThreadId = loopThreadId_;
|
auto result = tryExecuteSync(&MethodReply::send, message);
|
||||||
|
if (!result)
|
||||||
// Is the loop not yet on? => Go make synchronous call
|
executeAsyncAndDontWaitForResult(&MethodReply::send, message);
|
||||||
while (loopThreadId == std::thread::id{})
|
|
||||||
{
|
|
||||||
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
|
||||||
if (!loopMutex_.try_lock())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Synchronous call
|
|
||||||
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
|
||||||
return message.send();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is the loop on and we are in the same thread? => Go for synchronous call
|
|
||||||
if (loopThreadId == std::this_thread::get_id())
|
|
||||||
{
|
|
||||||
assert(!loopMutex_.try_lock());
|
|
||||||
return message.send(); // Synchronous call
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are in a different thread than the loop thread => Asynchronous call
|
|
||||||
auto request = std::make_unique<MethodReplyRequest>();
|
|
||||||
request->msg = message;
|
|
||||||
queueUserRequest(std::move(request));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::emitSignal(const Signal& message)
|
void Connection::emitSignal(const Signal& message)
|
||||||
{
|
{
|
||||||
std::thread::id loopThreadId = loopThreadId_;
|
auto result = tryExecuteSync(&Signal::send, message);
|
||||||
|
if (!result)
|
||||||
// Is the loop not yet on? => Go make synchronous call
|
executeAsyncAndDontWaitForResult(&Signal::send, message);
|
||||||
while (loopThreadId == std::thread::id{})
|
|
||||||
{
|
|
||||||
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
|
||||||
if (!loopMutex_.try_lock())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Synchronous call
|
|
||||||
std::lock_guard<std::mutex> guard(loopMutex_, std::adopt_lock);
|
|
||||||
return message.send();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is the loop on and we are in the same thread? => Go for synchronous call
|
|
||||||
if (loopThreadId == std::this_thread::get_id())
|
|
||||||
{
|
|
||||||
assert(!loopMutex_.try_lock());
|
|
||||||
return message.send(); // Synchronous call
|
|
||||||
}
|
|
||||||
|
|
||||||
// We are in a different thread than the loop thread => Asynchronous call
|
|
||||||
auto request = std::make_unique<SignalEmissionRequest>();
|
|
||||||
request->msg = message;
|
|
||||||
queueUserRequest(std::move(request));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<sdbus::internal::IConnection> Connection::clone() const
|
std::unique_ptr<sdbus::internal::IConnection> Connection::clone() const
|
||||||
@ -413,10 +413,14 @@ void Connection::notifyProcessingLoop()
|
|||||||
{
|
{
|
||||||
assert(notificationFd_ >= 0);
|
assert(notificationFd_ >= 0);
|
||||||
|
|
||||||
uint64_t value = 1;
|
for (int i = 0; i < 1; ++i)
|
||||||
auto r = write(notificationFd_, &value, sizeof(value));
|
{
|
||||||
|
//std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify processing loop", -errno);
|
uint64_t value = 1;
|
||||||
|
auto r = write(notificationFd_, &value, sizeof(value));
|
||||||
|
std::cerr << "Wrote to notification fd " << notificationFd_ << std::endl;
|
||||||
|
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify processing loop", -errno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::notifyProcessingLoopToExit()
|
void Connection::notifyProcessingLoopToExit()
|
||||||
@ -445,11 +449,12 @@ bool Connection::processPendingRequest()
|
|||||||
return r > 0;
|
return r > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::queueUserRequest(std::unique_ptr<IUserRequest>&& request)
|
void Connection::queueUserRequest(UserRequest&& request)
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> guard(userRequestsMutex_);
|
std::lock_guard<std::mutex> guard(userRequestsMutex_);
|
||||||
userRequests_.push(std::move(request));
|
userRequests_.push(std::move(request));
|
||||||
|
std::cerr << "Pushed to user request queue. Size: " << userRequests_.size() << std::endl;
|
||||||
}
|
}
|
||||||
notifyProcessingLoop();
|
notifyProcessingLoop();
|
||||||
}
|
}
|
||||||
@ -459,8 +464,8 @@ void Connection::processUserRequests()
|
|||||||
std::lock_guard<std::mutex> guard(userRequestsMutex_);
|
std::lock_guard<std::mutex> guard(userRequestsMutex_);
|
||||||
while (!userRequests_.empty())
|
while (!userRequests_.empty())
|
||||||
{
|
{
|
||||||
auto& reply = userRequests_.front();
|
auto& request = userRequests_.front();
|
||||||
reply->process();
|
request();
|
||||||
userRequests_.pop();
|
userRequests_.pop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -483,22 +488,31 @@ Connection::WaitResult Connection::waitForNextRequest()
|
|||||||
uint64_t usec;
|
uint64_t usec;
|
||||||
sd_bus_get_timeout(bus, &usec);
|
sd_bus_get_timeout(bus, &usec);
|
||||||
|
|
||||||
struct pollfd fds[] = {{sdbusFd, sdbusEvents, 0}, {notificationFd_, POLLIN, 0}};
|
struct pollfd fds[] = {{sdbusFd, sdbusEvents, 0}, {notificationFd_, POLLIN | POLLHUP | POLLERR | POLLNVAL, 0}};
|
||||||
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
|
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
|
||||||
|
|
||||||
|
std::cerr << "[lt] Going to poll on fs " << sdbusFd << ", " << notificationFd_ << " with timeout " << usec << " and fdscount == " << fdsCount << std::endl;
|
||||||
r = poll(fds, fdsCount, usec == (uint64_t) -1 ? -1 : (usec+999)/1000);
|
r = poll(fds, fdsCount, usec == (uint64_t) -1 ? -1 : (usec+999)/1000);
|
||||||
|
|
||||||
if (r < 0 && errno == EINTR)
|
if (r < 0 && errno == EINTR)
|
||||||
|
{
|
||||||
|
std::cerr << "<<<>>>> GOT EINTR" << std::endl;
|
||||||
return {true, false}; // Try again
|
return {true, false}; // Try again
|
||||||
|
}
|
||||||
|
|
||||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno);
|
SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno);
|
||||||
|
|
||||||
|
if ((fds[1].revents & POLLHUP) || (fds[1].revents & POLLERR) || ((fds[1].revents & POLLNVAL)))
|
||||||
|
{
|
||||||
|
std::cerr << "!!!!!!!!!! Something went wrong on polling" << std::endl;
|
||||||
|
}
|
||||||
if (fds[1].revents & POLLIN)
|
if (fds[1].revents & POLLIN)
|
||||||
{
|
{
|
||||||
if (exitLoopThread_)
|
if (exitLoopThread_)
|
||||||
return {false, false}; // Got exit notification
|
return {false, false}; // Got exit notification
|
||||||
|
|
||||||
// Otherwise we have some async messages to process
|
// Otherwise we have some user requests to process
|
||||||
|
std::cerr << "Loop found it has some async requests to process" << std::endl;
|
||||||
|
|
||||||
uint64_t value{};
|
uint64_t value{};
|
||||||
auto r = read(notificationFd_, &value, sizeof(value));
|
auto r = read(notificationFd_, &value, sizeof(value));
|
||||||
|
108
src/Connection.h
108
src/Connection.h
@ -99,104 +99,14 @@ namespace sdbus { namespace internal {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO move down
|
using UserRequest = std::function<void()>;
|
||||||
struct IUserRequest
|
|
||||||
{
|
|
||||||
virtual void process() = 0;
|
|
||||||
virtual ~IUserRequest() = default;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct MethodCallRequest : IUserRequest
|
|
||||||
{
|
|
||||||
MethodCall msg;
|
|
||||||
std::promise<MethodReply> result;
|
|
||||||
|
|
||||||
void process() override
|
|
||||||
{
|
|
||||||
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
|
|
||||||
|
|
||||||
auto reply = msg.send();
|
|
||||||
result.set_value(std::move(reply));
|
|
||||||
|
|
||||||
onSdbusError.dismiss();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct AsyncMethodCallRequest : IUserRequest
|
|
||||||
{
|
|
||||||
AsyncMethodCall msg;
|
|
||||||
void* callback;
|
|
||||||
void* userData;
|
|
||||||
|
|
||||||
// TODO: Catch exception and store to promise?
|
|
||||||
void process() override
|
|
||||||
{
|
|
||||||
msg.send(callback, userData);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct MethodReplyRequest : IUserRequest
|
|
||||||
{
|
|
||||||
MethodReply msg;
|
|
||||||
|
|
||||||
// TODO: Catch exception and store to promise?
|
|
||||||
void process() override
|
|
||||||
{
|
|
||||||
msg.send();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct SignalEmissionRequest : IUserRequest
|
|
||||||
{
|
|
||||||
Signal msg;
|
|
||||||
|
|
||||||
// TODO: Catch exception and store to promise?
|
|
||||||
void process() override
|
|
||||||
{
|
|
||||||
msg.send();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct SignalRegistrationRequest : IUserRequest
|
|
||||||
{
|
|
||||||
std::function<void*()> registerSignalHandler;
|
|
||||||
std::promise<void*> result;
|
|
||||||
|
|
||||||
void process() override
|
|
||||||
{
|
|
||||||
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
|
|
||||||
|
|
||||||
assert(registerSignalHandler);
|
|
||||||
void* slot = registerSignalHandler();
|
|
||||||
result.set_value(slot);
|
|
||||||
|
|
||||||
onSdbusError.dismiss();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
struct SignalUnregistrationRequest : IUserRequest
|
|
||||||
{
|
|
||||||
std::function<void()> unregisterSignalHandler;
|
|
||||||
std::promise<void> result;
|
|
||||||
|
|
||||||
void process() override
|
|
||||||
{
|
|
||||||
SCOPE_EXIT_NAMED(onSdbusError){ result.set_exception(std::current_exception()); };
|
|
||||||
|
|
||||||
assert(unregisterSignalHandler);
|
|
||||||
unregisterSignalHandler();
|
|
||||||
result.set_value();
|
|
||||||
|
|
||||||
onSdbusError.dismiss();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
static sd_bus* openBus(Connection::BusType type);
|
static sd_bus* openBus(Connection::BusType type);
|
||||||
static void finishHandshake(sd_bus* bus);
|
static void finishHandshake(sd_bus* bus);
|
||||||
static int createLoopNotificationDescriptor();
|
static int createLoopNotificationDescriptor();
|
||||||
static void closeLoopNotificationDescriptor(int fd);
|
static void closeLoopNotificationDescriptor(int fd);
|
||||||
bool processPendingRequest();
|
bool processPendingRequest();
|
||||||
void queueUserRequest(std::unique_ptr<IUserRequest>&& request);
|
void queueUserRequest(UserRequest&& request);
|
||||||
void processUserRequests();
|
void processUserRequests();
|
||||||
WaitResult waitForNextRequest();
|
WaitResult waitForNextRequest();
|
||||||
static std::string composeSignalMatchFilter( const std::string& objectPath
|
static std::string composeSignalMatchFilter( const std::string& objectPath
|
||||||
@ -206,13 +116,25 @@ namespace sdbus { namespace internal {
|
|||||||
void notifyProcessingLoopToExit();
|
void notifyProcessingLoopToExit();
|
||||||
void joinWithProcessingLoop();
|
void joinWithProcessingLoop();
|
||||||
|
|
||||||
|
// TODO move this and threading logic and method around it to separate class?
|
||||||
|
template <typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int> = 0>
|
||||||
|
inline auto tryExecuteSync(_Callable&& fnc, const _Args&... args);
|
||||||
|
template <typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int> = 0>
|
||||||
|
inline auto tryExecuteSync(_Callable&& fnc, const _Args&... args);
|
||||||
|
template <typename _Callable, typename... _Args, std::enable_if_t<std::is_void<function_result_t<_Callable>>::value, int> = 0>
|
||||||
|
inline void executeAsync(_Callable&& fnc, const _Args&... args);
|
||||||
|
template <typename _Callable, typename... _Args, std::enable_if_t<!std::is_void<function_result_t<_Callable>>::value, int> = 0>
|
||||||
|
inline auto executeAsync(_Callable&& fnc, const _Args&... args);
|
||||||
|
template <typename _Callable, typename... _Args>
|
||||||
|
inline void executeAsyncAndDontWaitForResult(_Callable&& fnc, const _Args&... args);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<sd_bus, decltype(&sd_bus_flush_close_unref)> bus_{nullptr, &sd_bus_flush_close_unref};
|
std::unique_ptr<sd_bus, decltype(&sd_bus_flush_close_unref)> bus_{nullptr, &sd_bus_flush_close_unref};
|
||||||
std::thread asyncLoopThread_;
|
std::thread asyncLoopThread_;
|
||||||
std::atomic<std::thread::id> loopThreadId_;
|
std::atomic<std::thread::id> loopThreadId_;
|
||||||
std::mutex loopMutex_;
|
std::mutex loopMutex_;
|
||||||
|
|
||||||
std::queue<std::unique_ptr<IUserRequest>> userRequests_;
|
std::queue<UserRequest> userRequests_;
|
||||||
std::mutex userRequestsMutex_;
|
std::mutex userRequestsMutex_;
|
||||||
|
|
||||||
std::atomic<bool> exitLoopThread_;
|
std::atomic<bool> exitLoopThread_;
|
||||||
|
@ -30,6 +30,8 @@
|
|||||||
#include "IConnection.h"
|
#include "IConnection.h"
|
||||||
#include <systemd/sd-bus.h>
|
#include <systemd/sd-bus.h>
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
|
#include <chrono>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
namespace sdbus { namespace internal {
|
namespace sdbus { namespace internal {
|
||||||
|
|
||||||
@ -49,7 +51,8 @@ ObjectProxy::ObjectProxy( std::unique_ptr<sdbus::internal::IConnection>&& connec
|
|||||||
{
|
{
|
||||||
// The connection is ours only, so we have to manage event loop upon this connection,
|
// The connection is ours only, so we have to manage event loop upon this connection,
|
||||||
// so we get signals, async replies, and other messages from D-Bus.
|
// so we get signals, async replies, and other messages from D-Bus.
|
||||||
// TODO uncomment connection_->enterProcessingLoopAsync();
|
connection_->enterProcessingLoopAsync();
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const std::string& methodName)
|
MethodCall ObjectProxy::createMethodCall(const std::string& interfaceName, const std::string& methodName)
|
||||||
|
@ -59,10 +59,12 @@ public:
|
|||||||
{
|
{
|
||||||
m_connection.requestName(INTERFACE_NAME);
|
m_connection.requestName(INTERFACE_NAME);
|
||||||
m_connection.enterProcessingLoopAsync();
|
m_connection.enterProcessingLoopAsync();
|
||||||
|
//m_connection2.enterProcessingLoopAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void TearDownTestCase()
|
static void TearDownTestCase()
|
||||||
{
|
{
|
||||||
|
//m_connection2.leaveProcessingLoop();
|
||||||
m_connection.leaveProcessingLoop();
|
m_connection.leaveProcessingLoop();
|
||||||
m_connection.releaseName(INTERFACE_NAME);
|
m_connection.releaseName(INTERFACE_NAME);
|
||||||
}
|
}
|
||||||
@ -71,7 +73,7 @@ private:
|
|||||||
void SetUp() override
|
void SetUp() override
|
||||||
{
|
{
|
||||||
m_adaptor = std::make_unique<TestingAdaptor>(m_connection);
|
m_adaptor = std::make_unique<TestingAdaptor>(m_connection);
|
||||||
m_proxy = std::make_unique<TestingProxy>(INTERFACE_NAME, OBJECT_PATH);
|
m_proxy = std::make_unique<TestingProxy>(/*m_connection2, */INTERFACE_NAME, OBJECT_PATH);
|
||||||
std::this_thread::sleep_for(50ms); // Give time for the proxy to start listening to signals
|
std::this_thread::sleep_for(50ms); // Give time for the proxy to start listening to signals
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,12 +85,14 @@ private:
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
static sdbus::internal::Connection m_connection;
|
static sdbus::internal::Connection m_connection;
|
||||||
|
//static sdbus::internal::Connection m_connection2;
|
||||||
|
|
||||||
std::unique_ptr<TestingAdaptor> m_adaptor;
|
std::unique_ptr<TestingAdaptor> m_adaptor;
|
||||||
std::unique_ptr<TestingProxy> m_proxy;
|
std::unique_ptr<TestingProxy> m_proxy;
|
||||||
};
|
};
|
||||||
|
|
||||||
sdbus::internal::Connection AdaptorAndProxyFixture::m_connection{sdbus::internal::Connection::BusType::eSystem};
|
sdbus::internal::Connection AdaptorAndProxyFixture::m_connection{sdbus::internal::Connection::BusType::eSystem};
|
||||||
|
//sdbus::internal::Connection AdaptorAndProxyFixture::m_connection2{sdbus::internal::Connection::BusType::eSystem};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,16 +100,16 @@ sdbus::internal::Connection AdaptorAndProxyFixture::m_connection{sdbus::internal
|
|||||||
/* -- TEST CASES -- */
|
/* -- TEST CASES -- */
|
||||||
/*-------------------------------------*/
|
/*-------------------------------------*/
|
||||||
|
|
||||||
TEST(AdaptorAndProxy, CanBeConstructedSuccesfully)
|
//TEST(AdaptorAndProxy, CanBeConstructedSuccesfully)
|
||||||
{
|
//{
|
||||||
auto connection = sdbus::createConnection();
|
// auto connection = sdbus::createConnection();
|
||||||
connection->requestName(INTERFACE_NAME);
|
// connection->requestName(INTERFACE_NAME);
|
||||||
|
|
||||||
ASSERT_NO_THROW(TestingAdaptor adaptor(*connection));
|
// ASSERT_NO_THROW(TestingAdaptor adaptor(*connection));
|
||||||
ASSERT_NO_THROW(TestingProxy proxy(INTERFACE_NAME, OBJECT_PATH));
|
// ASSERT_NO_THROW(TestingProxy proxy(INTERFACE_NAME, OBJECT_PATH));
|
||||||
|
|
||||||
connection->releaseName(INTERFACE_NAME);
|
// connection->releaseName(INTERFACE_NAME);
|
||||||
}
|
//}
|
||||||
|
|
||||||
// Methods
|
// Methods
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ public:
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
void noArgNoReturn() const { }
|
void noArgNoReturn() const { std::cerr << "Server: noArgNoReturn() called;" << std::endl;}
|
||||||
|
|
||||||
int32_t getInt() const { return INT32_VALUE; }
|
int32_t getInt() const { return INT32_VALUE; }
|
||||||
|
|
||||||
|
@ -90,72 +90,84 @@ int main(int /*argc*/, char */*argv*/[])
|
|||||||
{
|
{
|
||||||
const char* destinationName = "org.sdbuscpp.perftest";
|
const char* destinationName = "org.sdbuscpp.perftest";
|
||||||
const char* objectPath = "/org/sdbuscpp/perftest";
|
const char* objectPath = "/org/sdbuscpp/perftest";
|
||||||
PerftestClient client(destinationName, objectPath);
|
//PerftestClient client(destinationName, objectPath);
|
||||||
|
|
||||||
const unsigned int repetitions{2};
|
const unsigned int repetitions{2};
|
||||||
unsigned int msgCount = 1000;
|
unsigned int msgCount = 1000;
|
||||||
unsigned int msgSize{};
|
unsigned int msgSize{};
|
||||||
|
|
||||||
msgSize = 20;
|
|
||||||
std::cout << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
|
||||||
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
|
|
||||||
for (unsigned int r = 0; r < repetitions; ++r)
|
|
||||||
{
|
{
|
||||||
client.sendDataSignals(msgCount, msgSize);
|
PerftestClient client(destinationName, objectPath);
|
||||||
|
msgSize = 20;
|
||||||
std::this_thread::sleep_for(1000ms);
|
std::cout << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
||||||
}
|
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
|
||||||
|
for (unsigned int r = 0; r < repetitions; ++r)
|
||||||
msgSize = 1000;
|
|
||||||
std::cout << std::endl << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
|
||||||
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
|
|
||||||
for (unsigned int r = 0; r < repetitions; ++r)
|
|
||||||
{
|
|
||||||
client.sendDataSignals(msgCount, msgSize);
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(1000ms);
|
|
||||||
}
|
|
||||||
|
|
||||||
msgSize = 20;
|
|
||||||
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
|
||||||
for (unsigned int r = 0; r < repetitions; ++r)
|
|
||||||
{
|
|
||||||
auto str1 = createRandomString(msgSize/2);
|
|
||||||
auto str2 = createRandomString(msgSize/2);
|
|
||||||
|
|
||||||
auto startTime = std::chrono::steady_clock::now();
|
|
||||||
for (unsigned int i = 0; i < msgCount; i++)
|
|
||||||
{
|
{
|
||||||
auto result = client.concatenateTwoStrings(str1, str2);
|
client.sendDataSignals(msgCount, msgSize);
|
||||||
|
|
||||||
assert(result.size() == str1.size() + str2.size());
|
std::this_thread::sleep_for(1000ms);
|
||||||
assert(result.size() == msgSize);
|
|
||||||
}
|
}
|
||||||
auto stopTime = std::chrono::steady_clock::now();
|
|
||||||
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(1000ms);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msgSize = 1000;
|
|
||||||
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
|
||||||
for (unsigned int r = 0; r < repetitions; ++r)
|
|
||||||
{
|
{
|
||||||
auto str1 = createRandomString(msgSize/2);
|
PerftestClient client(destinationName, objectPath);
|
||||||
auto str2 = createRandomString(msgSize/2);
|
msgSize = 1000;
|
||||||
|
std::cout << std::endl << "** Measuring signals of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
||||||
auto startTime = std::chrono::steady_clock::now();
|
client.m_msgCount = msgCount; client.m_msgSize = msgSize;
|
||||||
for (unsigned int i = 0; i < msgCount; i++)
|
for (unsigned int r = 0; r < repetitions; ++r)
|
||||||
{
|
{
|
||||||
auto result = client.concatenateTwoStrings(str1, str2);
|
client.sendDataSignals(msgCount, msgSize);
|
||||||
|
|
||||||
assert(result.size() == str1.size() + str2.size());
|
std::this_thread::sleep_for(1000ms);
|
||||||
assert(result.size() == msgSize);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
PerftestClient client(destinationName, objectPath);
|
||||||
|
msgSize = 20;
|
||||||
|
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
||||||
|
for (unsigned int r = 0; r < repetitions; ++r)
|
||||||
|
{
|
||||||
|
auto str1 = createRandomString(msgSize/2);
|
||||||
|
auto str2 = createRandomString(msgSize/2);
|
||||||
|
|
||||||
|
auto startTime = std::chrono::steady_clock::now();
|
||||||
|
for (unsigned int i = 0; i < msgCount; i++)
|
||||||
|
{
|
||||||
|
auto result = client.concatenateTwoStrings(str1, str2);
|
||||||
|
|
||||||
|
assert(result.size() == str1.size() + str2.size());
|
||||||
|
assert(result.size() == msgSize);
|
||||||
|
}
|
||||||
|
auto stopTime = std::chrono::steady_clock::now();
|
||||||
|
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(1000ms);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
PerftestClient client(destinationName, objectPath);
|
||||||
|
msgSize = 1000;
|
||||||
|
std::cout << std::endl << "** Measuring method calls of size " << msgSize << " bytes (" << repetitions << " repetitions)..." << std::endl << std::endl;
|
||||||
|
for (unsigned int r = 0; r < repetitions; ++r)
|
||||||
|
{
|
||||||
|
auto str1 = createRandomString(msgSize/2);
|
||||||
|
auto str2 = createRandomString(msgSize/2);
|
||||||
|
|
||||||
|
auto startTime = std::chrono::steady_clock::now();
|
||||||
|
for (unsigned int i = 0; i < msgCount; i++)
|
||||||
|
{
|
||||||
|
auto result = client.concatenateTwoStrings(str1, str2);
|
||||||
|
|
||||||
|
assert(result.size() == str1.size() + str2.size());
|
||||||
|
assert(result.size() == msgSize);
|
||||||
|
}
|
||||||
|
auto stopTime = std::chrono::steady_clock::now();
|
||||||
|
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(1000ms);
|
||||||
}
|
}
|
||||||
auto stopTime = std::chrono::steady_clock::now();
|
|
||||||
std::cout << "Called " << msgCount << " methods in: " << std::chrono::duration_cast<std::chrono::milliseconds>(stopTime - startTime).count() << " ms" << std::endl;
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(1000ms);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
Reference in New Issue
Block a user