00001 #include "ECF_base.h"
00002 #include <string.h>
00003
00004 namespace Comm
00005 {
00006
00007 Communicator::Communicator()
00008 {
00009 bInitialized_ = false;
00010 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00011 logLevel_ = 4;
00012 sendCnt_ = recvCnt_ = 0;
00013 }
00014
00015
00016 bool Communicator::initialize(StateP state, int argc, char** argv)
00017 {
00018 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00019
00020
00021 if(bInitialized_) {
00022 beginTime_ = lastTime_ = MPI::Wtime();
00023 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00024 + uint2str(mpiGlobalSize_) + " on " + processorName_);
00025 return true;
00026 }
00027
00028 state_ = state;
00029
00030 MPI::Init(argc, argv);
00031
00032 mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
00033 mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
00034
00035 demeComm_ = MPI::COMM_WORLD.Dup();
00036 mpiSize_ = demeComm_.Get_size();
00037 mpiRank_ = demeComm_.Get_rank();
00038
00039 frameworkComm_ = MPI::COMM_WORLD.Dup();
00040
00041 char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
00042 pp = &processor_name[0];
00043 int namelen;
00044 MPI_Get_processor_name(processor_name, &namelen);
00045 processorName_ = processor_name;
00046
00047 beginTime_ = lastTime_ = MPI::Wtime();
00048
00049 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00050 + uint2str(mpiGlobalSize_) + " on " + processorName_);
00051
00052 bInitialized_ = true;
00053
00054 return true;
00055 }
00056
00057
00058
00059
00060 uint Communicator::createDemeCommunicator(uint nDemes)
00061 {
00062
00063 uint myColor = mpiGlobalRank_ % nDemes;
00064 demeMasters.resize(nDemes);
00065 for(uint i = 0; i < nDemes; i++)
00066 demeMasters[i] = i;
00067
00068 demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
00069 mpiSize_ = demeComm_.Get_size();
00070 mpiRank_ = demeComm_.Get_rank();
00071
00072 std::stringstream log;
00073 log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
00074 log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
00075 ECF_LOG(state_, 2, log.str());
00076
00077 return myColor;
00078 }
00079
00080
00081 bool Communicator::finalize()
00082 {
00083 if(!bInitialized_)
00084 return true;
00085
00086 endTime_ = MPI::Wtime();
00087
00088 std::stringstream times;
00089 times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
00090 times << ", COMP: " << compTime_;
00091 times << ", IDLE: " << idleTime_;
00092 times << ", SEND: " << sendTime_;
00093 times << ", RECV: " << recvTime_;
00094 times << ", PACK: " << packTime_;
00095 times << ", UNPACK: " << unpackTime_ << std::endl;
00096
00097
00098 if(mpiGlobalRank_ == 0) {
00099 std::string message;
00100 MPI::Status status;
00101
00102 for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
00103 frameworkComm_.Probe(iProcess, T_FINAL, status);
00104 uint size = status.Get_count(MPI::CHAR);
00105 message.resize(size);
00106 frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
00107
00108 times << message;
00109 }
00110 ECF_LOG(state_, 2, times.str());
00111 state_->getLogger()->saveTo(true);
00112 }
00113 else {
00114 std::string message = times.str();
00115 frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
00116 }
00117
00118 if(!state_->getBatchMode())
00119 MPI::Finalize();
00120
00121 return true;
00122 }
00123
00128 double Communicator::time(enum timing T)
00129 {
00130 currentTime_ = MPI::Wtime();
00131 double elapsed = currentTime_ - lastTime_;
00132 lastTime_ = currentTime_;
00133
00134 switch(T) {
00135 case IDLE:
00136 idleTime_ += elapsed;
00137 break;
00138 case COMP:
00139 compTime_ += elapsed;
00140 break;
00141 case SEND:
00142 sendTime_ += elapsed;
00143 break;
00144 case RECV:
00145 recvTime_ += elapsed;
00146 break;
00147 case PACK:
00148 packTime_ += elapsed;
00149 break;
00150 case UNPACK:
00151 unpackTime_ += elapsed;
00152 break;
00153 }
00154
00155 return 1000 * elapsed;
00156 }
00157
00158
00159 uint Communicator::getDemeMaster(uint iDeme)
00160 {
00161 return demeMasters[iDeme];
00162 }
00163
00164
00165 uint Communicator::getLastSource()
00166 {
00167 return status_.Get_source();
00168 }
00169
00170
00171 void Communicator::synchronize()
00172 {
00173 MPI::COMM_WORLD.Barrier();
00174 }
00175
00176
00177
00178 bool Communicator::messageWaiting(uint iProcess, uint tag)
00179 {
00180 return demeComm_.Iprobe(iProcess, tag, status_);
00181 }
00182
00183
00184 bool Communicator::sendControlMessage(uint iProcess, int control)
00185 {
00186 demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00187 return true;
00188 }
00189
00190
00191 int Communicator::recvControlMessage(uint iProcess)
00192 {
00193 int control;
00194 demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00195 return control;
00196 }
00197
00198
00199 bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
00200 {
00201 ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
00202 uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
00203 frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
00204 return true;
00205 }
00206
00207
00208 bool Communicator::recvTerminateMessage(uint iProcess)
00209 {
00210 bool termination;
00211 frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
00212 if(controlStatus_.Get_tag() == T_TERMINATE)
00213 termination = true;
00214 if(controlStatus_.Get_tag() == T_CONTINUE)
00215 termination = false;
00216 return termination;
00217 }
00218
00219
00220 bool Communicator::checkTerminationMessage(uint master)
00221 {
00222 if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
00223 if(controlStatus_.Get_tag() == T_TERMINATE) {
00224 return true;
00225 }
00226 else
00227 recvTerminateMessage(master);
00228 }
00229 return false;
00230 }
00231
00232
00233
00234
00235 bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00236 {
00237 time(COMP);
00238
00239 XMLNode xAll, xIndividual;
00240 xAll = XMLNode::createXMLTopNode("Pack");
00241
00242 if(nIndividuals == 0)
00243 nIndividuals = (uint) pool.size();
00244 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00245
00246 for(uint ind = 0; ind < nIndividuals; ind++) {
00247 pool[ind]->write(xIndividual);
00248 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00249 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00250 xAll.addChild(xIndividual);
00251 }
00252 char *message = xAll.createXMLString(0);
00253
00254
00255 double createTime = time(PACK);
00256
00257
00258 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00259
00260 double sendTime = time(SEND);
00261
00262 std::stringstream log;
00263 log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00264 ECF_LOG(state_, logLevel_, log.str());
00265
00266 freeXMLString(message);
00267
00268 return true;
00269 }
00270
00271
00272
00273 bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00274 {
00275 time(COMP);
00276
00277 XMLNode xAll, xIndividual;
00278 xAll = XMLNode::createXMLTopNode("Pack");
00279
00280 if(nIndividuals == 0)
00281 nIndividuals = (uint) pool.size();
00282 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00283
00284 for(uint ind = 0; ind < nIndividuals; ind++) {
00285 pool[ind]->write(xIndividual);
00286 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00287 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00288 xAll.addChild(xIndividual);
00289 }
00290 char *message = xAll.createXMLString(0);
00291
00292 double createTime = time(PACK);
00293
00294 frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00295
00296 double sendTime = time(SEND);
00297
00298 std::stringstream log;
00299 log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00300 ECF_LOG(state_, logLevel_, log.str());
00301
00302 freeXMLString(message);
00303
00304 return true;
00305 }
00306
00307
00308
00309
00310 uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
00311 {
00312 XMLNode xAll, xIndividual;
00313 MPI::Status status;
00314 std::string message;
00315
00316 time(COMP);
00317
00318 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00319
00320 double idle = time(IDLE);
00321
00322 uint length = status.Get_count(MPI::CHAR);
00323 message.resize(length + 1);
00324 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00325
00326 double recv = time(RECV);
00327
00328 xAll = XMLNode::parseString(message.c_str(), "Pack");
00329 uint nIndividuals = atoi(xAll.getAttribute("size"));
00330 uint index, cid;
00331 for(uint i = 0; i < nIndividuals; i++) {
00332
00333 xIndividual = xAll.getChildNode(i);
00334 index = atoi(xIndividual.getAttributeValue(1));
00335 cid = atoi(xIndividual.getAttributeValue(2));
00336 deme[index]->read(xIndividual);
00337 deme[index]->cid = cid;
00338 }
00339 status_ = status;
00340
00341 double read = time(UNPACK);
00342
00343 std::stringstream log;
00344 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00345 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00346 ECF_LOG(state_, logLevel_, log.str());
00347
00348 return nIndividuals;
00349 }
00350
00351
00352
00353
00354 std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
00355 {
00356 XMLNode xAll, xIndividual;
00357 MPI::Status status;
00358 std::string message;
00359 std::vector<IndividualP> pack;
00360
00361 time(COMP);
00362
00363 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00364
00365 double idle = time(IDLE);
00366
00367 uint length = status.Get_count(MPI::CHAR);
00368 message.resize(length + 1);
00369 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00370
00371 double recv = time(RECV);
00372
00373 xAll = XMLNode::parseString(message.c_str(), "Pack");
00374 uint nIndividuals = atoi(xAll.getAttribute("size"));
00375 uint index, cid;
00376 for(uint i = 0; i < nIndividuals; i++) {
00377
00378 xIndividual = xAll.getChildNode(i);
00379 pack.push_back((IndividualP) new Individual(state_));
00380 index = atoi(xIndividual.getAttributeValue(1));
00381 cid = atoi(xIndividual.getAttributeValue(2));
00382
00383 pack[i]->index = index;
00384 pack[i]->cid = cid;
00385 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00386 pack[i]->read(xIndividual);
00387 }
00388 status_ = status;
00389
00390 double read = time(UNPACK);
00391
00392 std::stringstream log;
00393 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00394 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00395 ECF_LOG(state_, logLevel_, log.str());
00396
00397 return pack;
00398 }
00399
00400
00401
00402 std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
00403 {
00404 XMLNode xAll, xIndividual;
00405 MPI::Status status;
00406 std::string message;
00407 std::vector<IndividualP> pack;
00408
00409 time(COMP);
00410
00411 frameworkComm_.Probe(iProcess, T_DEFAULT, status);
00412
00413 double idle = time(IDLE);
00414
00415 uint length = status.Get_count(MPI::CHAR);
00416 message.resize(length);
00417 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00418
00419 double recv = time(RECV);
00420
00421 xAll = XMLNode::parseString(message.c_str(), "Pack");
00422 uint nIndividuals = atoi(xAll.getAttribute("size"));
00423 uint index, cid;
00424 for(uint i = 0; i < nIndividuals; i++) {
00425 xIndividual = xAll.getChildNode(i);
00426 pack.push_back((IndividualP) new Individual(state_));
00427 index = atoi(xIndividual.getAttributeValue(1));
00428 cid = atoi(xIndividual.getAttributeValue(2));
00429
00430 pack[i]->index = index;
00431 pack[i]->cid = cid;
00432 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00433 pack[i]->read(xIndividual);
00434 }
00435 status_ = status;
00436
00437 double read = time(UNPACK);
00438
00439 std::stringstream log;
00440 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00441 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00442 ECF_LOG(state_, logLevel_, log.str());
00443
00444 return pack;
00445 }
00446
00447
00448
00449
00450
00451
00452 uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
00453 {
00454 XMLNode xAll, xIndividual;
00455 MPI::Status status;
00456
00457 time(COMP);
00458
00459 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00460
00461 double idle = time(IDLE);
00462
00463 uint length = status.Get_count(MPI::CHAR);
00464 char *message = new char[length + 1];
00465 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00466 status_ = status;
00467
00468 double recv = time(RECV);
00469
00470 xAll = XMLNode::parseString(message, "Pack");
00471 uint nIndividuals = atoi(xAll.getAttribute("size"));
00472 uint poolSize = (uint) pool.size();
00473
00474 if(poolSize < nIndividuals) {
00475 pool.resize(nIndividuals);
00476 for(uint i = poolSize; i < nIndividuals; i++) {
00477 pool[i] = (IndividualP) new Individual(state_);
00478 pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00479 }
00480 }
00481
00482 uint index, cid;
00483 for(uint i = 0; i < nIndividuals; i++) {
00484 xIndividual = xAll.getChildNode(i);
00485 index = atoi(xIndividual.getAttributeValue(1));
00486 cid = atoi(xIndividual.getAttributeValue(2));
00487 pool[i]->index = index;
00488 pool[i]->cid = cid;
00489 pool[i]->read(xIndividual);
00490 }
00491
00492 double read = time(UNPACK);
00493
00494 std::stringstream log;
00495 log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00496 ECF_LOG(state_, logLevel_, log.str());
00497
00498 delete [] message;
00499
00500 return nIndividuals;
00501 }
00502
00503
00504
00505
00506
00507
00508 bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00509 {
00510 time(COMP);
00511
00512 XMLNode xAll, xIndividual, xFitness;
00513 xAll = XMLNode::createXMLTopNode("Pack");
00514
00515 if(nIndividuals == 0)
00516 nIndividuals = (uint) pool.size();
00517 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00518
00519 for(uint ind = 0; ind < nIndividuals; ind++) {
00520 xIndividual = XMLNode::createXMLTopNode("Individual");
00521 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00522 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00523
00524 pool[ind]->fitness->write(xFitness);
00525 xIndividual.addChild(xFitness);
00526 xAll.addChild(xIndividual);
00527 }
00528 char *message = xAll.createXMLString(0);
00529
00530 double pack = time(PACK);
00531
00532 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00533
00534 double send = time(SEND);
00535
00536 std::stringstream log;
00537 log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
00538 ECF_LOG(state_, logLevel_, log.str());
00539
00540 freeXMLString(message);
00541
00542 return true;
00543 }
00544
00545
00546
00547
00548
00549 uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
00550 {
00551 XMLNode xAll, xIndividual, xFitness;
00552 MPI::Status status;
00553 std::string message;
00554
00555 time(COMP);
00556
00557 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00558
00559 double idle = time(IDLE);
00560
00561 uint length = status.Get_count(MPI::CHAR);
00562 message.resize(length);
00563 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00564
00565 double recv = time(RECV);
00566
00567 xAll = XMLNode::parseString(message.c_str(), "Pack");
00568 uint nIndividuals = atoi(xAll.getAttribute("size"));
00569 uint index, cid;
00570 for(uint i = 0; i < nIndividuals; i++) {
00571 xIndividual = xAll.getChildNode(i);
00572 index = atoi(xIndividual.getAttributeValue(0));
00573 cid = atoi(xIndividual.getAttributeValue(1));
00574 xFitness = xIndividual.getChildNode(0);
00575 deme[index]->fitness->read(xFitness);
00576 deme[index]->fitness->cid = cid;
00577
00578 }
00579 status_ = status;
00580
00581 double read = time(UNPACK);
00582
00583 std::stringstream log;
00584 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00585 ECF_LOG(state_, logLevel_, log.str());
00586
00587 return nIndividuals;
00588 }
00589
00590
00591
00592
00593
00594
00595 std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
00596 {
00597 XMLNode xAll, xIndividual, xFitness;
00598 MPI::Status status;
00599 std::string message;
00600 std::vector<uint> indices;
00601
00602 time(COMP);
00603
00604 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00605
00606 double idle = time(IDLE);
00607
00608 uint length = status.Get_count(MPI::CHAR);
00609 message.resize(length);
00610 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00611
00612 double recv = time(RECV);
00613
00614 xAll = XMLNode::parseString(message.c_str(), "Pack");
00615 uint nIndividuals = atoi(xAll.getAttribute("size"));
00616 uint index, cid;
00617 for(uint i = 0; i < nIndividuals; i++) {
00618 xIndividual = xAll.getChildNode(i);
00619 index = atoi(xIndividual.getAttributeValue(0));
00620 cid = atoi(xIndividual.getAttributeValue(1));
00621 xFitness = xIndividual.getChildNode(0);
00622
00623 deme[index]->fitness->read(xFitness);
00624 deme[index]->fitness->cid = cid;
00625 indices.push_back(index);
00626 }
00627 status_ = status;
00628
00629 double read = time(UNPACK);
00630
00631 std::stringstream log;
00632 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00633 ECF_LOG(state_, logLevel_, log.str());
00634
00635 return indices;
00636 }
00637
00638
00639
00640 bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
00641 {
00642 time(COMP);
00643
00644 frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
00645
00646 time(SEND);
00647
00648 std::stringstream log;
00649 log << "sent " << values.size() << " doubles";
00650 ECF_LOG(state_, logLevel_, log.str());
00651
00652 return true;
00653 }
00654
00655
00656
00657 std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
00658 {
00659 std::vector<double> values;
00660 MPI::Status status;
00661
00662 time(COMP);
00663
00664 frameworkComm_.Probe(iProcess, T_VALUES, status);
00665
00666 double idle = time(IDLE);
00667
00668 uint size = status.Get_count(MPI::DOUBLE);
00669 values.resize(size);
00670 frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
00671
00672 double recv = time(RECV);
00673
00674 std::stringstream log;
00675 log << "received " << values.size() << " doubles";
00676 ECF_LOG(state_, logLevel_, log.str());
00677
00678 return values;
00679 }
00680
00681
00682
00683 std::string Communicator::recvLogsGlobal()
00684 {
00685 std::string logs = "", message;
00686 uint logCount = 0;
00687 MPI::Status status;
00688
00689 time(COMP);
00690
00691 while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
00692 uint iProcess = status.Get_source();
00693 uint length = status.Get_count(MPI::CHAR);
00694 message.resize(length);
00695 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
00696 logs += message;
00697 logCount++;
00698 }
00699
00700 time(RECV);
00701
00702 std::stringstream log;
00703 log << "received " << logCount << " logs";
00704 ECF_LOG(state_, logLevel_, log.str());
00705
00706 return logs;
00707 }
00708
00709
00710
00711 bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
00712 {
00713 time(COMP);
00714
00715 MPI::Request request;
00716
00717 if(blocking)
00718 frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00719 else
00720 request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00721
00722 time(SEND);
00723
00724 std::stringstream log;
00725 log << "sent " << logs.size() << " log chars";
00726 ECF_LOG(state_, logLevel_, log.str());
00727
00728 return true;
00729 }
00730
00731
00732
00733 bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
00734 {
00735 time(COMP);
00736
00737 frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
00738
00739 time(SEND);
00740
00741 std::stringstream log;
00742 log << "sent " << size << " bytes";
00743 ECF_LOG(state_, logLevel_, log.str());
00744
00745 return true;
00746 }
00747
00748
00749
00750 voidP Communicator::recvDataGlobal(uint iProcess)
00751 {
00752 MPI::Status status;
00753 voidP data;
00754
00755 time(COMP);
00756
00757 frameworkComm_.Probe(iProcess, T_DATA, status);
00758
00759 time(IDLE);
00760
00761 uint size = status.Get_count(MPI::BYTE);
00762 data = (voidP) new char[size / sizeof(char) + 1];
00763 frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
00764
00765 time(RECV);
00766
00767 std::stringstream log;
00768 log << "received " << size << " bytes";
00769 ECF_LOG(state_, logLevel_, log.str());
00770
00771 return data;
00772 }
00773
00774 }