forked from Kistler-Group/sdbus-cpp
refactor: simplify async D-Bus connection handling
This commit is contained in:
committed by
Stanislav Angelovič
parent
7450515d0b
commit
3e20fc639e
@@ -35,53 +35,54 @@
|
||||
#include <unistd.h>
|
||||
#include <poll.h>
|
||||
#include <sys/eventfd.h>
|
||||
#include <cstdint>
|
||||
|
||||
namespace sdbus::internal {
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, const BusFactory& busFactory)
|
||||
: iface_(std::move(interface))
|
||||
: sdbus_(std::move(interface))
|
||||
, bus_(openBus(busFactory))
|
||||
{
|
||||
assert(iface_ != nullptr);
|
||||
assert(sdbus_ != nullptr);
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, default_bus_t)
|
||||
: Connection(std::move(interface), [this](sd_bus** bus){ return iface_->sd_bus_open(bus); })
|
||||
: Connection(std::move(interface), [this](sd_bus** bus){ return sdbus_->sd_bus_open(bus); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, system_bus_t)
|
||||
: Connection(std::move(interface), [this](sd_bus** bus){ return iface_->sd_bus_open_system(bus); })
|
||||
: Connection(std::move(interface), [this](sd_bus** bus){ return sdbus_->sd_bus_open_system(bus); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, session_bus_t)
|
||||
: Connection(std::move(interface), [this](sd_bus** bus){ return iface_->sd_bus_open_user(bus); })
|
||||
: Connection(std::move(interface), [this](sd_bus** bus){ return sdbus_->sd_bus_open_user(bus); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, custom_session_bus_t, const std::string& address)
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return iface_->sd_bus_open_user_with_address(bus, address.c_str()); })
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return sdbus_->sd_bus_open_user_with_address(bus, address.c_str()); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, remote_system_bus_t, const std::string& host)
|
||||
: Connection(std::move(interface), [this, &host](sd_bus** bus){ return iface_->sd_bus_open_system_remote(bus, host.c_str()); })
|
||||
: Connection(std::move(interface), [this, &host](sd_bus** bus){ return sdbus_->sd_bus_open_system_remote(bus, host.c_str()); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, private_bus_t, const std::string& address)
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return iface_->sd_bus_open_direct(bus, address.c_str()); })
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return sdbus_->sd_bus_open_direct(bus, address.c_str()); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, private_bus_t, int fd)
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return iface_->sd_bus_open_direct(bus, fd); })
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return sdbus_->sd_bus_open_direct(bus, fd); })
|
||||
{
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, server_bus_t, int fd)
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return iface_->sd_bus_open_server(bus, fd); })
|
||||
: Connection(std::move(interface), [&](sd_bus** bus) { return sdbus_->sd_bus_open_server(bus, fd); })
|
||||
{
|
||||
}
|
||||
|
||||
@@ -91,10 +92,10 @@ Connection::Connection(std::unique_ptr<ISdBus>&& interface, sdbus_bus_t, sd_bus
|
||||
}
|
||||
|
||||
Connection::Connection(std::unique_ptr<ISdBus>&& interface, pseudo_bus_t)
|
||||
: iface_(std::move(interface))
|
||||
: sdbus_(std::move(interface))
|
||||
, bus_(openPseudoBus())
|
||||
{
|
||||
assert(iface_ != nullptr);
|
||||
assert(sdbus_ != nullptr);
|
||||
}
|
||||
|
||||
Connection::~Connection()
|
||||
@@ -106,46 +107,42 @@ void Connection::requestName(const std::string& name)
|
||||
{
|
||||
SDBUS_CHECK_SERVICE_NAME(name);
|
||||
|
||||
auto r = iface_->sd_bus_request_name(bus_.get(), name.c_str(), 0);
|
||||
auto r = sdbus_->sd_bus_request_name(bus_.get(), name.c_str(), 0);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to request bus name", -r);
|
||||
|
||||
// In some cases we need to explicitly notify the event loop
|
||||
// to process messages that may have arrived while executing the call.
|
||||
notifyEventLoop(eventFd_.fd);
|
||||
// to process messages that may have arrived while executing the call
|
||||
wakeUpEventLoopIfMessagesInQueue();
|
||||
}
|
||||
|
||||
void Connection::releaseName(const std::string& name)
|
||||
{
|
||||
auto r = iface_->sd_bus_release_name(bus_.get(), name.c_str());
|
||||
auto r = sdbus_->sd_bus_release_name(bus_.get(), name.c_str());
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to release bus name", -r);
|
||||
|
||||
// In some cases we need to explicitly notify the event loop
|
||||
// to process messages that may have arrived while executing the call.
|
||||
notifyEventLoop(eventFd_.fd);
|
||||
// to process messages that may have arrived while executing the call
|
||||
wakeUpEventLoopIfMessagesInQueue();
|
||||
}
|
||||
|
||||
std::string Connection::getUniqueName() const
|
||||
{
|
||||
const char* unique = nullptr;
|
||||
auto r = iface_->sd_bus_get_unique_name(bus_.get(), &unique);
|
||||
auto r = sdbus_->sd_bus_get_unique_name(bus_.get(), &unique);
|
||||
SDBUS_THROW_ERROR_IF(r < 0 || unique == nullptr, "Failed to get unique bus name", -r);
|
||||
return unique;
|
||||
}
|
||||
|
||||
void Connection::enterEventLoop()
|
||||
{
|
||||
loopThreadId_ = std::this_thread::get_id();
|
||||
SCOPE_EXIT{ loopThreadId_ = std::thread::id{}; };
|
||||
|
||||
std::lock_guard guard(loopMutex_);
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto processed = processPendingRequest();
|
||||
if (processed)
|
||||
continue; // Process next one
|
||||
// Process one pending event
|
||||
(void)processPendingEvent();
|
||||
|
||||
auto success = waitForNextRequest();
|
||||
// And go to poll(), which wakes us up right away
|
||||
// if there's another pending event, or sleeps otherwise.
|
||||
auto success = waitForNextEvent();
|
||||
if (!success)
|
||||
break; // Exit I/O event loop
|
||||
}
|
||||
@@ -166,20 +163,24 @@ void Connection::leaveEventLoop()
|
||||
Connection::PollData Connection::getEventLoopPollData() const
|
||||
{
|
||||
ISdBus::PollData pollData{};
|
||||
auto r = iface_->sd_bus_get_poll_data(bus_.get(), &pollData);
|
||||
auto r = sdbus_->sd_bus_get_poll_data(bus_.get(), &pollData);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get bus poll data", -r);
|
||||
|
||||
return {pollData.fd, pollData.events, pollData.timeout_usec};
|
||||
assert(eventFd_.fd >= 0);
|
||||
|
||||
auto timeout = pollData.timeout_usec == UINT64_MAX ? std::chrono::microseconds::max() : std::chrono::microseconds(pollData.timeout_usec);
|
||||
|
||||
return {pollData.fd, pollData.events, timeout, eventFd_.fd};
|
||||
}
|
||||
|
||||
const ISdBus& Connection::getSdBusInterface() const
|
||||
{
|
||||
return *iface_.get();
|
||||
return *sdbus_.get();
|
||||
}
|
||||
|
||||
ISdBus& Connection::getSdBusInterface()
|
||||
{
|
||||
return *iface_.get();
|
||||
return *sdbus_.get();
|
||||
}
|
||||
|
||||
void Connection::addObjectManager(const std::string& objectPath)
|
||||
@@ -189,7 +190,7 @@ void Connection::addObjectManager(const std::string& objectPath)
|
||||
|
||||
void Connection::addObjectManager(const std::string& objectPath, floating_slot_t)
|
||||
{
|
||||
auto r = iface_->sd_bus_add_object_manager(bus_.get(), nullptr, objectPath.c_str());
|
||||
auto r = sdbus_->sd_bus_add_object_manager(bus_.get(), nullptr, objectPath.c_str());
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to add object manager", -r);
|
||||
}
|
||||
@@ -198,16 +199,16 @@ Slot Connection::addObjectManager(const std::string& objectPath, request_slot_t)
|
||||
{
|
||||
sd_bus_slot *slot{};
|
||||
|
||||
auto r = iface_->sd_bus_add_object_manager(bus_.get(), &slot, objectPath.c_str());
|
||||
auto r = sdbus_->sd_bus_add_object_manager(bus_.get(), &slot, objectPath.c_str());
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to add object manager", -r);
|
||||
|
||||
return {slot, [this](void *slot){ iface_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
|
||||
return {slot, [this](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
|
||||
}
|
||||
|
||||
void Connection::setMethodCallTimeout(uint64_t timeout)
|
||||
{
|
||||
auto r = iface_->sd_bus_set_method_call_timeout(bus_.get(), timeout);
|
||||
auto r = sdbus_->sd_bus_set_method_call_timeout(bus_.get(), timeout);
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to set method call timeout", -r);
|
||||
}
|
||||
@@ -216,7 +217,7 @@ uint64_t Connection::getMethodCallTimeout() const
|
||||
{
|
||||
uint64_t timeout;
|
||||
|
||||
auto r = iface_->sd_bus_get_method_call_timeout(bus_.get(), &timeout);
|
||||
auto r = sdbus_->sd_bus_get_method_call_timeout(bus_.get(), &timeout);
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get method call timeout", -r);
|
||||
|
||||
@@ -229,13 +230,13 @@ Slot Connection::addMatch(const std::string& match, message_handler callback)
|
||||
|
||||
auto matchInfo = std::make_unique<MatchInfo>(MatchInfo{std::move(callback), {}, *this, {}});
|
||||
|
||||
auto r = iface_->sd_bus_add_match(bus_.get(), &matchInfo->slot, match.c_str(), &Connection::sdbus_match_callback, matchInfo.get());
|
||||
auto r = sdbus_->sd_bus_add_match(bus_.get(), &matchInfo->slot, match.c_str(), &Connection::sdbus_match_callback, matchInfo.get());
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to add match", -r);
|
||||
|
||||
return {matchInfo.release(), [this](void *ptr)
|
||||
{
|
||||
auto* matchInfo = static_cast<MatchInfo*>(ptr);
|
||||
iface_->sd_bus_slot_unref(matchInfo->slot);
|
||||
sdbus_->sd_bus_slot_unref(matchInfo->slot);
|
||||
std::default_delete<MatchInfo>{}(matchInfo);
|
||||
}};
|
||||
}
|
||||
@@ -252,7 +253,7 @@ Slot Connection::addMatchAsync(const std::string& match, message_handler callbac
|
||||
sd_bus_message_handler_t sdbusInstallCallback = installCallback ? &Connection::sdbus_match_install_callback : nullptr;
|
||||
auto matchInfo = std::make_unique<MatchInfo>(MatchInfo{std::move(callback), std::move(installCallback), *this, {}});
|
||||
|
||||
auto r = iface_->sd_bus_add_match_async( bus_.get()
|
||||
auto r = sdbus_->sd_bus_add_match_async( bus_.get()
|
||||
, &matchInfo->slot
|
||||
, match.c_str()
|
||||
, &Connection::sdbus_match_callback
|
||||
@@ -263,7 +264,7 @@ Slot Connection::addMatchAsync(const std::string& match, message_handler callbac
|
||||
return {matchInfo.release(), [this](void *ptr)
|
||||
{
|
||||
auto* matchInfo = static_cast<MatchInfo*>(ptr);
|
||||
iface_->sd_bus_slot_unref(matchInfo->slot);
|
||||
sdbus_->sd_bus_slot_unref(matchInfo->slot);
|
||||
std::default_delete<MatchInfo>{}(matchInfo);
|
||||
}};
|
||||
}
|
||||
@@ -280,7 +281,7 @@ Slot Connection::addObjectVTable( const std::string& objectPath
|
||||
{
|
||||
sd_bus_slot *slot{};
|
||||
|
||||
auto r = iface_->sd_bus_add_object_vtable( bus_.get()
|
||||
auto r = sdbus_->sd_bus_add_object_vtable(bus_.get()
|
||||
, &slot
|
||||
, objectPath.c_str()
|
||||
, interfaceName.c_str()
|
||||
@@ -289,18 +290,18 @@ Slot Connection::addObjectVTable( const std::string& objectPath
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to register object vtable", -r);
|
||||
|
||||
return {slot, [this](void *slot){ iface_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
|
||||
return {slot, [this](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
|
||||
}
|
||||
|
||||
PlainMessage Connection::createPlainMessage() const
|
||||
{
|
||||
sd_bus_message* sdbusMsg{};
|
||||
|
||||
auto r = iface_->sd_bus_message_new(bus_.get(), &sdbusMsg, _SD_BUS_MESSAGE_TYPE_INVALID);
|
||||
auto r = sdbus_->sd_bus_message_new(bus_.get(), &sdbusMsg, _SD_BUS_MESSAGE_TYPE_INVALID);
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create a plain message", -r);
|
||||
|
||||
return Message::Factory::create<PlainMessage>(sdbusMsg, iface_.get(), adopt_message);
|
||||
return Message::Factory::create<PlainMessage>(sdbusMsg, sdbus_.get(), adopt_message);
|
||||
}
|
||||
|
||||
MethodCall Connection::createMethodCall( const std::string& destination
|
||||
@@ -310,7 +311,7 @@ MethodCall Connection::createMethodCall( const std::string& destination
|
||||
{
|
||||
sd_bus_message *sdbusMsg{};
|
||||
|
||||
auto r = iface_->sd_bus_message_new_method_call( bus_.get()
|
||||
auto r = sdbus_->sd_bus_message_new_method_call(bus_.get()
|
||||
, &sdbusMsg
|
||||
, destination.empty() ? nullptr : destination.c_str()
|
||||
, objectPath.c_str()
|
||||
@@ -319,7 +320,7 @@ MethodCall Connection::createMethodCall( const std::string& destination
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method call", -r);
|
||||
|
||||
return Message::Factory::create<MethodCall>(sdbusMsg, iface_.get(), this, adopt_message);
|
||||
return Message::Factory::create<MethodCall>(sdbusMsg, sdbus_.get(), adopt_message);
|
||||
}
|
||||
|
||||
Signal Connection::createSignal( const std::string& objectPath
|
||||
@@ -328,7 +329,7 @@ Signal Connection::createSignal( const std::string& objectPath
|
||||
{
|
||||
sd_bus_message *sdbusMsg{};
|
||||
|
||||
auto r = iface_->sd_bus_message_new_signal( bus_.get()
|
||||
auto r = sdbus_->sd_bus_message_new_signal(bus_.get()
|
||||
, &sdbusMsg
|
||||
, objectPath.c_str()
|
||||
, interfaceName.c_str()
|
||||
@@ -336,7 +337,47 @@ Signal Connection::createSignal( const std::string& objectPath
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create signal", -r);
|
||||
|
||||
return Message::Factory::create<Signal>(sdbusMsg, iface_.get(), adopt_message);
|
||||
return Message::Factory::create<Signal>(sdbusMsg, sdbus_.get(), adopt_message);
|
||||
}
|
||||
|
||||
MethodReply Connection::callMethod(const MethodCall& message, uint64_t timeout)
|
||||
{
|
||||
// If the call expects reply, this call will block the bus connection from
|
||||
// serving other messages until the reply arrives or the call times out.
|
||||
auto reply = message.send(timeout);
|
||||
|
||||
// Wake up event loop to process messages that may have arrived in the meantime...
|
||||
wakeUpEventLoopIfMessagesInQueue();
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
void Connection::callMethod(const MethodCall& message, void* callback, void* userData, uint64_t timeout, floating_slot_t)
|
||||
{
|
||||
// TODO: Think of ways of optimizing these three locking/unlocking of sdbus mutex (merge into one call?)
|
||||
auto timeoutBefore = getEventLoopPollData().timeout;
|
||||
message.send(callback, userData, timeout, floating_slot);
|
||||
auto timeoutAfter = getEventLoopPollData().timeout;
|
||||
|
||||
// An event loop may wait in poll with timeout `t1', while in another thread an async call is made with
|
||||
// timeout `t2'. If `t2' < `t1', then we have to wake up the event loop thread to update its poll timeout.
|
||||
if (timeoutAfter < timeoutBefore)
|
||||
notifyEventLoopToWakeUpFromPoll();
|
||||
}
|
||||
|
||||
Slot Connection::callMethod(const MethodCall& message, void* callback, void* userData, uint64_t timeout)
|
||||
{
|
||||
// TODO: Think of ways of optimizing these three locking/unlocking of sdbus mutex (merge into one call?)
|
||||
auto timeoutBefore = getEventLoopPollData().timeout;
|
||||
auto slot = message.send(callback, userData, timeout);
|
||||
auto timeoutAfter = getEventLoopPollData().timeout;
|
||||
|
||||
// An event loop may wait in poll with timeout `t1', while in another thread an async call is made with
|
||||
// timeout `t2'. If `t2' < `t1', then we have to wake up the event loop thread to update its poll timeout.
|
||||
if (timeoutAfter < timeoutBefore)
|
||||
notifyEventLoopToWakeUpFromPoll();
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
void Connection::emitPropertiesChangedSignal( const std::string& objectPath
|
||||
@@ -345,7 +386,7 @@ void Connection::emitPropertiesChangedSignal( const std::string& objectPath
|
||||
{
|
||||
auto names = to_strv(propNames);
|
||||
|
||||
auto r = iface_->sd_bus_emit_properties_changed_strv( bus_.get()
|
||||
auto r = sdbus_->sd_bus_emit_properties_changed_strv(bus_.get()
|
||||
, objectPath.c_str()
|
||||
, interfaceName.c_str()
|
||||
, propNames.empty() ? nullptr : &names[0] );
|
||||
@@ -355,7 +396,7 @@ void Connection::emitPropertiesChangedSignal( const std::string& objectPath
|
||||
|
||||
void Connection::emitInterfacesAddedSignal(const std::string& objectPath)
|
||||
{
|
||||
auto r = iface_->sd_bus_emit_object_added(bus_.get(), objectPath.c_str());
|
||||
auto r = sdbus_->sd_bus_emit_object_added(bus_.get(), objectPath.c_str());
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to emit InterfacesAdded signal for all registered interfaces", -r);
|
||||
}
|
||||
@@ -365,7 +406,7 @@ void Connection::emitInterfacesAddedSignal( const std::string& objectPath
|
||||
{
|
||||
auto names = to_strv(interfaces);
|
||||
|
||||
auto r = iface_->sd_bus_emit_interfaces_added_strv( bus_.get()
|
||||
auto r = sdbus_->sd_bus_emit_interfaces_added_strv(bus_.get()
|
||||
, objectPath.c_str()
|
||||
, interfaces.empty() ? nullptr : &names[0] );
|
||||
|
||||
@@ -374,7 +415,7 @@ void Connection::emitInterfacesAddedSignal( const std::string& objectPath
|
||||
|
||||
void Connection::emitInterfacesRemovedSignal(const std::string& objectPath)
|
||||
{
|
||||
auto r = iface_->sd_bus_emit_object_removed(bus_.get(), objectPath.c_str());
|
||||
auto r = sdbus_->sd_bus_emit_object_removed(bus_.get(), objectPath.c_str());
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to emit InterfacesRemoved signal for all registered interfaces", -r);
|
||||
}
|
||||
@@ -384,7 +425,7 @@ void Connection::emitInterfacesRemovedSignal( const std::string& objectPath
|
||||
{
|
||||
auto names = to_strv(interfaces);
|
||||
|
||||
auto r = iface_->sd_bus_emit_interfaces_removed_strv( bus_.get()
|
||||
auto r = sdbus_->sd_bus_emit_interfaces_removed_strv(bus_.get()
|
||||
, objectPath.c_str()
|
||||
, interfaces.empty() ? nullptr : &names[0] );
|
||||
|
||||
@@ -404,40 +445,11 @@ Slot Connection::registerSignalHandler( const std::string& sender
|
||||
// https://www.freedesktop.org/software/systemd/man/sd_bus_add_match.html .
|
||||
// But this would require libsystemd v237 or higher.
|
||||
auto filter = composeSignalMatchFilter(sender, objectPath, interfaceName, signalName);
|
||||
auto r = iface_->sd_bus_add_match(bus_.get(), &slot, filter.c_str(), callback, userData);
|
||||
auto r = sdbus_->sd_bus_add_match(bus_.get(), &slot, filter.c_str(), callback, userData);
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to register signal handler", -r);
|
||||
|
||||
return {slot, [this](void *slot){ iface_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
|
||||
}
|
||||
|
||||
MethodReply Connection::tryCallMethodSynchronously(const MethodCall& message, uint64_t timeout)
|
||||
{
|
||||
auto loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
|
||||
|
||||
// Is the loop not yet on? => Go make synchronous call
|
||||
while (loopThreadId == std::thread::id{})
|
||||
{
|
||||
// Did the loop begin in the meantime? Or try_lock() failed spuriously?
|
||||
if (!loopMutex_.try_lock())
|
||||
{
|
||||
loopThreadId = loopThreadId_.load(std::memory_order_relaxed);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Synchronous D-Bus call
|
||||
std::lock_guard guard(loopMutex_, std::adopt_lock);
|
||||
return message.send(timeout);
|
||||
}
|
||||
|
||||
// Is the loop on and we are in the same thread? => Go for synchronous call
|
||||
if (loopThreadId == std::this_thread::get_id())
|
||||
{
|
||||
assert(!loopMutex_.try_lock());
|
||||
return message.send(timeout);
|
||||
}
|
||||
|
||||
return {};
|
||||
return {slot, [this](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
|
||||
}
|
||||
|
||||
Connection::BusPtr Connection::openBus(const BusFactory& busFactory)
|
||||
@@ -446,7 +458,7 @@ Connection::BusPtr Connection::openBus(const BusFactory& busFactory)
|
||||
int r = busFactory(&bus);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to open bus", -r);
|
||||
|
||||
BusPtr busPtr{bus, [this](sd_bus* bus){ return iface_->sd_bus_flush_close_unref(bus); }};
|
||||
BusPtr busPtr{bus, [this](sd_bus* bus){ return sdbus_->sd_bus_flush_close_unref(bus); }};
|
||||
finishHandshake(busPtr.get());
|
||||
return busPtr;
|
||||
}
|
||||
@@ -455,17 +467,17 @@ Connection::BusPtr Connection::openPseudoBus()
|
||||
{
|
||||
sd_bus* bus{};
|
||||
|
||||
int r = iface_->sd_bus_new(&bus);
|
||||
int r = sdbus_->sd_bus_new(&bus);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to open pseudo bus", -r);
|
||||
|
||||
(void)iface_->sd_bus_start(bus);
|
||||
(void)sdbus_->sd_bus_start(bus);
|
||||
// It is expected that sd_bus_start has failed here, returning -EINVAL, due to having
|
||||
// not set a bus address, but it will leave the bus in an OPENING state, which enables
|
||||
// us to create plain D-Bus messages as a local data storage (for Variant, for example),
|
||||
// without dependency on real IPC communication with the D-Bus broker daemon.
|
||||
SDBUS_THROW_ERROR_IF(r < 0 && r != -EINVAL, "Failed to start pseudo bus", -r);
|
||||
|
||||
return {bus, [this](sd_bus* bus){ return iface_->sd_bus_close_unref(bus); }};
|
||||
return {bus, [this](sd_bus* bus){ return sdbus_->sd_bus_close_unref(bus); }};
|
||||
}
|
||||
|
||||
void Connection::finishHandshake(sd_bus* bus)
|
||||
@@ -476,43 +488,29 @@ void Connection::finishHandshake(sd_bus* bus)
|
||||
|
||||
assert(bus != nullptr);
|
||||
|
||||
auto r = iface_->sd_bus_flush(bus);
|
||||
auto r = sdbus_->sd_bus_flush(bus);
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to flush bus on opening", -r);
|
||||
}
|
||||
|
||||
void Connection::notifyEventLoop(int fd) const
|
||||
void Connection::notifyEventLoopToExit()
|
||||
{
|
||||
assert(fd >= 0);
|
||||
|
||||
uint64_t value = 1;
|
||||
auto r = write(fd, &value, sizeof(value));
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify event loop", -errno);
|
||||
loopExitFd_.notify();
|
||||
}
|
||||
|
||||
void Connection::notifyEventLoopToExit() const
|
||||
void Connection::notifyEventLoopToWakeUpFromPoll()
|
||||
{
|
||||
notifyEventLoop(loopExitFd_.fd);
|
||||
eventFd_.notify();
|
||||
}
|
||||
|
||||
void Connection::notifyEventLoopNewTimeout() const
|
||||
void Connection::wakeUpEventLoopIfMessagesInQueue()
|
||||
{
|
||||
// The extra notifications for new timeouts are only needed if calls are made asynchronously to the event loop.
|
||||
// Are we in the same thread as the event loop? Note that it's ok to fail this check because the event loop isn't yet started.
|
||||
if (loopThreadId_.load(std::memory_order_relaxed) == std::this_thread::get_id())
|
||||
return;
|
||||
|
||||
// Get the new timeout from sd-bus
|
||||
auto sdbusPollData = getEventLoopPollData();
|
||||
if (sdbusPollData.timeout_usec < activeTimeout_.load(std::memory_order_relaxed))
|
||||
notifyEventLoop(eventFd_.fd);
|
||||
}
|
||||
|
||||
void Connection::clearEventLoopNotification(int fd) const
|
||||
{
|
||||
uint64_t value{};
|
||||
auto r = read(fd, &value, sizeof(value));
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to read from the event descriptor", -errno);
|
||||
// When doing a sync call, other D-Bus messages may have arrived, waiting in the read queue.
|
||||
// In case an event loop is inside a poll in another thread, or an external event loop polls in the
|
||||
// same thread but as an unrelated event source, then we need to wake up the poll explicitly so the
|
||||
// event loop 1. processes all messages in the read queue, 2. updates poll timeout before next poll.
|
||||
if (arePendingMessagesInReadQueue())
|
||||
notifyEventLoopToWakeUpFromPoll();
|
||||
}
|
||||
|
||||
void Connection::joinWithEventLoop()
|
||||
@@ -521,32 +519,36 @@ void Connection::joinWithEventLoop()
|
||||
asyncLoopThread_.join();
|
||||
}
|
||||
|
||||
bool Connection::processPendingRequest()
|
||||
bool Connection::processPendingEvent()
|
||||
{
|
||||
auto bus = bus_.get();
|
||||
assert(bus != nullptr);
|
||||
|
||||
int r = iface_->sd_bus_process(bus, nullptr);
|
||||
int r = sdbus_->sd_bus_process(bus, nullptr);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to process bus requests", -r);
|
||||
|
||||
// In correct use of sdbus-c++ API, r can be 0 only when processPendingEvent()
|
||||
// is called from an external event loop as a reaction to event fd being signalled.
|
||||
// If there are no more D-Bus messages to process, we know we have to clear event fd.
|
||||
if (r == 0)
|
||||
eventFd_.clear();
|
||||
|
||||
return r > 0;
|
||||
}
|
||||
|
||||
bool Connection::waitForNextRequest()
|
||||
bool Connection::waitForNextEvent()
|
||||
{
|
||||
assert(bus_ != nullptr);
|
||||
assert(loopExitFd_.fd >= 0);
|
||||
assert(eventFd_.fd >= 0);
|
||||
|
||||
auto sdbusPollData = getEventLoopPollData();
|
||||
struct pollfd fds[] = {
|
||||
{sdbusPollData.fd, sdbusPollData.events, 0},
|
||||
{eventFd_.fd, POLLIN, 0},
|
||||
{loopExitFd_.fd, POLLIN, 0}
|
||||
};
|
||||
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
|
||||
struct pollfd fds[] = { {sdbusPollData.fd, sdbusPollData.events, 0}
|
||||
, {eventFd_.fd, POLLIN, 0}
|
||||
, {loopExitFd_.fd, POLLIN, 0} };
|
||||
constexpr auto fdsCount = sizeof(fds)/sizeof(fds[0]);
|
||||
|
||||
auto timeout = sdbusPollData.getPollTimeout();
|
||||
activeTimeout_.store(sdbusPollData.timeout_usec, std::memory_order_relaxed);
|
||||
auto r = poll(fds, fdsCount, timeout);
|
||||
|
||||
if (r < 0 && errno == EINTR)
|
||||
@@ -554,21 +556,35 @@ bool Connection::waitForNextRequest()
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno);
|
||||
|
||||
// new timeout notification
|
||||
// Wake up notification, in order that we re-enter poll with freshly read PollData (namely, new poll timeout thereof)
|
||||
if (fds[1].revents & POLLIN)
|
||||
{
|
||||
clearEventLoopNotification(fds[1].fd);
|
||||
auto cleared = eventFd_.clear();
|
||||
SDBUS_THROW_ERROR_IF(!cleared, "Failed to read from the event descriptor", -errno);
|
||||
// Go poll() again, but with up-to-date timeout (which will wake poll() up right away if there are messages to process)
|
||||
return waitForNextEvent();
|
||||
}
|
||||
// loop exit notification
|
||||
// Loop exit notification
|
||||
if (fds[2].revents & POLLIN)
|
||||
{
|
||||
clearEventLoopNotification(fds[2].fd);
|
||||
auto cleared = loopExitFd_.clear();
|
||||
SDBUS_THROW_ERROR_IF(!cleared, "Failed to read from the loop exit descriptor", -errno);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Connection::arePendingMessagesInReadQueue() const
|
||||
{
|
||||
uint64_t readQueueSize{};
|
||||
|
||||
auto r = sdbus_->sd_bus_get_n_queued_read(bus_.get(), &readQueueSize);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get number of pending messages in read queue", -r);
|
||||
|
||||
return readQueueSize > 0;
|
||||
}
|
||||
|
||||
std::string Connection::composeSignalMatchFilter( const std::string &sender
|
||||
, const std::string &objectPath
|
||||
, const std::string &interfaceName
|
||||
@@ -623,33 +639,48 @@ Connection::EventFd::~EventFd()
|
||||
close(fd);
|
||||
}
|
||||
|
||||
void Connection::EventFd::notify()
|
||||
{
|
||||
assert(fd >= 0);
|
||||
auto r = eventfd_write(fd, 1);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify event descriptor", -errno);
|
||||
}
|
||||
|
||||
bool Connection::EventFd::clear()
|
||||
{
|
||||
assert(fd >= 0);
|
||||
|
||||
uint64_t value{};
|
||||
auto r = eventfd_read(fd, &value);
|
||||
return r >= 0;
|
||||
}
|
||||
|
||||
} // namespace sdbus::internal
|
||||
|
||||
namespace sdbus {
|
||||
|
||||
std::optional<std::chrono::microseconds> IConnection::PollData::getRelativeTimeout() const
|
||||
std::chrono::microseconds IConnection::PollData::getRelativeTimeout() const
|
||||
{
|
||||
constexpr auto zero = std::chrono::microseconds::zero();
|
||||
if (timeout_usec == 0)
|
||||
return zero;
|
||||
else if (timeout_usec == UINT64_MAX)
|
||||
return std::nullopt;
|
||||
constexpr auto max = std::chrono::microseconds::max();
|
||||
using internal::now;
|
||||
|
||||
// We need C so that we use the same clock as the underlying sd-bus lib.
|
||||
// We use POSIX's clock_gettime in favour of std::chrono::steady_clock to ensure this.
|
||||
struct timespec ts{};
|
||||
auto r = clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "clock_gettime failed: ", -errno);
|
||||
auto now = std::chrono::nanoseconds(ts.tv_nsec) + std::chrono::seconds(ts.tv_sec);
|
||||
auto absTimeout = std::chrono::microseconds(timeout_usec);
|
||||
auto result = std::chrono::duration_cast<std::chrono::microseconds>(absTimeout - now);
|
||||
return std::max(result, zero);
|
||||
if (timeout == zero)
|
||||
return zero;
|
||||
else if (timeout == max)
|
||||
return max;
|
||||
else
|
||||
return std::max(std::chrono::duration_cast<std::chrono::microseconds>(timeout - now()), zero);
|
||||
}
|
||||
|
||||
int IConnection::PollData::getPollTimeout() const
|
||||
{
|
||||
auto timeout = getRelativeTimeout();
|
||||
return timeout ? static_cast<int>(std::chrono::ceil<std::chrono::milliseconds>(timeout.value()).count()) : -1;
|
||||
const auto relativeTimeout = getRelativeTimeout();
|
||||
|
||||
if (relativeTimeout == decltype(relativeTimeout)::max())
|
||||
return -1;
|
||||
else
|
||||
return static_cast<int>(std::chrono::ceil<std::chrono::milliseconds>(relativeTimeout).count());
|
||||
}
|
||||
|
||||
} // namespace sdbus
|
||||
|
||||
Reference in New Issue
Block a user