• Main Page
  • Modules
  • Classes
  • Files
  • File List

D:/Projekt/ECF_trunk/ECF/Communicator.cpp

00001 #include "ECF_base.h"
00002 #include <string.h>
00003 
00004 namespace Comm
00005 {
00006 
00007 Communicator::Communicator()
00008 {
00009     bInitialized_ = false;
00010     idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00011     logLevel_ = 4;
00012     sendCnt_ = recvCnt_ = 0;
00013 }
00014 
00015 
00016 bool Communicator::initialize(StateP state, int argc, char** argv)
00017 {
00018     idleTime_ = sendTime_ = recvTime_ = compTime_ = packTime_ = unpackTime_ = 0;
00019 
00020     // multiple runs
00021     if(bInitialized_) {
00022         beginTime_ = lastTime_ = MPI::Wtime();
00023         ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of " 
00024             + uint2str(mpiGlobalSize_) + " on " + processorName_);
00025         return true;
00026     }
00027 
00028     state_ = state;
00029 
00030     MPI::Init(argc, argv);
00031 
00032     mpiGlobalSize_ = MPI::COMM_WORLD.Get_size();
00033     mpiGlobalRank_ = MPI::COMM_WORLD.Get_rank();
00034 
00035     demeComm_ = MPI::COMM_WORLD.Dup();
00036     mpiSize_ = demeComm_.Get_size();
00037     mpiRank_ = demeComm_.Get_rank();
00038     frameworkComm_ = MPI::COMM_WORLD.Dup();
00039 
00040     char *pp, processor_name[MPI_MAX_PROCESSOR_NAME];
00041     pp = &processor_name[0];
00042     int namelen;
00043     MPI_Get_processor_name(processor_name, &namelen);
00044     processorName_ = processor_name;
00045 
00046     beginTime_ = lastTime_ = MPI::Wtime();
00047 
00048     ECF_LOG(state_, 2, "Process " + uint2str(mpiGlobalRank_) + " of " 
00049         + uint2str(mpiGlobalSize_) + " on " + processorName_);
00050 
00051     bInitialized_ = true;
00052 
00053     return true;
00054 }
00055 
00056 
00057 // zove se na pocetku prije evolucije
00058 // stvara se posebni komunikator za svaki deme (lokalni kontekst za algoritam)
00059 uint Communicator::createDemeCommunicator(uint nDemes)
00060 {
00061     // TODO: parametrize deme distribution among processes
00062     uint myColor = mpiGlobalRank_ % nDemes;
00063     demeMasters.resize(nDemes);
00064     for(uint i = 0; i < nDemes; i++)
00065         demeMasters[i] = i;
00066 
00067     demeComm_ = MPI::COMM_WORLD.Split(myColor, mpiGlobalRank_);
00068     mpiSize_ = demeComm_.Get_size();
00069     mpiRank_ = demeComm_.Get_rank();
00070 
00071     std::stringstream log;
00072     log << "Global process " << mpiGlobalRank_ << " joined deme communicator with index ";
00073     log << myColor << " (local rank: " << mpiRank_ << " of " << mpiSize_ << ")";
00074     ECF_LOG(state_, 2, log.str());
00075 
00076     return myColor; // deme index
00077 }
00078 
00079 
00080 bool Communicator::finalize()
00081 {
00082     if(!bInitialized_)
00083         return true;
00084 
00085     endTime_ = MPI::Wtime();
00086 
00087     std::stringstream times;
00088     times << "Process " << mpiGlobalRank_ << ": total MPI time: " << endTime_ - beginTime_;
00089     times << ", COMP: " << compTime_;
00090     times << ", IDLE: " << idleTime_;
00091     times << ", SEND: " << sendTime_;
00092     times << ", RECV: " << recvTime_;
00093     times << ", PACK: " << packTime_;
00094     times << ", UNPACK: " << unpackTime_ << std::endl;
00095 
00096     // collect and log process times at process 0
00097     if(mpiGlobalRank_ == 0) {
00098         std::string message;
00099         MPI::Status status;
00100 
00101         for(uint iProcess = 1; iProcess < mpiGlobalSize_; iProcess++) {
00102             frameworkComm_.Probe(iProcess, T_FINAL, status);
00103             uint size = status.Get_count(MPI::CHAR);
00104             message.resize(size);
00105             frameworkComm_.Recv(&message[0], size, MPI::CHAR, iProcess, T_FINAL, status);
00106 
00107             times << message;
00108         }
00109         ECF_LOG(state_, 2, times.str());
00110         state_->getLogger()->saveTo(true);
00111     }
00112     else {
00113         std::string message = times.str();
00114         frameworkComm_.Send(&message[0], (int) message.size(), MPI::CHAR, 0, T_FINAL);
00115     }
00116 
00117     if(!state_->getBatchMode())
00118         MPI::Finalize();
00119 
00120     return true;
00121 }
00122 
00127 double Communicator::time(enum timing T)
00128 {
00129     currentTime_ = MPI::Wtime();
00130     double elapsed = currentTime_ - lastTime_;
00131     lastTime_ = currentTime_;
00132 
00133     switch(T) {
00134     case IDLE:
00135         idleTime_ += elapsed;
00136         break;
00137     case COMP:
00138         compTime_ += elapsed;
00139         break;
00140     case SEND:
00141         sendTime_ += elapsed;
00142         break;
00143     case RECV:
00144         recvTime_ += elapsed;
00145         break;
00146     case PACK:
00147         packTime_ += elapsed;
00148         break;
00149     case UNPACK:
00150         unpackTime_ += elapsed;
00151         break;
00152     }
00153 
00154     return 1000 * elapsed;
00155 }
00156 
00157 
00158 uint Communicator::getDemeMaster(uint iDeme)
00159 {
00160     return demeMasters[iDeme];
00161 }
00162 
00163 
00164 uint Communicator::getLastSource()
00165 {
00166     return status_.Get_source();
00167 }
00168 
00169 
00170 void Communicator::synchronize()
00171 {
00172     MPI::COMM_WORLD.Barrier();
00173 }
00174 
00175 
00176 // provjerava (trenutno) ima li neka pristigla poruka
00177 bool Communicator::messageWaiting(uint iProcess, uint tag)
00178 {
00179     return demeComm_.Iprobe(iProcess, tag, status_);
00180 }
00181 
00182 
00183 bool Communicator::sendControlMessage(uint iProcess, int control)
00184 {
00185     demeComm_.Send(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00186     return true;
00187 }
00188 
00189 
00190 int Communicator::recvControlMessage(uint iProcess)
00191 {
00192     int control;
00193     demeComm_.Recv(&control, sizeof(int), MPI::BYTE, iProcess, T_CONTROL);
00194     return control;
00195 }
00196 
00197 
00198 bool Communicator::sendTerminateMessage(uint iProcess, bool termination)
00199 {
00200     ECF_LOG(state_, logLevel_, "Sending terminate message to process " + uint2str(iProcess));
00201     uint tag = (termination == true) ? T_TERMINATE : T_CONTINUE;
00202     frameworkComm_.Send(&termination, sizeof(bool), MPI::BYTE, iProcess, tag);
00203     return true;
00204 }
00205 
00206 
00207 bool Communicator::recvTerminateMessage(uint iProcess)
00208 {
00209     bool termination;
00210     frameworkComm_.Recv(&termination, sizeof(bool), MPI::BYTE, iProcess, MPI::ANY_TAG, controlStatus_);
00211     if(controlStatus_.Get_tag() == T_TERMINATE)
00212         termination = true;
00213     if(controlStatus_.Get_tag() == T_CONTINUE)
00214         termination = false;
00215     return termination;
00216 }
00217 
00218 
00219 bool Communicator::checkTerminationMessage(uint master)
00220 {
00221     if(frameworkComm_.Iprobe(master, MPI::ANY_TAG, controlStatus_)) {
00222         if(controlStatus_.Get_tag() == T_TERMINATE) {
00223             return true;
00224         }
00225         else 
00226             recvTerminateMessage(master);
00227     }
00228     return false;
00229 }
00230 
00231 
00232 // salje prvih nIndividuals jedinki iz zadanog vektora (sve ako je nInvididuals == 0)
00233 // svakoj jedinki se dodaje njen indeks u _deme_ (ne iz vektora pool!)
00234 bool Communicator::sendIndividuals(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00235 {
00236     time(COMP);
00237 
00238     XMLNode xAll, xIndividual;
00239     xAll = XMLNode::createXMLTopNode("Pack");
00240 
00241     if(nIndividuals == 0)
00242         nIndividuals = (uint) pool.size();
00243     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00244 
00245     for(uint ind = 0; ind < nIndividuals; ind++) {
00246         pool[ind]->write(xIndividual);
00247         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00248         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00249         xAll.addChild(xIndividual);
00250     }
00251     char *message = xAll.createXMLString(0);
00252     //std::string message = m;
00253 
00254     double createTime = time(PACK);
00255 
00256     //demeComm_.Send(message.data(), (int) message.length(), MPI::CHAR, iProcess, 0);
00257     demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00258     
00259     double sendTime = time(SEND);
00260 
00261     std::stringstream log;
00262     log << "sent " << nIndividuals << " individuals, " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00263     ECF_LOG(state_, logLevel_, log.str());
00264 
00265     freeXMLString(message);
00266 
00267     return true;
00268 }
00269 
00270 
00271 // isto kao i gornja fja, samo sa frameworkComm_
00272 bool Communicator::sendIndividualsGlobal(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00273 {
00274     time(COMP);
00275 
00276     XMLNode xAll, xIndividual;
00277     xAll = XMLNode::createXMLTopNode("Pack");
00278 
00279     if(nIndividuals == 0)
00280         nIndividuals = (uint) pool.size();
00281     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00282 
00283     for(uint ind = 0; ind < nIndividuals; ind++) {
00284         pool[ind]->write(xIndividual);
00285         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00286         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00287         xAll.addChild(xIndividual);
00288     }
00289     char *message = xAll.createXMLString(0);
00290 
00291     double createTime = time(PACK);
00292 
00293     frameworkComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00294 
00295     double sendTime = time(SEND);
00296 
00297     std::stringstream log;
00298     log << "sent " << nIndividuals << " individuals (global), " << strlen(message) << " bytes (P: " << createTime << " | S: " << sendTime << ")";
00299     ECF_LOG(state_, logLevel_, log.str());
00300 
00301     freeXMLString(message);
00302 
00303     return true;
00304 }
00305 
00306 
00307 // prima jednike na mjesto postojecih u deme objektu
00308 // cita se indeks iz poruke i jedinka se stavlja na odgovarajuce mjesto u deme vektoru
00309 uint Communicator::recvDemeIndividuals(std::vector<IndividualP>& deme, uint iProcess)
00310 {
00311     XMLNode xAll, xIndividual;
00312     MPI::Status status;
00313     std::string message;
00314 
00315     time(COMP);
00316 
00317     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00318 
00319     double idle = time(IDLE);
00320 
00321     uint length = status.Get_count(MPI::CHAR);
00322     message.resize(length + 1);
00323     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00324 
00325     double recv = time(RECV);
00326 
00327     xAll = XMLNode::parseString(message.c_str(), "Pack");
00328     uint nIndividuals = atoi(xAll.getAttribute("size"));
00329     uint index, cid;
00330     for(uint i = 0; i < nIndividuals; i++) {
00331         //xIndividual = xAll.getChildNode("Individual", i);
00332         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00333         index = atoi(xIndividual.getAttributeValue(1));
00334         cid = atoi(xIndividual.getAttributeValue(2));
00335         deme[index]->read(xIndividual);
00336         deme[index]->cid = cid;
00337     }
00338     status_ = status;
00339 
00340     double read = time(UNPACK);
00341 
00342     std::stringstream log;
00343     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00344     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00345     ECF_LOG(state_, logLevel_, log.str());
00346 
00347     return nIndividuals;
00348 }
00349 
00350 
00351 // prima jedinke u novi vektor
00352 // ujedno stvara i inicijalizira pristigle jedinke i stvara njihov fitness
00353 std::vector<IndividualP> Communicator::recvIndividuals(uint iProcess)
00354 {
00355     XMLNode xAll, xIndividual;
00356     MPI::Status status;
00357     std::string message;
00358     std::vector<IndividualP> pack;
00359 
00360     time(COMP);
00361 
00362     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00363 
00364     double idle = time(IDLE);
00365 
00366     uint length = status.Get_count(MPI::CHAR);
00367     message.resize(length + 1);
00368     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00369 
00370     double recv = time(RECV);
00371 
00372     xAll = XMLNode::parseString(message.c_str(), "Pack");
00373     uint nIndividuals = atoi(xAll.getAttribute("size"));
00374     uint index, cid;
00375     for(uint i = 0; i < nIndividuals; i++) {
00376         //xIndividual = xAll.getChildNode("Individual", i);
00377         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00378         pack.push_back((IndividualP) new Individual(state_));
00379         index = atoi(xIndividual.getAttributeValue(1));
00380         cid = atoi(xIndividual.getAttributeValue(2));
00381 
00382         pack[i]->index = index;
00383         pack[i]->cid = cid;
00384         pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00385         pack[i]->read(xIndividual);
00386     }
00387     status_ = status;
00388 
00389     double read = time(UNPACK);
00390 
00391     std::stringstream log;
00392     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00393     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00394     ECF_LOG(state_, logLevel_, log.str());
00395 
00396     return pack;
00397 }
00398 
00399 
00400 // isto kao gornja fja, samo uz frameworkComm_
00401 std::vector<IndividualP> Communicator::recvIndividualsGlobal(uint iProcess)
00402 {
00403     XMLNode xAll, xIndividual;
00404     MPI::Status status;
00405     std::string message;
00406     std::vector<IndividualP> pack;
00407 
00408     time(COMP);
00409 
00410     frameworkComm_.Probe(iProcess, T_DEFAULT, status);
00411 
00412     double idle = time(IDLE);
00413 
00414     uint length = status.Get_count(MPI::CHAR);
00415     message.resize(length);
00416     frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00417 
00418     double recv = time(RECV);
00419 
00420     xAll = XMLNode::parseString(message.c_str(), "Pack");
00421     uint nIndividuals = atoi(xAll.getAttribute("size"));
00422     uint index, cid;
00423     for(uint i = 0; i < nIndividuals; i++) {
00424         xIndividual = xAll.getChildNode(i);
00425         pack.push_back((IndividualP) new Individual(state_));
00426         index = atoi(xIndividual.getAttributeValue(1));
00427         cid = atoi(xIndividual.getAttributeValue(2));
00428 
00429         pack[i]->index = index;
00430         pack[i]->cid = cid;
00431         pack[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00432         pack[i]->read(xIndividual);
00433     }
00434     status_ = status;
00435 
00436     double read = time(UNPACK);
00437 
00438     std::stringstream log;
00439     log << "received " << nIndividuals << " individuals, " << length << " bytes (";
00440     log << "I: " << idle << " | R: " << recv << " | U: " << read << ")";
00441     ECF_LOG(state_, logLevel_, log.str());
00442 
00443     return pack;
00444 }
00445 
00446 
00447 // prima jedinke u postojeci vektor jedinki (bez obzira na indekse)
00448 // velicina vektora se _povecava_ na broj primljenih jedinki
00449 // ako je jedinki manje, vektor se _ne smanjuje_
00450 // vraca broj primljenih jedinki
00451 uint Communicator::recvReplaceIndividuals(std::vector<IndividualP>& pool, uint iProcess)
00452 {
00453     XMLNode xAll, xIndividual;
00454     MPI::Status status;
00455 
00456     time(COMP);
00457 
00458     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00459 
00460     double idle = time(IDLE);
00461 
00462     uint length = status.Get_count(MPI::CHAR);
00463     char *message = new char[length + 1];
00464     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00465     status_ = status;
00466 
00467     double recv = time(RECV);
00468 
00469     xAll = XMLNode::parseString(message, "Pack");
00470     uint nIndividuals = atoi(xAll.getAttribute("size"));
00471     uint poolSize = (uint) pool.size();
00472 
00473     if(poolSize < nIndividuals) {
00474         pool.resize(nIndividuals);
00475         for(uint i = poolSize; i < nIndividuals; i++) {
00476             pool[i] = (IndividualP) new Individual(state_);
00477             pool[i]->fitness = (FitnessP) state_->getFitnessObject()->copy();
00478         }
00479     }
00480 
00481     uint index, cid;
00482     for(uint i = 0; i < nIndividuals; i++) {
00483         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00484         index = atoi(xIndividual.getAttributeValue(1));
00485         cid = atoi(xIndividual.getAttributeValue(2));
00486         pool[i]->index = index;
00487         pool[i]->cid = cid;
00488         pool[i]->read(xIndividual);
00489     }
00490 
00491     double read = time(UNPACK);
00492 
00493     std::stringstream log;
00494     log << "received " << nIndividuals << " individuals, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00495     ECF_LOG(state_, logLevel_, log.str());
00496 
00497     delete [] message;
00498 
00499     return nIndividuals;
00500 }
00501 
00502 
00503 // salje fitnese jedinki iz zadanog vektora
00504 // fitnesi se pakiraju kao jedinke bez genotipa (atribut "i" je prvi po redu!)
00505 // svakoj jedinki se dodaje njen indeks iz _deme_ (ne iz vektora pool!)
00506 // ako je nIndividuals != 0, salje se samo toliko prvih iz vektora
00507 bool Communicator::sendFitness(std::vector<IndividualP> pool, uint iProcess, uint nIndividuals)
00508 {
00509     time(COMP);
00510 
00511     XMLNode xAll, xIndividual, xFitness;
00512     xAll = XMLNode::createXMLTopNode("Pack");
00513 
00514     if(nIndividuals == 0)
00515         nIndividuals = (uint) pool.size();
00516     xAll.addAttribute("size", uint2str(nIndividuals).c_str());
00517 
00518     for(uint ind = 0; ind < nIndividuals; ind++) {
00519         xIndividual = XMLNode::createXMLTopNode("Individual");
00520         xIndividual.addAttribute("i", uint2str(pool[ind]->index).c_str());
00521         xIndividual.addAttribute("c", uint2str(pool[ind]->cid).c_str());
00522 
00523         pool[ind]->fitness->write(xFitness);
00524         xIndividual.addChild(xFitness);
00525         xAll.addChild(xIndividual);
00526     }
00527     char *message = xAll.createXMLString(0);
00528 
00529     double pack = time(PACK);
00530 
00531     demeComm_.Send(message, (int) strlen(message) + 1, MPI::CHAR, iProcess, T_DEFAULT);
00532 
00533     double send = time(SEND);
00534 
00535     std::stringstream log;
00536     log << "sent " << nIndividuals << " fitness objects, " << strlen(message) << " bytes (P: " << pack << " | S: " << send << ")";
00537     ECF_LOG(state_, logLevel_, log.str());
00538 
00539     freeXMLString(message);
00540 
00541     return true;
00542 }
00543 
00544 
00545 // prima fitnese na mjesto postojecih u deme objektu
00546 // fitnesi zu zapakirani u jedinke bez genotipa (atribut "i" je prvi po redu!)
00547 // cita se indeks iz poruke i fitnes jedinke se stavlja na odgovarajuce mjesto u deme vektoru
00548 uint Communicator::recvDemeFitness(std::vector<IndividualP>& deme, uint iProcess)
00549 {
00550     XMLNode xAll, xIndividual, xFitness;
00551     MPI::Status status;
00552     std::string message;
00553 
00554     time(COMP);
00555 
00556     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00557 
00558     double idle = time(IDLE);
00559 
00560     uint length = status.Get_count(MPI::CHAR);
00561     message.resize(length);
00562     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00563 
00564     double recv = time(RECV);
00565 
00566     xAll = XMLNode::parseString(message.c_str(), "Pack");
00567     uint nIndividuals = atoi(xAll.getAttribute("size"));
00568     uint index, cid;
00569     for(uint i = 0; i < nIndividuals; i++) {
00570         xIndividual = xAll.getChildNode(i); // ovo bi trebalo biti brze...?
00571         index = atoi(xIndividual.getAttributeValue(0));
00572         cid = atoi(xIndividual.getAttributeValue(1));
00573         xFitness = xIndividual.getChildNode(0);
00574         deme[index]->fitness->read(xFitness);
00575         deme[index]->fitness->cid = cid;
00576 
00577     }
00578     status_ = status;
00579 
00580     double read = time(UNPACK);
00581 
00582     std::stringstream log;
00583     log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00584     ECF_LOG(state_, logLevel_, log.str());
00585 
00586     return nIndividuals;
00587 }
00588 
00589 
00590 // prima fitnese u vektor fitnesa
00591 // fitnesi dolaze zapakirani kao jedinke bez genotipa (atribut "i" je prvi po redu!)
00592 // fitnesi se stavljaju u vektor (prvi argument) na mjesto koje pise kao indeks jedinke u poruci
00593 // funkcija vraca vektor indeksa jedinki primljenih fitnesa
00594 std::vector<uint> Communicator::recvFitnessVector(std::vector<IndividualP>& deme, uint iProcess)
00595 {
00596     XMLNode xAll, xIndividual, xFitness;
00597     MPI::Status status;
00598     std::string message;
00599     std::vector<uint> indices;
00600 
00601     time(COMP);
00602 
00603     demeComm_.Probe(iProcess, MPI::ANY_TAG, status);
00604 
00605     double idle = time(IDLE);
00606 
00607     uint length = status.Get_count(MPI::CHAR);
00608     message.resize(length);
00609     demeComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_DEFAULT, status);
00610 
00611     double recv = time(RECV);
00612 
00613     xAll = XMLNode::parseString(message.c_str(), "Pack");
00614     uint nIndividuals = atoi(xAll.getAttribute("size"));
00615     uint index, cid;
00616     for(uint i = 0; i < nIndividuals; i++) {
00617         xIndividual = xAll.getChildNode(i);
00618         index = atoi(xIndividual.getAttributeValue(0));
00619         cid = atoi(xIndividual.getAttributeValue(1));
00620         xFitness = xIndividual.getChildNode(0);
00621 
00622         deme[index]->fitness->read(xFitness);
00623         deme[index]->fitness->cid = cid;
00624         indices.push_back(index);
00625     }
00626     status_ = status;
00627 
00628     double read = time(UNPACK);
00629 
00630     std::stringstream log;
00631     log << "received " << nIndividuals << " fitnes objects, " << length << " bytes (I: " << idle << " | R: " << recv << " | U: " << read << ")";
00632     ECF_LOG(state_, logLevel_, log.str());
00633 
00634     return indices;
00635 }
00636 
00637 
00638 // salje vektor vrijednosti u globalnom komunikatoru
00639 bool Communicator::sendValuesGlobal(std::vector<double> values, uint iProcess)
00640 {
00641     time(COMP);
00642 
00643     frameworkComm_.Send(&values[0], (int) values.size(), MPI::DOUBLE, iProcess, T_VALUES);
00644 
00645     time(SEND);
00646 
00647     std::stringstream log;
00648     log << "sent " << values.size() << " doubles";
00649     ECF_LOG(state_, logLevel_, log.str());
00650 
00651     return true;
00652 }
00653 
00654 
00655 // prima vektor vrijednosti u globalnom komunikatoru
00656 std::vector<double> Communicator::recvValuesGlobal(uint iProcess)
00657 {
00658     std::vector<double> values;
00659     MPI::Status status;
00660 
00661     time(COMP);
00662 
00663     frameworkComm_.Probe(iProcess, T_VALUES, status);
00664 
00665     double idle = time(IDLE);
00666 
00667     uint size = status.Get_count(MPI::DOUBLE);
00668     values.resize(size);
00669     frameworkComm_.Recv(&values[0], size, MPI::DOUBLE, iProcess, T_VALUES, status);
00670 
00671     double recv = time(RECV);
00672 
00673     std::stringstream log;
00674     log << "received " << values.size() << " doubles";
00675     ECF_LOG(state_, logLevel_, log.str());
00676 
00677     return values;
00678 }
00679 
00680 
00681 // prima pristigle log poruke od bilo kojih procesa u globalnom komunikatoru
00682 std::string Communicator::recvLogsGlobal()
00683 {
00684     std::string logs = "", message;
00685     uint logCount = 0;
00686     MPI::Status status;
00687 
00688     time(COMP);
00689 
00690     while(frameworkComm_.Iprobe(MPI::ANY_SOURCE, T_LOGS, status)) {
00691         uint iProcess = status.Get_source();
00692         uint length = status.Get_count(MPI::CHAR);
00693         message.resize(length);
00694         frameworkComm_.Recv(&message[0], length, MPI::CHAR, iProcess, T_LOGS, status);
00695         logs += message;
00696         logCount++;
00697     }
00698 
00699     time(RECV);
00700 
00701     std::stringstream log;
00702     log << "received " << logCount << " logs";
00703     ECF_LOG(state_, logLevel_, log.str());
00704 
00705     return logs;
00706 }
00707 
00708 
00709 // salje log poruke u globalnom komunikatoru
00710 bool Communicator::sendLogsGlobal(std::string logs, uint iProcess, bool blocking)
00711 {
00712     time(COMP);
00713 
00714     MPI::Request request;
00715 
00716     if(blocking)
00717         frameworkComm_.Send(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00718     else
00719         request = frameworkComm_.Isend(&logs[0], (int) logs.size(), MPI::CHAR, iProcess, T_LOGS);
00720 
00721     time(SEND);
00722 
00723     std::stringstream log;
00724     log << "sent " << logs.size() << " log chars";
00725     ECF_LOG(state_, logLevel_, log.str());
00726 
00727     return true;
00728 }
00729 
00730 
00731 // slanje bilo kakvih podataka u globalnom komunikatoru
00732 bool Communicator::sendDataGlobal(voidP data, uint size, uint iProcess)
00733 {
00734     time(COMP);
00735 
00736     frameworkComm_.Send(data.get(), size, MPI::BYTE, iProcess, T_DATA);
00737 
00738     time(SEND);
00739 
00740     std::stringstream log;
00741     log << "sent " << size << " bytes";
00742     ECF_LOG(state_, logLevel_, log.str());
00743 
00744     return true;
00745 }
00746 
00747 
00748 // primanje bilo kakvih podataka u globalnom komunikatoru
00749 voidP Communicator::recvDataGlobal(uint iProcess)
00750 {
00751     MPI::Status status;
00752     voidP data;
00753 
00754     time(COMP);
00755 
00756     frameworkComm_.Probe(iProcess, T_DATA, status);
00757 
00758     time(IDLE);
00759 
00760     uint size = status.Get_count(MPI::BYTE);
00761     data = (voidP) new char[size / sizeof(char) + 1];
00762     frameworkComm_.Recv(data.get(), size, MPI::BYTE, iProcess, T_DATA, status);
00763 
00764     time(RECV);
00765 
00766     std::stringstream log;
00767     log << "received " << size << " bytes";
00768     ECF_LOG(state_, logLevel_, log.str());
00769 
00770     return data;
00771 }
00772 
00773 } // namespace

Generated on Thu Oct 6 2011 13:41:00 for ECF by  doxygen 1.7.1