forked from Kistler-Group/sdbus-cpp
fix timeout handling * Despite what is documented in sd_bus_get_timeout(3), the timeout returned is actually an absolute time point of Linux's CLOCK_MONOTONIC clock. Hence, we first have to subtract the current time from the timeout in order to get a relative time that can be passed to poll. * For async call timeouts to reliably work, we need a way to notify the event loop of a connection that is currently blocked waiting in poll. I.e. assume the event loop thread entered poll with a timeout set to T1. Afterwards, the main thread starts an async call C with a timeout T2 < T1. In order for C to be canceled after its timeout T1 has elapsed, we have to be able to notify the event loop so that it can update its poll data. Co-authored-by: Urs Ritzmann <ursritzmann@protonmail.ch> Co-authored-by: Lukasz Marcul <lukasz.marcul@onemeter.com>
This commit is contained in:
committed by
GitHub
parent
0b8f2d9752
commit
bb0f3f0242
@@ -122,7 +122,7 @@ void Connection::leaveEventLoop()
|
||||
|
||||
Connection::PollData Connection::getEventLoopPollData() const
|
||||
{
|
||||
ISdBus::PollData pollData;
|
||||
ISdBus::PollData pollData{};
|
||||
auto r = iface_->sd_bus_get_poll_data(bus_.get(), &pollData);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get bus poll data", -r);
|
||||
|
||||
@@ -221,7 +221,8 @@ MethodCall Connection::createMethodCall( const std::string& destination
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method call", -r);
|
||||
|
||||
return Message::Factory::create<MethodCall>(sdbusMsg, iface_.get(), adopt_message);
|
||||
return Message::Factory::create<MethodCall>(sdbusMsg, iface_.get(),
|
||||
static_cast<const sdbus::internal::IConnection*>(this), adopt_message);
|
||||
}
|
||||
|
||||
Signal Connection::createSignal( const std::string& objectPath
|
||||
@@ -366,19 +367,38 @@ void Connection::finishHandshake(sd_bus* bus)
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to flush bus on opening", -r);
|
||||
}
|
||||
|
||||
void Connection::notifyEventLoopToExit()
|
||||
void Connection::notifyEventLoop(int fd) const
|
||||
{
|
||||
assert(loopExitFd_.fd >= 0);
|
||||
assert(fd >= 0);
|
||||
|
||||
uint64_t value = 1;
|
||||
auto r = write(loopExitFd_.fd, &value, sizeof(value));
|
||||
auto r = write(fd, &value, sizeof(value));
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to notify event loop", -errno);
|
||||
}
|
||||
|
||||
void Connection::clearExitNotification()
|
||||
void Connection::notifyEventLoopToExit() const
|
||||
{
|
||||
notifyEventLoop(loopExitFd_.fd);
|
||||
}
|
||||
|
||||
void Connection::notifyEventLoopNewTimeout() const {
|
||||
// The extra notifications for new timeouts are only needed if calls are made asynchronously to the event loop.
|
||||
// Are we in the same thread as the event loop? Note that it's ok to fail this check because the event loop isn't yet started.
|
||||
if (loopThreadId_.load(std::memory_order_relaxed) == std::this_thread::get_id()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// alternatively use ::sd_bus_get_timeout(..)
|
||||
auto sdbusPollData = getEventLoopPollData();
|
||||
if (sdbusPollData.timeout_usec < activeTimeout_) {
|
||||
notifyEventLoop(eventFd_.fd);
|
||||
}
|
||||
}
|
||||
|
||||
void Connection::clearEventLoopNotification(int fd) const
|
||||
{
|
||||
uint64_t value{};
|
||||
auto r = read(loopExitFd_.fd, &value, sizeof(value));
|
||||
auto r = read(fd, &value, sizeof(value));
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to read from the event descriptor", -errno);
|
||||
}
|
||||
|
||||
@@ -402,13 +422,18 @@ bool Connection::processPendingRequest()
|
||||
bool Connection::waitForNextRequest()
|
||||
{
|
||||
assert(bus_ != nullptr);
|
||||
assert(loopExitFd_.fd != 0);
|
||||
assert(eventFd_.fd != 0);
|
||||
|
||||
auto sdbusPollData = getEventLoopPollData();
|
||||
struct pollfd fds[] = {{sdbusPollData.fd, sdbusPollData.events, 0}, {loopExitFd_.fd, POLLIN, 0}};
|
||||
struct pollfd fds[] = {
|
||||
{sdbusPollData.fd, sdbusPollData.events, 0},
|
||||
{eventFd_.fd, POLLIN, 0},
|
||||
{loopExitFd_.fd, POLLIN, 0}
|
||||
};
|
||||
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
|
||||
|
||||
auto timeout = sdbusPollData.timeout_usec == (uint64_t) -1 ? (uint64_t)-1 : (sdbusPollData.timeout_usec+999)/1000;
|
||||
auto timeout = sdbusPollData.getPollTimeout();
|
||||
activeTimeout_ = sdbusPollData.timeout_usec;
|
||||
auto r = poll(fds, fdsCount, timeout);
|
||||
|
||||
if (r < 0 && errno == EINTR)
|
||||
@@ -416,9 +441,15 @@ bool Connection::waitForNextRequest()
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno);
|
||||
|
||||
// new timeout notification
|
||||
if (fds[1].revents & POLLIN)
|
||||
{
|
||||
clearExitNotification();
|
||||
clearEventLoopNotification(fds[1].fd);
|
||||
}
|
||||
// loop exit notification
|
||||
if (fds[2].revents & POLLIN)
|
||||
{
|
||||
clearEventLoopNotification(fds[2].fd);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -450,13 +481,13 @@ std::vector</*const */char*> Connection::to_strv(const std::vector<std::string>&
|
||||
return strv;
|
||||
}
|
||||
|
||||
Connection::LoopExitEventFd::LoopExitEventFd()
|
||||
Connection::EventFd::EventFd()
|
||||
{
|
||||
fd = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC | EFD_NONBLOCK);
|
||||
fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
|
||||
SDBUS_THROW_ERROR_IF(fd < 0, "Failed to create event object", -errno);
|
||||
}
|
||||
|
||||
Connection::LoopExitEventFd::~LoopExitEventFd()
|
||||
Connection::EventFd::~EventFd()
|
||||
{
|
||||
assert(fd >= 0);
|
||||
close(fd);
|
||||
@@ -537,4 +568,38 @@ std::unique_ptr<sdbus::IConnection> createRemoteSystemBusConnection(const std::s
|
||||
return std::make_unique<sdbus::internal::Connection>(std::move(interface), remote_system_bus, host);
|
||||
}
|
||||
|
||||
} // namespace sdbus::inernal
|
||||
|
||||
namespace sdbus {
|
||||
|
||||
std::optional<std::chrono::microseconds> IConnection::PollData::getRelativeTimeout() const
|
||||
{
|
||||
constexpr auto zero =std::chrono::microseconds::zero();
|
||||
if (timeout_usec == 0) {
|
||||
return zero;
|
||||
}
|
||||
else if (timeout_usec == UINT64_MAX) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
// We need CLOCK_MONOTONIC so that we use the same clock as the underlying sd-bus lib.
|
||||
// We use POSIX's clock_gettime in favour of std::chrono::steady_clock to ensure this.
|
||||
struct timespec ts{};
|
||||
auto r = clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "clock_gettime failed: ", -errno);
|
||||
auto now = std::chrono::nanoseconds(ts.tv_nsec) + std::chrono::seconds(ts.tv_sec);
|
||||
auto absTimeout = std::chrono::microseconds(timeout_usec);
|
||||
auto result = std::chrono::duration_cast<std::chrono::microseconds>(absTimeout - now);
|
||||
return std::max(result, zero);
|
||||
}
|
||||
|
||||
int IConnection::PollData::getPollTimeout() const
|
||||
{
|
||||
auto timeout = getRelativeTimeout();
|
||||
if (!timeout) {
|
||||
return -1;
|
||||
}
|
||||
return (int) std::chrono::ceil<std::chrono::milliseconds>(timeout.value()).count();
|
||||
}
|
||||
|
||||
} // namespace sdbus
|
||||
Reference in New Issue
Block a user