refactor: use sd-bus API to get current message

This commit is contained in:
Stanislav Angelovic
2023-02-02 21:51:09 +01:00
committed by Stanislav Angelovič
parent c6afa26541
commit 9412940d1e
18 changed files with 125 additions and 99 deletions

View File

@ -36,6 +36,9 @@
struct sd_bus;
struct sd_event;
namespace sdbus {
class Message;
}
namespace sdbus {
@ -209,6 +212,21 @@ namespace sdbus {
*/
virtual bool processPendingEvent() = 0;
/*!
* @brief Provides access to the currently processed D-Bus message
*
* This method provides access to the currently processed incoming D-Bus message.
* "Currently processed" means that the registered callback handler(s) for that message
* are being invoked. This method is meant to be called from within a callback handler
* (e.g. from a D-Bus signal handler, or async method reply handler, etc.). In such a case it is
* guaranteed to return a valid D-Bus message instance for which the handler is called.
* If called from other contexts/threads, it may return a valid or invalid message, depending
* on whether a message was processed or not at the time of the call.
*
* @return Currently processed D-Bus message
*/
virtual Message getCurrentlyProcessedMessage() const = 0;
/*!
* @brief Sets general method call timeout
*

View File

@ -442,20 +442,19 @@ namespace sdbus {
virtual const std::string& getObjectPath() const = 0;
/*!
* @brief Provides currently processed D-Bus message
* @brief Provides access to the currently processed D-Bus message
*
* This method provides immutable access to the currently processed incoming D-Bus message.
* This method provides access to the currently processed incoming D-Bus message.
* "Currently processed" means that the registered callback handler(s) for that message
* are being invoked. This method is meant to be called from within a callback handler
* (e.g. D-Bus method implementation handler). In such a case it is guaranteed to return
* a valid pointer to the D-Bus message for which the handler is called. If called from other
* contexts/threads, it may return a nonzero pointer or a nullptr, depending on whether a message
* was processed at the time of call or not, but the value is nondereferencable, since the pointed-to
* message may have gone in the meantime.
* (e.g. from a D-Bus signal handler, or async method reply handler, etc.). In such a case it is
* guaranteed to return a valid D-Bus message instance for which the handler is called.
* If called from other contexts/threads, it may return a valid or invalid message, depending
* on whether a message was processed or not at the time of the call.
*
* @return A pointer to the currently processed D-Bus message
* @return Currently processed D-Bus message
*/
virtual const Message* getCurrentlyProcessedMessage() const = 0;
virtual Message getCurrentlyProcessedMessage() const = 0;
};
// Out-of-line member definitions

View File

@ -92,7 +92,7 @@ namespace sdbus {
* The call does not block if the method call has dont-expect-reply flag set. In that case,
* the call returns immediately and the return value is an empty, invalid method reply.
*
* The call blocks otherwise, waiting for the remote peer to send back a reply, or an error,
* The call blocks otherwise, waiting for the remote peer to send back a reply or an error,
* or until the call times out.
*
* While blocking, other concurrent operations (in other threads) on the underlying bus
@ -102,8 +102,8 @@ namespace sdbus {
* callMethod() function overload, which does not block the bus connection, or do the synchronous
* call from another Proxy instance created just before the call and then destroyed (which is
* anyway quite a typical approach in D-Bus implementations). Such proxy instance must have
* its own bus connection. Slim proxies created with `dont_run_event_loop_thread` tag are
* designed for exactly that purpose.
* its own bus connection. So-called light-weight proxies (ones created with `dont_run_event_loop_thread`
* tag are designed for exactly that purpose.
*
* Note: To avoid messing with messages, use API on a higher level of abstraction defined below.
*
@ -125,7 +125,9 @@ namespace sdbus {
* @param[in] timeout Timeout for dbus call in microseconds
* @return Cookie for the the pending asynchronous call
*
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* This is a callback-based way of asynchronously calling a remote D-Bus method.
*
* The call itself is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided async reply handler will get invoked from the context of the bus
* connection I/O event loop thread.
*
@ -141,6 +143,53 @@ namespace sdbus {
template <typename _Rep, typename _Period>
PendingAsyncCall callMethod(const MethodCall& message, async_reply_handler asyncReplyCallback, const std::chrono::duration<_Rep, _Period>& timeout);
/*!
* @brief Calls method on the D-Bus object asynchronously
*
* @param[in] message Message representing an async method call
* @param[in] Tag denoting a std::future-based overload
* @return Future object providing access to the future method reply message
*
* This is a std::future-based way of asynchronously calling a remote D-Bus method.
*
* The call itself is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided future object will be set to contain the reply (or sdbus::Error
* in case the remote method threw an exception).
*
* Note: To avoid messing with messages, use higher-level API defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual std::future<MethodReply> callMethod(const MethodCall& message, with_future_t) = 0;
/*!
* @brief Calls method on the D-Bus object asynchronously, with custom timeout
*
* @param[in] message Message representing an async method call
* @param[in] timeout Method call timeout
* @param[in] Tag denoting a std::future-based overload
* @return Future object providing access to the future method reply message
*
* This is a std::future-based way of asynchronously calling a remote D-Bus method.
*
* The call itself is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided future object will be set to contain the reply (or sdbus::Error
* in case the remote method threw an exception, or the call timed out).
*
* Note: To avoid messing with messages, use higher-level API defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual std::future<MethodReply> callMethod(const MethodCall& message, uint64_t timeout, with_future_t) = 0;
/*!
* @copydoc IProxy::callMethod(const MethodCall&,uint64_t,with_future_t)
*/
template <typename _Rep, typename _Period>
std::future<MethodReply> callMethod( const MethodCall& message
, const std::chrono::duration<_Rep, _Period>& timeout
, with_future_t );
/*!
* @brief Registers a handler for the desired signal emitted by the D-Bus object
*
@ -397,47 +446,19 @@ namespace sdbus {
virtual const std::string& getObjectPath() const = 0;
/*!
* @brief Provides currently processed D-Bus message
* @brief Provides access to the currently processed D-Bus message
*
* This method provides immutable access to the currently processed incoming D-Bus message.
* This method provides access to the currently processed incoming D-Bus message.
* "Currently processed" means that the registered callback handler(s) for that message
* are being invoked. This method is meant to be called from within a callback handler
* (e.g. from a D-Bus signal handler, or async method reply handler, etc.). In such a case it is
* guaranteed to return a valid pointer to the D-Bus message for which the handler is called.
* If called from other contexts/threads, it may return a nonzero pointer or a nullptr, depending
* on whether a message was processed at the time of call or not, but the value is nondereferencable,
* since the pointed-to message may have gone in the meantime.
* guaranteed to return a valid D-Bus message instance for which the handler is called.
* If called from other contexts/threads, it may return a valid or invalid message, depending
* on whether a message was processed or not at the time of the call.
*
* @return A pointer to the currently processed D-Bus message
* @return Currently processed D-Bus message
*/
virtual const Message* getCurrentlyProcessedMessage() const = 0;
/*!
* @brief Calls method on the D-Bus object asynchronously
*
* @param[in] message Message representing an async method call
* @param[in] asyncReplyCallback Handler for the async reply
* @param[in] timeout Timeout for dbus call in microseconds
* @return Cookie for the the pending asynchronous call
*
* The call is non-blocking. It doesn't wait for the reply. Once the reply arrives,
* the provided async reply handler will get invoked from the context of the connection
* I/O event loop thread.
*
* Note: To avoid messing with messages, use higher-level API defined below.
*
* @throws sdbus::Error in case of failure
*/
virtual std::future<MethodReply> callMethod(const MethodCall& message, with_future_t) = 0;
virtual std::future<MethodReply> callMethod(const MethodCall& message, uint64_t timeout, with_future_t) = 0;
/*!
* @copydoc IProxy::callMethod(const MethodCall&,uint64_t,with_future_t)
*/
template <typename _Rep, typename _Period>
std::future<MethodReply> callMethod( const MethodCall& message
, const std::chrono::duration<_Rep, _Period>& timeout
, with_future_t );
virtual Message getCurrentlyProcessedMessage() const = 0;
};
/********************************************//**

View File

@ -70,13 +70,19 @@ namespace sdbus {
* Serialization and deserialization functions are provided for types supported
* by D-Bus.
*
* You don't need to work with this class directly if you use high-level APIs
* of @c IObject and @c IProxy.
* You mostly don't need to work with this class directly if you use high-level
* APIs of @c IObject and @c IProxy.
*
***********************************************/
class [[nodiscard]] Message
{
public:
Message(const Message&) noexcept;
Message& operator=(const Message&) noexcept;
Message(Message&& other) noexcept;
Message& operator=(Message&& other) noexcept;
~Message();
Message& operator<<(bool item);
Message& operator<<(int16_t item);
Message& operator<<(int32_t item);
@ -222,13 +228,6 @@ namespace sdbus {
Message(void *msg, internal::ISdBus* sdbus) noexcept;
Message(void *msg, internal::ISdBus* sdbus, adopt_message_t) noexcept;
Message(const Message&) noexcept;
Message& operator=(const Message&) noexcept;
Message(Message&& other) noexcept;
Message& operator=(Message&& other) noexcept;
~Message();
friend Factory;
protected:

View File

@ -774,6 +774,13 @@ bool Connection::arePendingMessagesInReadQueue() const
return readQueueSize > 0;
}
Message Connection::getCurrentlyProcessedMessage() const
{
auto* sdbusMsg = sdbus_->sd_bus_get_current_message(bus_.get());
return Message::Factory::create<Message>(sdbusMsg, sdbus_.get());
}
std::string Connection::composeSignalMatchFilter( const std::string &sender
, const std::string &objectPath
, const std::string &interfaceName

View File

@ -86,6 +86,7 @@ namespace sdbus::internal {
void leaveEventLoop() override;
PollData getEventLoopPollData() const override;
bool processPendingEvent() override;
Message getCurrentlyProcessedMessage() const override;
void addObjectManager(const std::string& objectPath) override;
void addObjectManager(const std::string& objectPath, floating_slot_t) override;

View File

@ -87,6 +87,7 @@ namespace sdbus::internal {
virtual int sd_bus_start(sd_bus *bus) = 0;
virtual int sd_bus_process(sd_bus *bus, sd_bus_message **r) = 0;
virtual sd_bus_message* sd_bus_get_current_message(sd_bus *bus) = 0;
virtual int sd_bus_get_poll_data(sd_bus *bus, PollData* data) = 0;
virtual int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) = 0;
virtual int sd_bus_flush(sd_bus *bus) = 0;

View File

@ -240,9 +240,9 @@ const std::string& Object::getObjectPath() const
return objectPath_;
}
const Message* Object::getCurrentlyProcessedMessage() const
Message Object::getCurrentlyProcessedMessage() const
{
return m_CurrentlyProcessedMessage.load(std::memory_order_relaxed);
return connection_.getCurrentlyProcessedMessage();
}
Object::InterfaceData& Object::getInterface(const std::string& interfaceName)
@ -338,12 +338,6 @@ int Object::sdbus_method_callback(sd_bus_message *sdbusMessage, void *userData,
auto message = Message::Factory::create<MethodCall>(sdbusMessage, &object.connection_.getSdBusInterface());
object.m_CurrentlyProcessedMessage.store(&message, std::memory_order_relaxed);
SCOPE_EXIT
{
object.m_CurrentlyProcessedMessage.store(nullptr, std::memory_order_relaxed);
};
auto& callback = interfaceData->methods[message.getMemberName()].callback;
assert(callback);
@ -396,12 +390,6 @@ int Object::sdbus_property_set_callback( sd_bus */*bus*/
auto value = Message::Factory::create<PropertySetCall>(sdbusValue, &object.connection_.getSdBusInterface());
object.m_CurrentlyProcessedMessage.store(&value, std::memory_order_relaxed);
SCOPE_EXIT
{
object.m_CurrentlyProcessedMessage.store(nullptr, std::memory_order_relaxed);
};
auto ok = invokeHandlerAndCatchErrors([&](){ callback(value); }, retError);
return ok ? 1 : -1;

View File

@ -35,7 +35,6 @@
#include <vector>
#include <functional>
#include <memory>
#include <atomic>
#include <cassert>
namespace sdbus::internal {
@ -103,7 +102,7 @@ namespace sdbus::internal {
sdbus::IConnection& getConnection() const override;
const std::string& getObjectPath() const override;
const Message* getCurrentlyProcessedMessage() const override;
Message getCurrentlyProcessedMessage() const override;
private:
using InterfaceName = std::string;
@ -178,7 +177,6 @@ namespace sdbus::internal {
std::string objectPath_;
std::map<InterfaceName, InterfaceData> interfaces_;
Slot objectManagerSlot_;
std::atomic<const Message*> m_CurrentlyProcessedMessage{nullptr};
};
}

View File

@ -200,9 +200,9 @@ const std::string& Proxy::getObjectPath() const
return objectPath_;
}
const Message* Proxy::getCurrentlyProcessedMessage() const
Message Proxy::getCurrentlyProcessedMessage() const
{
return m_CurrentlyProcessedMessage.load(std::memory_order_relaxed);
return connection_->getCurrentlyProcessedMessage();
}
int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userData, sd_bus_error *retError)
@ -225,12 +225,6 @@ int Proxy::sdbus_async_reply_handler(sd_bus_message *sdbusMessage, void *userDat
auto message = Message::Factory::create<MethodReply>(sdbusMessage, &proxy.connection_->getSdBusInterface());
proxy.m_CurrentlyProcessedMessage.store(&message, std::memory_order_relaxed);
SCOPE_EXIT
{
proxy.m_CurrentlyProcessedMessage.store(nullptr, std::memory_order_relaxed);
};
auto ok = invokeHandlerAndCatchErrors([&]
{
const auto* error = sd_bus_message_get_error(sdbusMessage);
@ -256,12 +250,6 @@ int Proxy::sdbus_signal_handler(sd_bus_message *sdbusMessage, void *userData, sd
auto message = Message::Factory::create<Signal>(sdbusMessage, &signalData->proxy.connection_->getSdBusInterface());
signalData->proxy.m_CurrentlyProcessedMessage.store(&message, std::memory_order_relaxed);
SCOPE_EXIT
{
signalData->proxy.m_CurrentlyProcessedMessage.store(nullptr, std::memory_order_relaxed);
};
auto ok = invokeHandlerAndCatchErrors([&](){ signalData->callback(message); }, retError);
return ok ? 0 : -1;

View File

@ -35,7 +35,6 @@
#include <map>
#include <deque>
#include <mutex>
#include <atomic>
#include <condition_variable>
namespace sdbus::internal {
@ -72,7 +71,7 @@ namespace sdbus::internal {
sdbus::IConnection& getConnection() const override;
const std::string& getObjectPath() const override;
const Message* getCurrentlyProcessedMessage() const override;
Message getCurrentlyProcessedMessage() const override;
private:
void registerSignalHandlers(sdbus::internal::IConnection& connection);
@ -178,8 +177,6 @@ namespace sdbus::internal {
std::mutex mutex_;
std::deque<std::shared_ptr<CallData>> calls_;
} pendingAsyncCalls_;
std::atomic<const Message*> m_CurrentlyProcessedMessage{nullptr};
};
}

View File

@ -382,6 +382,11 @@ int SdBus::sd_bus_process(sd_bus *bus, sd_bus_message **r)
return ::sd_bus_process(bus, r);
}
sd_bus_message* SdBus::sd_bus_get_current_message(sd_bus *bus)
{
return ::sd_bus_get_current_message(bus);
}
int SdBus::sd_bus_get_poll_data(sd_bus *bus, PollData* data)
{
std::lock_guard lock(sdbusMutex_);

View File

@ -79,6 +79,7 @@ public:
virtual int sd_bus_start(sd_bus *bus) override;
virtual int sd_bus_process(sd_bus *bus, sd_bus_message **r) override;
virtual sd_bus_message* sd_bus_get_current_message(sd_bus *bus) override;
virtual int sd_bus_get_poll_data(sd_bus *bus, PollData* data) override;
virtual int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) override;
virtual int sd_bus_flush(sd_bus *bus) override;

View File

@ -122,7 +122,7 @@ uint32_t TestAdaptor::doOperation(const uint32_t& param)
{
std::this_thread::sleep_for(std::chrono::milliseconds(param));
m_methodCallMsg = getObject().getCurrentlyProcessedMessage();
m_methodCallMsg = std::make_unique<const Message>(getObject().getCurrentlyProcessedMessage());
m_methodCallMemberName = m_methodCallMsg->getMemberName();
return param;
@ -130,7 +130,7 @@ uint32_t TestAdaptor::doOperation(const uint32_t& param)
void TestAdaptor::doOperationAsync(sdbus::Result<uint32_t>&& result, uint32_t param)
{
m_methodCallMsg = getObject().getCurrentlyProcessedMessage();
m_methodCallMsg = std::make_unique<const Message>(getObject().getCurrentlyProcessedMessage());
m_methodCallMemberName = m_methodCallMsg->getMemberName();
if (param == 0)
@ -234,7 +234,7 @@ bool TestAdaptor::blocking()
void TestAdaptor::blocking(const bool& value)
{
m_propertySetMsg = getObject().getCurrentlyProcessedMessage();
m_propertySetMsg = std::make_unique<const Message>(getObject().getCurrentlyProcessedMessage());
m_propertySetSender = m_propertySetMsg->getSender();
m_blocking = value;

View File

@ -33,6 +33,7 @@
#include <chrono>
#include <atomic>
#include <utility>
#include <memory>
namespace sdbus { namespace test {
@ -103,9 +104,9 @@ public: // for tests
mutable double m_multiplyResult{};
mutable std::atomic<bool> m_wasThrowErrorCalled{false};
const Message* m_methodCallMsg{};
std::unique_ptr<const Message> m_methodCallMsg;
std::string m_methodCallMemberName;
const Message* m_propertySetMsg{};
std::unique_ptr<const Message> m_propertySetMsg;
std::string m_propertySetSender;
};

View File

@ -61,7 +61,7 @@ TestProxy::~TestProxy()
void TestProxy::onSimpleSignal()
{
m_signalMsg = getProxy().getCurrentlyProcessedMessage();
m_signalMsg = std::make_unique<sdbus::Message>(getProxy().getCurrentlyProcessedMessage());
m_signalMemberName = m_signalMsg->getMemberName();
m_gotSimpleSignal = true;

View File

@ -33,6 +33,7 @@
#include <chrono>
#include <atomic>
#include <future>
#include <memory>
namespace sdbus { namespace test {
@ -118,7 +119,7 @@ public: // for tests
std::function<void(uint32_t res, const sdbus::Error* err)> m_DoOperationClientSideAsyncReplyHandler;
std::function<void(const std::string&, const std::map<std::string, sdbus::Variant>&, const std::vector<std::string>&)> m_onPropertiesChangedHandler;
const Message* m_signalMsg{};
std::unique_ptr<const Message> m_signalMsg;
std::string m_signalMemberName;
};

View File

@ -78,6 +78,7 @@ public:
MOCK_METHOD1(sd_bus_start, int(sd_bus *bus));
MOCK_METHOD2(sd_bus_process, int(sd_bus *bus, sd_bus_message **r));
MOCK_METHOD1(sd_bus_get_current_message, sd_bus_message*(sd_bus *bus));
MOCK_METHOD2(sd_bus_get_poll_data, int(sd_bus *bus, PollData* data));
MOCK_METHOD2(sd_bus_get_n_queued_read, int(sd_bus *bus, uint64_t *ret));
MOCK_METHOD1(sd_bus_flush, int(sd_bus *bus));