00001 #include "ECF_base.h"
00002 #include <string.h>
00003
00004 namespace Comm
00005 {
00006
00007 #ifdef _MPI
00008
00009 Communicator::Communicator()
00010 {
00011 bInitialized_ = false;
00012 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00013 logLevel_ = 4;
00014 sendCnt_ = recvCnt_ = 0;
00015 }
00016
00017
00018 bool Communicator::initialize(StateP state, int argc, char** argv)
00019 {
00020 idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00021
00022
00023 if(bInitialized_) {
00024 beginTime_ = lastTime_ = MPI::Wtime();
00025 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00026 + uint2str(mpiGlobalSize_) + " on " + processorName_);
00027 return true;
00028 }
00029
00030 state_ = state;
00031
00032 MPI::Init(argc, argv);
00033
00034 mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
00035 mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
00036
00037 demeComm_ = MPI::COMM_WORLD.Dup();
00038 mpiSize_ = demeComm_.Get_size();
00039 mpiRank_ = demeComm_.Get_rank();
00040
00041 frameworkComm_ = MPI::COMM_WORLD.Dup();
00042
00043 char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
00044 pp = &processor_name[0];
00045 int namelen;
00046 MPI_Get_processor_name(processor_name, &namelen);
00047 processorName_ = processor_name;
00048
00049 beginTime_ = lastTime_ = MPI::Wtime();
00050
00051 ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00052 + uint2str(mpiGlobalSize_) + " on " + processorName_);
00053
00054 bInitialized_ = true;
00055
00056 return true;
00057 }
00058
00059
00060
00061
00062 uint Communicator::createDemeCommunicator(uint nDemes)
00063 {
00064
00065 uint myColor = mpiGlobalRank_ % nDemes;
00066 demeMasters.resize(nDemes);
00067 for(uint i = 0; i < nDemes; i++)
00068 demeMasters[i] = i;
00069
00070 demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
00071 mpiSize_ = demeComm_.Get_size();
00072 mpiRank_ = demeComm_.Get_rank();
00073
00074 std::stringstream log;
00075 log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
00076 log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
00077 ECF_LOG(state_, 2, log.str());
00078
00079 return myColor;
00080 }
00081
00082
00083 bool Communicator::finalize()
00084 {
00085 if(!bInitialized_)
00086 return true;
00087
00088 endTime_ = MPI::Wtime();
00089
00090 std::stringstream times;
00091 times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
00092 times << ", COMP: " << compTime_;
00093 times << ", IDLE: " << idleTime_;
00094 times << ", SEND: " << sendTime_;
00095 times << ", RECV: " << recvTime_;
00096 times << ", PACK: " << packTime_;
00097 times << ", UNPACK: " << unpackTime_ << std::endl;
00098
00099
00100 if(mpiGlobalRank_ == 0) {
00101 std::string message;
00102 MPI::Status status;
00103
00104 for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
00105 frameworkComm_.Probe(iProcess, T_FINAL, status);
00106 uint size = status.Get_count(MPI::CHAR);
00107 message.resize(size);
00108 frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
00109
00110 times << message;
00111 }
00112 ECF_LOG(state_, 2, times.str());
00113 state_->getLogger()->saveTo(true);
00114 }
00115 else {
00116 std::string message = times.str();
00117 frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
00118 }
00119
00120 if(!state_->getBatchMode())
00121 MPI::Finalize();
00122
00123 return true;
00124 }
00125
00130 double Communicator::time(enum timing T)
00131 {
00132 currentTime_ = MPI::Wtime();
00133 double elapsed = currentTime_ - lastTime_;
00134 lastTime_ = currentTime_;
00135
00136 switch(T) {
00137 case IDLE:
00138 idleTime_ += elapsed;
00139 break;
00140 case COMP:
00141 compTime_ += elapsed;
00142 break;
00143 case SEND:
00144 sendTime_ += elapsed;
00145 break;
00146 case RECV:
00147 recvTime_ += elapsed;
00148 break;
00149 case PACK:
00150 packTime_ += elapsed;
00151 break;
00152 case UNPACK:
00153 unpackTime_ += elapsed;
00154 break;
00155 }
00156
00157 return 1000 * elapsed;
00158 }
00159
00160
00161 uint Communicator::getDemeMaster(uint iDeme)
00162 {
00163 return demeMasters[iDeme];
00164 }
00165
00166
00167 uint Communicator::getLastSource()
00168 {
00169 return status_.Get_source();
00170 }
00171
00172
00173 void Communicator::synchronize()
00174 {
00175 MPI::COMM_WORLD.Barrier();
00176 }
00177
00178
00179
00180 bool Communicator::messageWaiting(uint iProcess, uint tag)
00181 {
00182 return demeComm_.Iprobe(iProcess, tag, status_);
00183 }
00184
00185
00186 bool Communicator::sendControlMessage(uint iProcess, int control)
00187 {
00188 demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00189 return true;
00190 }
00191
00192
00193 int Communicator::recvControlMessage(uint iProcess)
00194 {
00195 int control;
00196 demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00197 return control;
00198 }
00199
00200
00201 bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
00202 {
00203 ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
00204 uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
00205 frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
00206 return true;
00207 }
00208
00209
00210 bool Communicator::recvTerminateMessage(uint iProcess)
00211 {
00212 bool termination;
00213 frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
00214 if(controlStatus_.Get_tag() == T_TERMINATE)
00215 termination = true;
00216 if(controlStatus_.Get_tag() == T_CONTINUE)
00217 termination = false;
00218 return termination;
00219 }
00220
00221
00222 bool Communicator::checkTerminationMessage(uint master)
00223 {
00224 if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
00225 if(controlStatus_.Get_tag() == T_TERMINATE) {
00226 return true;
00227 }
00228 else
00229 recvTerminateMessage(master);
00230 }
00231 return false;
00232 }
00233
00234
00235
00236
00237 bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00238 {
00239 time(COMP);
00240
00241 XMLNode xAll, xIndividual;
00242 xAll = XMLNode::createXMLTopNode("Pack");
00243
00244 if(nIndividuals == 0)
00245 nIndividuals = (uint) pool.size();
00246 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00247
00248 for(uint ind = 0; ind < nIndividuals; ind++) {
00249 pool[ind]->write(xIndividual);
00250 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00251 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00252 xAll.addChild(xIndividual);
00253 }
00254 char *message = xAll.createXMLString(0);
00255
00256
00257 double createTime = time(PACK);
00258
00259
00260 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00261
00262 double sendTime = time(SEND);
00263
00264 std::stringstream log;
00265 log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00266 ECF_LOG(state_, logLevel_, log.str());
00267
00268 freeXMLString(message);
00269
00270 return true;
00271 }
00272
00273
00274
00275 bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00276 {
00277 time(COMP);
00278
00279 XMLNode xAll, xIndividual;
00280 xAll = XMLNode::createXMLTopNode("Pack");
00281
00282 if(nIndividuals == 0)
00283 nIndividuals = (uint) pool.size();
00284 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00285
00286 for(uint ind = 0; ind < nIndividuals; ind++) {
00287 pool[ind]->write(xIndividual);
00288 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00289 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00290 xAll.addChild(xIndividual);
00291 }
00292 char *message = xAll.createXMLString(0);
00293
00294 double createTime = time(PACK);
00295
00296 frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00297
00298 double sendTime = time(SEND);
00299
00300 std::stringstream log;
00301 log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00302 ECF_LOG(state_, logLevel_, log.str());
00303
00304 freeXMLString(message);
00305
00306 return true;
00307 }
00308
00309
00310
00311
00312 uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
00313 {
00314 XMLNode xAll, xIndividual;
00315 MPI::Status status;
00316 std::string message;
00317
00318 time(COMP);
00319
00320 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00321
00322 double idle = time(IDLE);
00323
00324 uint length = status.Get_count(MPI::CHAR);
00325 message.resize(length + 1);
00326 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00327
00328 double recv = time(RECV);
00329
00330 xAll = XMLNode::parseString(message.c_str(), "Pack");
00331 uint nIndividuals = atoi(xAll.getAttribute("size"));
00332 uint index, cid;
00333 for(uint i = 0; i < nIndividuals; i++) {
00334
00335 xIndividual = xAll.getChildNode(i);
00336 index = atoi(xIndividual.getAttributeValue(1));
00337 cid = atoi(xIndividual.getAttributeValue(2));
00338 deme[index]->read(xIndividual);
00339 deme[index]->cid = cid;
00340 }
00341 status_ = status;
00342
00343 double read = time(UNPACK);
00344
00345 std::stringstream log;
00346 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00347 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00348 ECF_LOG(state_, logLevel_, log.str());
00349
00350 return nIndividuals;
00351 }
00352
00353
00354
00355
00356 std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
00357 {
00358 XMLNode xAll, xIndividual;
00359 MPI::Status status;
00360 std::string message;
00361 std::vector<IndividualP> pack;
00362
00363 time(COMP);
00364
00365 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00366
00367 double idle = time(IDLE);
00368
00369 uint length = status.Get_count(MPI::CHAR);
00370 message.resize(length + 1);
00371 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00372
00373 double recv = time(RECV);
00374
00375 xAll = XMLNode::parseString(message.c_str(), "Pack");
00376 uint nIndividuals = atoi(xAll.getAttribute("size"));
00377 uint index, cid;
00378 for(uint i = 0; i < nIndividuals; i++) {
00379
00380 xIndividual = xAll.getChildNode(i);
00381 pack.push_back((IndividualP) new Individual(state_));
00382 index = atoi(xIndividual.getAttributeValue(1));
00383 cid = atoi(xIndividual.getAttributeValue(2));
00384
00385 pack[i]->index = index;
00386 pack[i]->cid = cid;
00387 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00388 pack[i]->read(xIndividual);
00389 }
00390 status_ = status;
00391
00392 double read = time(UNPACK);
00393
00394 std::stringstream log;
00395 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00396 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00397 ECF_LOG(state_, logLevel_, log.str());
00398
00399 return pack;
00400 }
00401
00402
00403
00404 std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
00405 {
00406 XMLNode xAll, xIndividual;
00407 MPI::Status status;
00408 std::string message;
00409 std::vector<IndividualP> pack;
00410
00411 time(COMP);
00412
00413 frameworkComm_.Probe(iProcess, T_DEFAULT, status);
00414
00415 double idle = time(IDLE);
00416
00417 uint length = status.Get_count(MPI::CHAR);
00418 message.resize(length);
00419 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00420
00421 double recv = time(RECV);
00422
00423 xAll = XMLNode::parseString(message.c_str(), "Pack");
00424 uint nIndividuals = atoi(xAll.getAttribute("size"));
00425 uint index, cid;
00426 for(uint i = 0; i < nIndividuals; i++) {
00427 xIndividual = xAll.getChildNode(i);
00428 pack.push_back((IndividualP) new Individual(state_));
00429 index = atoi(xIndividual.getAttributeValue(1));
00430 cid = atoi(xIndividual.getAttributeValue(2));
00431
00432 pack[i]->index = index;
00433 pack[i]->cid = cid;
00434 pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00435 pack[i]->read(xIndividual);
00436 }
00437 status_ = status;
00438
00439 double read = time(UNPACK);
00440
00441 std::stringstream log;
00442 log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00443 log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00444 ECF_LOG(state_, logLevel_, log.str());
00445
00446 return pack;
00447 }
00448
00449
00450
00451
00452
00453
00454 uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
00455 {
00456 XMLNode xAll, xIndividual;
00457 MPI::Status status;
00458
00459 time(COMP);
00460
00461 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00462
00463 double idle = time(IDLE);
00464
00465 uint length = status.Get_count(MPI::CHAR);
00466 char *message = new char[length + 1];
00467 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00468 status_ = status;
00469
00470 double recv = time(RECV);
00471
00472 xAll = XMLNode::parseString(message, "Pack");
00473 uint nIndividuals = atoi(xAll.getAttribute("size"));
00474 uint poolSize = (uint) pool.size();
00475
00476 if(poolSize < nIndividuals) {
00477 pool.resize(nIndividuals);
00478 for(uint i = poolSize; i < nIndividuals; i++) {
00479 pool[i] = (IndividualP) new Individual(state_);
00480 pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00481 }
00482 }
00483
00484 uint index, cid;
00485 for(uint i = 0; i < nIndividuals; i++) {
00486 xIndividual = xAll.getChildNode(i);
00487 index = atoi(xIndividual.getAttributeValue(1));
00488 cid = atoi(xIndividual.getAttributeValue(2));
00489 pool[i]->index = index;
00490 pool[i]->cid = cid;
00491 pool[i]->read(xIndividual);
00492 }
00493
00494 double read = time(UNPACK);
00495
00496 std::stringstream log;
00497 log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00498 ECF_LOG(state_, logLevel_, log.str());
00499
00500 delete [] message;
00501
00502 return nIndividuals;
00503 }
00504
00505
00506
00507
00508
00509
00510 bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00511 {
00512 time(COMP);
00513
00514 XMLNode xAll, xIndividual, xFitness;
00515 xAll = XMLNode::createXMLTopNode("Pack");
00516
00517 if(nIndividuals == 0)
00518 nIndividuals = (uint) pool.size();
00519 xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00520
00521 for(uint ind = 0; ind < nIndividuals; ind++) {
00522 xIndividual = XMLNode::createXMLTopNode("Individual");
00523 xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00524 xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00525
00526 pool[ind]->fitness->write(xFitness);
00527 xIndividual.addChild(xFitness);
00528 xAll.addChild(xIndividual);
00529 }
00530 char *message = xAll.createXMLString(0);
00531
00532 double pack = time(PACK);
00533
00534 demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00535
00536 double send = time(SEND);
00537
00538 std::stringstream log;
00539 log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
00540 ECF_LOG(state_, logLevel_, log.str());
00541
00542 freeXMLString(message);
00543
00544 return true;
00545 }
00546
00547
00548
00549
00550
00551 uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
00552 {
00553 XMLNode xAll, xIndividual, xFitness;
00554 MPI::Status status;
00555 std::string message;
00556
00557 time(COMP);
00558
00559 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00560
00561 double idle = time(IDLE);
00562
00563 uint length = status.Get_count(MPI::CHAR);
00564 message.resize(length);
00565 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00566
00567 double recv = time(RECV);
00568
00569 xAll = XMLNode::parseString(message.c_str(), "Pack");
00570 uint nIndividuals = atoi(xAll.getAttribute("size"));
00571 uint index, cid;
00572 for(uint i = 0; i < nIndividuals; i++) {
00573 xIndividual = xAll.getChildNode(i);
00574 index = atoi(xIndividual.getAttributeValue(0));
00575 cid = atoi(xIndividual.getAttributeValue(1));
00576 xFitness = xIndividual.getChildNode(0);
00577 deme[index]->fitness->read(xFitness);
00578 deme[index]->fitness->cid = cid;
00579
00580 }
00581 status_ = status;
00582
00583 double read = time(UNPACK);
00584
00585 std::stringstream log;
00586 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00587 ECF_LOG(state_, logLevel_, log.str());
00588
00589 return nIndividuals;
00590 }
00591
00592
00593
00594
00595
00596
00597 std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
00598 {
00599 XMLNode xAll, xIndividual, xFitness;
00600 MPI::Status status;
00601 std::string message;
00602 std::vector<uint> indices;
00603
00604 time(COMP);
00605
00606 demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00607
00608 double idle = time(IDLE);
00609
00610 uint length = status.Get_count(MPI::CHAR);
00611 message.resize(length);
00612 demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00613
00614 double recv = time(RECV);
00615
00616 xAll = XMLNode::parseString(message.c_str(), "Pack");
00617 uint nIndividuals = atoi(xAll.getAttribute("size"));
00618 uint index, cid;
00619 for(uint i = 0; i < nIndividuals; i++) {
00620 xIndividual = xAll.getChildNode(i);
00621 index = atoi(xIndividual.getAttributeValue(0));
00622 cid = atoi(xIndividual.getAttributeValue(1));
00623 xFitness = xIndividual.getChildNode(0);
00624
00625 deme[index]->fitness->read(xFitness);
00626 deme[index]->fitness->cid = cid;
00627 indices.push_back(index);
00628 }
00629 status_ = status;
00630
00631 double read = time(UNPACK);
00632
00633 std::stringstream log;
00634 log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00635 ECF_LOG(state_, logLevel_, log.str());
00636
00637 return indices;
00638 }
00639
00640
00641
00642 bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
00643 {
00644 time(COMP);
00645
00646 frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
00647
00648 time(SEND);
00649
00650 std::stringstream log;
00651 log << "sent " << values.size() << " doubles";
00652 ECF_LOG(state_, logLevel_, log.str());
00653
00654 return true;
00655 }
00656
00657
00658
00659 std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
00660 {
00661 std::vector<double> values;
00662 MPI::Status status;
00663
00664 time(COMP);
00665
00666 frameworkComm_.Probe(iProcess, T_VALUES, status);
00667
00668 double idle = time(IDLE);
00669
00670 uint size = status.Get_count(MPI::DOUBLE);
00671 values.resize(size);
00672 frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
00673
00674 double recv = time(RECV);
00675
00676 std::stringstream log;
00677 log << "received " << values.size() << " doubles";
00678 ECF_LOG(state_, logLevel_, log.str());
00679
00680 return values;
00681 }
00682
00683
00684
00685 std::string Communicator::recvLogsGlobal()
00686 {
00687 std::string logs = "", message;
00688 uint logCount = 0;
00689 MPI::Status status;
00690
00691 time(COMP);
00692
00693 while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
00694 uint iProcess = status.Get_source();
00695 uint length = status.Get_count(MPI::CHAR);
00696 message.resize(length);
00697 frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
00698 logs += message;
00699 logCount++;
00700 }
00701
00702 time(RECV);
00703
00704 std::stringstream log;
00705 log << "received " << logCount << " logs";
00706 ECF_LOG(state_, logLevel_, log.str());
00707
00708 return logs;
00709 }
00710
00711
00712
00713 bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
00714 {
00715 time(COMP);
00716
00717 MPI::Request request;
00718
00719 if(blocking)
00720 frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00721 else
00722 request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00723
00724 time(SEND);
00725
00726 std::stringstream log;
00727 log << "sent " << logs.size() << " log chars";
00728 ECF_LOG(state_, logLevel_, log.str());
00729
00730 return true;
00731 }
00732
00733
00734
00735 bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
00736 {
00737 time(COMP);
00738
00739 frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
00740
00741 time(SEND);
00742
00743 std::stringstream log;
00744 log << "sent " << size << " bytes";
00745 ECF_LOG(state_, logLevel_, log.str());
00746
00747 return true;
00748 }
00749
00750
00751
00752 voidP Communicator::recvDataGlobal(uint iProcess)
00753 {
00754 MPI::Status status;
00755 voidP data;
00756
00757 time(COMP);
00758
00759 frameworkComm_.Probe(iProcess, T_DATA, status);
00760
00761 time(IDLE);
00762
00763 uint size = status.Get_count(MPI::BYTE);
00764 data = (voidP) new char[size / sizeof(char) + 1];
00765 frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
00766
00767 time(RECV);
00768
00769 std::stringstream log;
00770 log << "received " << size << " bytes";
00771 ECF_LOG(state_, logLevel_, log.str());
00772
00773 return data;
00774 }
00775
00776 #endif
00777
00778 }