-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmpiports.cpp
More file actions
225 lines (192 loc) · 7.69 KB
/
mpiports.cpp
File metadata and controls
225 lines (192 loc) · 7.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#include <mpi.h>
#include <fstream>
#include <numeric>
#include <boost/filesystem.hpp>
#include "common.hpp"
#include "EventUtils.hpp"
#include "programoptions.hpp"
#include "logging.hpp"
#include "prettyprint.hpp"
using namespace EventTimings;
namespace mp {
/// Writes a port name to file or nameserver
void publishPort(Options options, std::string const & portName)
{
using namespace boost::filesystem;
int rank = getCommRank();
if (options.commType == many and options.pubType == file)
writePort(options.publishDirectory / ("A-" + std::to_string(rank) + ".address"), portName);
if (options.commType == single and options.pubType == file)
writePort(options.publishDirectory / "intercomm.address", portName);
if (options.pubType == server)
writePort(std::string("mpiports"), portName);
}
/// Reads a port for connection to `remoteRank` from file or nameserver
std::string lookupPort(Options options, int remoteRank = -1)
{
std::string portName;
if (options.commType == many and options.pubType == file) {
assert(remoteRank >= 0);
portName = readPort(options.publishDirectory / ( "A-" + std::to_string(remoteRank) + ".address"));
}
if (options.commType == single and options.pubType == file)
portName = readPort(options.publishDirectory / "intercomm.address");
if (options.pubType == server)
portName = readPort(std::string("mpiports"));
return portName;
}
}
int main(int argc, char **argv)
{
using namespace mp;
MPI_Init(&argc, &argv);
auto options = getOptions(argc, argv);
logging::init(options.debug);
// MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
int rank = getCommRank(MPI_COMM_WORLD);
int size = getCommSize(MPI_COMM_WORLD);
MPI_Comm commWorld;
if (options.split) {
int color = rank < size/2 ? 0 : 1; // split the group into halves
MPI_Comm_split(MPI_COMM_WORLD, color, rank, &commWorld); // use rank as ordering, so order is preserved
options.participant = color == 0 ? A : B; // overwrite participant
}
else
MPI_Comm_dup(MPI_COMM_WORLD, &commWorld);
rank = getCommRank(commWorld);
size = getCommSize(commWorld);
auto syncComm = createSyncIcomm(options.participant, options.publishDirectory, commWorld); // First barrier
EventRegistry::instance().initialize(options.participant == A ? "A" : "B", options.runName, commWorld);
std::map<int, MPI_Comm> comms;
// ============================== BEGIN COMPUTE CONNECTIONS
Event _determineNumCon("Compute connections", true);
std::vector<int> comRanks;
if (options.participant == A)
comRanks = invertGetRanks(options.peers, size, rank); // who connects to me?
else if (options.participant == B)
comRanks = getRanks(options.peers, size, rank); // to whom I do connect?
_determineNumCon.stop(true);
// ============================== END COMPUTE CONNECTIONS
if (options.participant == A)
removeDir(options.publishDirectory, commWorld); // Remove directory, followed by a barrier
// ============================== BEGIN PUBLISH
Event _publish("Publish", true);
if (options.participant == A) {
if (options.commType == single and rank == 0)
publishPort(options, openPort());
if (options.commType == many)
for (auto r : comRanks)
publishPort(options, openPort());
}
_publish.stop(true);
// ============================== END PUBLISH
DEBUG << "Finished publishing";
if (rank == 0) MPI_Barrier(syncComm);
// ============================== BEGIN LOOKUP
std::list<std::string> portNames;
Event _lookup("Lookup", true);
if (options.commType == single and rank == 0)
portNames.push_back(lookupPort(options)); // Single: There is only one port
if (options.commType == many and options.participant == A)
portNames.push_back(lookupPort(options, rank)); // A received connecct, only my port
if (options.commType == many and options.participant == B) {
for (auto r : comRanks) { // All the ports we conect to
portNames.push_back(lookupPort(options, r));
}
}
_lookup.stop(true);
// ============================== END LOOKUP
DEBUG << "Finished lookup";
if (rank == 0) MPI_Barrier(syncComm);
INFO << "Starting connecting on " << size << " ranks.";
Event _connect("Connect", true);
std::string portName;
if (options.participant == A) { // receives connections
if (options.commType == single) {
if (rank == 0)
portName = portNames.front();
DEBUG << "Accepting connection on " << portName;
MPI_Comm_accept(portName.c_str(), MPI_INFO_NULL, 0, commWorld, &comms[0]);
DEBUG << "Received connection on " << portName;
}
if (options.commType == many) {
portName = portNames.front();
for (auto r : comRanks) {
MPI_Comm icomm;
DEBUG << "Accepting connection on " << portName;
MPI_Comm_accept(portName.c_str(), MPI_INFO_NULL, 0, MPI_COMM_SELF, &icomm);
DEBUG << "Accepted connection on " << portName;
DEBUG << "icomm size = " << getRemoteCommSize(icomm);
int connectedRank = -1;
MPI_Recv(&connectedRank, 1, MPI_INT, 0, MPI_ANY_TAG, icomm, MPI_STATUS_IGNORE);
// MPI_Send(&rank, 1, MPI_INT, 0, 0, icomm);
DEBUG << "Received rank number " << connectedRank;
comms[connectedRank] = icomm;
}
}
}
if (options.participant == B) { // connects to the intercomms
// sleep(1000);
if (options.commType == single) {
if (rank == 0)
portName = portNames.front();
INFO << "Connecting to " << portName;
MPI_Comm_connect(portName.c_str(), MPI_INFO_NULL, 0, commWorld, &comms[0]);
DEBUG << "Connected to " << portName;
}
if (options.commType == many) {
for (auto r : comRanks) {
MPI_Comm icomm;
portName = portNames.front();
INFO << "Connecting to rank " << r << " on " << portName;
MPI_Comm_connect(portName.c_str(), MPI_INFO_NULL, 0, MPI_COMM_SELF, &icomm);
DEBUG << "icomm size = " << getRemoteCommSize(icomm);
DEBUG << "Connected to rank " << r << " on " << portName;
MPI_Send(&rank, 1, MPI_INT, 0, 0, icomm);
// int connectedRank = -1;
// MPI_Recv(&connectedRank, 1, MPI_INT, 0, MPI_ANY_TAG, icomm, MPI_STATUS_IGNORE);
comms[r] = icomm;
portNames.pop_front();
}
}
// for (auto &c : comms)
// DEBUG << "Number of remote ranks = " << getRemoteCommSize(c.second);
}
_connect.stop(true);
// ==============================
INFO << "Starting data exchange";
for (auto vecSize : {500, 2000, 4000}) {
std::vector<double> dataVec(vecSize);
std::iota(dataVec.begin(), dataVec.end(), 0.5);
if (rank == 0)
MPI_Barrier(syncComm);
Event _dataexchange("Data " + std::to_string(vecSize), true);
for (int round = 0; round < options.rounds; ++round) {
for (auto r : comRanks) {
int actualRank;
MPI_Comm actualComm;
if (options.commType == single) {
actualRank = r;
actualComm = comms[0];
}
if (options.commType == many) {
actualRank = 0;
actualComm = comms[r];
}
if (options.participant == A) {
MPI_Recv(dataVec.data(), dataVec.size(), MPI_DOUBLE, actualRank, MPI_ANY_TAG, actualComm, MPI_STATUS_IGNORE);
// MPI_Send(dataVec.data(), dataVec.size(), MPI_DOUBLE, actualRank, 0, actualComm);
}
if (options.participant == B) {
MPI_Send(dataVec.data(), dataVec.size(), MPI_DOUBLE, actualRank, 0, actualComm);
// MPI_Recv(dataVec.data(), dataVec.size(), MPI_DOUBLE, actualRank, MPI_ANY_TAG, actualComm, MPI_STATUS_IGNORE);
}
}
}
_dataexchange.stop(true);
}
EventRegistry::instance().finalize();
EventRegistry::instance().printAll();
MPI_Finalize();
return 0;
}