feat: add support for async registration of matches (#374)

This commit is contained in:
Stanislav Angelovič
2023-11-03 17:57:52 +01:00
committed by GitHub
parent 9da18aec25
commit 9490b3351f
9 changed files with 131 additions and 15 deletions

View File

@@ -1637,7 +1637,7 @@ Live examples of extending sdbus-c++ types can be found in [Message unit tests](
Support for match rules Support for match rules
----------------------- -----------------------
`IConnection` class provides `addMatch` method that you can use to install match rules. An associated callback handler will be called upon an incoming message matching given match rule. There is support for both client-owned and floating (library-owned) match rules. Consult `IConnection` header or sdbus-c++ doxygen documentation for more information. `IConnection` class provides `addMatch` and `addMatchAsync` family of methods that you can use to install match rules on that bus connection. An associated callback handler will be called when an incoming D-Bus message matches the given match rule. Clients can decide whether they own and control the match rule lifetime, or whether the match rule lifetime is bound the connection object lifetime (so-called floating match rule). Consult `IConnection` header or sdbus-c++ doxygen documentation for more information.
Using direct (peer-to-peer) D-Bus connections Using direct (peer-to-peer) D-Bus connections
--------------------------------------------- ---------------------------------------------

View File

@@ -268,10 +268,10 @@ namespace sdbus {
virtual void addObjectManager(const std::string& objectPath, floating_slot_t) = 0; virtual void addObjectManager(const std::string& objectPath, floating_slot_t) = 0;
/*! /*!
* @brief Adds a match rule for incoming message dispatching * @brief Installs a match rule for messages received on this bus connection
* *
* @param[in] match Match expression to filter incoming D-Bus message * @param[in] match Match expression to filter incoming D-Bus message
* @param[in] callback Callback handler to be called upon incoming D-Bus message matching the rule * @param[in] callback Callback handler to be called upon processing an inbound D-Bus message matching the rule
* @return RAII-style slot handle representing the ownership of the subscription * @return RAII-style slot handle representing the ownership of the subscription
* *
* The method installs a match rule for messages received on the specified bus connection. * The method installs a match rule for messages received on the specified bus connection.
@@ -291,10 +291,10 @@ namespace sdbus {
[[nodiscard]] virtual Slot addMatch(const std::string& match, message_handler callback) = 0; [[nodiscard]] virtual Slot addMatch(const std::string& match, message_handler callback) = 0;
/*! /*!
* @brief Adds a floating match rule for incoming message dispatching * @brief Installs a floating match rule for messages received on this bus connection
* *
* @param[in] match Match expression to filter incoming D-Bus message * @param[in] match Match expression to filter incoming D-Bus message
* @param[in] callback Callback handler to be called upon incoming D-Bus message matching the rule * @param[in] callback Callback handler to be called upon processing an inbound D-Bus message matching the rule
* *
* The method installs a floating match rule for messages received on the specified bus connection. * The method installs a floating match rule for messages received on the specified bus connection.
* Floating means that the bus connection object owns the match rule, i.e. lifetime of the match rule * Floating means that the bus connection object owns the match rule, i.e. lifetime of the match rule
@@ -307,6 +307,45 @@ namespace sdbus {
*/ */
virtual void addMatch(const std::string& match, message_handler callback, floating_slot_t) = 0; virtual void addMatch(const std::string& match, message_handler callback, floating_slot_t) = 0;
/*!
* @brief Asynchronously installs a match rule for messages received on this bus connection
*
* @param[in] match Match expression to filter incoming D-Bus message
* @param[in] callback Callback handler to be called upon processing an inbound D-Bus message matching the rule
* @param[in] installCallback Callback handler to be called upon processing an inbound D-Bus message matching the rule
* @return RAII-style slot handle representing the ownership of the subscription
*
* This method operates the same as `addMatch()` above, just that it installs the match rule asynchronously,
* in a non-blocking fashion. A request is sent to the broker, but the call does not wait for a response.
* The `installCallback' callable is called when the response is later received, with the response message
* from the broker as parameter. If it's an empty function object, a default implementation is used that
* terminates the bus connection should installing the match fail.
*
* Refer to the @c addMatch(const std::string& match, message_handler callback) documentation, and consult
* `man sd_bus_add_match`, for more information.
*
* @throws sdbus::Error in case of failure
*/
[[nodiscard]] virtual Slot addMatchAsync(const std::string& match, message_handler callback, message_handler installCallback) = 0;
/*!
* @brief Asynchronously installs a floating match rule for messages received on this bus connection
*
* @param[in] match Match expression to filter incoming D-Bus message
* @param[in] callback Callback handler to be called upon processing an inbound D-Bus message matching the rule
* @param[in] installCallback Callback handler to be called upon processing an inbound D-Bus message matching the rule
*
* The method installs a floating match rule for messages received on the specified bus connection.
* Floating means that the bus connection object owns the match rule, i.e. lifetime of the match rule
* is bound to the lifetime of the bus connection.
*
* Refer to the @c addMatch(const std::string& match, message_handler callback, message_handler installCallback)
* documentation for more information.
*
* @throws sdbus::Error in case of failure
*/
virtual void addMatchAsync(const std::string& match, message_handler callback, message_handler installCallback, floating_slot_t) = 0;
/*! /*!
* @copydoc IConnection::enterEventLoop() * @copydoc IConnection::enterEventLoop()
* *

View File

@@ -217,17 +217,11 @@ uint64_t Connection::getMethodCallTimeout() const
Slot Connection::addMatch(const std::string& match, message_handler callback) Slot Connection::addMatch(const std::string& match, message_handler callback)
{ {
auto matchInfo = std::make_unique<MatchInfo>(MatchInfo{std::move(callback), *this, {}}); SDBUS_THROW_ERROR_IF(!callback, "Invalid match callback handler provided", EINVAL);
auto messageHandler = [](sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/) -> int auto matchInfo = std::make_unique<MatchInfo>(MatchInfo{std::move(callback), {}, *this, {}});
{
auto* matchInfo = static_cast<MatchInfo*>(userData);
auto message = Message::Factory::create<PlainMessage>(sdbusMessage, &matchInfo->connection.getSdBusInterface());
matchInfo->callback(message);
return 0;
};
auto r = iface_->sd_bus_add_match(bus_.get(), &matchInfo->slot, match.c_str(), std::move(messageHandler), matchInfo.get()); auto r = iface_->sd_bus_add_match(bus_.get(), &matchInfo->slot, match.c_str(), &Connection::sdbus_match_callback, matchInfo.get());
SDBUS_THROW_ERROR_IF(r < 0, "Failed to add match", -r); SDBUS_THROW_ERROR_IF(r < 0, "Failed to add match", -r);
return {matchInfo.release(), [this](void *ptr) return {matchInfo.release(), [this](void *ptr)
@@ -243,6 +237,34 @@ void Connection::addMatch(const std::string& match, message_handler callback, fl
floatingMatchRules_.push_back(addMatch(match, std::move(callback))); floatingMatchRules_.push_back(addMatch(match, std::move(callback)));
} }
Slot Connection::addMatchAsync(const std::string& match, message_handler callback, message_handler installCallback)
{
SDBUS_THROW_ERROR_IF(!callback, "Invalid match callback handler provided", EINVAL);
sd_bus_message_handler_t sdbusInstallCallback = installCallback ? &Connection::sdbus_match_install_callback : nullptr;
auto matchInfo = std::make_unique<MatchInfo>(MatchInfo{std::move(callback), std::move(installCallback), *this, {}});
auto r = iface_->sd_bus_add_match_async( bus_.get()
, &matchInfo->slot
, match.c_str()
, &Connection::sdbus_match_callback
, sdbusInstallCallback
, matchInfo.get());
SDBUS_THROW_ERROR_IF(r < 0, "Failed to add match", -r);
return {matchInfo.release(), [this](void *ptr)
{
auto* matchInfo = static_cast<MatchInfo*>(ptr);
iface_->sd_bus_slot_unref(matchInfo->slot);
std::default_delete<MatchInfo>{}(matchInfo);
}};
}
void Connection::addMatchAsync(const std::string& match, message_handler callback, message_handler installCallback, floating_slot_t)
{
floatingMatchRules_.push_back(addMatchAsync(match, std::move(callback), std::move(installCallback)));
}
Slot Connection::addObjectVTable( const std::string& objectPath Slot Connection::addObjectVTable( const std::string& objectPath
, const std::string& interfaceName , const std::string& interfaceName
, const sd_bus_vtable* vtable , const sd_bus_vtable* vtable
@@ -565,6 +587,22 @@ std::vector</*const */char*> Connection::to_strv(const std::vector<std::string>&
return strv; return strv;
} }
int Connection::sdbus_match_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
{
auto* matchInfo = static_cast<MatchInfo*>(userData);
auto message = Message::Factory::create<PlainMessage>(sdbusMessage, &matchInfo->connection.getSdBusInterface());
matchInfo->callback(message);
return 0;
}
int Connection::sdbus_match_install_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error */*retError*/)
{
auto* matchInfo = static_cast<MatchInfo*>(userData);
auto message = Message::Factory::create<PlainMessage>(sdbusMessage, &matchInfo->connection.getSdBusInterface());
matchInfo->installCallback(message);
return 0;
}
Connection::EventFd::EventFd() Connection::EventFd::EventFd()
{ {
fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);

View File

@@ -96,6 +96,8 @@ namespace sdbus::internal {
[[nodiscard]] Slot addMatch(const std::string& match, message_handler callback) override; [[nodiscard]] Slot addMatch(const std::string& match, message_handler callback) override;
void addMatch(const std::string& match, message_handler callback, floating_slot_t) override; void addMatch(const std::string& match, message_handler callback, floating_slot_t) override;
[[nodiscard]] Slot addMatchAsync(const std::string& match, message_handler callback, message_handler installCallback) override;
void addMatchAsync(const std::string& match, message_handler callback, message_handler installCallback, floating_slot_t) override;
const ISdBus& getSdBusInterface() const override; const ISdBus& getSdBusInterface() const override;
ISdBus& getSdBusInterface() override; ISdBus& getSdBusInterface() override;
@@ -151,10 +153,13 @@ namespace sdbus::internal {
void clearEventLoopNotification(int fd) const; void clearEventLoopNotification(int fd) const;
void notifyEventLoopNewTimeout() const override; void notifyEventLoopNewTimeout() const override;
private:
void joinWithEventLoop(); void joinWithEventLoop();
static std::vector</*const */char*> to_strv(const std::vector<std::string>& strings); static std::vector</*const */char*> to_strv(const std::vector<std::string>& strings);
static int sdbus_match_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
static int sdbus_match_install_callback(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError);
private:
struct EventFd struct EventFd
{ {
EventFd(); EventFd();
@@ -165,6 +170,7 @@ namespace sdbus::internal {
struct MatchInfo struct MatchInfo
{ {
message_handler callback; message_handler callback;
message_handler installCallback;
Connection& connection; Connection& connection;
sd_bus_slot *slot; sd_bus_slot *slot;
}; };

View File

@@ -80,6 +80,7 @@ namespace sdbus::internal {
virtual int sd_bus_add_object_vtable(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata) = 0; virtual int sd_bus_add_object_vtable(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata) = 0;
virtual int sd_bus_add_object_manager(sd_bus *bus, sd_bus_slot **slot, const char *path) = 0; virtual int sd_bus_add_object_manager(sd_bus *bus, sd_bus_slot **slot, const char *path) = 0;
virtual int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata) = 0; virtual int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata) = 0;
virtual int sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata) = 0;
virtual sd_bus_slot* sd_bus_slot_unref(sd_bus_slot *slot) = 0; virtual sd_bus_slot* sd_bus_slot_unref(sd_bus_slot *slot) = 0;
virtual int sd_bus_new(sd_bus **ret) = 0; virtual int sd_bus_new(sd_bus **ret) = 0;

View File

@@ -345,6 +345,13 @@ int SdBus::sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match,
return ::sd_bus_add_match(bus, slot, match, callback, userdata); return ::sd_bus_add_match(bus, slot, match, callback, userdata);
} }
int SdBus::sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata)
{
std::lock_guard lock(sdbusMutex_);
return ::sd_bus_add_match_async(bus, slot, match, callback, install_callback, userdata);
}
sd_bus_slot* SdBus::sd_bus_slot_unref(sd_bus_slot *slot) sd_bus_slot* SdBus::sd_bus_slot_unref(sd_bus_slot *slot)
{ {
std::lock_guard lock(sdbusMutex_); std::lock_guard lock(sdbusMutex_);

View File

@@ -72,6 +72,7 @@ public:
virtual int sd_bus_add_object_vtable(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata) override; virtual int sd_bus_add_object_vtable(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata) override;
virtual int sd_bus_add_object_manager(sd_bus *bus, sd_bus_slot **slot, const char *path) override; virtual int sd_bus_add_object_manager(sd_bus *bus, sd_bus_slot **slot, const char *path) override;
virtual int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata) override; virtual int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata) override;
virtual int sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata) override;
virtual sd_bus_slot* sd_bus_slot_unref(sd_bus_slot *slot) override; virtual sd_bus_slot* sd_bus_slot_unref(sd_bus_slot *slot) override;
virtual int sd_bus_new(sd_bus **ret) override; virtual int sd_bus_new(sd_bus **ret) override;

View File

@@ -88,6 +88,29 @@ TEST_F(AConnection, WillCallCallbackHandlerForIncomingMessageMatchingMatchRule)
ASSERT_TRUE(waitUntil(matchingMessageReceived)); ASSERT_TRUE(waitUntil(matchingMessageReceived));
} }
TEST_F(AConnection, CanInstallMatchRuleAsynchronously)
{
auto matchRule = "sender='" + BUS_NAME + "',path='" + OBJECT_PATH + "'";
std::atomic<bool> matchingMessageReceived{false};
std::atomic<bool> matchRuleInstalled{false};
auto slot = s_proxyConnection->addMatchAsync( matchRule
, [&](sdbus::Message& msg)
{
if(msg.getPath() == OBJECT_PATH)
matchingMessageReceived = true;
}
, [&](sdbus::Message& /*msg*/)
{
matchRuleInstalled = true;
} );
EXPECT_TRUE(waitUntil(matchRuleInstalled));
m_adaptor->emitSimpleSignal();
ASSERT_TRUE(waitUntil(matchingMessageReceived));
}
TEST_F(AConnection, WillUnsubscribeMatchRuleWhenClientDestroysTheAssociatedSlot) TEST_F(AConnection, WillUnsubscribeMatchRuleWhenClientDestroysTheAssociatedSlot)
{ {
auto matchRule = "sender='" + BUS_NAME + "',path='" + OBJECT_PATH + "'"; auto matchRule = "sender='" + BUS_NAME + "',path='" + OBJECT_PATH + "'";

View File

@@ -71,6 +71,7 @@ public:
MOCK_METHOD6(sd_bus_add_object_vtable, int(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata)); MOCK_METHOD6(sd_bus_add_object_vtable, int(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata));
MOCK_METHOD3(sd_bus_add_object_manager, int(sd_bus *bus, sd_bus_slot **slot, const char *path)); MOCK_METHOD3(sd_bus_add_object_manager, int(sd_bus *bus, sd_bus_slot **slot, const char *path));
MOCK_METHOD5(sd_bus_add_match, int(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata)); MOCK_METHOD5(sd_bus_add_match, int(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata));
MOCK_METHOD6(sd_bus_add_match_async, int(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata));
MOCK_METHOD1(sd_bus_slot_unref, sd_bus_slot*(sd_bus_slot *slot)); MOCK_METHOD1(sd_bus_slot_unref, sd_bus_slot*(sd_bus_slot *slot));
MOCK_METHOD1(sd_bus_new, int(sd_bus **ret)); MOCK_METHOD1(sd_bus_new, int(sd_bus **ret));