fix: minor fixes for async timeout handling

This commit is contained in:
Stanislav Angelovic
2021-12-20 14:47:12 +01:00
committed by Urs Ritzmann
parent 23fdd0ce8f
commit b8eb0e8ceb
4 changed files with 54 additions and 60 deletions

View File

@ -84,7 +84,8 @@ namespace sdbus {
*
* @return a duration since the CLOCK_MONOTONIC epoch started.
*/
[[nodiscard]] std::chrono::microseconds getAbsoluteTimeout() const {
[[nodiscard]] std::chrono::microseconds getAbsoluteTimeout() const
{
return std::chrono::microseconds(timeout_usec);
}

View File

@ -221,8 +221,7 @@ MethodCall Connection::createMethodCall( const std::string& destination
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method call", -r);
return Message::Factory::create<MethodCall>(sdbusMsg, iface_.get(),
static_cast<const sdbus::internal::IConnection*>(this), adopt_message);
return Message::Factory::create<MethodCall>(sdbusMsg, iface_.get(), this, adopt_message);
}
Signal Connection::createSignal( const std::string& objectPath
@ -381,18 +380,17 @@ void Connection::notifyEventLoopToExit() const
notifyEventLoop(loopExitFd_.fd);
}
void Connection::notifyEventLoopNewTimeout() const {
void Connection::notifyEventLoopNewTimeout() const
{
// The extra notifications for new timeouts are only needed if calls are made asynchronously to the event loop.
// Are we in the same thread as the event loop? Note that it's ok to fail this check because the event loop isn't yet started.
if (loopThreadId_.load(std::memory_order_relaxed) == std::this_thread::get_id()) {
if (loopThreadId_.load(std::memory_order_relaxed) == std::this_thread::get_id())
return;
}
// alternatively use ::sd_bus_get_timeout(..)
// Get the new timeout from sd-bus
auto sdbusPollData = getEventLoopPollData();
if (sdbusPollData.timeout_usec < activeTimeout_) {
if (sdbusPollData.timeout_usec < activeTimeout_.load(std::memory_order_relaxed))
notifyEventLoop(eventFd_.fd);
}
}
void Connection::clearEventLoopNotification(int fd) const
@ -433,7 +431,7 @@ bool Connection::waitForNextRequest()
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
auto timeout = sdbusPollData.getPollTimeout();
activeTimeout_ = sdbusPollData.timeout_usec;
activeTimeout_.store(sdbusPollData.timeout_usec, std::memory_order_relaxed);
auto r = poll(fds, fdsCount, timeout);
if (r < 0 && errno == EINTR)
@ -493,8 +491,37 @@ Connection::EventFd::~EventFd()
close(fd);
}
} // namespace sdbus::internal
namespace sdbus {
std::optional<std::chrono::microseconds> IConnection::PollData::getRelativeTimeout() const
{
constexpr auto zero = std::chrono::microseconds::zero();
if (timeout_usec == 0)
return zero;
else if (timeout_usec == UINT64_MAX)
return std::nullopt;
// We need C so that we use the same clock as the underlying sd-bus lib.
// We use POSIX's clock_gettime in favour of std::chrono::steady_clock to ensure this.
struct timespec ts{};
auto r = clock_gettime(CLOCK_MONOTONIC, &ts);
SDBUS_THROW_ERROR_IF(r < 0, "clock_gettime failed: ", -errno);
auto now = std::chrono::nanoseconds(ts.tv_nsec) + std::chrono::seconds(ts.tv_sec);
auto absTimeout = std::chrono::microseconds(timeout_usec);
auto result = std::chrono::duration_cast<std::chrono::microseconds>(absTimeout - now);
return std::max(result, zero);
}
int IConnection::PollData::getPollTimeout() const
{
auto timeout = getRelativeTimeout();
return timeout ? static_cast<int>(std::chrono::ceil<std::chrono::milliseconds>(timeout.value()).count()) : -1;
}
} // namespace sdbus
namespace sdbus::internal {
std::unique_ptr<sdbus::internal::IConnection> createConnection()
@ -505,7 +532,7 @@ std::unique_ptr<sdbus::internal::IConnection> createConnection()
return std::unique_ptr<sdbus::internal::IConnection>(connectionInternal);
}
}
} // namespace sdbus::internal
namespace sdbus {
@ -568,38 +595,4 @@ std::unique_ptr<sdbus::IConnection> createRemoteSystemBusConnection(const std::s
return std::make_unique<sdbus::internal::Connection>(std::move(interface), remote_system_bus, host);
}
} // namespace sdbus::inernal
namespace sdbus {
std::optional<std::chrono::microseconds> IConnection::PollData::getRelativeTimeout() const
{
constexpr auto zero =std::chrono::microseconds::zero();
if (timeout_usec == 0) {
return zero;
}
else if (timeout_usec == UINT64_MAX) {
return std::nullopt;
}
// We need CLOCK_MONOTONIC so that we use the same clock as the underlying sd-bus lib.
// We use POSIX's clock_gettime in favour of std::chrono::steady_clock to ensure this.
struct timespec ts{};
auto r = clock_gettime(CLOCK_MONOTONIC, &ts);
SDBUS_THROW_ERROR_IF(r < 0, "clock_gettime failed: ", -errno);
auto now = std::chrono::nanoseconds(ts.tv_nsec) + std::chrono::seconds(ts.tv_sec);
auto absTimeout = std::chrono::microseconds(timeout_usec);
auto result = std::chrono::duration_cast<std::chrono::microseconds>(absTimeout - now);
return std::max(result, zero);
}
int IConnection::PollData::getPollTimeout() const
{
auto timeout = getRelativeTimeout();
if (!timeout) {
return -1;
}
return (int) std::chrono::ceil<std::chrono::milliseconds>(timeout.value()).count();
}
} // namespace sdbus
} // namespace sdbus

