Skip to content

Commit 7da2227

Browse files
committed
Change ownership model. Now a vector of span is in general returned in place of a vector of vectors.
1 parent ee0b597 commit 7da2227

7 files changed

Lines changed: 145 additions & 96 deletions

File tree

Framework/Core/include/Framework/DataProcessingHelpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ struct DataProcessingHelpers {
5454
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
5555
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
5656
/// Helper to route messages for forwarding
57-
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
57+
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
5858
bool copy, bool consume);
5959
/// Helper to route messages for forwarding
6060
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <cstddef>
2626
#include <mutex>
27+
#include <span>
2728
#include <vector>
2829
#include <functional>
2930

@@ -113,7 +114,7 @@ class DataRelayer
113114
ActivityStats processDanglingInputs(std::vector<ExpirationHandler> const&,
114115
ServiceRegistryRef context, bool createNew);
115116

116-
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<std::vector<fair::mq::MessagePtr>>&, TimesliceIndex::OldestOutputInfo info)>;
117+
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<std::span<fair::mq::MessagePtr>>&, TimesliceIndex::OldestOutputInfo info)>;
117118

118119
// Callback for when some messages are about to be owned by the the DataRelayer
119120
using OnInsertionCallback = std::function<void(ServiceRegistryRef&, std::span<fair::mq::MessagePtr>&)>;
@@ -153,11 +154,14 @@ class DataRelayer
153154
/// @returns the actions ready to be performed.
154155
void getReadyToProcess(std::vector<RecordAction>& completed);
155156

156-
/// Returns an input registry associated to the given timeslice and gives
157-
/// ownership to the caller. This is because once the inputs are out of the
158-
/// DataRelayer they need to be deleted once the processing is concluded.
159-
std::vector<std::vector<fair::mq::MessagePtr>> consumeAllInputsForTimeslice(TimesliceSlot id);
157+
/// Returns spans into the relayer's internal storage for the given timeslice.
158+
/// The slot is marked invalid so new data can be relayed to it immediately,
159+
/// but the underlying message vectors are NOT freed until releaseSlot() is
160+
/// called. The caller must call releaseSlot() once it is done with the spans.
161+
std::vector<std::span<fair::mq::MessagePtr>> consumeAllInputsForTimeslice(TimesliceSlot id);
160162
std::vector<std::vector<fair::mq::MessagePtr>> consumeExistingInputsForTimeslice(TimesliceSlot id);
163+
/// Free the storage for a slot previously handed out by consumeAllInputsForTimeslice().
164+
void releaseSlot(TimesliceSlot slot);
161165

162166
/// Returns how many timeslices we can handle in parallel
163167
[[nodiscard]] size_t getParallelTimeslices() const;
@@ -204,6 +208,11 @@ class DataRelayer
204208
/// N is the maximum number of inflight timeslices, while
205209
/// M is the number of inputs which are requested.
206210
std::vector<std::vector<fair::mq::MessagePtr>> mCache;
211+
/// Holding area for message vectors moved out of mCache by
212+
/// consumeAllInputsForTimeslice(). Same NxM layout as mCache.
213+
/// Spans returned to callers point here and remain valid until releaseSlot().
214+
/// relay() never touches mConsumedCache, so no locking is needed in releaseSlot().
215+
std::vector<std::vector<fair::mq::MessagePtr>> mConsumedCache;
207216

208217
/// This is the index which maps a given timestamp to the associated
209218
/// cacheline.

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
587587
// the inputs which are shared between this device and others
588588
// to the next one in the daisy chain.
589589
// FIXME: do it in a smarter way than O(N^2)
590-
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
590+
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
591591
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
592592
auto& proxy = registry.get<FairMQDeviceProxy>();
593593

@@ -619,7 +619,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
619619
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
620620
};
621621

622-
static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
622+
static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
623623
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
624624
auto& proxy = registry.get<FairMQDeviceProxy>();
625625

@@ -629,8 +629,7 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
629629
// Always copy them, because we do not want to actually send them.
630630
// We merely need the side effect of the consume, if applicable.
631631
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
632-
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
633-
DataProcessingHelpers::cleanForwardedMessages(span, consume);
632+
DataProcessingHelpers::cleanForwardedMessages(currentSetOfInputs[ii], consume);
634633
}
635634

