00001 #include "ECF_base.h"
00002 #include <string.h>
00003
00004 namespace Comm
00005 {
00006
00007 Communicator::Communicator()
00008 {
00009 bInitialized_ = false;
00010 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00011 logLevel_ = 4;
00012 sendCnt_ = recvCnt_ = 0;
00013 }
00014
00015
00016 bool Communicator::initialize(StateP state, int argc, char** argv)
00017 {
00018 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00019
00020
00021 if(bInitialized_) {
00022 beginTime_ = lastTime_ = MPI::Wtime();
00023 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00024 + uint2str(mpiGlobalSize_) + " on " + processorName_);
00025 return true;
00026 }
00027
00028 state_ = state;
00029
00030 MPI::Init(argc, argv);
00031
00032 mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
00033 mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
00034
00035 demeComm_ = MPI::COMM_WORLD.Dup();
00036 mpiSize_ = demeComm_.Get_size();
00037 mpiRank_ = demeComm_.Get_rank();
00038 frameworkComm_ = MPI::COMM_WORLD.Dup();
00039
00040 char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
00041 pp = &processor_name[0];
00042 int namelen;
00043 MPI_Get_processor_name(processor_name, &namelen);
00044 processorName_ = processor_name;
00045
00046 beginTime_ = lastTime_ = MPI::Wtime();
00047
00048 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00049 + uint2str(mpiGlobalSize_) + " on " + processorName_);
00050
00051 bInitialized_ = true;
00052
00053 return true;
00054 }
00055
00056
00057
00058
00059 uint Communicator::createDemeCommunicator(uint nDemes)
00060 {
00061
00062 uint myColor = mpiGlobalRank_ % nDemes;
00063 demeMasters.resize(nDemes);
00064 for(uint i = 0; i < nDemes; i++)
00065 demeMasters[i] = i;
00066
00067 demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
00068 mpiSize_ = demeComm_.Get_size();
00069 mpiRank_ = demeComm_.Get_rank();
00070
00071 std::stringstream log;
00072 log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
00073 log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
00074 ECF_LOG(state_, 2, log.str());
00075
00076 return myColor;
00077 }
00078
00079
00080 bool Communicator::finalize()
00081 {
00082 if(!bInitialized_)
00083 return true;
00084
00085 endTime_ = MPI::Wtime();
00086
00087 std::stringstream times;
00088 times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
00089 times << ", COMP: " << compTime_;
00090 times << ", IDLE: " << idleTime_;
00091 times << ", SEND: " << sendTime_;
00092 times << ", RECV: " << recvTime_;
00093 times << ", PACK: " << packTime_;
00094 times << ", UNPACK: " << unpackTime_ << std::endl;
00095
00096
00097 if(mpiGlobalRank_ == 0) {
00098 std::string message;
00099 MPI::Status status;
00100
00101 for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
00102 frameworkComm_.Probe(iProcess, T_FINAL, status);
00103 uint size = status.Get_count(MPI::CHAR);
00104 message.resize(size);
00105 frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
00106
00107 times << message;
00108 }
00109 ECF_LOG(state_, 2, times.str());
00110 state_->getLogger()->saveTo(true);
00111 }
00112 else {
00113 std::string message = times.str();
00114 frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
00115 }
00116
00117 if(!state_->getBatchMode())
00118 MPI::Finalize();
00119
00120 return true;
00121 }
00122
00127 double Communicator::time(enum timing T)
00128 {
00129 currentTime_ = MPI::Wtime();
00130 double elapsed = currentTime_ - lastTime_;
00131 lastTime_ = currentTime_;
00132
00133 switch(T) {
00134 case IDLE:
00135 idleTime_ += elapsed;
00136 break;
00137 case COMP:
00138 compTime_ += elapsed;
00139 break;
00140 case SEND:
00141 sendTime_ += elapsed;
00142 break;
00143 case RECV:
00144 recvTime_ += elapsed;
00145 break;
00146 case PACK:
00147 packTime_ += elapsed;
00148 break;
00149 case UNPACK:
00150 unpackTime_ += elapsed;
00151 break;
00152 }
00153
00154 return 1000 * elapsed;
00155 }
00156
00157
00158 uint Communicator::getDemeMaster(uint iDeme)
00159 {
00160 return demeMasters[iDeme];
00161 }
00162
00163
00164 uint Communicator::getLastSource()
00165 {
00166 return status_.Get_source();
00167 }
00168
00169
00170 void Communicator::synchronize()
00171 {
00172 MPI::COMM_WORLD.Barrier();
00173 }
00174
00175
00176
00177 bool Communicator::messageWaiting(uint iProcess, uint tag)
00178 {
00179 return demeComm_.Iprobe(iProcess, tag, status_);
00180 }
00181
00182
00183 bool Communicator::sendControlMessage(uint iProcess, int control)
00184 {
00185 demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00186 return true;
00187 }
00188
00189
00190 int Communicator::recvControlMessage(uint iProcess)
00191 {
00192 int control;
00193 demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00194 return control;
00195 }
00196
00197
00198 bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
00199 {
00200 ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
00201 uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
00202 frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
00203 return true;
00204 }
00205
00206
00207 bool Communicator::recvTerminateMessage(uint iProcess)
00208 {
00209 bool termination;
00210 frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
00211 if(controlStatus_.Get_tag() == T_TERMINATE)
00212 termination = true;
00213 if(controlStatus_.Get_tag() == T_CONTINUE)
00214 termination = false;
00215 return termination;
00216 }
00217
00218
00219 bool Communicator::checkTerminationMessage(uint master)
00220 {
00221 if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
00222 if(controlStatus_.Get_tag() == T_TERMINATE) {
00223 return true;
00224 }
00225 else
00226 recvTerminateMessage(master);
00227 }
00228 return false;
00229 }
00230
00231
00232
00233
00234 bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00235 {
00236 time(COMP);
00237
00238 XMLNode xAll, xIndividual;
00239 xAll = XMLNode::createXMLTopNode("Pack");
00240
00241 if(nIndividuals == 0)
00242 nIndividuals = (uint) pool.size();
00243 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00244
00245 for(uint ind = 0; ind < nIndividuals; ind++) {
00246 pool[ind]->write(xIndividual);
00247 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00248 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00249 xAll.addChild(xIndividual);
00250 }
00251 char *message = xAll.createXMLString(0);
00252
00253
00254 double createTime = time(PACK);
00255
00256
00257 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00258
00259 double sendTime = time(SEND);
00260
00261 std::stringstream log;
00262 log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00263 ECF_LOG(state_, logLevel_, log.str());
00264
00265 freeXMLString(message);
00266
00267 return true;
00268 }
00269
00270
00271
00272 bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00273 {
00274 time(COMP);
00275
00276 XMLNode xAll, xIndividual;
00277 xAll = XMLNode::createXMLTopNode("Pack");
00278
00279 if(nIndividuals == 0)
00280 nIndividuals = (uint) pool.size();
00281 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00282
00283 for(uint ind = 0; ind < nIndividuals; ind++) {
00284 pool[ind]->write(xIndividual);
00285 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00286 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00287 xAll.addChild(xIndividual);
00288 }
00289 char *message = xAll.createXMLString(0);
00290
00291 double createTime = time(PACK);
00292
00293 frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00294
00295 double sendTime = time(SEND);
00296
00297 std::stringstream log;
00298 log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00299 ECF_LOG(state_, logLevel_, log.str());
00300
00301 freeXMLString(message);
00302
00303 return true;
00304 }
00305
00306
00307
00308
00309 uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
00310 {
00311 XMLNode xAll, xIndividual;
00312 MPI::Status status;
00313 std::string message;
00314
00315 time(COMP);
00316
00317 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00318
00319 double idle = time(IDLE);
00320
00321 uint length = status.Get_count(MPI::CHAR);
00322 message.resize(length + 1);
00323 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00324
00325 double recv = time(RECV);
00326
00327 xAll = XMLNode::parseString(message.c_str(), "Pack");
00328 uint nIndividuals = atoi(xAll.getAttribute("size"));
00329 uint index, cid;
00330 for(uint i = 0; i < nIndividuals; i++) {
00331
00332 xIndividual = xAll.getChildNode(i);
00333 index = atoi(xIndividual.getAttributeValue(1));
00334 cid = atoi(xIndividual.getAttributeValue(2));
00335 deme[index]->read(xIndividual);
00336 deme[index]->cid = cid;
00337 }
00338 status_ = status;
00339
00340 double read = time(UNPACK);
00341
00342 std::stringstream log;
00343 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00344 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00345 ECF_LOG(state_, logLevel_, log.str());
00346
00347 return nIndividuals;
00348 }
00349
00350
00351
00352
00353 std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
00354 {
00355 XMLNode xAll, xIndividual;
00356 MPI::Status status;
00357 std::string message;
00358 std::vector<IndividualP> pack;
00359
00360 time(COMP);
00361
00362 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00363
00364 double idle = time(IDLE);
00365
00366 uint length = status.Get_count(MPI::CHAR);
00367 message.resize(length + 1);
00368 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00369
00370 double recv = time(RECV);
00371
00372 xAll = XMLNode::parseString(message.c_str(), "Pack");
00373 uint nIndividuals = atoi(xAll.getAttribute("size"));
00374 uint index, cid;
00375 for(uint i = 0; i < nIndividuals; i++) {
00376
00377 xIndividual = xAll.getChildNode(i);
00378 pack.push_back((IndividualP) new Individual(state_));
00379 index = atoi(xIndividual.getAttributeValue(1));
00380 cid = atoi(xIndividual.getAttributeValue(2));
00381
00382 pack[i]->index = index;
00383 pack[i]->cid = cid;
00384 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00385 pack[i]->read(xIndividual);
00386 }
00387 status_ = status;
00388
00389 double read = time(UNPACK);
00390
00391 std::stringstream log;
00392 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00393 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00394 ECF_LOG(state_, logLevel_, log.str());
00395
00396 return pack;
00397 }
00398
00399
00400
00401 std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
00402 {
00403 XMLNode xAll, xIndividual;
00404 MPI::Status status;
00405 std::string message;
00406 std::vector<IndividualP> pack;
00407
00408 time(COMP);
00409
00410 frameworkComm_.Probe(iProcess, T_DEFAULT, status);
00411
00412 double idle = time(IDLE);
00413
00414 uint length = status.Get_count(MPI::CHAR);
00415 message.resize(length);
00416 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00417
00418 double recv = time(RECV);
00419
00420 xAll = XMLNode::parseString(message.c_str(), "Pack");
00421 uint nIndividuals = atoi(xAll.getAttribute("size"));
00422 uint index, cid;
00423 for(uint i = 0; i < nIndividuals; i++) {
00424 xIndividual = xAll.getChildNode(i);
00425 pack.push_back((IndividualP) new Individual(state_));
00426 index = atoi(xIndividual.getAttributeValue(1));
00427 cid = atoi(xIndividual.getAttributeValue(2));
00428
00429 pack[i]->index = index;
00430 pack[i]->cid = cid;
00431 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00432 pack[i]->read(xIndividual);
00433 }
00434 status_ = status;
00435
00436 double read = time(UNPACK);
00437
00438 std::stringstream log;
00439 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00440 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00441 ECF_LOG(state_, logLevel_, log.str());
00442
00443 return pack;
00444 }
00445
00446
00447
00448
00449
00450
00451 uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
00452 {
00453 XMLNode xAll, xIndividual;
00454 MPI::Status status;
00455
00456 time(COMP);
00457
00458 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00459
00460 double idle = time(IDLE);
00461
00462 uint length = status.Get_count(MPI::CHAR);
00463 char *message = new char[length + 1];
00464 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00465 status_ = status;
00466
00467 double recv = time(RECV);
00468
00469 xAll = XMLNode::parseString(message, "Pack");
00470 uint nIndividuals = atoi(xAll.getAttribute("size"));
00471 uint poolSize = (uint) pool.size();
00472
00473 if(poolSize < nIndividuals) {
00474 pool.resize(nIndividuals);
00475 for(uint i = poolSize; i < nIndividuals; i++) {
00476 pool[i] = (IndividualP) new Individual(state_);
00477 pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00478 }
00479 }
00480
00481 uint index, cid;
00482 for(uint i = 0; i < nIndividuals; i++) {
00483 xIndividual = xAll.getChildNode(i);
00484 index = atoi(xIndividual.getAttributeValue(1));
00485 cid = atoi(xIndividual.getAttributeValue(2));
00486 pool[i]->index = index;
00487 pool[i]->cid = cid;
00488 pool[i]->read(xIndividual);
00489 }
00490
00491 double read = time(UNPACK);
00492
00493 std::stringstream log;
00494 log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00495 ECF_LOG(state_, logLevel_, log.str());
00496
00497 delete [] message;
00498
00499 return nIndividuals;
00500 }
00501
00502
00503
00504
00505
00506
00507 bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00508 {
00509 time(COMP);
00510
00511 XMLNode xAll, xIndividual, xFitness;
00512 xAll = XMLNode::createXMLTopNode("Pack");
00513
00514 if(nIndividuals == 0)
00515 nIndividuals = (uint) pool.size();
00516 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00517
00518 for(uint ind = 0; ind < nIndividuals; ind++) {
00519 xIndividual = XMLNode::createXMLTopNode("Individual");
00520 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00521 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00522
00523 pool[ind]->fitness->write(xFitness);
00524 xIndividual.addChild(xFitness);
00525 xAll.addChild(xIndividual);
00526 }
00527 char *message = xAll.createXMLString(0);
00528
00529 double pack = time(PACK);
00530
00531 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00532
00533 double send = time(SEND);
00534
00535 std::stringstream log;
00536 log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
00537 ECF_LOG(state_, logLevel_, log.str());
00538
00539 freeXMLString(message);
00540
00541 return true;
00542 }
00543
00544
00545
00546
00547
00548 uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
00549 {
00550 XMLNode xAll, xIndividual, xFitness;
00551 MPI::Status status;
00552 std::string message;
00553
00554 time(COMP);
00555
00556 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00557
00558 double idle = time(IDLE);
00559
00560 uint length = status.Get_count(MPI::CHAR);
00561 message.resize(length);
00562 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00563
00564 double recv = time(RECV);
00565
00566 xAll = XMLNode::parseString(message.c_str(), "Pack");
00567 uint nIndividuals = atoi(xAll.getAttribute("size"));
00568 uint index, cid;
00569 for(uint i = 0; i < nIndividuals; i++) {
00570 xIndividual = xAll.getChildNode(i);
00571 index = atoi(xIndividual.getAttributeValue(0));
00572 cid = atoi(xIndividual.getAttributeValue(1));
00573 xFitness = xIndividual.getChildNode(0);
00574 deme[index]->fitness->read(xFitness);
00575 deme[index]->fitness->cid = cid;
00576
00577 }
00578 status_ = status;
00579
00580 double read = time(UNPACK);
00581
00582 std::stringstream log;
00583 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00584 ECF_LOG(state_, logLevel_, log.str());
00585
00586 return nIndividuals;
00587 }
00588
00589
00590
00591
00592
00593
00594 std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
00595 {
00596 XMLNode xAll, xIndividual, xFitness;
00597 MPI::Status status;
00598 std::string message;
00599 std::vector<uint> indices;
00600
00601 time(COMP);
00602
00603 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00604
00605 double idle = time(IDLE);
00606
00607 uint length = status.Get_count(MPI::CHAR);
00608 message.resize(length);
00609 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00610
00611 double recv = time(RECV);
00612
00613 xAll = XMLNode::parseString(message.c_str(), "Pack");
00614 uint nIndividuals = atoi(xAll.getAttribute("size"));
00615 uint index, cid;
00616 for(uint i = 0; i < nIndividuals; i++) {
00617 xIndividual = xAll.getChildNode(i);
00618 index = atoi(xIndividual.getAttributeValue(0));
00619 cid = atoi(xIndividual.getAttributeValue(1));
00620 xFitness = xIndividual.getChildNode(0);
00621
00622 deme[index]->fitness->read(xFitness);
00623 deme[index]->fitness->cid = cid;
00624 indices.push_back(index);
00625 }
00626 status_ = status;
00627
00628 double read = time(UNPACK);
00629
00630 std::stringstream log;
00631 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00632 ECF_LOG(state_, logLevel_, log.str());
00633
00634 return indices;
00635 }
00636
00637
00638
00639 bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
00640 {
00641 time(COMP);
00642
00643 frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
00644
00645 time(SEND);
00646
00647 std::stringstream log;
00648 log << "sent " << values.size() << " doubles";
00649 ECF_LOG(state_, logLevel_, log.str());
00650
00651 return true;
00652 }
00653
00654
00655
00656 std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
00657 {
00658 std::vector<double> values;
00659 MPI::Status status;
00660
00661 time(COMP);
00662
00663 frameworkComm_.Probe(iProcess, T_VALUES, status);
00664
00665 double idle = time(IDLE);
00666
00667 uint size = status.Get_count(MPI::DOUBLE);
00668 values.resize(size);
00669 frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
00670
00671 double recv = time(RECV);
00672
00673 std::stringstream log;
00674 log << "received " << values.size() << " doubles";
00675 ECF_LOG(state_, logLevel_, log.str());
00676
00677 return values;
00678 }
00679
00680
00681
00682 std::string Communicator::recvLogsGlobal()
00683 {
00684 std::string logs = "", message;
00685 uint logCount = 0;
00686 MPI::Status status;
00687
00688 time(COMP);
00689
00690 while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
00691 uint iProcess = status.Get_source();
00692 uint length = status.Get_count(MPI::CHAR);
00693 message.resize(length);
00694 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
00695 logs += message;
00696 logCount++;
00697 }
00698
00699 time(RECV);
00700
00701 std::stringstream log;
00702 log << "received " << logCount << " logs";
00703 ECF_LOG(state_, logLevel_, log.str());
00704
00705 return logs;
00706 }
00707
00708
00709
00710 bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
00711 {
00712 time(COMP);
00713
00714 MPI::Request request;
00715
00716 if(blocking)
00717 frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00718 else
00719 request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00720
00721 time(SEND);
00722
00723 std::stringstream log;
00724 log << "sent " << logs.size() << " log chars";
00725 ECF_LOG(state_, logLevel_, log.str());
00726
00727 return true;
00728 }
00729
00730
00731
00732 bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
00733 {
00734 time(COMP);
00735
00736 frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
00737
00738 time(SEND);
00739
00740 std::stringstream log;
00741 log << "sent " << size << " bytes";
00742 ECF_LOG(state_, logLevel_, log.str());
00743
00744 return true;
00745 }
00746
00747
00748
00749 voidP Communicator::recvDataGlobal(uint iProcess)
00750 {
00751 MPI::Status status;
00752 voidP data;
00753
00754 time(COMP);
00755
00756 frameworkComm_.Probe(iProcess, T_DATA, status);
00757
00758 time(IDLE);
00759
00760 uint size = status.Get_count(MPI::BYTE);
00761 data = (voidP) new char[size / sizeof(char) + 1];
00762 frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
00763
00764 time(RECV);
00765
00766 std::stringstream log;
00767 log << "received " << size << " bytes";
00768 ECF_LOG(state_, logLevel_, log.str());
00769
00770 return data;
00771 }
00772
00773 }