• Main Page
  • Modules
  • Classes
  • Files
  • File List

D:/Projekt/ECF_trunk/ECF/Communicator.cpp

00001 #include "ECF_base.h"
00002 #include <string.h>
00003 
00004 namespace Comm
00005 {
00006 
00007 #ifdef _MPI
00008 
00009 Communicator::Communicator()
00010 {
00011     bInitialized_ = false;
00012     idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00013     logLevel_ = 4;
00014     sendCnt_ = recvCnt_ = 0;
00015 }
00016 
00017 
00018 bool Communicator::initialize(StateP state, int argc, char** argv)
00019 {
00020     idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00021 
00022     // multiple runs
00023     if(bInitialized_) {
00024         beginTime_ = lastTime_ = MPI::Wtime();
00025         ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00026             + uint2str(mpiGlobalSize_) + " on " + processorName_);
00027         return true;
00028     }
00029 
00030     state_ = state;
00031 
00032     MPI::Init(argc, argv);
00033 
00034     mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
00035     mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
00036 
00037     demeComm_ = MPI::COMM_WORLD.Dup();
00038     mpiSize_ = demeComm_.Get_size();
00039     mpiRank_ = demeComm_.Get_rank();
00040 
00041     frameworkComm_ = MPI::COMM_WORLD.Dup();
00042 
00043     char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
00044     pp = &processor_name[0];
00045     int namelen;
00046     MPI_Get_processor_name(processor_name, &namelen);
00047     processorName_ = processor_name;
00048 
00049     beginTime_ = lastTime_ = MPI::Wtime();
00050 
00051     ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of "
00052         + uint2str(mpiGlobalSize_) + " on " + processorName_);
00053 
00054     bInitialized_ = true;
00055 
00056     return true;
00057 }
00058 
00059 
00060 // zove se na pocetku prije evolucije
00061 // stvara se posebni komunikator za svaki deme (lokalni kontekst za algoritam)
00062 uint Communicator::createDemeCommunicator(uint nDemes)
00063 {
00064     // TODO: parametrize deme distribution among processes
00065     uint myColor = mpiGlobalRank_ % nDemes;
00066     demeMasters.resize(nDemes);
00067     for(uint i = 0; i < nDemes; i++)
00068         demeMasters[i] = i;
00069 
00070     demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
00071     mpiSize_ = demeComm_.Get_size();
00072     mpiRank_ = demeComm_.Get_rank();
00073 
00074     std::stringstream log;
00075     log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
00076     log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
00077     ECF_LOG(state_, 2, log.str());
00078 
00079     return myColor; // deme index
00080 }
00081 
00082 
00083 bool Communicator::finalize()
00084 {
00085     if(!bInitialized_)
00086         return true;
00087 
00088     endTime_ = MPI::Wtime();
00089 
00090     std::stringstream times;
00091     times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
00092     times << ", COMP: " << compTime_;
00093     times << ", IDLE: " << idleTime_;
00094     times << ", SEND: " << sendTime_;
00095     times << ", RECV: " << recvTime_;
00096     times << ", PACK: " << packTime_;
00097     times << ", UNPACK: " << unpackTime_ << std::endl;
00098 
00099     // collect and log process times at process 0
00100     if(mpiGlobalRank_ == 0) {
00101         std::string message;
00102         MPI::Status status;
00103 
00104         for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
00105             frameworkComm_.Probe(iProcess, T_FINAL, status);
00106             uint size = status.Get_count(MPI::CHAR);
00107             message.resize(size);
00108             frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
00109 
00110             times << message;
00111         }
00112         ECF_LOG(state_, 2, times.str());
00113         state_->getLogger()->saveTo(true);
00114     }
00115     else {
00116         std::string message = times.str();
00117         frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
00118     }
00119 
00120     if(!state_->getBatchMode())
00121         MPI::Finalize();
00122 
00123     return true;
00124 }
00125 
00130 double Communicator::time(enum timing T)
00131 {
00132     currentTime_ = MPI::Wtime();
00133     double elapsed = currentTime_ - lastTime_;
00134     lastTime_ = currentTime_;
00135 
00136     switch(T) {
00137     case IDLE:
00138         idleTime_ += elapsed;
00139         break;
00140     case COMP:
00141         compTime_ += elapsed;
00142         break;
00143     case SEND:
00144         sendTime_ += elapsed;
00145         break;
00146     case RECV:
00147         recvTime_ += elapsed;
00148         break;
00149     case PACK:
00150         packTime_ += elapsed;
00151         break;
00152     case UNPACK:
00153         unpackTime_ += elapsed;
00154         break;
00155     }
00156 
00157     return 1000 * elapsed;
00158 }
00159 
00160 
00161 uint Communicator::getDemeMaster(uint iDeme)
00162 {
00163     return demeMasters[iDeme];
00164 }
00165 
00166 
00167 uint Communicator::getLastSource()
00168 {
00169     return status_.Get_source();
00170 }
00171 
00172 
00173 void Communicator::synchronize()
00174 {
00175     MPI::COMM_WORLD.Barrier();
00176 }
00177 
00178 
00179 // provjerava (trenutno) ima li neka pristigla poruka
00180 bool Communicator::messageWaiting(uint iProcess, uint tag)
00181 {
00182     return demeComm_.Iprobe(iProcess, tag, status_);
00183 }
00184 
00185 
00186 bool Communicator::sendControlMessage(uint iProcess, int control)
00187 {
00188     demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00189     return true;
00190 }
00191 
00192 
00193 int Communicator::recvControlMessage(uint iProcess)
00194 {
00195     int control;
00196     demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00197     return control;
00198 }
00199 
00200 
00201 bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
00202 {
00203     ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
00204     uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
00205     frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
00206     return true;
00207 }
00208 
00209 
00210 bool Communicator::recvTerminateMessage(uint iProcess)
00211 {
00212     bool termination;
00213     frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
00214     if(controlStatus_.Get_tag() == T_TERMINATE)
00215         termination = true;
00216     if(controlStatus_.Get_tag() == T_CONTINUE)
00217         termination = false;
00218     return termination;
00219 }
00220 
00221 
00222 bool Communicator::checkTerminationMessage(uint master)
00223 {
00224     if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
00225         if(controlStatus_.Get_tag() == T_TERMINATE) {
00226             return true;
00227         }
00228         else
00229             recvTerminateMessage(master);
00230     }
00231     return false;
00232 }
00233 
00234 
00235 // salje prvih nIndividuals jedinki iz zadanog vektora (sve ako je nInvididuals == 0)
00236 // svakoj jedinki se dodaje njen indeks u _deme_ (ne iz vektora pool!)
00237 bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00238 {
00239     time(COMP);
00240 
00241     XMLNode xAll, xIndividual;
00242     xAll = XMLNode::createXMLTopNode("Pack");
00243 
00244     if(nIndividuals == 0)
00245         nIndividuals = (uint) pool.size();
00246     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00247 
00248     for(uint ind = 0; ind < nIndividuals; ind++) {
00249         pool[ind]->write(xIndividual);
00250         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00251         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00252         xAll.addChild(xIndividual);
00253     }
00254     char *message = xAll.createXMLString(0);
00255     //std::string message = m;
00256 
00257     double createTime = time(PACK);
00258 
00259     //demeComm_.Send(message.data(), (int) message.length(), MPI::CHAR, iProcess, 0);
00260     demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00261 
00262     double sendTime = time(SEND);
00263 
00264     std::stringstream log;
00265     log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00266     ECF_LOG(state_, logLevel_, log.str());
00267 
00268     freeXMLString(message);
00269 
00270     return true;
00271 }
00272 
00273 
00274 // isto kao i gornja fja, samo sa frameworkComm_
00275 bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00276 {
00277     time(COMP);
00278 
00279     XMLNode xAll, xIndividual;
00280     xAll = XMLNode::createXMLTopNode("Pack");
00281 
00282     if(nIndividuals == 0)
00283         nIndividuals = (uint) pool.size();
00284     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00285 
00286     for(uint ind = 0; ind < nIndividuals; ind++) {
00287         pool[ind]->write(xIndividual);
00288         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00289         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00290         xAll.addChild(xIndividual);
00291     }
00292     char *message = xAll.createXMLString(0);
00293 
00294     double createTime = time(PACK);
00295 
00296     frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00297 
00298     double sendTime = time(SEND);
00299 
00300     std::stringstream log;
00301     log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00302     ECF_LOG(state_, logLevel_, log.str());
00303 
00304     freeXMLString(message);
00305 
00306     return true;
00307 }
00308 
00309 
00310 // prima jednike na mjesto postojecih u deme objektu
00311 // cita se indeks iz poruke i jedinka se stavlja na odgovarajuce mjesto u deme vektoru
00312 uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
00313 {
00314     XMLNode xAll, xIndividual;
00315     MPI::Status status;
00316     std::string message;
00317 
00318     time(COMP);
00319 
00320     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00321 
00322     double idle = time(IDLE);
00323 
00324     uint length = status.Get_count(MPI::CHAR);
00325     message.resize(length + 1);
00326     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00327 
00328     double recv = time(RECV);
00329 
00330     xAll = XMLNode::parseString(message.c_str(), "Pack");
00331     uint nIndividuals = atoi(xAll.getAttribute("size"));
00332     uint index, cid;
00333     for(uint i = 0; i < nIndividuals; i++) {
00334         //xIndividual = xAll.getChildNode("Individual", i);
00335         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00336         index = atoi(xIndividual.getAttributeValue(1));
00337         cid = atoi(xIndividual.getAttributeValue(2));
00338         deme[index]->read(xIndividual);
00339         deme[index]->cid = cid;
00340     }
00341     status_ = status;
00342 
00343     double read = time(UNPACK);
00344 
00345     std::stringstream log;
00346     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00347     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00348     ECF_LOG(state_, logLevel_, log.str());
00349 
00350     return nIndividuals;
00351 }
00352 
00353 
00354 // prima jedinke u novi vektor
00355 // ujedno stvara i inicijalizira pristigle jedinke i stvara njihov fitness
00356 std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
00357 {
00358     XMLNode xAll, xIndividual;
00359     MPI::Status status;
00360     std::string message;
00361     std::vector<IndividualP> pack;
00362 
00363     time(COMP);
00364 
00365     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00366 
00367     double idle = time(IDLE);
00368 
00369     uint length = status.Get_count(MPI::CHAR);
00370     message.resize(length + 1);
00371     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00372 
00373     double recv = time(RECV);
00374 
00375     xAll = XMLNode::parseString(message.c_str(), "Pack");
00376     uint nIndividuals = atoi(xAll.getAttribute("size"));
00377     uint index, cid;
00378     for(uint i = 0; i < nIndividuals; i++) {
00379         //xIndividual = xAll.getChildNode("Individual", i);
00380         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00381         pack.push_back((IndividualP) new Individual(state_));
00382         index = atoi(xIndividual.getAttributeValue(1));
00383         cid = atoi(xIndividual.getAttributeValue(2));
00384 
00385         pack[i]->index = index;
00386         pack[i]->cid = cid;
00387         pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00388         pack[i]->read(xIndividual);
00389     }
00390     status_ = status;
00391 
00392     double read = time(UNPACK);
00393 
00394     std::stringstream log;
00395     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00396     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00397     ECF_LOG(state_, logLevel_, log.str());
00398 
00399     return pack;
00400 }
00401 
00402 
00403 // isto kao gornja fja, samo uz frameworkComm_
00404 std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
00405 {
00406     XMLNode xAll, xIndividual;
00407     MPI::Status status;
00408     std::string message;
00409     std::vector<IndividualP> pack;
00410 
00411     time(COMP);
00412 
00413     frameworkComm_.Probe(iProcess, T_DEFAULT, status);
00414 
00415     double idle = time(IDLE);
00416 
00417     uint length = status.Get_count(MPI::CHAR);
00418     message.resize(length);
00419     frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00420 
00421     double recv = time(RECV);
00422 
00423     xAll = XMLNode::parseString(message.c_str(), "Pack");
00424     uint nIndividuals = atoi(xAll.getAttribute("size"));
00425     uint index, cid;
00426     for(uint i = 0; i < nIndividuals; i++) {
00427         xIndividual = xAll.getChildNode(i);
00428         pack.push_back((IndividualP) new Individual(state_));
00429         index = atoi(xIndividual.getAttributeValue(1));
00430         cid = atoi(xIndividual.getAttributeValue(2));
00431 
00432         pack[i]->index = index;
00433         pack[i]->cid = cid;
00434         pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00435         pack[i]->read(xIndividual);
00436     }
00437     status_ = status;
00438 
00439     double read = time(UNPACK);
00440 
00441     std::stringstream log;
00442     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00443     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00444     ECF_LOG(state_, logLevel_, log.str());
00445 
00446     return pack;
00447 }
00448 
00449 
00450 // prima jedinke u postojeci vektor jedinki (bez obzira na indekse)
00451 // velicina vektora se _povecava_ na broj primljenih jedinki
00452 // ako je jedinki manje, vektor se _ne smanjuje_
00453 // vraca broj primljenih jedinki
00454 uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
00455 {
00456     XMLNode xAll, xIndividual;
00457     MPI::Status status;
00458 
00459     time(COMP);
00460 
00461     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00462 
00463     double idle = time(IDLE);
00464 
00465     uint length = status.Get_count(MPI::CHAR);
00466     char *message = new char[length + 1];
00467     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00468     status_ = status;
00469 
00470     double recv = time(RECV);
00471 
00472     xAll = XMLNode::parseString(message, "Pack");
00473     uint nIndividuals = atoi(xAll.getAttribute("size"));
00474     uint poolSize = (uint) pool.size();
00475 
00476     if(poolSize < nIndividuals) {
00477         pool.resize(nIndividuals);
00478         for(uint i = poolSize; i < nIndividuals; i++) {
00479             pool[i] = (IndividualP) new Individual(state_);
00480             pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00481         }
00482     }
00483 
00484     uint index, cid;
00485     for(uint i = 0; i < nIndividuals; i++) {
00486         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00487         index = atoi(xIndividual.getAttributeValue(1));
00488         cid = atoi(xIndividual.getAttributeValue(2));
00489         pool[i]->index = index;
00490         pool[i]->cid = cid;
00491         pool[i]->read(xIndividual);
00492     }
00493 
00494     double read = time(UNPACK);
00495 
00496     std::stringstream log;
00497     log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00498     ECF_LOG(state_, logLevel_, log.str());
00499 
00500     delete [] message;
00501 
00502     return nIndividuals;
00503 }
00504 
00505 
00506 // salje fitnese jedinki iz zadanog vektora
00507 // fitnesi se pakiraju kao jedinke bez genotipa (atribut "i" je prvi po redu!)
00508 // svakoj jedinki se dodaje njen indeks iz _deme_ (ne iz vektora pool!)
00509 // ako je nIndividuals != 0, salje se samo toliko prvih iz vektora
00510 bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00511 {
00512     time(COMP);
00513 
00514     XMLNode xAll, xIndividual, xFitness;
00515     xAll = XMLNode::createXMLTopNode("Pack");
00516 
00517     if(nIndividuals == 0)
00518         nIndividuals = (uint) pool.size();
00519     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00520 
00521     for(uint ind = 0; ind < nIndividuals; ind++) {
00522         xIndividual = XMLNode::createXMLTopNode("Individual");
00523         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00524         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00525 
00526         pool[ind]->fitness->write(xFitness);
00527         xIndividual.addChild(xFitness);
00528         xAll.addChild(xIndividual);
00529     }
00530     char *message = xAll.createXMLString(0);
00531 
00532     double pack = time(PACK);
00533 
00534     demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00535 
00536     double send = time(SEND);
00537 
00538     std::stringstream log;
00539     log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
00540     ECF_LOG(state_, logLevel_, log.str());
00541 
00542     freeXMLString(message);
00543 
00544     return true;
00545 }
00546 
00547 
00548 // prima fitnese na mjesto postojecih u deme objektu
00549 // fitnesi zu zapakirani u jedinke bez genotipa (atribut "i" je prvi po redu!)
00550 // cita se indeks iz poruke i fitnes jedinke se stavlja na odgovarajuce mjesto u deme vektoru
00551 uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
00552 {
00553     XMLNode xAll, xIndividual, xFitness;
00554     MPI::Status status;
00555     std::string message;
00556 
00557     time(COMP);
00558 
00559     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00560 
00561     double idle = time(IDLE);
00562 
00563     uint length = status.Get_count(MPI::CHAR);
00564     message.resize(length);
00565     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00566 
00567     double recv = time(RECV);
00568 
00569     xAll = XMLNode::parseString(message.c_str(), "Pack");
00570     uint nIndividuals = atoi(xAll.getAttribute("size"));
00571     uint index, cid;
00572     for(uint i = 0; i < nIndividuals; i++) {
00573         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00574         index = atoi(xIndividual.getAttributeValue(0));
00575         cid = atoi(xIndividual.getAttributeValue(1));
00576         xFitness = xIndividual.getChildNode(0);
00577         deme[index]->fitness->read(xFitness);
00578         deme[index]->fitness->cid = cid;
00579 
00580     }
00581     status_ = status;
00582 
00583     double read = time(UNPACK);
00584 
00585     std::stringstream log;
00586     log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00587     ECF_LOG(state_, logLevel_, log.str());
00588 
00589     return nIndividuals;
00590 }
00591 
00592 
00593 // prima fitnese u vektor fitnesa
00594 // fitnesi dolaze zapakirani kao jedinke bez genotipa (atribut "i" je prvi po redu!)
00595 // fitnesi se stavljaju u vektor (prvi argument) na mjesto koje pise kao indeks jedinke u poruci
00596 // funkcija vraca vektor indeksa jedinki primljenih fitnesa
00597 std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
00598 {
00599     XMLNode xAll, xIndividual, xFitness;
00600     MPI::Status status;
00601     std::string message;
00602     std::vector<uint> indices;
00603 
00604     time(COMP);
00605 
00606     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00607 
00608     double idle = time(IDLE);
00609 
00610     uint length = status.Get_count(MPI::CHAR);
00611     message.resize(length);
00612     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00613 
00614     double recv = time(RECV);
00615 
00616     xAll = XMLNode::parseString(message.c_str(), "Pack");
00617     uint nIndividuals = atoi(xAll.getAttribute("size"));
00618     uint index, cid;
00619     for(uint i = 0; i < nIndividuals; i++) {
00620         xIndividual = xAll.getChildNode(i);
00621         index = atoi(xIndividual.getAttributeValue(0));
00622         cid = atoi(xIndividual.getAttributeValue(1));
00623         xFitness = xIndividual.getChildNode(0);
00624 
00625         deme[index]->fitness->read(xFitness);
00626         deme[index]->fitness->cid = cid;
00627         indices.push_back(index);
00628     }
00629     status_ = status;
00630 
00631     double read = time(UNPACK);
00632 
00633     std::stringstream log;
00634     log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00635     ECF_LOG(state_, logLevel_, log.str());
00636 
00637     return indices;
00638 }
00639 
00640 
00641 // salje vektor vrijednosti u globalnom komunikatoru
00642 bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
00643 {
00644     time(COMP);
00645 
00646     frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
00647 
00648     time(SEND);
00649 
00650     std::stringstream log;
00651     log << "sent " << values.size() << " doubles";
00652     ECF_LOG(state_, logLevel_, log.str());
00653 
00654     return true;
00655 }
00656 
00657 
00658 // prima vektor vrijednosti u globalnom komunikatoru
00659 std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
00660 {
00661     std::vector<double> values;
00662     MPI::Status status;
00663 
00664     time(COMP);
00665 
00666     frameworkComm_.Probe(iProcess, T_VALUES, status);
00667 
00668     double idle = time(IDLE);
00669 
00670     uint size = status.Get_count(MPI::DOUBLE);
00671     values.resize(size);
00672     frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
00673 
00674     double recv = time(RECV);
00675 
00676     std::stringstream log;
00677     log << "received " << values.size() << " doubles";
00678     ECF_LOG(state_, logLevel_, log.str());
00679 
00680     return values;
00681 }
00682 
00683 
00684 // prima pristigle log poruke od bilo kojih procesa u globalnom komunikatoru
00685 std::string Communicator::recvLogsGlobal()
00686 {
00687     std::string logs = "", message;
00688     uint logCount = 0;
00689     MPI::Status status;
00690 
00691     time(COMP);
00692 
00693     while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
00694         uint iProcess = status.Get_source();
00695         uint length = status.Get_count(MPI::CHAR);
00696         message.resize(length);
00697         frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
00698         logs += message;
00699         logCount++;
00700     }
00701 
00702     time(RECV);
00703 
00704     std::stringstream log;
00705     log << "received " << logCount << " logs";
00706     ECF_LOG(state_, logLevel_, log.str());
00707 
00708     return logs;
00709 }
00710 
00711 
00712 // salje log poruke u globalnom komunikatoru
00713 bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
00714 {
00715     time(COMP);
00716 
00717     MPI::Request request;
00718 
00719     if(blocking)
00720         frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00721     else
00722         request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00723 
00724     time(SEND);
00725 
00726     std::stringstream log;
00727     log << "sent " << logs.size() << " log chars";
00728     ECF_LOG(state_, logLevel_, log.str());
00729 
00730     return true;
00731 }
00732 
00733 
00734 // slanje bilo kakvih podataka u globalnom komunikatoru
00735 bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
00736 {
00737     time(COMP);
00738 
00739     frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
00740 
00741     time(SEND);
00742 
00743     std::stringstream log;
00744     log << "sent " << size << " bytes";
00745     ECF_LOG(state_, logLevel_, log.str());
00746 
00747     return true;
00748 }
00749 
00750 
00751 // primanje bilo kakvih podataka u globalnom komunikatoru
00752 voidP Communicator::recvDataGlobal(uint iProcess)
00753 {
00754     MPI::Status status;
00755     voidP data;
00756 
00757     time(COMP);
00758 
00759     frameworkComm_.Probe(iProcess, T_DATA, status);
00760 
00761     time(IDLE);
00762 
00763     uint size = status.Get_count(MPI::BYTE);
00764     data = (voidP) new char[size / sizeof(char) + 1];
00765     frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
00766 
00767     time(RECV);
00768 
00769     std::stringstream log;
00770     log << "received " << size << " bytes";
00771     ECF_LOG(state_, logLevel_, log.str());
00772 
00773     return data;
00774 }
00775 
00776 #endif
00777 
00778 } // namespace

Generated on Tue Nov 4 2014 13:04:30 for ECF by  doxygen 1.7.1