636635
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
@@ -1278,7 +1277,7 @@ void DataProcessingDevice::Run()
12781277
// - we can trigger further events from the queue
12791278
// - we can guarantee this is the last thing we do in the loop (
12801279
// assuming no one else is adding to the queue before this point).
1281-
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1280+
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
12821281
O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
12831282
ServiceRegistryRef ref{registry};
12841283
ref.get<AsyncQueue>();
@@ -1985,7 +1984,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
19851984
nPayloadsPerHeader = 1;
19861985
ii += (nMessages / 2) - 1;
19871986
}
1988-
auto onDrop = [ref](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
1987+
auto onDrop = [ref](TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
19891988
O2_SIGNPOST_ID_GENERATE(cid, async_queue);
19901989
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
19911990
slot.index, oldestOutputInfo.timeslice.value);
@@ -2163,15 +2162,20 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
21632162
// want to support multithreaded dispatching of operations, I can simply
21642163
// move these to some thread local store and the rest of the lambdas
21652164
// should work just fine.
2166-
std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
2165+
std::vector<std::span<fair::mq::MessagePtr>> currentSetOfInputs;
2166+
std::vector<std::vector<fair::mq::MessagePtr>> ownedInputs;
21672167

21682168
//
2169-
auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
2169+
auto getInputSpan = [ref, &currentSetOfInputs, &ownedInputs](TimesliceSlot slot, bool consume = true) {
21702170
auto& relayer = ref.get<DataRelayer>();
21712171
if (consume) {
21722172
currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
21732173
} else {
2174-
currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
2174+
ownedInputs = relayer.consumeExistingInputsForTimeslice(slot);
2175+
currentSetOfInputs.resize(ownedInputs.size());
2176+
for (size_t i = 0; i < ownedInputs.size(); ++i) {
2177+
currentSetOfInputs[i] = std::span(ownedInputs[i]);
2178+
}
21752179
}
21762180
// Convert raw message indices directly to a DataRef in O(1).
21772181
// Used both by the sequential PartIterator and as the fallback for positional access.
@@ -2245,7 +2249,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22452249
// to avoid double counting them.
22462250
// This was actually the easiest solution we could find for
22472251
// O2-646.
2248-
auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
2252+
auto cleanTimers = [&currentSetOfInputs, &ownedInputs](TimesliceSlot slot, InputRecord& record) {
22492253
assert(record.size() == currentSetOfInputs.size());
22502254
for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
22512255
// assuming that for timer inputs we do have exactly one PartRef object
@@ -2258,8 +2262,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22582262
if (input.header == nullptr) {
22592263
continue;
22602264
}
2261-
// This will hopefully delete the message.
2262-
currentSetOfInputs[ii].clear();
2265+
// For the consume=false (Process) path, ownedInputs holds the actual
2266+
// message vectors and the span points into them.
2267+
ownedInputs[ii].clear();
2268+
currentSetOfInputs[ii] = {};
22632269
}
22642270
};
22652271

@@ -2412,9 +2418,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
24122418
if (spec.forwards.empty() == false) {
24132419
auto& timesliceIndex = ref.get<TimesliceIndex>();
24142420
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
2421+
ref.get<DataRelayer>().releaseSlot(action.slot);
24152422
O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
24162423
continue;
24172424
}
2425+
ref.get<DataRelayer>().releaseSlot(action.slot);
24182426
}
24192427
// If there is no optional inputs we canForwardEarly
24202428
// the messages to that parallel processing can happen.
@@ -2567,6 +2575,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
25672575
if (action.op == CompletionPolicy::CompletionOp::Process) {
25682576
cleanTimers(action.slot, record);
25692577
}
2578+
if (shouldConsume) {
2579+
ref.get<DataRelayer>().releaseSlot(action.slot);
2580+
}
25702581
O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
25712582
}
25722583
O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,15 +393,14 @@ void DataProcessingHelpers::cleanForwardedMessages(std::span<fair::mq::MessagePt
393393
}
394394

395395
auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
396-
std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
396+
std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
397397
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
398398
{
399399
// we collect all messages per forward in a map and send them together
400400
std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());
401401

402402
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
403-
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
404-
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
403+
routeForwardedMessages(proxy, currentSetOfInputs[ii], forwardedParts, copyByDefault, consume);
405404
}
406405
return forwardedParts;
407406
};

Framework/Core/src/DataRelayer.cxx

Lines changed: 33 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -412,22 +412,20 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop)
412412
ref = mContext](TimesliceSlot slot) {
413413
if (onDrop) {
414414
auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
415-
// State of the computation
416-
std::vector<std::vector<fair::mq::MessagePtr>> dropped(numInputTypes);
415+
// Build spans over the cache entries (no copy or move of message ownership).
416+
// The spans are valid for the duration of the onDrop call; entries are
417+
// cleared below after the callback returns.
418+
std::vector<std::span<fair::mq::MessagePtr>> droppedSpans(numInputTypes);
417419
for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
418420
auto cacheId = slot.index * numInputTypes + ai;
419421
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
420-
// TODO: in the original implementation of the cache, there have been only two messages per entry,
421-
// check if the 2 above corresponds to the number of messages.
422-
if (!cache[cacheId].empty()) {
423-
dropped[ai] = std::move(cache[cacheId]);
424-
}
422+
droppedSpans[ai] = cache[cacheId];
425423
}
426-
bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.empty(); });
424+
bool anyDropped = std::any_of(droppedSpans.begin(), droppedSpans.end(), [](auto& s) { return !s.empty(); });
427425
if (anyDropped) {
428426
O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
429427
O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
430-
onDrop(slot, dropped, oldestPossibleTimeslice);
428+
onDrop(slot, droppedSpans, oldestPossibleTimeslice);
431429
}
432430
}
433431
assert(cache.empty() == false);
@@ -886,58 +884,33 @@ void DataRelayer::updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStat
886884
}
887885
}
888886

