DNDSR 0.1.0.dev1+gcd065ad
Distributed Numeric Data Structure for CFV
Loading...
Searching...
No Matches
ArrayRedistributor.hpp
Go to the documentation of this file.
1#pragma once
2/// @file ArrayRedistributor.hpp
3/// @brief Redistributes ArrayPair data across different MPI partitions using ArrayTransformer.
4///
5/// Enables reading HDF5 checkpoint data written with partition A (np=N) into
6/// partition B (np=M) by using an "original index" (e.g., CGNS serial cell index)
7/// as a partition-independent key.
8///
9/// The redistribution uses a two-step rendezvous pattern implemented via
10/// temporary ArrayTransformer pull operations:
11/// Step 1: Each rank reads an even slice of the H5 global data.
12/// Step 2: A temporary ArrayTransformer pulls each rank's needed cells
13/// (identified by origIndex) from whichever rank read them.
14
15#include "ArrayTransformer.hpp"
16#include "SerializerBase.hpp"
17#include <unordered_map>
18#include <algorithm>
19#include <numeric>
20
21namespace DNDS
22{
23
24 // EvenSplitRange is defined in Defines.hpp
25
26 /// @brief Redistributes ArrayPair data between different MPI partitions using ArrayTransformer.
27 ///
28 /// The caller provides:
29 /// - readOrigIndex: the original indices for data this rank read from HDF5 (even-split)
30 /// - readArray: the data this rank read (father of a temporary ArrayPair)
31 /// - newOrigIndex: the original indices this rank needs (from current mesh's cell2cellOrig)
32 ///
33 /// The redistributor builds a temporary ArrayTransformer to pull needed data:
34 /// 1. Build a global mapping on the "read" arrays (readArray serves as father).
35 /// 2. For each entry in newOrigIndex, find which rank's read slice contains it, and
36 /// compute the corresponding global index in the read array's global space.
37 /// 3. Use ArrayTransformer::createGhostMapping + pullOnce to fetch the data.
38 /// 4. Copy the pulled data into the output array in newOrigIndex order.
39 ///
40 /// This works for fixed-row-size, dynamic-row-size, and CSR arrays alike, since
41 /// ArrayTransformer handles all these layouts internally.
42
43 /// @brief Builds the pulling index mapping for redistribution.
44 ///
45 /// Given the origIndex arrays from the even-split read (distributed across all ranks),
46 /// and the newOrigIndex for this rank, determines which global indices in the
47 /// read array space need to be pulled.
48 ///
49 /// Uses a 3-round MPI_Alltoallv rendezvous:
50 /// Round 1: Send (origIdx, globalReadIdx) pairs to directory ranks.
51 /// Round 2: Send query origIndex entries to directory ranks.
52 /// Round 3: Directory ranks reply with resolved globalReadIdx.
53 ///
54 /// @par Memory scaling
55 /// Temporary memory per rank is O(nLocal) where nLocal is the larger of
56 /// readOrigIndex.size() and newOrigIndex.size(). Specifically:
57 /// - 7 index vectors of size O(nLocal): sendBuf, recvBuf, querySendBuf,
58 /// queryRecvBuf, queryReplyBuf, replyRecvBuf, pullingIndexGlobal
59 /// - 1 unordered_map<index,index> of size O(nGlobal/nRanks) for the directory
60 /// - Total: ~7 × nLocal × sizeof(index) + O(nGlobal/nRanks) hash table overhead
61 /// For a 100M-cell mesh with 8 ranks: ~12.5M entries × 8 B × 7 ≈ 700 MB per rank,
62 /// plus ~200 MB for the directory hash table.
63 ///
64 /// @param mpi MPI communicator info
65 /// @param readOrigIndex Original indices from the even-split read (this rank's slice)
66 /// @param newOrigIndex Original indices this rank needs (from current partition)
67 /// @param readGlobalMapping Global mapping of the read array (prefix-sum offsets)
68 /// @return pullingIndexGlobal: vector of global read-array indices to pull
69 /// (ordered to match newOrigIndex)
70 inline std::vector<index> BuildRedistributionPullingIndex(
71 const MPIInfo &mpi,
72 const std::vector<index> &readOrigIndex,
73 const std::vector<index> &newOrigIndex,
74 const ssp<GlobalOffsetsMapping> &readGlobalMapping)
75 {
76 // Build a distributed directory mapping origIdx -> global read index via
77 // a rendezvous pattern using MPI_Alltoallv.
78 //
79 // Each rank holds readOrigIndex for its read slice. The global read index
80 // for readOrigIndex[i] on rank r is: readGlobalMapping(r, i).
81 //
82 // Directory rank assignment: directoryRank(origIdx) = origIdx * nRanks / nGlobal
83 // (block partitioning for good locality).
84 //
85 // Steps:
86 // 1. Send (origIdx, globalReadIdx) pairs to directory ranks via Alltoallv.
87 // 2. Send newOrigIndex queries to directory ranks via Alltoallv.
88 // 3. Directory ranks look up and reply with globalReadIdx.
89
90 const int nRanks = mpi.size;
91 index nGlobal = readGlobalMapping->globalSize();
92 DNDS_assert_info(nGlobal > 0, "Redistribution requires nGlobal > 0");
93 auto directoryRank = [&](index origIdx) -> int
94 {
95 if (nGlobal == 0) return 0;
96 return static_cast<int>(std::min(index(nRanks - 1), origIdx * index(nRanks) / nGlobal));
97 };
98
99 // Step 3: Send read entries to directory via Alltoallv.
100 // Each entry is (origIdx, globalReadIdx).
101
102 // Count entries per directory rank
103 std::vector<int> sendCounts(nRanks, 0);
104 for (index i = 0; i < index(readOrigIndex.size()); i++)
105 {
106 int dr = directoryRank(readOrigIndex[i]);
107 sendCounts[dr]++;
108 }
109
110 std::vector<int> sendDisps(nRanks + 1, 0);
111 std::partial_sum(sendCounts.begin(), sendCounts.end(), sendDisps.begin() + 1);
112
113 // Pack send buffers (origIdx, globalReadIdx) interleaved
114 std::vector<index> sendBuf(index(sendDisps[nRanks]) * 2);
115 std::vector<int> sendPos(nRanks, 0);
116 for (index i = 0; i < index(readOrigIndex.size()); i++)
117 {
118 int dr = directoryRank(readOrigIndex[i]);
119 index pos = sendDisps[dr] + sendPos[dr];
120 sendBuf[pos * 2] = readOrigIndex[i];
121 sendBuf[pos * 2 + 1] = readGlobalMapping->operator()(mpi.rank, i);
122 sendPos[dr]++;
123 }
124
125 // Alltoall sizes
126 std::vector<int> recvCounts(nRanks, 0);
127 MPI_Alltoall(sendCounts.data(), 1, MPI_INT, recvCounts.data(), 1, MPI_INT, mpi.comm);
128
129 std::vector<int> recvDisps(nRanks + 1, 0);
130 std::partial_sum(recvCounts.begin(), recvCounts.end(), recvDisps.begin() + 1);
131
132 // Alltoallv to send pairs
133 // Multiply counts/disps by 2 for the interleaved pairs
134 std::vector<int> sendCounts2(nRanks), sendDisps2(nRanks);
135 std::vector<int> recvCounts2(nRanks), recvDisps2(nRanks);
136 for (int r = 0; r < nRanks; r++)
137 {
138 sendCounts2[r] = sendCounts[r] * 2;
139 sendDisps2[r] = sendDisps[r] * 2;
140 recvCounts2[r] = recvCounts[r] * 2;
141 recvDisps2[r] = recvDisps[r] * 2;
142 }
143
144 std::vector<index> recvBuf(index(recvDisps[nRanks]) * 2);
145 MPI_Alltoallv(sendBuf.data(), sendCounts2.data(), sendDisps2.data(), DNDS_MPI_INDEX,
146 recvBuf.data(), recvCounts2.data(), recvDisps2.data(), DNDS_MPI_INDEX,
147 mpi.comm);
148
149 // Step 4: Build directory lookup: origIdx -> globalReadIdx
150 std::unordered_map<index, index> directoryMap;
151 directoryMap.reserve(recvDisps[nRanks]);
152 for (index i = 0; i < recvDisps[nRanks]; i++)
153 {
154 directoryMap[recvBuf[i * 2]] = recvBuf[i * 2 + 1];
155 }
156
157 // Step 5: Send queries from newOrigIndex to directory, get back globalReadIdx.
158 // Count queries per directory rank
159 std::vector<int> querySendCounts(nRanks, 0);
160 for (index i = 0; i < index(newOrigIndex.size()); i++)
161 {
162 int dr = directoryRank(newOrigIndex[i]);
163 querySendCounts[dr]++;
164 }
165
166 std::vector<int> querySendDisps(nRanks + 1, 0);
167 std::partial_sum(querySendCounts.begin(), querySendCounts.end(), querySendDisps.begin() + 1);
168
169 // Pack query send buffer and record the order mapping
170 std::vector<index> querySendBuf(querySendDisps[nRanks]);
171 // orderMap[pos_in_send] = index in newOrigIndex
172 std::vector<index> queryOrderMap(querySendDisps[nRanks]);
173 std::vector<int> queryPos(nRanks, 0);
174 for (index i = 0; i < index(newOrigIndex.size()); i++)
175 {
176 int dr = directoryRank(newOrigIndex[i]);
177 index pos = querySendDisps[dr] + queryPos[dr];
178 querySendBuf[pos] = newOrigIndex[i];
179 queryOrderMap[pos] = i;
180 queryPos[dr]++;
181 }
182
183 // Alltoall query sizes
184 std::vector<int> queryRecvCounts(nRanks, 0);
185 MPI_Alltoall(querySendCounts.data(), 1, MPI_INT, queryRecvCounts.data(), 1, MPI_INT, mpi.comm);
186
187 std::vector<int> queryRecvDisps(nRanks + 1, 0);
188 std::partial_sum(queryRecvCounts.begin(), queryRecvCounts.end(), queryRecvDisps.begin() + 1);
189
190 // Alltoallv queries
191 std::vector<index> queryRecvBuf(queryRecvDisps[nRanks]);
192 MPI_Alltoallv(querySendBuf.data(), querySendCounts.data(), querySendDisps.data(), DNDS_MPI_INDEX,
193 queryRecvBuf.data(), queryRecvCounts.data(), queryRecvDisps.data(), DNDS_MPI_INDEX,
194 mpi.comm);
195
196 // Step 6: Directory ranks look up and reply with globalReadIdx.
197 std::vector<index> queryReplyBuf(queryRecvDisps[nRanks]);
198 for (index i = 0; i < queryRecvDisps[nRanks]; i++)
199 {
200 auto it = directoryMap.find(queryRecvBuf[i]);
201 DNDS_assert_info(it != directoryMap.end(),
202 fmt::format("origIdx {} not found in directory on rank {}", queryRecvBuf[i], mpi.rank));
203 queryReplyBuf[i] = it->second;
204 }
205
206 // Alltoallv replies back (reverse direction)
207 std::vector<index> replyRecvBuf(querySendDisps[nRanks]);
208 MPI_Alltoallv(queryReplyBuf.data(), queryRecvCounts.data(), queryRecvDisps.data(), DNDS_MPI_INDEX,
209 replyRecvBuf.data(), querySendCounts.data(), querySendDisps.data(), DNDS_MPI_INDEX,
210 mpi.comm);
211
212 // Step 7: Build pullingIndexGlobal in newOrigIndex order.
213 std::vector<index> pullingIndexGlobal(newOrigIndex.size());
214 for (index i = 0; i < querySendDisps[nRanks]; i++)
215 {
216 pullingIndexGlobal[queryOrderMap[i]] = replyRecvBuf[i];
217 }
218
219 return pullingIndexGlobal;
220 }
221
222 /// @brief Redistributes an ArrayPair from a source partition to a target partition.
223 ///
224 /// Template-free core logic: given a "read" father array (from even-split H5 read)
225 /// with known origIndex, and a target partition's newOrigIndex, uses
226 /// ArrayTransformer to pull data from wherever it was read to where it's needed.
227 ///
228 /// @tparam TArray The ParArray type (e.g., ParArray<real, DynamicSize>)
229 /// @param mpi MPI communicator info
230 /// @param readFather Father array populated from even-split H5 read
231 /// @param readOrigIndex Original indices for each row of readFather
232 /// @param newOrigIndex Original indices this rank needs (from current partition)
233 /// @param outFather Output father array, pre-allocated with correct size and row structure
234 template <class TArray>
236 const MPIInfo &mpi,
237 ssp<TArray> readFather,
238 const std::vector<index> &readOrigIndex,
239 const std::vector<index> &newOrigIndex,
240 ssp<TArray> outFather)
241 {
242 DNDS_assert(readFather);
243 DNDS_assert(outFather);
244 DNDS_assert(index(readOrigIndex.size()) == readFather->Size());
245
246 // 1. Create global mapping for the read father array
247 readFather->createGlobalMapping();
248 auto readGlobalMapping = readFather->pLGlobalMapping;
249
250 // 2. Build the pulling index: for each newOrigIndex[i], find the global read index
251 std::vector<index> pullingIndexGlobal = BuildRedistributionPullingIndex(
252 mpi, readOrigIndex, newOrigIndex, readGlobalMapping);
253
254 // Save the original order before createGhostMapping sorts it
255 std::vector<index> pullingIndexOrig(pullingIndexGlobal);
256
257 // 3. Create a temporary son array and transformer to do the pull
258 auto readSon = std::make_shared<TArray>(mpi);
259
260 using TArrayTransformer = typename ArrayTransformerType<TArray>::Type;
261 TArrayTransformer trans;
262 trans.setFatherSon(readFather, readSon);
263 trans.createFatherGlobalMapping(); // reuses the mapping we created above
264 trans.createGhostMapping(pullingIndexGlobal);
265 // NOTE: createGhostMapping sorts pullingIndexGlobal in-place!
266 // After pull, readSon data is in sorted-global-index order, NOT newOrigIndex order.
267 trans.createMPITypes();
268 trans.pullOnce();
269
270 // 4. Copy pulled data (in son) to outFather in the correct order.
271 // readSon is in sorted ghostIndex order. For each newOrigIndex[i],
272 // find pullingIndexOrig[i] in the sorted ghostIndex to get the son position.
273 DNDS_assert(outFather->Size() == index(newOrigIndex.size()));
274
275 // Build a lookup from sorted ghostIndex to son position
276 const auto &ghostIndex = trans.pLGhostMapping->ghostIndex;
277 std::unordered_map<index, index> globalIdx2SonPos;
278 globalIdx2SonPos.reserve(ghostIndex.size());
279 for (index j = 0; j < index(ghostIndex.size()); j++)
280 globalIdx2SonPos[ghostIndex[j]] = j;
281
282 // For CSR arrays, set the output row sizes from the pulled son data,
283 // then compress, before copying element values.
284 if constexpr (TArray::_dataLayout == CSR)
285 {
286 outFather->ResizeRowsAndCompress(
287 [&](index i) -> rowsize
288 {
289 auto it = globalIdx2SonPos.find(pullingIndexOrig[i]);
290 DNDS_assert(it != globalIdx2SonPos.end());
291 return readSon->RowSize(it->second);
292 });
293 }
294
295 for (index i = 0; i < index(newOrigIndex.size()); i++)
296 {
297 index globalReadIdx = pullingIndexOrig[i];
298 auto it = globalIdx2SonPos.find(globalReadIdx);
299 DNDS_assert_info(it != globalIdx2SonPos.end(),
300 fmt::format("globalReadIdx {} not found in ghostIndex on rank {}", globalReadIdx, mpi.rank));
301 index sonPos = it->second;
302 outFather->CopyRowFrom(i, *readSon, sonPos);
303 }
304 }
305
306}
ParArray (MPI-aware array) and ArrayTransformer (ghost/halo communication).
#define DNDS_assert_info(expr, info)
Debug-only assertion with an extra std::string info message.
Definition Errors.hpp:113
#define DNDS_assert(expr)
Debug-only assertion (compiled out when DNDS_NDEBUG is defined). Prints the expression + file/line + ...
Definition Errors.hpp:108
Base types and abstract interface for array serialization.
Ghost-communication engine for a father / son ParArray pair.
void setFatherSon(const t_pArray &n_father, const t_pArray &n_son)
Attach father and son arrays. First setup step.
the host side operators are provided as implemented
const MPI_Datatype DNDS_MPI_INDEX
MPI datatype matching index (= MPI_INT64_T).
Definition MPI.hpp:90
int32_t rowsize
Row-width / per-row element-count type (signed 32-bit).
Definition Defines.hpp:109
@ CSR
Compressed Sparse Row (flat buffer + row-start index).
int64_t index
Global row / DOF index type (signed 64-bit; handles multi-billion-cell meshes).
Definition Defines.hpp:107
std::vector< index > BuildRedistributionPullingIndex(const MPIInfo &mpi, const std::vector< index > &readOrigIndex, const std::vector< index > &newOrigIndex, const ssp< GlobalOffsetsMapping > &readGlobalMapping)
Redistributes ArrayPair data between different MPI partitions using ArrayTransformer.
std::shared_ptr< T > ssp
Shortened alias for std::shared_ptr used pervasively in DNDSR.
Definition Defines.hpp:138
void RedistributeArrayWithTransformer(const MPIInfo &mpi, ssp< TArray > readFather, const std::vector< index > &readOrigIndex, const std::vector< index > &newOrigIndex, ssp< TArray > outFather)
Redistributes an ArrayPair from a source partition to a target partition.
Lightweight bundle of an MPI communicator and the calling rank's coordinates.
Definition MPI.hpp:215
ArrayTransformer< DNDS::real, DynamicSize > trans
for(DNDS::index g=0;g< son->Size();g++) for(DNDS j< dynCols;j++) { DNDS::index ghostGlobal=trans.pLGhostMapping-> ghostIndex[g]
tVec r(NCells)