forked from Kistler-Group/sdbus-cpp
Introduce support for asynchronous server-side methods (#12)
* Add preliminary changes for async server methods * Refactor the Message concept and break it into distinctive types * Continue working on async server methods (high-level API mainly) * Continue developing support for async server * Finishing async server methods * Finishing async server methods (fixing tests & cleaning up) * A little code cleaning * Add unit tests for type traits of free functions * Support for generating async server methods in stub headers * Update ChangeLog for v0.3.0 * Update the tutorial with how to use async server-side methods * Update the TOC in sdbus-c++ tutorial * Update numbering in TOC * Remove unnecessary code * Final cleanups
This commit is contained in:
committed by
Lukáš Ďurfina
parent
b041f76bfc
commit
d8fd053714
@@ -42,13 +42,13 @@ Connection::Connection(Connection::BusType type)
|
||||
|
||||
finishHandshake(bus);
|
||||
|
||||
exitLoopFd_ = createProcessingExitDescriptor();
|
||||
notificationFd_ = createLoopNotificationDescriptor();
|
||||
}
|
||||
|
||||
Connection::~Connection()
|
||||
{
|
||||
leaveProcessingLoop();
|
||||
closeProcessingExitDescriptor(exitLoopFd_);
|
||||
closeLoopNotificationDescriptor(notificationFd_);
|
||||
}
|
||||
|
||||
void Connection::requestName(const std::string& name)
|
||||
@@ -67,13 +67,15 @@ void Connection::enterProcessingLoop()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
auto processed = processPendingRequest(bus_.get());
|
||||
auto processed = processPendingRequest();
|
||||
if (processed)
|
||||
continue; // Process next one
|
||||
|
||||
auto success = waitForNextRequest(bus_.get(), exitLoopFd_);
|
||||
auto success = waitForNextRequest();
|
||||
if (!success)
|
||||
break; // Exit processing loop
|
||||
if (success.asyncMsgsToProcess)
|
||||
processAsynchronousMessages();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,10 +114,10 @@ void Connection::removeObjectVTable(void* vtableHandle)
|
||||
sd_bus_slot_unref((sd_bus_slot *)vtableHandle);
|
||||
}
|
||||
|
||||
sdbus::Message Connection::createMethodCall( const std::string& destination
|
||||
, const std::string& objectPath
|
||||
, const std::string& interfaceName
|
||||
, const std::string& methodName ) const
|
||||
sdbus::MethodCall Connection::createMethodCall( const std::string& destination
|
||||
, const std::string& objectPath
|
||||
, const std::string& interfaceName
|
||||
, const std::string& methodName ) const
|
||||
{
|
||||
sd_bus_message *sdbusMsg{};
|
||||
|
||||
@@ -131,12 +133,12 @@ sdbus::Message Connection::createMethodCall( const std::string& destination
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create method call", -r);
|
||||
|
||||
return Message(sdbusMsg, Message::Type::eMethodCall);
|
||||
return MethodCall(sdbusMsg);
|
||||
}
|
||||
|
||||
sdbus::Message Connection::createSignal( const std::string& objectPath
|
||||
, const std::string& interfaceName
|
||||
, const std::string& signalName ) const
|
||||
sdbus::Signal Connection::createSignal( const std::string& objectPath
|
||||
, const std::string& interfaceName
|
||||
, const std::string& signalName ) const
|
||||
{
|
||||
sd_bus_message *sdbusSignal{};
|
||||
|
||||
@@ -151,7 +153,7 @@ sdbus::Message Connection::createSignal( const std::string& objectPath
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create signal", -r);
|
||||
|
||||
return Message(sdbusSignal, Message::Type::eSignal);
|
||||
return Signal(sdbusSignal);
|
||||
}
|
||||
|
||||
void* Connection::registerSignalHandler( const std::string& objectPath
|
||||
@@ -175,6 +177,13 @@ void Connection::unregisterSignalHandler(void* handlerCookie)
|
||||
sd_bus_slot_unref((sd_bus_slot *)handlerCookie);
|
||||
}
|
||||
|
||||
void Connection::sendReplyAsynchronously(const sdbus::MethodReply& reply)
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
asyncReplies_.push(reply);
|
||||
notifyProcessingLoop();
|
||||
}
|
||||
|
||||
std::unique_ptr<sdbus::internal::IConnection> Connection::clone() const
|
||||
{
|
||||
return std::make_unique<sdbus::internal::Connection>(busType_);
|
||||
@@ -211,10 +220,8 @@ void Connection::finishHandshake(sd_bus* bus)
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to flush bus on opening", -r);
|
||||
}
|
||||
|
||||
int Connection::createProcessingExitDescriptor()
|
||||
int Connection::createLoopNotificationDescriptor()
|
||||
{
|
||||
// Mechanism for graceful termination of processing loop
|
||||
|
||||
auto r = eventfd(0, EFD_SEMAPHORE | EFD_CLOEXEC);
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to create event object", -errno);
|
||||
@@ -222,16 +229,23 @@ int Connection::createProcessingExitDescriptor()
|
||||
return r;
|
||||
}
|
||||
|
||||
void Connection::closeProcessingExitDescriptor(int fd)
|
||||
void Connection::closeLoopNotificationDescriptor(int fd)
|
||||
{
|
||||
close(fd);
|
||||
}
|
||||
|
||||
void Connection::notifyProcessingLoop()
|
||||
{
|
||||
assert(notificationFd_ >= 0);
|
||||
uint64_t value = 1;
|
||||
write(notificationFd_, &value, sizeof(value));
|
||||
}
|
||||
|
||||
void Connection::notifyProcessingLoopToExit()
|
||||
{
|
||||
assert(exitLoopFd_ >= 0);
|
||||
uint64_t value = 1;
|
||||
write(exitLoopFd_, &value, sizeof(value));
|
||||
exitLoopThread_ = true;
|
||||
|
||||
notifyProcessingLoop();
|
||||
}
|
||||
|
||||
void Connection::joinWithProcessingLoop()
|
||||
@@ -240,8 +254,10 @@ void Connection::joinWithProcessingLoop()
|
||||
asyncLoopThread_.join();
|
||||
}
|
||||
|
||||
bool Connection::processPendingRequest(sd_bus* bus)
|
||||
bool Connection::processPendingRequest()
|
||||
{
|
||||
auto bus = bus_.get();
|
||||
|
||||
assert(bus != nullptr);
|
||||
|
||||
int r = sd_bus_process(bus, nullptr);
|
||||
@@ -251,10 +267,23 @@ bool Connection::processPendingRequest(sd_bus* bus)
|
||||
return r > 0;
|
||||
}
|
||||
|
||||
bool Connection::waitForNextRequest(sd_bus* bus, int exitFd)
|
||||
void Connection::processAsynchronousMessages()
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(mutex_);
|
||||
while (!asyncReplies_.empty())
|
||||
{
|
||||
auto reply = asyncReplies_.front();
|
||||
asyncReplies_.pop();
|
||||
reply.send();
|
||||
}
|
||||
}
|
||||
|
||||
Connection::WaitResult Connection::waitForNextRequest()
|
||||
{
|
||||
auto bus = bus_.get();
|
||||
|
||||
assert(bus != nullptr);
|
||||
assert(exitFd != 0);
|
||||
assert(notificationFd_ != 0);
|
||||
|
||||
auto r = sd_bus_get_fd(bus);
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to get bus descriptor", -r);
|
||||
@@ -267,20 +296,26 @@ bool Connection::waitForNextRequest(sd_bus* bus, int exitFd)
|
||||
uint64_t usec;
|
||||
sd_bus_get_timeout(bus, &usec);
|
||||
|
||||
struct pollfd fds[] = {{sdbusFd, sdbusEvents, 0}, {exitFd, POLLIN, 0}};
|
||||
struct pollfd fds[] = {{sdbusFd, sdbusEvents, 0}, {notificationFd_, POLLIN, 0}};
|
||||
auto fdsCount = sizeof(fds)/sizeof(fds[0]);
|
||||
|
||||
r = poll(fds, fdsCount, usec == (uint64_t) -1 ? -1 : (usec+999)/1000);
|
||||
|
||||
if (r < 0 && errno == EINTR)
|
||||
return true; // Try again
|
||||
return {true, false}; // Try again
|
||||
|
||||
SDBUS_THROW_ERROR_IF(r < 0, "Failed to wait on the bus", -errno);
|
||||
|
||||
if (fds[1].revents & POLLIN)
|
||||
return false; // Got exit notification
|
||||
{
|
||||
if (exitLoopThread_)
|
||||
return {false, false}; // Got exit notification
|
||||
|
||||
return true;
|
||||
// Otherwise we have some async messages to process
|
||||
return {false, true};
|
||||
}
|
||||
|
||||
return {true, false};
|
||||
}
|
||||
|
||||
std::string Connection::composeSignalMatchFilter( const std::string& objectPath
|
||||
|
||||
Reference in New Issue
Block a user