View File

@ -738,8 +738,10 @@ std::string Message::getSELinuxContext() const
}
MethodCall::MethodCall(void *msg, internal::ISdBus *sdbus, const internal::IConnection *connection,
adopt_message_t) noexcept
MethodCall::MethodCall( void *msg
, internal::ISdBus *sdbus
, const internal::IConnection *connection
, adopt_message_t) noexcept
: Message(msg, sdbus, adopt_message)
, connection_(connection)
{
@ -780,10 +782,6 @@ MethodReply MethodCall::sendWithReply(uint64_t timeout) const
SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r);
if (connection_) {
connection_->notifyEventLoopNewTimeout();
}
return Factory::create<MethodReply>(sdbusReply, sdbus_, adopt_message);
}
@ -799,9 +797,10 @@ void MethodCall::send(void* callback, void* userData, uint64_t timeout, dont_req
{
auto r = sdbus_->sd_bus_call_async(nullptr, nullptr, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r);
if (connection_) {
connection_->notifyEventLoopNewTimeout();
}
// Force event loop to re-enter polling with the async call timeout if that is less than the one used in current poll
SDBUS_THROW_ERROR_IF(connection_ == nullptr, "Invalid use of MethodCall API", ENOTSUP);
connection_->notifyEventLoopNewTimeout();
}
MethodCall::Slot MethodCall::send(void* callback, void* userData, uint64_t timeout) const
@ -810,9 +809,10 @@ MethodCall::Slot MethodCall::send(void* callback, void* userData, uint64_t timeo
auto r = sdbus_->sd_bus_call_async(nullptr, &slot, (sd_bus_message*)msg_, (sd_bus_message_handler_t)callback, userData, timeout);
SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method asynchronously", -r);
if (connection_) {
connection_->notifyEventLoopNewTimeout();
}
// Force event loop to re-enter polling with the async call timeout if that is less than the one used in current poll
SDBUS_THROW_ERROR_IF(connection_ == nullptr, "Invalid use of MethodCall API", ENOTSUP);
connection_->notifyEventLoopNewTimeout();
return Slot{slot, [sdbus_ = sdbus_](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }};
}

View File

@ -67,14 +67,14 @@ MethodCall Proxy::createMethodCall(const std::string& interfaceName, const std::
MethodReply Proxy::callMethod(const MethodCall& message, uint64_t timeout)
{
// Sending method call synchronously is the only operation that blocks, waiting for the method
// reply message among the incoming message on the sd-bus connection socket. But typically there
// reply message among the incoming messages on the sd-bus connection socket. But typically there
// already is somebody that generally handles incoming D-Bus messages -- the connection event loop
// running typically in its own thread. We have to avoid polling on socket from several threads.
// So we have to branch here: either we are within the context of the event loop thread, then we
// can send the message simply via sd_bus_call, which blocks. Or we are in another thread, then
// we can perform the send operation of the method call message from here (because that is thread-
// safe like other sd-bus API accesses), but the incoming reply we have to get through the event
// loop thread, because this is be the only rightful listener on the sd-bus connection socket.
// loop thread, because this is the only rightful listener on the sd-bus connection socket.
// So, technically, we use async means to wait here for reply received by the event loop thread.
SDBUS_THROW_ERROR_IF(!message.isValid(), "Invalid method call message provided", EINVAL);