889-
std::vector<std::vector<fair::mq::MessagePtr>> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
887+
std::vector<std::span<fair::mq::MessagePtr>> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
890888
{
891889
std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
892-
893890
const auto numInputTypes = mDistinctRoutesIndex.size();
894-
// State of the computation
895-
std::vector<std::vector<fair::mq::MessagePtr>> messages(numInputTypes);
896-
auto& cache = mCache;
897-
auto& index = mTimesliceIndex;
898-
899-
// Nothing to see here, this is just to make the outer loop more understandable.
900-
auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
901-
return;
902-
};
903-
904-
// We move ownership so that the cache can be reused once the computation is
905-
// finished. We mark the given cache slot invalid, so that it can be reused
906-
// This means we can still handle old messages if there is still space in the
907-
// cache where to put them.
908-
auto moveHeaderPayloadToOutput = [&messages,
909-
&cachedStateMetrics = mCachedStateMetrics,
910-
&cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
911-
auto cacheId = s.index * numInputTypes + arg;
912-
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
913-
// TODO: in the original implementation of the cache, there have been only two messages per entry,
914-
// check if the 2 above corresponds to the number of messages.
915-
if (!cache[cacheId].empty()) {
916-
messages[arg] = std::move(cache[cacheId]);
917-
}
918-
index.markAsInvalid(s);
919-
};
920-
921-
// An invalid set of arguments is a set of arguments associated to an invalid
922-
// timeslice, so I can simply do that. I keep the assertion there because in principle
923-
// we should have dispatched the timeslice already!
924-
// FIXME: what happens when we have enough timeslices to hit the invalid one?
925-
auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
926-
for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
927-
assert(std::accumulate(cache[ai].begin(), cache[ai].end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
928-
cache[ai].clear();
929-
}
930-
index.markAsInvalid(s);
931-
};
932-
933-
// Outer loop here.
934-
jumpToCacheEntryAssociatedWith(slot);
935-
for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
936-
moveHeaderPayloadToOutput(slot, ai);
891+
// Move message vectors from mCache into mConsumedCache (same NxM layout) so that:
892+
// - mCache entries are immediately empty and the slot can be reused by relay()
893+
// - the messages remain alive in mConsumedCache until releaseSlot() is called
894+
// relay() never touches mConsumedCache, so the returned spans are safe to use
895+
// concurrently with new relay() calls into this (now-invalid) slot.
896+
std::vector<std::span<fair::mq::MessagePtr>> spans(numInputTypes);
897+
for (size_t ai = 0; ai < numInputTypes; ++ai) {
898+
auto cacheId = slot.index * numInputTypes + ai;
899+
mCachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
900+
mConsumedCache[cacheId] = std::move(mCache[cacheId]);
901+
spans[ai] = mConsumedCache[cacheId];
937902
}
938-
invalidateCacheFor(slot);
903+
mTimesliceIndex.markAsInvalid(slot);
904+
return spans;
905+
}
939906

940-
return messages;
907+
void DataRelayer::releaseSlot(TimesliceSlot slot)
908+
{
909+
// No lock needed: relay() only touches mCache, never mConsumedCache.
910+
const auto numInputTypes = mDistinctRoutesIndex.size();
911+
for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
912+
mConsumedCache[ai].clear();
913+
}
941914
}
942915

943916
std::vector<std::vector<fair::mq::MessagePtr>> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
@@ -991,6 +964,9 @@ void DataRelayer::clear()
991964
for (auto& cache : mCache) {
992965
cache.clear();
993966
}
967+
for (auto& consumed : mConsumedCache) {
968+
consumed.clear();
969+
}
994970
for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
995971
mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
996972
}
@@ -1024,6 +1000,7 @@ void DataRelayer::publishMetrics()
10241000
// maybe misleading to have the allocation in a function primarily for
10251001
// metrics publishing, do better in setPipelineLength?
10261002
mCache.resize(numInputTypes * mTimesliceIndex.size());
1003+
mConsumedCache.resize(mCache.size());
10271004
auto& states = mContext.get<DataProcessingStates>();
10281005

10291006
mCachedStateMetrics.resize(mCache.size());

0 commit comments

Comments
 (0)