DNDSR 0.2.1
Distributed Numeric Data Structure for CFV
Loading...
Searching...
No Matches
ArrayRedistributor.hpp
Go to the documentation of this file.
1#pragma once
2/// @file ArrayRedistributor.hpp
3/// @brief Redistributes ArrayPair data across different MPI partitions using ArrayTransformer.
4///
5/// Enables reading HDF5 checkpoint data written with partition A (np=N) into
6/// partition B (np=M) by using an "original index" (e.g., CGNS serial cell index)
7/// as a partition-independent key.
8///
9/// The redistribution uses a two-step rendezvous pattern implemented via
10/// temporary ArrayTransformer pull operations:
11/// Step 1: Each rank reads an even slice of the H5 global data.
12/// Step 2: A temporary ArrayTransformer pulls each rank's needed cells
13/// (identified by origIndex) from whichever rank read them.
14
15#include "ArrayTransformer.hpp"
17#include <unordered_map>
18#include <algorithm>
19#include <numeric>
20
21namespace DNDS
22{
23
24 // EvenSplitRange is defined in Defines.hpp
25
26 /// @brief Redistributes ArrayPair data between different MPI partitions using ArrayTransformer.
27 ///
28 /// The caller provides:
29 /// - readOrigIndex: the original indices for data this rank read from HDF5 (even-split)
30 /// - readArray: the data this rank read (father of a temporary ArrayPair)
31 /// - newOrigIndex: the original indices this rank needs (from current mesh's cell2cellOrig)
32 ///
33 /// The redistributor builds a temporary ArrayTransformer to pull needed data:
34 /// 1. Build a global mapping on the "read" arrays (readArray serves as father).
35 /// 2. For each entry in newOrigIndex, find which rank's read slice contains it, and
36 /// compute the corresponding global index in the read array's global space.
37 /// 3. Use ArrayTransformer::createGhostMapping + pullOnce to fetch the data.
38 /// 4. Copy the pulled data into the output array in newOrigIndex order.
39 ///
40 /// This works for fixed-row-size, dynamic-row-size, and CSR arrays alike, since
41 /// ArrayTransformer handles all these layouts internally.
42
43 /// @brief Builds the pulling index mapping for redistribution.
44 ///
45 /// Given the origIndex arrays from the even-split read (distributed across all ranks),
46 /// and the newOrigIndex for this rank, determines which global indices in the
47 /// read array space need to be pulled.
48 ///
49 /// Uses a 3-round MPI_Alltoallv rendezvous:
50 /// Round 1: Send (origIdx, globalReadIdx) pairs to directory ranks.
51 /// Round 2: Send query origIndex entries to directory ranks.
52 /// Round 3: Directory ranks reply with resolved globalReadIdx.
53 ///
54 /// @par Memory scaling
55 /// Temporary memory per rank is O(nLocal) where nLocal is the larger of
56 /// readOrigIndex.size() and newOrigIndex.size(). Specifically:
57 /// - 7 index vectors of size O(nLocal): sendBuf, recvBuf, querySendBuf,
58 /// queryRecvBuf, queryReplyBuf, replyRecvBuf, pullingIndexGlobal
59 /// - 1 unordered_map<index,index> of size O(nGlobal/nRanks) for the directory
60 /// - Total: ~7 × nLocal × sizeof(index) + O(nGlobal/nRanks) hash table overhead
61 /// For a 100M-cell mesh with 8 ranks: ~12.5M entries × 8 B × 7 ≈ 700 MB per rank,
62 /// plus ~200 MB for the directory hash table.
63 ///
64 /// @param mpi MPI communicator info
65 /// @param readOrigIndex Original indices from the even-split read (this rank's slice)
66 /// @param newOrigIndex Original indices this rank needs (from current partition)
67 /// @param readGlobalMapping Global mapping of the read array (prefix-sum offsets)
68 /// @return pullingIndexGlobal: vector of global read-array indices to pull
69 /// (ordered to match newOrigIndex)
70 inline std::vector<index> BuildRedistributionPullingIndex(
71 const MPIInfo &mpi,
72 const std::vector<index> &readOrigIndex,
73 const std::vector<index> &newOrigIndex,
74 const ssp<GlobalOffsetsMapping> &readGlobalMapping)
75 {
76 // Build a distributed directory mapping origIdx -> global read index via
77 // a rendezvous pattern using MPI_Alltoallv.
78 //
79 // Each rank holds readOrigIndex for its read slice. The global read index
80 // for readOrigIndex[i] on rank r is: readGlobalMapping(r, i).
81 //
82 // Directory rank assignment: directoryRank(origIdx) = origIdx * nRanks / nGlobal
83 // (block partitioning for good locality).
84 //
85 // Steps:
86 // 1. Send (origIdx, globalReadIdx) pairs to directory ranks via Alltoallv.
87 // 2. Send newOrigIndex queries to directory ranks via Alltoallv.
88 // 3. Directory ranks look up and reply with globalReadIdx.
89
90 const int nRanks = mpi.size;
91 index nGlobal = readGlobalMapping->globalSize();
92 DNDS_assert_info(nGlobal > 0, "Redistribution requires nGlobal > 0");
93 auto directoryRank = [&](index origIdx) -> int
94 {
95 if (nGlobal == 0)
96 return 0;
97 return static_cast<int>(std::min(index(nRanks - 1), origIdx * index(nRanks) / nGlobal));
98 };
99
100 // Step 3: Send read entries to directory via Alltoallv.
101 // Each entry is (origIdx, globalReadIdx).
102
103 // Count entries per directory rank
104 std::vector<int> sendCounts(nRanks, 0);
105 for (long i : readOrigIndex)
106 {
107 int dr = directoryRank(i);
108 sendCounts[dr]++;
109 }
110
111 std::vector<int> sendDisps(nRanks + 1, 0);
112 std::partial_sum(sendCounts.begin(), sendCounts.end(), sendDisps.begin() + 1);
113
114 // Pack send buffers (origIdx, globalReadIdx) interleaved
115 std::vector<index> sendBuf(index(sendDisps[nRanks]) * 2);
116 std::vector<int> sendPos(nRanks, 0);
117 for (index i = 0; i < index(readOrigIndex.size()); i++)
118 {
119 int dr = directoryRank(readOrigIndex[i]);
120 index pos = sendDisps[dr] + sendPos[dr];
121 sendBuf[pos * 2] = readOrigIndex[i];
122 sendBuf[pos * 2 + 1] = readGlobalMapping->operator()(mpi.rank, i);
123 sendPos[dr]++;
124 }
125
126 // Alltoall sizes
127 std::vector<int> recvCounts(nRanks, 0);
128 MPI_Alltoall(sendCounts.data(), 1, MPI_INT, recvCounts.data(), 1, MPI_INT, mpi.comm);
129
130 std::vector<int> recvDisps(nRanks + 1, 0);
131 std::partial_sum(recvCounts.begin(), recvCounts.end(), recvDisps.begin() + 1);
132
133 // Alltoallv to send pairs
134 // Multiply counts/disps by 2 for the interleaved pairs
135 std::vector<int> sendCounts2(nRanks), sendDisps2(nRanks);
136 std::vector<int> recvCounts2(nRanks), recvDisps2(nRanks);
137 for (int r = 0; r < nRanks; r++)
138 {
139 sendCounts2[r] = sendCounts[r] * 2;
140 sendDisps2[r] = sendDisps[r] * 2;
141 recvCounts2[r] = recvCounts[r] * 2;
142 recvDisps2[r] = recvDisps[r] * 2;
143 }
144
145 std::vector<index> recvBuf(index(recvDisps[nRanks]) * 2);
146 MPI_Alltoallv(sendBuf.data(), sendCounts2.data(), sendDisps2.data(), DNDS_MPI_INDEX,
147 recvBuf.data(), recvCounts2.data(), recvDisps2.data(), DNDS_MPI_INDEX,
148 mpi.comm);
149
150 // Step 4: Build directory lookup: origIdx -> globalReadIdx
151 std::unordered_map<index, index> directoryMap;
152 directoryMap.reserve(recvDisps[nRanks]);
153 for (index i = 0; i < recvDisps[nRanks]; i++)
154 {
155 directoryMap[recvBuf[i * 2]] = recvBuf[i * 2 + 1];
156 }
157
158 // Step 5: Send queries from newOrigIndex to directory, get back globalReadIdx.
159 // Count queries per directory rank
160 std::vector<int> querySendCounts(nRanks, 0);
161 for (long i : newOrigIndex)
162 {
163 int dr = directoryRank(i);
164 querySendCounts[dr]++;
165 }
166
167 std::vector<int> querySendDisps(nRanks + 1, 0);
168 std::partial_sum(querySendCounts.begin(), querySendCounts.end(), querySendDisps.begin() + 1);
169
170 // Pack query send buffer and record the order mapping
171 std::vector<index> querySendBuf(querySendDisps[nRanks]);
172 // orderMap[pos_in_send] = index in newOrigIndex
173 std::vector<index> queryOrderMap(querySendDisps[nRanks]);
174 std::vector<int> queryPos(nRanks, 0);
175 for (index i = 0; i < index(newOrigIndex.size()); i++)
176 {
177 int dr = directoryRank(newOrigIndex[i]);
178 index pos = querySendDisps[dr] + queryPos[dr];
179 querySendBuf[pos] = newOrigIndex[i];
180 queryOrderMap[pos] = i;
181 queryPos[dr]++;
182 }
183
184 // Alltoall query sizes
185 std::vector<int> queryRecvCounts(nRanks, 0);
186 MPI_Alltoall(querySendCounts.data(), 1, MPI_INT, queryRecvCounts.data(), 1, MPI_INT, mpi.comm);
187
188 std::vector<int> queryRecvDisps(nRanks + 1, 0);
189 std::partial_sum(queryRecvCounts.begin(), queryRecvCounts.end(), queryRecvDisps.begin() + 1);
190
191 // Alltoallv queries
192 std::vector<index> queryRecvBuf(queryRecvDisps[nRanks]);
193 MPI_Alltoallv(querySendBuf.data(), querySendCounts.data(), querySendDisps.data(), DNDS_MPI_INDEX,
194 queryRecvBuf.data(), queryRecvCounts.data(), queryRecvDisps.data(), DNDS_MPI_INDEX,
195 mpi.comm);
196
197 // Step 6: Directory ranks look up and reply with globalReadIdx.
198 std::vector<index> queryReplyBuf(queryRecvDisps[nRanks]);
199 for (index i = 0; i < queryRecvDisps[nRanks]; i++)
200 {
201 auto it = directoryMap.find(queryRecvBuf[i]);
202 DNDS_assert_info(it != directoryMap.end(),
203 fmt::format("origIdx {} not found in directory on rank {}", queryRecvBuf[i], mpi.rank));
204 queryReplyBuf[i] = it->second;
205 }
206
207 // Alltoallv replies back (reverse direction)
208 std::vector<index> replyRecvBuf(querySendDisps[nRanks]);
209 MPI_Alltoallv(queryReplyBuf.data(), queryRecvCounts.data(), queryRecvDisps.data(), DNDS_MPI_INDEX,
210 replyRecvBuf.data(), querySendCounts.data(), querySendDisps.data(), DNDS_MPI_INDEX,
211 mpi.comm);
212
213 // Step 7: Build pullingIndexGlobal in newOrigIndex order.
214 std::vector<index> pullingIndexGlobal(newOrigIndex.size());
215 for (index i = 0; i < querySendDisps[nRanks]; i++)
216 {
217 pullingIndexGlobal[queryOrderMap[i]] = replyRecvBuf[i];
218 }
219
220 return pullingIndexGlobal;
221 }
222
223 /// @brief Redistributes an ArrayPair from a source partition to a target partition.
224 ///
225 /// Template-free core logic: given a "read" father array (from even-split H5 read)
226 /// with known origIndex, and a target partition's newOrigIndex, uses
227 /// ArrayTransformer to pull data from wherever it was read to where it's needed.
228 ///
229 /// @tparam TArray The ParArray type (e.g., ParArray<real, DynamicSize>)
230 /// @param mpi MPI communicator info
231 /// @param readFather Father array populated from even-split H5 read
232 /// @param readOrigIndex Original indices for each row of readFather
233 /// @param newOrigIndex Original indices this rank needs (from current partition)
234 /// @param outFather Output father array, pre-allocated with correct size and row structure
235 template <class TArray>
237 const MPIInfo &mpi,
238 ssp<TArray> readFather,
239 const std::vector<index> &readOrigIndex,
240 const std::vector<index> &newOrigIndex,
241 ssp<TArray> outFather)
242 {
243 DNDS_assert(readFather);
244 DNDS_assert(outFather);
245 DNDS_assert(index(readOrigIndex.size()) == readFather->Size());
246
247 // 1. Create global mapping for the read father array
248 readFather->createGlobalMapping();
249 auto readGlobalMapping = readFather->pLGlobalMapping;
250
251 // 2. Build the pulling index: for each newOrigIndex[i], find the global read index
252 std::vector<index> pullingIndexGlobal = BuildRedistributionPullingIndex(
253 mpi, readOrigIndex, newOrigIndex, readGlobalMapping);
254
255 // Save the original order before createGhostMapping sorts it
256 std::vector<index> pullingIndexOrig(pullingIndexGlobal);
257
258 // 3. Create a temporary son array and transformer to do the pull
259 auto readSon = std::make_shared<TArray>(mpi);
260
261 using TArrayTransformer = typename ArrayTransformerType<TArray>::Type;
262 TArrayTransformer trans;
263 trans.setFatherSon(readFather, readSon);
264 trans.createFatherGlobalMapping(); // reuses the mapping we created above
265 trans.createGhostMapping(pullingIndexGlobal);
266 // NOTE: createGhostMapping sorts pullingIndexGlobal in-place!
267 // After pull, readSon data is in sorted-global-index order, NOT newOrigIndex order.
268 trans.createMPITypes();
269 trans.pullOnce();
270
271 // 4. Copy pulled data (in son) to outFather in the correct order.
272 // readSon is in sorted ghostIndex order. For each newOrigIndex[i],
273 // find pullingIndexOrig[i] in the sorted ghostIndex to get the son position.
274 DNDS_assert(outFather->Size() == index(newOrigIndex.size()));
275
276 // Build a lookup from sorted ghostIndex to son position
277 const auto &ghostIndex = trans.pLGhostMapping->ghostIndex;
278 std::unordered_map<index, index> globalIdx2SonPos;
279 globalIdx2SonPos.reserve(ghostIndex.size());
280 for (index j = 0; j < index(ghostIndex.size()); j++)
281 globalIdx2SonPos[ghostIndex[j]] = j;
282
283 // For CSR arrays, set the output row sizes from the pulled son data,
284 // then compress, before copying element values.
285 if constexpr (TArray::_dataLayout == CSR)
286 {
287 outFather->ResizeRowsAndCompress(
288 [&](index i) -> rowsize
289 {
290 auto it = globalIdx2SonPos.find(pullingIndexOrig[i]);
291 DNDS_assert(it != globalIdx2SonPos.end());
292 return readSon->RowSize(it->second);
293 });
294 }
295
296 for (index i = 0; i < index(newOrigIndex.size()); i++)
297 {
298 index globalReadIdx = pullingIndexOrig[i];
299 auto it = globalIdx2SonPos.find(globalReadIdx);
300 DNDS_assert_info(it != globalIdx2SonPos.end(),
301 fmt::format("globalReadIdx {} not found in ghostIndex on rank {}", globalReadIdx, mpi.rank));
302 index sonPos = it->second;
303 outFather->CopyRowFrom(i, *readSon, sonPos);
304 }
305 }
306
307}
ParArray (MPI-aware array) and ArrayTransformer (ghost/halo communication).
#define DNDS_assert_info(expr, info)
Debug-only assertion with an extra std::string info message.
Definition Errors.hpp:117
#define DNDS_assert(expr)
Debug-only assertion (compiled out when DNDS_NDEBUG is defined). Prints the expression + file/line + ...
Definition Errors.hpp:112
Base types and abstract interface for array serialization.
Ghost-communication engine for a father / son ParArray pair.
void setFatherSon(const t_pArray &n_father, const t_pArray &n_son)
Attach father and son arrays. First setup step.
the host side operators are provided as implemented
const MPI_Datatype DNDS_MPI_INDEX
MPI datatype matching index (= MPI_INT64_T).
Definition MPI.hpp:106
int32_t rowsize
Row-width / per-row element-count type (signed 32-bit).
Definition Defines.hpp:114
@ CSR
Compressed Sparse Row (flat buffer + row-start index).
int64_t index
Global row / DOF index type (signed 64-bit; handles multi-billion-cell meshes).
Definition Defines.hpp:112
std::vector< index > BuildRedistributionPullingIndex(const MPIInfo &mpi, const std::vector< index > &readOrigIndex, const std::vector< index > &newOrigIndex, const ssp< GlobalOffsetsMapping > &readGlobalMapping)
Redistributes ArrayPair data between different MPI partitions using ArrayTransformer.
std::shared_ptr< T > ssp
Shortened alias for std::shared_ptr used pervasively in DNDSR.
Definition Defines.hpp:143
void RedistributeArrayWithTransformer(const MPIInfo &mpi, ssp< TArray > readFather, const std::vector< index > &readOrigIndex, const std::vector< index > &newOrigIndex, ssp< TArray > outFather)
Redistributes an ArrayPair from a source partition to a target partition.
Lightweight bundle of an MPI communicator and the calling rank's coordinates.
Definition MPI.hpp:231
ArrayTransformer< DNDS::real, DynamicSize > trans
for(DNDS::index g=0;g< son->Size();g++) for(DNDS j< dynCols;j++) { DNDS::index ghostGlobal=trans.pLGhostMapping-> ghostIndex[g]
tVec r(NCells)