From 092fe77f149815d3dc18e214b288b7185e999ebd Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 8 Jan 2026 11:08:17 +0100 Subject: [PATCH 1/2] DPL: fix warnings --- Framework/Core/include/Framework/ServiceSpec.h | 2 +- Framework/Core/include/Framework/StringHelpers.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/include/Framework/ServiceSpec.h b/Framework/Core/include/Framework/ServiceSpec.h index 5684889e85376..8ac0872edd1bf 100644 --- a/Framework/Core/include/Framework/ServiceSpec.h +++ b/Framework/Core/include/Framework/ServiceSpec.h @@ -31,7 +31,7 @@ struct DeviceSpec; struct ServiceRegistry; struct ServiceRegistryRef; struct DeviceState; -struct ProcessingContext; +class ProcessingContext; class EndOfStreamContext; struct ConfigContext; struct WorkflowSpecNode; diff --git a/Framework/Core/include/Framework/StringHelpers.h b/Framework/Core/include/Framework/StringHelpers.h index 8a2d892062f70..a2ee758435efc 100644 --- a/Framework/Core/include/Framework/StringHelpers.h +++ b/Framework/Core/include/Framework/StringHelpers.h @@ -171,7 +171,7 @@ constexpr auto get_str(const char (&str)[N]) } template -constexpr auto get_size(const char (&str)[N]) +constexpr auto get_size(const char (&)[N]) { return N; } From 1cbc9692adf2581aed8a06abdc51e03ee62a969c Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 8 Jan 2026 11:08:17 +0100 Subject: [PATCH 2/2] DPL: avoid MessageSet abstractions when forwarding This is most likely faster, and it will allow us to move the early forwarding at an earlier stage where the data is not yet in a MessageSet. --- .../include/Framework/DataProcessingHelpers.h | 9 +- Framework/Core/src/DataProcessingDevice.cxx | 2 +- Framework/Core/src/DataProcessingHelpers.cxx | 188 ++++++++++-------- Framework/Core/test/test_ForwardInputs.cxx | 27 ++- 4 files changed, 127 insertions(+), 99 deletions(-) diff --git a/Framework/Core/include/Framework/DataProcessingHelpers.h b/Framework/Core/include/Framework/DataProcessingHelpers.h index 34bb87613d920..a9bd95b69f4c7 100644 --- a/Framework/Core/include/Framework/DataProcessingHelpers.h +++ b/Framework/Core/include/Framework/DataProcessingHelpers.h @@ -16,6 +16,7 @@ #include "Framework/TimesliceIndex.h" #include #include +#include namespace o2::framework { @@ -53,9 +54,11 @@ struct DataProcessingHelpers { /// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies); /// Helper to route messages for forwarding - static std::vector routeForwardedMessages(FairMQDeviceProxy& proxy, - std::vector& currentSetOfInputs, - const bool copyByDefault, bool consume); + static std::vector routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector& currentSetOfInputs, + bool copy, bool consume); + /// Helper to route messages for forwarding + static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span& currentSetOfInputs, std::vector& forwardedParts, + bool copy, bool consume); }; } // namespace o2::framework #endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_ diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 63c333561f24e..3925359b056b2 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -592,7 +592,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, O2_SIGNPOST_ID_GENERATE(sid, forwarding); O2_SIGNPOST_START(forwarding, sid, "forwardInputs", "Starting forwarding for slot %zu with oldestTimeslice %zu %{public}s%{public}s%{public}s", slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); - auto forwardedParts = DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copy, consume); + auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copy, consume); for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) { if (forwardedParts[fi].Size() == 0) { diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 90dcee52d73da..2f7a1f65f3bd3 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -228,102 +228,128 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi } } -auto DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, - std::vector& currentSetOfInputs, - const bool copyByDefault, bool consume) -> std::vector +void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std::span& messages, std::vector& forwardedParts, + const bool copyByDefault, bool consume) { - // we collect all messages per forward in a map and send them together - std::vector forwardedParts; - forwardedParts.resize(proxy.getNumForwards()); - std::vector forwardingChoices{}; O2_SIGNPOST_ID_GENERATE(sid, forwarding); + std::vector forwardingChoices{}; + size_t pi = 0; + while (pi < messages.size()) { + auto& header = messages[pi]; - for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { - auto& messageSet = currentSetOfInputs[ii]; + // If is now possible that the record is not complete when + // we forward it, because of a custom completion policy. + // this means that we need to skip the empty entries in the + // record for being forwarded. + if (header->GetData() == nullptr) { + pi += 2; + continue; + } + auto dih = o2::header::get(header->GetData()); + if (dih) { + pi += 2; + continue; + } + auto sih = o2::header::get(header->GetData()); + if (sih) { + pi += 2; + continue; + } - for (size_t pi = 0; pi < messageSet.size(); ++pi) { - auto& header = messageSet.header(pi); + auto dph = o2::header::get(header->GetData()); + auto dh = o2::header::get(header->GetData()); - // If is now possible that the record is not complete when - // we forward it, because of a custom completion policy. - // this means that we need to skip the empty entries in the - // record for being forwarded. - if (header->GetData() == nullptr) { - continue; - } - auto dih = o2::header::get(header->GetData()); - if (dih) { - continue; - } - auto sih = o2::header::get(header->GetData()); - if (sih) { - continue; - } + if (dph == nullptr || dh == nullptr) { + // Complain only if this is not an out-of-band message + LOGP(error, "Data is missing {}{}{}", + dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : ""); + pi += 2; + continue; + } - auto dph = o2::header::get(header->GetData()); - auto dh = o2::header::get(header->GetData()); + // At least one payload. + auto& payload = messages[pi + 1]; + // Calculate the number of messages which should be handled together + // all in one go. + size_t numberOfMessages = 0; + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together. + numberOfMessages = dh->splitPayloadParts + 1; // one is for the header + } else { + // Sequence of splitPayloadParts (header, payload) pairs belonging together. + // In case splitPayloadParts = 0, we consider this as a single message pair + numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; + } - if (dph == nullptr || dh == nullptr) { - // Complain only if this is not an out-of-band message - LOGP(error, "Data is missing {}{}{}", - dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : ""); - continue; - } + if (payload.get() == nullptr && consume == true) { + // If the payload is not there, it means we already + // processed it with ConsumeExisiting. Therefore we + // need to do something only if this is the last consume. + header.reset(nullptr); + pi += numberOfMessages; + continue; + } - auto& payload = messageSet.payload(pi); + // We need to find the forward route only for the first + // part of a split payload. All the others will use the same. + // Therefore, we reset and recompute the forwarding choice: + // + // - If this is the first payload of a [header0][payload0][header0][payload1]... sequence, + // which is actually always created and handled together. Notice that in this + // case we have splitPayloadParts == splitPayloadIndex + // - If this is the first payload of a [header0][payload0][header1][payload1]... sequence + // belonging to the same multipart message (and therefore we are guaranteed that they + // need to be routed together). + // - If the message is not a multipart (splitPayloadParts 0) or has only one part + // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore + // we will already use the same choice in the for loop below. + // - if (payload.get() == nullptr && consume == true) { - // If the payload is not there, it means we already - // processed it with ConsumeExisiting. Therefore we - // need to do something only if this is the last consume. - header.reset(nullptr); - continue; - } + forwardingChoices.clear(); + proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime); - // We need to find the forward route only for the first - // part of a split payload. All the others will use the same. - // Therefore, we reset and recompute the forwarding choice: - // - // - If this is the first payload of a [header0][payload0][header0][payload1] sequence, - // which is actually always created and handled together - // - If the message is not a multipart (splitPayloadParts 0) or has only one part - // - If it's a message of the kind [header0][payload1][payload2][payload3]... and therefore - // we will already use the same choice in the for loop below. - if (dh->splitPayloadIndex == 0 || dh->splitPayloadParts <= 1 || messageSet.getNumberOfPayloads(pi) > 0) { - forwardingChoices.clear(); - proxy.getMatchingForwardChannelIndexes(forwardingChoices, *dh, dph->startTime); - } + if (forwardingChoices.empty()) { + // Nothing to forward go to the next messageset + pi += numberOfMessages; + continue; + } - if (forwardingChoices.empty()) { - // Nothing to forward go to the next messageset - continue; - } + // In case of more than one forward route, we need to copy the message. + // This will eventually use the same memory if running with the same backend. + if (copyByDefault || forwardingChoices.size() > 1) { + for (auto& choice : forwardingChoices) { + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.", + fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value); - // In case of more than one forward route, we need to copy the message. - // This will eventually use the same memory if running with the same backend. - if (copyByDefault || forwardingChoices.size() > 1) { - for (auto& choice : forwardingChoices) { - auto&& newHeader = header->GetTransport()->CreateMessage(); - O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding a copy of %{public}s to route %d.", - fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), choice.value); - newHeader->Copy(*header); - forwardedParts[choice.value].AddPart(std::move(newHeader)); - - for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { - auto&& newPayload = header->GetTransport()->CreateMessage(); - newPayload->Copy(*messageSet.payload(pi, payloadIndex)); - forwardedParts[choice.value].AddPart(std::move(newPayload)); - } - } - } else { - O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.", - fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value); - forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.header(pi))); - for (size_t payloadIndex = 0; payloadIndex < messageSet.getNumberOfPayloads(pi); ++payloadIndex) { - forwardedParts[forwardingChoices.back().value].AddPart(std::move(messageSet.payload(pi, payloadIndex))); + for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) { + auto&& newMsg = header->GetTransport()->CreateMessage(); + newMsg->Copy(*messages[ppi]); + forwardedParts[choice.value].AddPart(std::move(newMsg)); } } + } else { + O2_SIGNPOST_EVENT_EMIT(forwarding, sid, "forwardInputs", "Forwarding %{public}s to route %d.", + fmt::format("{}/{}/{}@timeslice:{} tfCounter:{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification, dph->startTime, dh->tfCounter).c_str(), forwardingChoices.back().value); + for (size_t ppi = pi; ppi < pi + numberOfMessages; ++ppi) { + forwardedParts[forwardingChoices.back().value].AddPart(std::move(messages[ppi])); + } } + pi += numberOfMessages; + } +} + +auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy, + std::vector& currentSetOfInputs, + const bool copyByDefault, bool consume) -> std::vector +{ + // we collect all messages per forward in a map and send them together + std::vector forwardedParts; + forwardedParts.resize(proxy.getNumForwards()); + std::vector forwardingChoices{}; + + for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { + auto span = std::span(currentSetOfInputs[ii].messages); + routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume); } return forwardedParts; }; diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index 7ddbc831edad2..fe9f70d1daadb 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -45,7 +45,7 @@ TEST_CASE("ForwardInputsEmpty") std::vector currentSetOfInputs; - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.empty()); } @@ -95,7 +95,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 2); // Two messages for that route } @@ -146,7 +146,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, true); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true); REQUIRE(result.size() == 1); REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed. } @@ -201,8 +201,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2. However it cannot really happen. // Correct behavior below: @@ -260,7 +259,7 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong // FIXME: actually correct behavior below @@ -325,7 +324,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // Two messages per route REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters @@ -388,7 +387,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward REQUIRE(result[1].Size() == 2); // @@ -466,7 +465,7 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes") currentSetOfInputs.emplace_back(std::move(messageSet2)); REQUIRE(currentSetOfInputs.size() == 2); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // REQUIRE(result[1].Size() == 2); // @@ -529,7 +528,7 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 0); // Two messages per route REQUIRE(result[1].Size() == 2); // Two messages per route @@ -541,7 +540,7 @@ TEST_CASE("ForwardInputsSplitPayload") dh.dataOrigin = "TST"; dh.dataDescription = "A"; dh.subSpecification = 0; - dh.splitPayloadIndex = 0; + dh.splitPayloadIndex = 2; dh.splitPayloadParts = 2; o2::header::DataHeader dh2; @@ -611,7 +610,7 @@ TEST_CASE("ForwardInputsSplitPayload") REQUIRE(messageSet.size() == 2); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes CHECK(result[0].Size() == 2); // No messages on this route CHECK(result[1].Size() == 3); @@ -657,7 +656,7 @@ TEST_CASE("ForwardInputEOSSingleRoute") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded } @@ -702,7 +701,7 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute") REQUIRE(messageSet.size() == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, currentSetOfInputs, copyByDefault, consume); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded }