• Main Page
  • Modules
  • Classes
  • Files
  • File List

D:/Projekt/ECF_trunk/ECF/Communicator.cpp

00001 #include "ECF_base.h"
00002 #include <string.h>
00003 
00004 namespace Comm
00005 {
00006 
00007 Communicator::Communicator()
00008 {
00009     bInitialized_ = false;
00010     idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00011     logLevel_ = 4;
00012     sendCnt_ = recvCnt_ = 0;
00013 }
00014 
00015 
00016 bool Communicator::initialize(StateP state, int argc, char** argv)
00017 {
00018     idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00019 
00020     // multiple runs
00021     if(bInitialized_) {
00022         beginTime_ = lastTime_ = MPI::Wtime();
00023         ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of " 
00024             + uint2str(mpiGlobalSize_) + " on " + processorName_);
00025         return true;
00026     }
00027 
00028     state_ = state;
00029 
00030     MPI::Init(argc, argv);
00031 
00032     mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
00033     mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
00034 
00035     demeComm_ = MPI::COMM_WORLD.Dup();
00036     mpiSize_ = demeComm_.Get_size();
00037     mpiRank_ = demeComm_.Get_rank();
00038 
00039     frameworkComm_ = MPI::COMM_WORLD.Dup();
00040 
00041     char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
00042     pp = &processor_name[0];
00043     int namelen;
00044     MPI_Get_processor_name(processor_name, &namelen);
00045     processorName_ = processor_name;
00046 
00047     beginTime_ = lastTime_ = MPI::Wtime();
00048 
00049     ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of " 
00050         + uint2str(mpiGlobalSize_) + " on " + processorName_);
00051 
00052     bInitialized_ = true;
00053 
00054     return true;
00055 }
00056 
00057 
00058 // zove se na pocetku prije evolucije
00059 // stvara se posebni komunikator za svaki deme (lokalni kontekst za algoritam)
00060 uint Communicator::createDemeCommunicator(uint nDemes)
00061 {
00062     // TODO: parametrize deme distribution among processes
00063     uint myColor = mpiGlobalRank_ % nDemes;
00064     demeMasters.resize(nDemes);
00065     for(uint i = 0; i < nDemes; i++)
00066         demeMasters[i] = i;
00067 
00068     demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
00069     mpiSize_ = demeComm_.Get_size();
00070     mpiRank_ = demeComm_.Get_rank();
00071 
00072     std::stringstream log;
00073     log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
00074     log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
00075     ECF_LOG(state_, 2, log.str());
00076 
00077     return myColor; // deme index
00078 }
00079 
00080 
00081 bool Communicator::finalize()
00082 {
00083     if(!bInitialized_)
00084         return true;
00085 
00086     endTime_ = MPI::Wtime();
00087 
00088     std::stringstream times;
00089     times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
00090     times << ", COMP: " << compTime_;
00091     times << ", IDLE: " << idleTime_;
00092     times << ", SEND: " << sendTime_;
00093     times << ", RECV: " << recvTime_;
00094     times << ", PACK: " << packTime_;
00095     times << ", UNPACK: " << unpackTime_ << std::endl;
00096 
00097     // collect and log process times at process 0
00098     if(mpiGlobalRank_ == 0) {
00099         std::string message;
00100         MPI::Status status;
00101 
00102         for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
00103             frameworkComm_.Probe(iProcess, T_FINAL, status);
00104             uint size = status.Get_count(MPI::CHAR);
00105             message.resize(size);
00106             frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
00107 
00108             times << message;
00109         }
00110         ECF_LOG(state_, 2, times.str());
00111         state_->getLogger()->saveTo(true);
00112     }
00113     else {
00114         std::string message = times.str();
00115         frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
00116     }
00117 
00118     if(!state_->getBatchMode())
00119         MPI::Finalize();
00120 
00121     return true;
00122 }
00123 
00128 double Communicator::time(enum timing T)
00129 {
00130     currentTime_ = MPI::Wtime();
00131     double elapsed = currentTime_ - lastTime_;
00132     lastTime_ = currentTime_;
00133 
00134     switch(T) {
00135     case IDLE:
00136         idleTime_ += elapsed;
00137         break;
00138     case COMP:
00139         compTime_ += elapsed;
00140         break;
00141     case SEND:
00142         sendTime_ += elapsed;
00143         break;
00144     case RECV:
00145         recvTime_ += elapsed;
00146         break;
00147     case PACK:
00148         packTime_ += elapsed;
00149         break;
00150     case UNPACK:
00151         unpackTime_ += elapsed;
00152         break;
00153     }
00154 
00155     return 1000 * elapsed;
00156 }
00157 
00158 
00159 uint Communicator::getDemeMaster(uint iDeme)
00160 {
00161     return demeMasters[iDeme];
00162 }
00163 
00164 
00165 uint Communicator::getLastSource()
00166 {
00167     return status_.Get_source();
00168 }
00169 
00170 
00171 void Communicator::synchronize()
00172 {
00173     MPI::COMM_WORLD.Barrier();
00174 }
00175 
00176 
00177 // provjerava (trenutno) ima li neka pristigla poruka
00178 bool Communicator::messageWaiting(uint iProcess, uint tag)
00179 {
00180     return demeComm_.Iprobe(iProcess, tag, status_);
00181 }
00182 
00183 
00184 bool Communicator::sendControlMessage(uint iProcess, int control)
00185 {
00186     demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00187     return true;
00188 }
00189 
00190 
00191 int Communicator::recvControlMessage(uint iProcess)
00192 {
00193     int control;
00194     demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00195     return control;
00196 }
00197 
00198 
00199 bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
00200 {
00201     ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
00202     uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
00203     frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
00204     return true;
00205 }
00206 
00207 
00208 bool Communicator::recvTerminateMessage(uint iProcess)
00209 {
00210     bool termination;
00211     frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
00212     if(controlStatus_.Get_tag() == T_TERMINATE)
00213         termination = true;
00214     if(controlStatus_.Get_tag() == T_CONTINUE)
00215         termination = false;
00216     return termination;
00217 }
00218 
00219 
00220 bool Communicator::checkTerminationMessage(uint master)
00221 {
00222     if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
00223         if(controlStatus_.Get_tag() == T_TERMINATE) {
00224             return true;
00225         }
00226         else 
00227             recvTerminateMessage(master);
00228     }
00229     return false;
00230 }
00231 
00232 
00233 // salje prvih nIndividuals jedinki iz zadanog vektora (sve ako je nInvididuals == 0)
00234 // svakoj jedinki se dodaje njen indeks u _deme_ (ne iz vektora pool!)
00235 bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00236 {
00237     time(COMP);
00238 
00239     XMLNode xAll, xIndividual;
00240     xAll = XMLNode::createXMLTopNode("Pack");
00241 
00242     if(nIndividuals == 0)
00243         nIndividuals = (uint) pool.size();
00244     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00245 
00246     for(uint ind = 0; ind < nIndividuals; ind++) {
00247         pool[ind]->write(xIndividual);
00248         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00249         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00250         xAll.addChild(xIndividual);
00251     }
00252     char *message = xAll.createXMLString(0);
00253     //std::string message = m;
00254 
00255     double createTime = time(PACK);
00256 
00257     //demeComm_.Send(message.data(), (int) message.length(), MPI::CHAR, iProcess, 0);
00258     demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00259     
00260     double sendTime = time(SEND);
00261 
00262     std::stringstream log;
00263     log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00264     ECF_LOG(state_, logLevel_, log.str());
00265 
00266     freeXMLString(message);
00267 
00268     return true;
00269 }
00270 
00271 
00272 // isto kao i gornja fja, samo sa frameworkComm_
00273 bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00274 {
00275     time(COMP);
00276 
00277     XMLNode xAll, xIndividual;
00278     xAll = XMLNode::createXMLTopNode("Pack");
00279 
00280     if(nIndividuals == 0)
00281         nIndividuals = (uint) pool.size();
00282     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00283 
00284     for(uint ind = 0; ind < nIndividuals; ind++) {
00285         pool[ind]->write(xIndividual);
00286         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00287         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00288         xAll.addChild(xIndividual);
00289     }
00290     char *message = xAll.createXMLString(0);
00291 
00292     double createTime = time(PACK);
00293 
00294     frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00295 
00296     double sendTime = time(SEND);
00297 
00298     std::stringstream log;
00299     log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00300     ECF_LOG(state_, logLevel_, log.str());
00301 
00302     freeXMLString(message);
00303 
00304     return true;
00305 }
00306 
00307 
00308 // prima jednike na mjesto postojecih u deme objektu
00309 // cita se indeks iz poruke i jedinka se stavlja na odgovarajuce mjesto u deme vektoru
00310 uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
00311 {
00312     XMLNode xAll, xIndividual;
00313     MPI::Status status;
00314     std::string message;
00315 
00316     time(COMP);
00317 
00318     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00319 
00320     double idle = time(IDLE);
00321 
00322     uint length = status.Get_count(MPI::CHAR);
00323     message.resize(length + 1);
00324     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00325 
00326     double recv = time(RECV);
00327 
00328     xAll = XMLNode::parseString(message.c_str(), "Pack");
00329     uint nIndividuals = atoi(xAll.getAttribute("size"));
00330     uint index, cid;
00331     for(uint i = 0; i < nIndividuals; i++) {
00332         //xIndividual = xAll.getChildNode("Individual", i);
00333         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00334         index = atoi(xIndividual.getAttributeValue(1));
00335         cid = atoi(xIndividual.getAttributeValue(2));
00336         deme[index]->read(xIndividual);
00337         deme[index]->cid = cid;
00338     }
00339     status_ = status;
00340 
00341     double read = time(UNPACK);
00342 
00343     std::stringstream log;
00344     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00345     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00346     ECF_LOG(state_, logLevel_, log.str());
00347 
00348     return nIndividuals;
00349 }
00350 
00351 
00352 // prima jedinke u novi vektor
00353 // ujedno stvara i inicijalizira pristigle jedinke i stvara njihov fitness
00354 std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
00355 {
00356     XMLNode xAll, xIndividual;
00357     MPI::Status status;
00358     std::string message;
00359     std::vector<IndividualP> pack;
00360 
00361     time(COMP);
00362 
00363     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00364 
00365     double idle = time(IDLE);
00366 
00367     uint length = status.Get_count(MPI::CHAR);
00368     message.resize(length + 1);
00369     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00370 
00371     double recv = time(RECV);
00372 
00373     xAll = XMLNode::parseString(message.c_str(), "Pack");
00374     uint nIndividuals = atoi(xAll.getAttribute("size"));
00375     uint index, cid;
00376     for(uint i = 0; i < nIndividuals; i++) {
00377         //xIndividual = xAll.getChildNode("Individual", i);
00378         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00379         pack.push_back((IndividualP) new Individual(state_));
00380         index = atoi(xIndividual.getAttributeValue(1));
00381         cid = atoi(xIndividual.getAttributeValue(2));
00382 
00383         pack[i]->index = index;
00384         pack[i]->cid = cid;
00385         pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00386         pack[i]->read(xIndividual);
00387     }
00388     status_ = status;
00389 
00390     double read = time(UNPACK);
00391 
00392     std::stringstream log;
00393     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00394     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00395     ECF_LOG(state_, logLevel_, log.str());
00396 
00397     return pack;
00398 }
00399 
00400 
00401 // isto kao gornja fja, samo uz frameworkComm_
00402 std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
00403 {
00404     XMLNode xAll, xIndividual;
00405     MPI::Status status;
00406     std::string message;
00407     std::vector<IndividualP> pack;
00408 
00409     time(COMP);
00410 
00411     frameworkComm_.Probe(iProcess, T_DEFAULT, status);
00412 
00413     double idle = time(IDLE);
00414 
00415     uint length = status.Get_count(MPI::CHAR);
00416     message.resize(length);
00417     frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00418 
00419     double recv = time(RECV);
00420 
00421     xAll = XMLNode::parseString(message.c_str(), "Pack");
00422     uint nIndividuals = atoi(xAll.getAttribute("size"));
00423     uint index, cid;
00424     for(uint i = 0; i < nIndividuals; i++) {
00425         xIndividual = xAll.getChildNode(i);
00426         pack.push_back((IndividualP) new Individual(state_));
00427         index = atoi(xIndividual.getAttributeValue(1));
00428         cid = atoi(xIndividual.getAttributeValue(2));
00429 
00430         pack[i]->index = index;
00431         pack[i]->cid = cid;
00432         pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00433         pack[i]->read(xIndividual);
00434     }
00435     status_ = status;
00436 
00437     double read = time(UNPACK);
00438 
00439     std::stringstream log;
00440     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00441     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00442     ECF_LOG(state_, logLevel_, log.str());
00443 
00444     return pack;
00445 }
00446 
00447 
00448 // prima jedinke u postojeci vektor jedinki (bez obzira na indekse)
00449 // velicina vektora se _povecava_ na broj primljenih jedinki
00450 // ako je jedinki manje, vektor se _ne smanjuje_
00451 // vraca broj primljenih jedinki
00452 uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
00453 {
00454     XMLNode xAll, xIndividual;
00455     MPI::Status status;
00456 
00457     time(COMP);
00458 
00459     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00460 
00461     double idle = time(IDLE);
00462 
00463     uint length = status.Get_count(MPI::CHAR);
00464     char *message = new char[length + 1];
00465     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00466     status_ = status;
00467 
00468     double recv = time(RECV);
00469 
00470     xAll = XMLNode::parseString(message, "Pack");
00471     uint nIndividuals = atoi(xAll.getAttribute("size"));
00472     uint poolSize = (uint) pool.size();
00473 
00474     if(poolSize < nIndividuals) {
00475         pool.resize(nIndividuals);
00476         for(uint i = poolSize; i < nIndividuals; i++) {
00477             pool[i] = (IndividualP) new Individual(state_);
00478             pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00479         }
00480     }
00481 
00482     uint index, cid;
00483     for(uint i = 0; i < nIndividuals; i++) {
00484         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00485         index = atoi(xIndividual.getAttributeValue(1));
00486         cid = atoi(xIndividual.getAttributeValue(2));
00487         pool[i]->index = index;
00488         pool[i]->cid = cid;
00489         pool[i]->read(xIndividual);
00490     }
00491 
00492     double read = time(UNPACK);
00493 
00494     std::stringstream log;
00495     log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00496     ECF_LOG(state_, logLevel_, log.str());
00497 
00498     delete [] message;
00499 
00500     return nIndividuals;
00501 }
00502 
00503 
00504 // salje fitnese jedinki iz zadanog vektora
00505 // fitnesi se pakiraju kao jedinke bez genotipa (atribut "i" je prvi po redu!)
00506 // svakoj jedinki se dodaje njen indeks iz _deme_ (ne iz vektora pool!)
00507 // ako je nIndividuals != 0, salje se samo toliko prvih iz vektora
00508 bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00509 {
00510     time(COMP);
00511 
00512     XMLNode xAll, xIndividual, xFitness;
00513     xAll = XMLNode::createXMLTopNode("Pack");
00514 
00515     if(nIndividuals == 0)
00516         nIndividuals = (uint) pool.size();
00517     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00518 
00519     for(uint ind = 0; ind < nIndividuals; ind++) {
00520         xIndividual = XMLNode::createXMLTopNode("Individual");
00521         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00522         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00523 
00524         pool[ind]->fitness->write(xFitness);
00525         xIndividual.addChild(xFitness);
00526         xAll.addChild(xIndividual);
00527     }
00528     char *message = xAll.createXMLString(0);
00529 
00530     double pack = time(PACK);
00531 
00532     demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00533 
00534     double send = time(SEND);
00535 
00536     std::stringstream log;
00537     log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
00538     ECF_LOG(state_, logLevel_, log.str());
00539 
00540     freeXMLString(message);
00541 
00542     return true;
00543 }
00544 
00545 
00546 // prima fitnese na mjesto postojecih u deme objektu
00547 // fitnesi zu zapakirani u jedinke bez genotipa (atribut "i" je prvi po redu!)
00548 // cita se indeks iz poruke i fitnes jedinke se stavlja na odgovarajuce mjesto u deme vektoru
00549 uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
00550 {
00551     XMLNode xAll, xIndividual, xFitness;
00552     MPI::Status status;
00553     std::string message;
00554 
00555     time(COMP);
00556 
00557     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00558 
00559     double idle = time(IDLE);
00560 
00561     uint length = status.Get_count(MPI::CHAR);
00562     message.resize(length);
00563     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00564 
00565     double recv = time(RECV);
00566 
00567     xAll = XMLNode::parseString(message.c_str(), "Pack");
00568     uint nIndividuals = atoi(xAll.getAttribute("size"));
00569     uint index, cid;
00570     for(uint i = 0; i < nIndividuals; i++) {
00571         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00572         index = atoi(xIndividual.getAttributeValue(0));
00573         cid = atoi(xIndividual.getAttributeValue(1));
00574         xFitness = xIndividual.getChildNode(0);
00575         deme[index]->fitness->read(xFitness);
00576         deme[index]->fitness->cid = cid;
00577 
00578     }
00579     status_ = status;
00580 
00581     double read = time(UNPACK);
00582 
00583     std::stringstream log;
00584     log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00585     ECF_LOG(state_, logLevel_, log.str());
00586 
00587     return nIndividuals;
00588 }
00589 
00590 
00591 // prima fitnese u vektor fitnesa
00592 // fitnesi dolaze zapakirani kao jedinke bez genotipa (atribut "i" je prvi po redu!)
00593 // fitnesi se stavljaju u vektor (prvi argument) na mjesto koje pise kao indeks jedinke u poruci
00594 // funkcija vraca vektor indeksa jedinki primljenih fitnesa
00595 std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
00596 {
00597     XMLNode xAll, xIndividual, xFitness;
00598     MPI::Status status;
00599     std::string message;
00600     std::vector<uint> indices;
00601 
00602     time(COMP);
00603 
00604     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00605 
00606     double idle = time(IDLE);
00607 
00608     uint length = status.Get_count(MPI::CHAR);
00609     message.resize(length);
00610     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00611 
00612     double recv = time(RECV);
00613 
00614     xAll = XMLNode::parseString(message.c_str(), "Pack");
00615     uint nIndividuals = atoi(xAll.getAttribute("size"));
00616     uint index, cid;
00617     for(uint i = 0; i < nIndividuals; i++) {
00618         xIndividual = xAll.getChildNode(i);
00619         index = atoi(xIndividual.getAttributeValue(0));
00620         cid = atoi(xIndividual.getAttributeValue(1));
00621         xFitness = xIndividual.getChildNode(0);
00622 
00623         deme[index]->fitness->read(xFitness);
00624         deme[index]->fitness->cid = cid;
00625         indices.push_back(index);
00626     }
00627     status_ = status;
00628 
00629     double read = time(UNPACK);
00630 
00631     std::stringstream log;
00632     log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00633     ECF_LOG(state_, logLevel_, log.str());
00634 
00635     return indices;
00636 }
00637 
00638 
00639 // salje vektor vrijednosti u globalnom komunikatoru
00640 bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
00641 {
00642     time(COMP);
00643 
00644     frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
00645 
00646     time(SEND);
00647 
00648     std::stringstream log;
00649     log << "sent " << values.size() << " doubles";
00650     ECF_LOG(state_, logLevel_, log.str());
00651 
00652     return true;
00653 }
00654 
00655 
00656 // prima vektor vrijednosti u globalnom komunikatoru
00657 std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
00658 {
00659     std::vector<double> values;
00660     MPI::Status status;
00661 
00662     time(COMP);
00663 
00664     frameworkComm_.Probe(iProcess, T_VALUES, status);
00665 
00666     double idle = time(IDLE);
00667 
00668     uint size = status.Get_count(MPI::DOUBLE);
00669     values.resize(size);
00670     frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
00671 
00672     double recv = time(RECV);
00673 
00674     std::stringstream log;
00675     log << "received " << values.size() << " doubles";
00676     ECF_LOG(state_, logLevel_, log.str());
00677 
00678     return values;
00679 }
00680 
00681 
00682 // prima pristigle log poruke od bilo kojih procesa u globalnom komunikatoru
00683 std::string Communicator::recvLogsGlobal()
00684 {
00685     std::string logs = "", message;
00686     uint logCount = 0;
00687     MPI::Status status;
00688 
00689     time(COMP);
00690 
00691     while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
00692         uint iProcess = status.Get_source();
00693         uint length = status.Get_count(MPI::CHAR);
00694         message.resize(length);
00695         frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
00696         logs += message;
00697         logCount++;
00698     }
00699 
00700     time(RECV);
00701 
00702     std::stringstream log;
00703     log << "received " << logCount << " logs";
00704     ECF_LOG(state_, logLevel_, log.str());
00705 
00706     return logs;
00707 }
00708 
00709 
00710 // salje log poruke u globalnom komunikatoru
00711 bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
00712 {
00713     time(COMP);
00714 
00715     MPI::Request request;
00716 
00717     if(blocking)
00718         frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00719     else
00720         request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00721 
00722     time(SEND);
00723 
00724     std::stringstream log;
00725     log << "sent " << logs.size() << " log chars";
00726     ECF_LOG(state_, logLevel_, log.str());
00727 
00728     return true;
00729 }
00730 
00731 
00732 // slanje bilo kakvih podataka u globalnom komunikatoru
00733 bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
00734 {
00735     time(COMP);
00736 
00737     frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
00738 
00739     time(SEND);
00740 
00741     std::stringstream log;
00742     log << "sent " << size << " bytes";
00743     ECF_LOG(state_, logLevel_, log.str());
00744 
00745     return true;
00746 }
00747 
00748 
00749 // primanje bilo kakvih podataka u globalnom komunikatoru
00750 voidP Communicator::recvDataGlobal(uint iProcess)
00751 {
00752     MPI::Status status;
00753     voidP data;
00754 
00755     time(COMP);
00756 
00757     frameworkComm_.Probe(iProcess, T_DATA, status);
00758 
00759     time(IDLE);
00760 
00761     uint size = status.Get_count(MPI::BYTE);
00762     data = (voidP) new char[size / sizeof(char) + 1];
00763     frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
00764 
00765     time(RECV);
00766 
00767     std::stringstream log;
00768     log << "received " << size << " bytes";
00769     ECF_LOG(state_, logLevel_, log.str());
00770 
00771     return data;
00772 }
00773 
00774 } // namespace

Generated on Fri Jul 5 2013 09:34:23 for ECF by  doxygen 1.7.1