diff --git a/src/Connection.cpp b/src/Connection.cpp index 8a5f0cf..7c667b5 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -651,7 +651,8 @@ sd_bus_message* Connection::callMethod(sd_bus_message* sdbusMsg, uint64_t timeou SDBUS_THROW_ERROR_IF(r < 0, "Failed to call method", -r); - // Wake up event loop to process messages that may have arrived in the meantime... + // Wake up event loop to process messages that may have arrived in the meantime, + // or to dispatch the outbound message that hasn't yet been fully sent out. wakeUpEventLoopIfMessagesInQueue(); return sdbusReply; @@ -669,7 +670,8 @@ Slot Connection::callMethodAsync(sd_bus_message* sdbusMsg, sd_bus_message_handle // An event loop may wait in poll with timeout `t1', while in another thread an async call is made with // timeout `t2'. If `t2' < `t1', then we have to wake up the event loop thread to update its poll timeout. - if (timeoutAfter < timeoutBefore) + // We also have to wake up the event loop to process the messages that may be in the read/write queues. + if (timeoutAfter < timeoutBefore || arePendingMessagesInQueues()) notifyEventLoopToWakeUpFromPoll(); return {slot, [this](void *slot){ sdbus_->sd_bus_slot_unref((sd_bus_slot*)slot); }}; @@ -679,6 +681,9 @@ void Connection::sendMessage(sd_bus_message* sdbusMsg) { auto r = sdbus_->sd_bus_send(nullptr, sdbusMsg, nullptr); + // Wake up event loop to continue dispatching the (fairly large) outbound message that hasn't yet been fully sent + wakeUpEventLoopIfMessagesInQueue(); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to send D-Bus message", -r); } @@ -758,11 +763,15 @@ void Connection::notifyEventLoopToWakeUpFromPoll() void Connection::wakeUpEventLoopIfMessagesInQueue() { - // When doing a sync call, other D-Bus messages may have arrived, waiting in the read queue. + // We need this in two cases: + // 1. When doing a sync call, other D-Bus messages may have arrived, waiting in the read queue. // In case an event loop is inside a poll in another thread, or an external event loop polls in the // same thread but as an unrelated event source, then we need to wake up the poll explicitly so the // event loop 1. processes all messages in the read queue, 2. updates poll timeout before next poll. - if (arePendingMessagesInReadQueue()) + // 2. Additionally, when sending out messages, these may be too long to be sent out entirely within + // the single sd_bus_send() or sd_bus_call_async() call, in which case they are queued in the write + // queue. We need to wake up the event loop to continue sending the message until it's fully sent. + if (arePendingMessagesInQueues()) notifyEventLoopToWakeUpFromPoll(); } @@ -801,6 +810,8 @@ bool Connection::waitForNextEvent() , {loopExitFd_.fd, POLLIN, 0} }; constexpr auto fdsCount = sizeof(fds)/sizeof(fds[0]); + // Are there pending messages in the inbound queue? Then sd-bus will set timeout to 0, so poll() will wake up right away. + // Are there pending messages in the outbound queue? Then sd-bus will add POLLOUT to events, so poll() will wake up right away. auto timeout = sdbusPollData.getPollTimeout(); auto r = poll(fds, fdsCount, timeout); @@ -814,7 +825,7 @@ bool Connection::waitForNextEvent() { auto cleared = eventFd_.clear(); SDBUS_THROW_ERROR_IF(!cleared, "Failed to read from the event descriptor", -errno); - // Go poll() again, but with up-to-date timeout (which will wake poll() up right away if there are messages to process) + // Go poll() again, but with freshly calculated, up-to-date timeout and with up-to-date events to watch return waitForNextEvent(); } // Loop exit notification @@ -828,14 +839,15 @@ bool Connection::waitForNextEvent() return true; } -bool Connection::arePendingMessagesInReadQueue() const +bool Connection::arePendingMessagesInQueues() const { uint64_t readQueueSize{}; + uint64_t writeQueueSize{}; - auto r = sdbus_->sd_bus_get_n_queued_read(bus_.get(), &readQueueSize); - SDBUS_THROW_ERROR_IF(r < 0, "Failed to get number of pending messages in read queue", -r); + auto r = sdbus_->sd_bus_get_n_queued(bus_.get(), &readQueueSize, &writeQueueSize); + SDBUS_THROW_ERROR_IF(r < 0, "Failed to get number of pending messages in sd-bus queues", -r); - return readQueueSize > 0; + return readQueueSize > 0 || writeQueueSize > 0; } Message Connection::getCurrentlyProcessedMessage() const diff --git a/src/Connection.h b/src/Connection.h index b68986f..ed28bf4 100644 --- a/src/Connection.h +++ b/src/Connection.h @@ -187,7 +187,7 @@ namespace sdbus::internal { void finishHandshake(sd_bus* bus); bool waitForNextEvent(); - [[nodiscard]] bool arePendingMessagesInReadQueue() const; + [[nodiscard]] bool arePendingMessagesInQueues() const; void notifyEventLoopToExit(); void notifyEventLoopToWakeUpFromPoll(); diff --git a/src/ISdBus.h b/src/ISdBus.h index 61d0ed9..c48b0f9 100644 --- a/src/ISdBus.h +++ b/src/ISdBus.h @@ -90,7 +90,7 @@ namespace sdbus::internal { virtual int sd_bus_process(sd_bus *bus, sd_bus_message **r) = 0; virtual sd_bus_message* sd_bus_get_current_message(sd_bus *bus) = 0; virtual int sd_bus_get_poll_data(sd_bus *bus, PollData* data) = 0; - virtual int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) = 0; + virtual int sd_bus_get_n_queued(sd_bus *bus, uint64_t *read, uint64_t* write) = 0; virtual int sd_bus_flush(sd_bus *bus) = 0; virtual sd_bus *sd_bus_flush_close_unref(sd_bus *bus) = 0; virtual sd_bus *sd_bus_close_unref(sd_bus *bus) = 0; diff --git a/src/SdBus.cpp b/src/SdBus.cpp index f42760b..7774212 100644 --- a/src/SdBus.cpp +++ b/src/SdBus.cpp @@ -27,6 +27,7 @@ #include "SdBus.h" #include +#include namespace sdbus::internal { @@ -52,11 +53,6 @@ int SdBus::sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *cookie) if (r < 0) return r; - // Make sure long messages are not only stored in outgoing queues but also really sent out - // TODO: This is a workaround. We should not block here until everything is physically sent out. - // Refactor: if sd_bus_get_n_queued_write() > 0 then wake up event loop through event fd - ::sd_bus_flush(bus != nullptr ? bus : ::sd_bus_message_get_bus(m)); - return r; } @@ -75,11 +71,6 @@ int SdBus::sd_bus_call_async(sd_bus *bus, sd_bus_slot **slot, sd_bus_message *m, if (r < 0) return r; - // Make sure long messages are not only stored in outgoing queues but also really sent out - // TODO: This is a workaround. We should not block here until everything is physically sent out. - // Refactor: if sd_bus_get_n_queued_write() > 0 then wake up event loop through event fd - ::sd_bus_flush(bus != nullptr ? bus : ::sd_bus_message_get_bus(m)); - return r; } @@ -413,11 +404,14 @@ int SdBus::sd_bus_get_poll_data(sd_bus *bus, PollData* data) return r; } -int SdBus::sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) +int SdBus::sd_bus_get_n_queued(sd_bus *bus, uint64_t *read, uint64_t* write) { std::lock_guard lock(sdbusMutex_); - return ::sd_bus_get_n_queued_read(bus, ret); + auto r1 = ::sd_bus_get_n_queued_read(bus, read); + auto r2 = ::sd_bus_get_n_queued_write(bus, write); + + return std::min(r1, r2); } int SdBus::sd_bus_flush(sd_bus *bus) diff --git a/src/SdBus.h b/src/SdBus.h index 87c459e..7b8127d 100644 --- a/src/SdBus.h +++ b/src/SdBus.h @@ -82,7 +82,7 @@ public: virtual int sd_bus_process(sd_bus *bus, sd_bus_message **r) override; virtual sd_bus_message* sd_bus_get_current_message(sd_bus *bus) override; virtual int sd_bus_get_poll_data(sd_bus *bus, PollData* data) override; - virtual int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret) override; + virtual int sd_bus_get_n_queued(sd_bus *bus, uint64_t *read, uint64_t* write) override; virtual int sd_bus_flush(sd_bus *bus) override; virtual sd_bus *sd_bus_flush_close_unref(sd_bus *bus) override; virtual sd_bus *sd_bus_close_unref(sd_bus *bus) override; diff --git a/tests/integrationtests/DBusAsyncMethodsTests.cpp b/tests/integrationtests/DBusAsyncMethodsTests.cpp index 0012da1..abbc1b5 100644 --- a/tests/integrationtests/DBusAsyncMethodsTests.cpp +++ b/tests/integrationtests/DBusAsyncMethodsTests.cpp @@ -87,7 +87,7 @@ TYPED_TEST(AsyncSdbusTestObject, ThrowsTimeoutErrorWhenClientSideAsyncMethodTime } } -TYPED_TEST(AsyncSdbusTestObject, RunsServerSideAsynchoronousMethodAsynchronously) +TYPED_TEST(AsyncSdbusTestObject, RunsServerSideAsynchronousMethodAsynchronously) { // Yeah, this is kinda timing-dependent test, but times should be safe... std::mutex mtx; @@ -142,6 +142,19 @@ TYPED_TEST(AsyncSdbusTestObject, HandlesCorrectlyABulkOfParallelServerSideAsyncM ASSERT_THAT(resultCount, Eq(1500)); } +TYPED_TEST(AsyncSdbusTestObject, RunsServerSideAsynchronousMethodWithLargeMessage) +{ + std::map largeMap; + for (int32_t i = 0; i < 40'000; ++i) + largeMap.emplace(i, "This is string nr. " + std::to_string(i+1)); + + auto result1 = this->m_proxy->doOperationAsyncWithLargeData(0, largeMap); // Sends large map back in the context of the callback (event loop thread) + auto result2 = this->m_proxy->doOperationAsyncWithLargeData(500, largeMap); // Sends large map back outside the context of the event loop thread + + ASSERT_THAT(result1, Eq(largeMap)); + ASSERT_THAT(result2, Eq(largeMap)); +} + TYPED_TEST(AsyncSdbusTestObject, InvokesMethodAsynchronouslyOnClientSide) { std::promise promise; @@ -177,6 +190,17 @@ TYPED_TEST(AsyncSdbusTestObject, InvokesMethodAsynchronouslyOnClientSideWithFutu ASSERT_THAT(returnValue, Eq(100)); } +TYPED_TEST(AsyncSdbusTestObject, InvokesMethodWithLargeDataAsynchronouslyOnClientSideWithFuture) +{ + std::map largeMap; + for (int32_t i = 0; i < 40'000; ++i) + largeMap.emplace(i, "This is string nr. " + std::to_string(i+1)); + + auto future = this->m_proxy->doOperationWithLargeDataClientSideAsync(largeMap, sdbus::with_future); + + ASSERT_THAT(future.get(), Eq(largeMap)); +} + TYPED_TEST(AsyncSdbusTestObject, AnswersThatAsyncCallIsPendingIfItIsInProgress) { this->m_proxy->installDoOperationClientSideAsyncReplyHandler([&](uint32_t /*res*/, std::optional /*err*/){}); diff --git a/tests/integrationtests/DBusGeneralTests.cpp b/tests/integrationtests/DBusGeneralTests.cpp index 38bccee..52df2ef 100644 --- a/tests/integrationtests/DBusGeneralTests.cpp +++ b/tests/integrationtests/DBusGeneralTests.cpp @@ -52,7 +52,7 @@ using ADirectConnection = TestFixtureWithDirectConnection; /* -- TEST CASES -- */ /*-------------------------------------*/ -TEST(AdaptorAndProxy, CanBeConstructedSuccesfully) +TEST(AdaptorAndProxy, CanBeConstructedSuccessfully) { auto connection = sdbus::createBusConnection(); connection->requestName(SERVICE_NAME); diff --git a/tests/integrationtests/DBusMethodsTests.cpp b/tests/integrationtests/DBusMethodsTests.cpp index 98ea768..f11a313 100644 --- a/tests/integrationtests/DBusMethodsTests.cpp +++ b/tests/integrationtests/DBusMethodsTests.cpp @@ -54,12 +54,12 @@ using namespace sdbus::test; /* -- TEST CASES -- */ /*-------------------------------------*/ -TYPED_TEST(SdbusTestObject, CallsEmptyMethodSuccesfully) +TYPED_TEST(SdbusTestObject, CallsEmptyMethodSuccessfully) { ASSERT_NO_THROW(this->m_proxy->noArgNoReturn()); } -TYPED_TEST(SdbusTestObject, CallsMethodsWithBaseTypesSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodsWithBaseTypesSuccessfully) { auto resInt = this->m_proxy->getInt(); ASSERT_THAT(resInt, Eq(INT32_VALUE)); @@ -68,14 +68,14 @@ TYPED_TEST(SdbusTestObject, CallsMethodsWithBaseTypesSuccesfully) ASSERT_THAT(multiplyRes, Eq(INT64_VALUE * DOUBLE_VALUE)); } -TYPED_TEST(SdbusTestObject, CallsMethodsWithTuplesSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodsWithTuplesSuccessfully) { auto resTuple = this->m_proxy->getTuple(); ASSERT_THAT(std::get<0>(resTuple), Eq(UINT32_VALUE)); ASSERT_THAT(std::get<1>(resTuple), Eq(STRING_VALUE)); } -TYPED_TEST(SdbusTestObject, CallsMethodsWithStructSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodsWithStructSuccessfully) { sdbus::Struct> a{}; auto vectorRes = this->m_proxy->getInts16FromStruct(a); @@ -88,21 +88,21 @@ TYPED_TEST(SdbusTestObject, CallsMethodsWithStructSuccesfully) ASSERT_THAT(vectorRes, Eq(std::vector{INT16_VALUE, INT16_VALUE, -INT16_VALUE})); } -TYPED_TEST(SdbusTestObject, CallsMethodWithVariantSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithVariantSuccessfully) { sdbus::Variant v{DOUBLE_VALUE}; sdbus::Variant variantRes = this->m_proxy->processVariant(v); ASSERT_THAT(variantRes.get(), Eq(static_cast(DOUBLE_VALUE))); } -TYPED_TEST(SdbusTestObject, CallsMethodWithStdVariantSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithStdVariantSuccessfully) { std::variant v{DOUBLE_VALUE}; auto variantRes = this->m_proxy->processVariant(v); ASSERT_THAT(std::get(variantRes), Eq(static_cast(DOUBLE_VALUE))); } -TYPED_TEST(SdbusTestObject, CallsMethodWithStructVariantsAndGetMapSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithStructVariantsAndGetMapSuccessfully) { std::vector x{-2, 0, 2}; sdbus::Struct y{false, true}; @@ -116,44 +116,44 @@ TYPED_TEST(SdbusTestObject, CallsMethodWithStructVariantsAndGetMapSuccesfully) ASSERT_THAT(mapOfVariants[2].get(), Eq(res[2].get())); } -TYPED_TEST(SdbusTestObject, CallsMethodWithStructInStructSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithStructInStructSuccessfully) { auto val = this->m_proxy->getStructInStruct(); ASSERT_THAT(val.template get<0>(), Eq(STRING_VALUE)); ASSERT_THAT(std::get<0>(std::get<1>(val))[INT32_VALUE], Eq(INT32_VALUE)); } -TYPED_TEST(SdbusTestObject, CallsMethodWithTwoStructsSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithTwoStructsSuccessfully) { auto val = this->m_proxy->sumStructItems({1, 2}, {3, 4}); ASSERT_THAT(val, Eq(1 + 2 + 3 + 4)); } -TYPED_TEST(SdbusTestObject, CallsMethodWithTwoVectorsSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithTwoVectorsSuccessfully) { auto val = this->m_proxy->sumArrayItems({1, 7}, {2, 3, 4}); ASSERT_THAT(val, Eq(1 + 7 + 2 + 3 + 4)); } -TYPED_TEST(SdbusTestObject, CallsMethodWithSignatureSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithSignatureSuccessfully) { auto resSignature = this->m_proxy->getSignature(); ASSERT_THAT(resSignature, Eq(SIGNATURE_VALUE)); } -TYPED_TEST(SdbusTestObject, CallsMethodWithObjectPathSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithObjectPathSuccessfully) { auto resObjectPath = this->m_proxy->getObjPath(); ASSERT_THAT(resObjectPath, Eq(OBJECT_PATH_VALUE)); } -TYPED_TEST(SdbusTestObject, CallsMethodWithUnixFdSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithUnixFdSuccessfully) { auto resUnixFd = this->m_proxy->getUnixFd(); ASSERT_THAT(resUnixFd.get(), Gt(UNIX_FD_VALUE)); } -TYPED_TEST(SdbusTestObject, CallsMethodWithComplexTypeSuccesfully) +TYPED_TEST(SdbusTestObject, CallsMethodWithComplexTypeSuccessfully) { auto resComplex = this->m_proxy->getComplex(); ASSERT_THAT(resComplex.count(0), Eq(1)); @@ -265,6 +265,30 @@ TYPED_TEST(SdbusTestObject, CanAccessAssociatedMethodCallMessageInAsyncMethodCal ASSERT_THAT(this->m_adaptor->m_methodName, Eq("doOperationAsync")); } +TYPED_TEST(SdbusTestObject, CallsMethodWithLargeArgument) +{ + std::map collection; + //std::size_t totalSize{}; + for (int i = 0; i < 400'000; i++) + { + collection[i] = ("This is a string of fifty characters. This is a string of fifty " + std::to_string(i)); + //totalSize += sizeof(int) + collection[i].size(); + } + //printf("Sending large message with collection of size %zu bytes\n", totalSize); + this->m_proxy->sendLargeMessage(collection); +} + +TYPED_TEST(SdbusTestObject, CanSendCallsAndReceiveRepliesWithLargeData) +{ + std::map largeMap; + for (int32_t i = 0; i < 40'000; ++i) + largeMap.emplace(i, "This is string nr. " + std::to_string(i+1)); + + auto returnedMap = this->m_proxy->doOperationWithLargeData(largeMap); + + ASSERT_THAT(returnedMap, Eq(largeMap)); +} + #if LIBSYSTEMD_VERSION>=240 TYPED_TEST(SdbusTestObject, CanSetGeneralMethodTimeoutWithLibsystemdVersionGreaterThan239) { diff --git a/tests/integrationtests/DBusPropertiesTests.cpp b/tests/integrationtests/DBusPropertiesTests.cpp index f65d191..78cb37d 100644 --- a/tests/integrationtests/DBusPropertiesTests.cpp +++ b/tests/integrationtests/DBusPropertiesTests.cpp @@ -55,7 +55,7 @@ using namespace sdbus::test; /* -- TEST CASES -- */ /*-------------------------------------*/ -TYPED_TEST(SdbusTestObject, ReadsReadOnlyPropertySuccesfully) +TYPED_TEST(SdbusTestObject, ReadsReadOnlyPropertySuccessfully) { ASSERT_THAT(this->m_proxy->state(), Eq(DEFAULT_STATE_VALUE)); } @@ -65,7 +65,7 @@ TYPED_TEST(SdbusTestObject, FailsWritingToReadOnlyProperty) ASSERT_THROW(this->m_proxy->setStateProperty("new_value"), sdbus::Error); } -TYPED_TEST(SdbusTestObject, WritesAndReadsReadWritePropertySuccesfully) +TYPED_TEST(SdbusTestObject, WritesAndReadsReadWritePropertySuccessfully) { uint32_t newActionValue = 5678; diff --git a/tests/integrationtests/DBusSignalsTests.cpp b/tests/integrationtests/DBusSignalsTests.cpp index 463bdcd..4f8fcd0 100644 --- a/tests/integrationtests/DBusSignalsTests.cpp +++ b/tests/integrationtests/DBusSignalsTests.cpp @@ -48,14 +48,14 @@ using namespace sdbus::test; /* -- TEST CASES -- */ /*-------------------------------------*/ -TYPED_TEST(SdbusTestObject, EmitsSimpleSignalSuccesfully) +TYPED_TEST(SdbusTestObject, EmitsSimpleSignalSuccessfully) { this->m_adaptor->emitSimpleSignal(); ASSERT_TRUE(waitUntil(this->m_proxy->m_gotSimpleSignal)); } -TYPED_TEST(SdbusTestObject, EmitsSimpleSignalToMultipleProxiesSuccesfully) +TYPED_TEST(SdbusTestObject, EmitsSimpleSignalToMultipleProxiesSuccessfully) { auto proxy1 = std::make_unique(*this->s_adaptorConnection, SERVICE_NAME, OBJECT_PATH); auto proxy2 = std::make_unique(*this->s_adaptorConnection, SERVICE_NAME, OBJECT_PATH); @@ -78,7 +78,7 @@ TYPED_TEST(SdbusTestObject, ProxyDoesNotReceiveSignalFromOtherBusName) ASSERT_FALSE(waitUntil(this->m_proxy->m_gotSimpleSignal, 1s)); } -TYPED_TEST(SdbusTestObject, EmitsSignalWithMapSuccesfully) +TYPED_TEST(SdbusTestObject, EmitsSignalWithMapSuccessfully) { this->m_adaptor->emitSignalWithMap({{0, "zero"}, {1, "one"}}); @@ -87,19 +87,19 @@ TYPED_TEST(SdbusTestObject, EmitsSignalWithMapSuccesfully) ASSERT_THAT(this->m_proxy->m_mapFromSignal[1], Eq("one")); } -TYPED_TEST(SdbusTestObject, EmitsSignalWithLargeMapSuccesfully) +TYPED_TEST(SdbusTestObject, EmitsSignalWithLargeMapSuccessfully) { - std::map largeMap; - for (int32_t i = 0; i < 20'000; ++i) - largeMap.emplace(i, "This is string nr. " + std::to_string(i+1)); - this->m_adaptor->emitSignalWithMap(largeMap); + std::map largeMap; + for (int32_t i = 0; i < 20'000; ++i) + largeMap.emplace(i, "This is string nr. " + std::to_string(i+1)); + this->m_adaptor->emitSignalWithMap(largeMap); - ASSERT_TRUE(waitUntil(this->m_proxy->m_gotSignalWithMap)); - ASSERT_THAT(this->m_proxy->m_mapFromSignal[0], Eq("This is string nr. 1")); - ASSERT_THAT(this->m_proxy->m_mapFromSignal[1], Eq("This is string nr. 2")); + ASSERT_TRUE(waitUntil(this->m_proxy->m_gotSignalWithMap)); + ASSERT_THAT(this->m_proxy->m_mapFromSignal[0], Eq("This is string nr. 1")); + ASSERT_THAT(this->m_proxy->m_mapFromSignal[1], Eq("This is string nr. 2")); } -TYPED_TEST(SdbusTestObject, EmitsSignalWithVariantSuccesfully) +TYPED_TEST(SdbusTestObject, EmitsSignalWithVariantSuccessfully) { double d = 3.14; this->m_adaptor->emitSignalWithVariant(sdbus::Variant{d}); @@ -108,7 +108,7 @@ TYPED_TEST(SdbusTestObject, EmitsSignalWithVariantSuccesfully) ASSERT_THAT(this->m_proxy->m_variantFromSignal, DoubleEq(d)); } -TYPED_TEST(SdbusTestObject, EmitsSignalWithoutRegistrationSuccesfully) +TYPED_TEST(SdbusTestObject, EmitsSignalWithoutRegistrationSuccessfully) { this->m_adaptor->emitSignalWithoutRegistration({"platform", sdbus::Signature{"av"}}); diff --git a/tests/integrationtests/TestAdaptor.cpp b/tests/integrationtests/TestAdaptor.cpp index ea66296..dd8acf9 100644 --- a/tests/integrationtests/TestAdaptor.cpp +++ b/tests/integrationtests/TestAdaptor.cpp @@ -128,6 +128,14 @@ uint32_t TestAdaptor::doOperation(const uint32_t& param) return param; } +std::map TestAdaptor::doOperationWithLargeData(const std::map& largeParam) +{ + m_methodCallMsg = std::make_unique(getObject().getCurrentlyProcessedMessage()); + m_methodName = m_methodCallMsg->getMemberName(); + + return largeParam; +} + void TestAdaptor::doOperationAsync(sdbus::Result&& result, uint32_t param) { m_methodCallMsg = std::make_unique(getObject().getCurrentlyProcessedMessage()); @@ -149,6 +157,27 @@ void TestAdaptor::doOperationAsync(sdbus::Result&& result, uint32_t pa } } +void TestAdaptor::doOperationAsyncWithLargeData(sdbus::Result>&& result, uint32_t param, const std::map& largeMap) +{ + m_methodCallMsg = std::make_unique(getObject().getCurrentlyProcessedMessage()); + m_methodName = m_methodCallMsg->getMemberName(); + + if (param == 0) + { + // Don't sleep and return the result from this thread + result.returnResults(largeMap); + } + else + { + // Process asynchronously in another thread and return the result from there + std::thread([param, largeMap, result = std::move(result)]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(param)); + result.returnResults(largeMap); + }).detach(); + } +} + sdbus::Signature TestAdaptor::getSignature() { return SIGNATURE_VALUE; @@ -212,6 +241,11 @@ void TestAdaptor::emitTwoSimpleSignals() emitSignalWithMap({}); } +void TestAdaptor::sendLargeMessage(const std::map& /*collection*/) +{ + //printf("Adaptor: got collection with %zu items", collection.size()); +} + std::string TestAdaptor::state() { return m_state; diff --git a/tests/integrationtests/TestAdaptor.h b/tests/integrationtests/TestAdaptor.h index c47f747..b8b95a5 100644 --- a/tests/integrationtests/TestAdaptor.h +++ b/tests/integrationtests/TestAdaptor.h @@ -73,7 +73,9 @@ protected: int32_t sumStructItems(const sdbus::Struct& arg0, const sdbus::Struct& arg1) override; uint32_t sumArrayItems(const std::vector& arg0, const std::array& arg1) override; uint32_t doOperation(const uint32_t& arg0) override; + std::map doOperationWithLargeData(const std::map& largeParam) override; void doOperationAsync(sdbus::Result&& result, uint32_t arg0) override; + void doOperationAsyncWithLargeData(sdbus::Result>&& result, uint32_t arg0, const std::map& largeParam) override; sdbus::Signature getSignature() override; sdbus::ObjectPath getObjPath() override; sdbus::UnixFd getUnixFd() override; @@ -82,6 +84,7 @@ protected: void throwErrorWithNoReply() override; void doPrivilegedStuff() override; void emitTwoSimpleSignals() override; + void sendLargeMessage(const std::map& collection) override; uint32_t action() override; void action(const uint32_t& value) override; @@ -132,7 +135,9 @@ protected: int32_t sumStructItems(const sdbus::Struct&, const sdbus::Struct&) override { return {}; } uint32_t sumArrayItems(const std::vector&, const std::array&) override { return {}; } uint32_t doOperation(const uint32_t&) override { return {}; } + std::map doOperationWithLargeData(const std::map&) override { return {}; } void doOperationAsync(sdbus::Result&&, uint32_t) override {} + void doOperationAsyncWithLargeData(sdbus::Result>&&, uint32_t, const std::map&) override {} sdbus::Signature getSignature() override { return {}; } sdbus::ObjectPath getObjPath() override { return {}; } sdbus::UnixFd getUnixFd() override { return {}; } @@ -141,6 +146,7 @@ protected: void throwErrorWithNoReply() override {} void doPrivilegedStuff() override {} void emitTwoSimpleSignals() override {} + void sendLargeMessage(const std::map&) override {} uint32_t action() override { return {}; } void action(const uint32_t&) override {} diff --git a/tests/integrationtests/TestProxy.cpp b/tests/integrationtests/TestProxy.cpp index 9248c79..3592f01 100644 --- a/tests/integrationtests/TestProxy.cpp +++ b/tests/integrationtests/TestProxy.cpp @@ -151,6 +151,14 @@ std::future TestProxy::doOperationClientSideAsyncOnBasicAPILevel(ui return getProxy().callMethodAsync(methodCall, sdbus::with_future); } +std::future> TestProxy::doOperationWithLargeDataClientSideAsync(const std::map& largeParam, with_future_t) +{ + return getProxy().callMethodAsync("doOperationWithLargeData") + .onInterface(sdbus::test::INTERFACE_NAME) + .withArguments(largeParam) + .getResultAsFuture>(); +} + void TestProxy::doErroneousOperationClientSideAsync() { getProxy().callMethodAsync("throwError") diff --git a/tests/integrationtests/TestProxy.h b/tests/integrationtests/TestProxy.h index cd1cfca..0ca9f61 100644 --- a/tests/integrationtests/TestProxy.h +++ b/tests/integrationtests/TestProxy.h @@ -98,6 +98,7 @@ public: sdbus::PendingAsyncCall doOperationClientSideAsync(uint32_t param); [[nodiscard]] sdbus::Slot doOperationClientSideAsync(uint32_t param, sdbus::return_slot_t); std::future doOperationClientSideAsync(uint32_t param, with_future_t); + std::future> doOperationWithLargeDataClientSideAsync(const std::map& largeParam, with_future_t); std::future doOperationClientSideAsyncOnBasicAPILevel(uint32_t param); std::future doErroneousOperationClientSideAsync(with_future_t); void doErroneousOperationClientSideAsync(); diff --git a/tests/integrationtests/integrationtests-adaptor.h b/tests/integrationtests/integrationtests-adaptor.h index fbe25e5..d2ec92b 100644 --- a/tests/integrationtests/integrationtests-adaptor.h +++ b/tests/integrationtests/integrationtests-adaptor.h @@ -46,7 +46,9 @@ protected: , sdbus::registerMethod("sumStructItems").withInputParamNames("arg0", "arg1").withOutputParamNames("arg0").implementedAs([this](const sdbus::Struct& arg0, const sdbus::Struct& arg1){ return this->sumStructItems(arg0, arg1); }) , sdbus::registerMethod("sumArrayItems").withInputParamNames("arg0", "arg1").withOutputParamNames("arg0").implementedAs([this](const std::vector& arg0, const std::array& arg1){ return this->sumArrayItems(arg0, arg1); }) , sdbus::registerMethod("doOperation").withInputParamNames("arg0").withOutputParamNames("arg0").implementedAs([this](const uint32_t& arg0){ return this->doOperation(arg0); }) + , sdbus::registerMethod("doOperationWithLargeData").withInputParamNames("largeMap").withOutputParamNames("largeMap").implementedAs([this](const std::map& largeMap){ return this->doOperationWithLargeData(largeMap); }) , sdbus::registerMethod("doOperationAsync").withInputParamNames("arg0").withOutputParamNames("arg0").implementedAs([this](sdbus::Result&& result, uint32_t arg0){ this->doOperationAsync(std::move(result), std::move(arg0)); }) + , sdbus::registerMethod("doOperationAsyncWithLargeData").withInputParamNames("arg0", "largeMap").withOutputParamNames("largeMap").implementedAs([this](sdbus::Result>&& result, uint32_t arg0, const std::map& largeMap){ this->doOperationAsyncWithLargeData(std::move(result), std::move(arg0), largeMap); }) , sdbus::registerMethod("getSignature").withOutputParamNames("arg0").implementedAs([this](){ return this->getSignature(); }) , sdbus::registerMethod("getObjPath").withOutputParamNames("arg0").implementedAs([this](){ return this->getObjPath(); }) , sdbus::registerMethod("getUnixFd").withOutputParamNames("arg0").implementedAs([this](){ return this->getUnixFd(); }) @@ -55,6 +57,7 @@ protected: , sdbus::registerMethod("throwErrorWithNoReply").implementedAs([this](){ return this->throwErrorWithNoReply(); }).withNoReply() , sdbus::registerMethod("doPrivilegedStuff").implementedAs([this](){ return this->doPrivilegedStuff(); }).markAsPrivileged() , sdbus::registerMethod("emitTwoSimpleSignals").implementedAs([this](){ return this->emitTwoSimpleSignals(); }) + , sdbus::registerMethod("sendLargeMessage").implementedAs([this](const std::map& collection){ this->sendLargeMessage(collection); }) , sdbus::registerSignal("simpleSignal").markAsDeprecated() , sdbus::registerSignal("signalWithMap").withParameters>("aMap") , sdbus::registerSignal("signalWithVariant").withParameters("aVariant") @@ -93,7 +96,9 @@ private: virtual int32_t sumStructItems(const sdbus::Struct& arg0, const sdbus::Struct& arg1) = 0; virtual uint32_t sumArrayItems(const std::vector& arg0, const std::array& arg1) = 0; virtual uint32_t doOperation(const uint32_t& arg0) = 0; + virtual std::map doOperationWithLargeData(const std::map& largeParam) = 0; virtual void doOperationAsync(sdbus::Result&& result, uint32_t arg0) = 0; + virtual void doOperationAsyncWithLargeData(sdbus::Result>&& result, uint32_t arg0, const std::map& largeParam) = 0; virtual sdbus::Signature getSignature() = 0; virtual sdbus::ObjectPath getObjPath() = 0; virtual sdbus::UnixFd getUnixFd() = 0; @@ -102,6 +107,7 @@ private: virtual void throwErrorWithNoReply() = 0; virtual void doPrivilegedStuff() = 0; virtual void emitTwoSimpleSignals() = 0; + virtual void sendLargeMessage(const std::map& collection) = 0; private: virtual uint32_t action() = 0; diff --git a/tests/integrationtests/integrationtests-proxy.h b/tests/integrationtests/integrationtests-proxy.h index 4e8ed86..94be845 100644 --- a/tests/integrationtests/integrationtests-proxy.h +++ b/tests/integrationtests/integrationtests-proxy.h @@ -130,6 +130,13 @@ public: return result; } + std::map doOperationWithLargeData(const std::map& largeParam) + { + std::map result; + m_proxy.callMethod("doOperationWithLargeData").onInterface(INTERFACE_NAME).withArguments(largeParam).storeResultsTo(result); + return result; + } + uint32_t doOperationAsync(const uint32_t& arg0) { uint32_t result; @@ -137,6 +144,13 @@ public: return result; } + std::map doOperationAsyncWithLargeData(const uint32_t& arg0, const std::map& largeParam) + { + std::map result; + m_proxy.callMethod("doOperationAsyncWithLargeData").onInterface(INTERFACE_NAME).withArguments(arg0, largeParam).storeResultsTo(result); + return result; + } + sdbus::Signature getSignature() { sdbus::Signature result; @@ -185,6 +199,11 @@ public: m_proxy.callMethod("emitTwoSimpleSignals").onInterface(INTERFACE_NAME); } + void sendLargeMessage(const std::map& collection) + { + m_proxy.callMethod("sendLargeMessage").onInterface(INTERFACE_NAME).withArguments(collection); + } + void unregisterSimpleSignalHandler() { simpleSignalSlot_.reset(); diff --git a/tests/unittests/mocks/SdBusMock.h b/tests/unittests/mocks/SdBusMock.h index f20f4df..9d0fd2b 100644 --- a/tests/unittests/mocks/SdBusMock.h +++ b/tests/unittests/mocks/SdBusMock.h @@ -81,7 +81,7 @@ public: MOCK_METHOD2(sd_bus_process, int(sd_bus *bus, sd_bus_message **r)); MOCK_METHOD1(sd_bus_get_current_message, sd_bus_message*(sd_bus *bus)); MOCK_METHOD2(sd_bus_get_poll_data, int(sd_bus *bus, PollData* data)); - MOCK_METHOD2(sd_bus_get_n_queued_read, int(sd_bus *bus, uint64_t *ret)); + MOCK_METHOD3(sd_bus_get_n_queued, int(sd_bus *bus, uint64_t *read, uint64_t* write)); MOCK_METHOD1(sd_bus_flush, int(sd_bus *bus)); MOCK_METHOD1(sd_bus_flush_close_unref, sd_bus *(sd_bus *bus)); MOCK_METHOD1(sd_bus_close_unref, sd_bus *(sd_bus *bus));