• Main Page
  • Modules
  • Classes
  • Files
  • File List

D:/Projekt/ECF_trunk/ECF/Algorithm.cpp

00001 #include "ECF_base.h"
00002 
00003 #ifdef _MPI
00004 
00005 const int MASTER = 0;
00006 
00007 
00008 void Algorithm::registerParallelParameters(StateP state)
00009 {
00010     std::string *type = new std::string("eval");
00011     state->getRegistry()->registerEntry("parallel.type", (voidP) type, ECF::STRING,
00012         "implicit parallelization method: eval - evaluation, mut - mutation + eval");
00013 
00014     uint* jobSize = new uint(10);
00015     state->getRegistry()->registerEntry("parallel.jobsize", (voidP) jobSize, ECF::UINT,
00016         "implicit parallelization jobsize (individuals per job)");
00017 
00018     uint *sync = new uint(0);
00019     state->getRegistry()->registerEntry("parallel.sync", (voidP) sync, ECF::UINT,
00020         "implicit parallelization synchronicity: 0 - async, 1 - sync ");
00021 }
00022 
00023 
00024 bool Algorithm::initializeParallel(StateP state)
00025 {
00026     state_ = state;
00027     comm_ = state->getCommunicator();
00028     selBestOp = static_cast<SelBestOpP> (new SelBestOp);
00029     totalEvaluations_ = wastedEvaluations_ = 0;
00030     bImplicitEvaluation_ = bImplicitMutation_ = false;
00031 
00032     // initialize protected members for population consistency
00033     uint demeSize = state->getPopulation()->getLocalDeme()->getSize();
00034     storedInds_.resize(demeSize);
00035     sentInds_.resize(demeSize);
00036     isConsistent_.resize(demeSize);
00037 
00038     if(!state_->getRegistry()->isModified("parallel.type"))
00039         bImplicitParallel_ = false;
00040     else
00041         bImplicitParallel_ = true;
00042 
00043     if(this->isParallel() && bImplicitParallel_) {
00044         ECF_LOG_ERROR(state_, "Error: implicit parallelization possible only with sequential algorithm!");
00045         throw "";
00046     }
00047 
00048     if((this->isParallel() || bImplicitParallel_)) {
00049         if(state->getPopulation()->getNoDemes() > state->getCommunicator()->getCommGlobalSize()) { 
00050             ECF_LOG_ERROR(state, "Error: number of processes must be equal or greater than the number of demes in parallel EA!");
00051             throw "";
00052         }
00053     }
00054     else if(state->getPopulation()->getNoDemes() != state->getCommunicator()->getCommGlobalSize()) {
00055         ECF_LOG_ERROR(state, "Error: number of processes must equal the number of demes in EA with sequential algorithm!");
00056         throw "";
00057     }
00058 
00059     if(!bImplicitParallel_)
00060         return true;
00061 
00062     voidP sptr = state->getRegistry()->getEntry("parallel.type");
00063     std::string parallelType = *((std::string*)sptr.get());
00064 
00065     if(parallelType == "eval") {
00066         bImplicitEvaluation_ = true;
00067     }
00068     else if(parallelType == "mut") {
00069         bImplicitMutation_ = true;
00070     }
00071     else {
00072         ECF_LOG_ERROR(state, "Error: unkown implicit parallelization mode!");
00073         throw "";
00074     }
00075 
00076     sptr = state->getRegistry()->getEntry("parallel.sync");
00077     bSynchronized_ = ((*((uint*) sptr.get())) != 0);
00078 
00079     voidP jobSizeP = state->getRegistry()->getEntry("parallel.jobsize");
00080     jobSize_ = *((uint*) jobSizeP.get());
00081 
00082     if(jobSize_ > state->getPopulation()->at(0)->getSize()) {
00083         ECF_LOG_ERROR(state, "Error: parallel.jobsize must be less or equal to the number of individuals!");
00084         throw "";
00085     }
00086 
00087     return true;
00088 }
00089 
00090 
00091 // create deme-sized vector of initialized individuals for receiving
00092 void Algorithm::initializeImplicit(StateP state)
00093 {
00094     for(uint i = 0; i < state->getPopulation()->at(0)->size(); i++) {
00095         IndividualP ind = (IndividualP) new Individual(state);
00096         ind->fitness = (FitnessP) state->getFitnessObject()->copy();
00097         demeCopy_.push_back(ind);
00098         requestIds_.push_back(0);
00099         requestMutIds_.push_back(0);
00100     }
00101     stored_.resize(state->getPopulation()->at(0)->size());
00102     currentBest_ = selBestOp->select(*(state->getPopulation()->at(0)));
00103 }
00104 
00105 
00106 bool Algorithm::advanceGeneration(StateP state)
00107 {
00108     // implicitly parallel: worker processes go to work
00109     if(state->getCommunicator()->getCommRank() != 0 && bImplicitParallel_)
00110         return implicitParallelOperate(state);
00111     // otherwise, run the algorithm
00112     else
00113         return advanceGeneration(state, state->getPopulation()->at(0));
00114 }
00115 
00116 
00117 void Algorithm::implicitEvaluate(IndividualP ind)
00118 {
00119     totalEvaluations_++;
00120     if(requestIds_[ind->index] > 0) {   // if already on evaluation
00121         wastedEvaluations_++;
00122         return;
00123     }
00124 
00125     DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00126 
00127     if(!isMember(ind, requests_)) {
00128         requests_.push_back(ind);
00129         requestIds_[ind->index] = 1;    // fitness wanted
00130 
00131         // synchronous implicit evaluation: individual to be evaluated is removed from the deme
00132         // upon return, the individual is put at the end of deme
00133         // individual's index is _NOT_ its place in deme vector anymore!
00134         if(bSynchronized_) {
00135             stored_[ind->index] = ind;
00136             removeFrom(ind, *workingDeme);
00137         }
00138     }
00139 
00140     if(requests_.size() < jobSize_)
00141         return;
00142 
00143     if(comm_->messageWaiting()) {
00144         ECF_LOG(state_, 4, "Receiving evaluated individuals");
00145         std::vector<uint> indices = comm_->recvFitnessVector(demeCopy_, Comm::ANY_PROCESS);
00146         for(uint i = 0; i < indices.size(); i++) {
00147             if(requestIds_[indices[i]] > 0) {   // only if needs evaluation
00148                 if(bSynchronized_) {    // return individual to deme
00149                     stored_[indices[i]]->fitness = demeCopy_[indices[i]]->fitness;
00150                     workingDeme->push_back(stored_[indices[i]]);
00151                 }
00152                 else
00153                     workingDeme->at(indices[i])->fitness = demeCopy_[indices[i]]->fitness;
00154                 requestIds_[indices[i]] = 0;
00155             }
00156         }
00157 
00158         uint iWorker = comm_->getLastSource();
00159         comm_->sendIndividuals(requests_, iWorker);
00160         requests_.resize(0);
00161     }
00162     else {
00163         ECF_LOG(state_, 4, "Evaluating locally...");
00164         IndividualP ind = requests_.back();
00165         ind->fitness = evalOp_->evaluate(ind);
00166         requestIds_[ind->index] = 0;
00167         if(bSynchronized_) {    // return individual to deme
00168             workingDeme->push_back(ind);
00169         }
00170         requests_.pop_back();
00171     }
00172 
00173 }
00174 
00175 
00176 uint Algorithm::implicitMutate(IndividualP ind)
00177 {
00178     DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00179 
00180     if(!isMember(ind, requestsMut_)) {
00181         requestsMut_.push_back(ind);
00182         requestMutIds_[ind->index] = 1; // mutation needed
00183 
00184         if(bSynchronized_) {    // remove from deme
00185             removeFrom(ind, *workingDeme);
00186         }
00187     }
00188     if(requestsMut_.size() < jobSize_)
00189         return 0;
00190 
00191     if(comm_->messageWaiting()) {
00192         ECF_LOG(state_, 4, "Receiving mutated individuals");
00193         uint received = comm_->recvReplaceIndividuals(receivedMut_, Comm::ANY_PROCESS);
00194         currentBest_ = selBestOp->select(*workingDeme);
00195 
00196         for(uint i = 0; i < received; i++) {
00197             // if synchronized, return individual to deme
00198             if(bSynchronized_) {
00199                 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00200                 newInd->index = receivedMut_[i]->index;
00201                 workingDeme->push_back(newInd);
00202             }
00203             // replace individual only if mutation is needed
00204             // also: only if not replacing the current best
00205             else {
00206                 if(requestMutIds_[receivedMut_[i]->index] > 0
00207                     && (workingDeme->at(receivedMut_[i]->index) != currentBest_ 
00208                     || receivedMut_[i]->fitness->isBetterThan(currentBest_->fitness))) {
00209                         IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00210                         workingDeme->replace(receivedMut_[i]->index, newInd);
00211                 }
00212             }
00213             requestMutIds_[receivedMut_[i]->index] = 0;
00214         }
00215 
00216         uint iWorker = comm_->getLastSource();
00217         comm_->sendIndividuals(requestsMut_, iWorker);
00218 
00219         requestsMut_.resize(0);
00220         return received;
00221     }
00222     else {
00223         ECF_LOG(state_, 4, "Mutating locally...");
00224         std::vector<IndividualP> pool;
00225         pool.push_back(requestsMut_.back());
00226         requestMutIds_[requestsMut_.back()->index] = 0;
00227         requestsMut_.pop_back();
00228 
00229         if(bSynchronized_) {    // return individual to deme
00230             workingDeme->push_back(pool[0]);
00231         }
00232         uint mutated = mutation_->mutation(pool);
00233         return mutated;
00234     }
00235 }
00236 
00237 
00238 // implicit parallelization: worker processes
00239 bool Algorithm::implicitParallelOperate(StateP state)
00240 {
00241     ECF_LOG(state, 4, "Worker process initiating.");
00242 
00243     if(bImplicitEvaluation_) {
00244         myJob_.resize(0);
00245         comm_->sendFitness(myJob_, MASTER);
00246         uint myJobSize = 1;
00247 
00248         // while individuals to evaluate
00249         while(myJobSize != 0) {
00250 
00251             myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00252             for(uint i = 0; i < myJobSize; i++)
00253                 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00254 
00255             comm_->sendFitness(myJob_, MASTER, myJobSize);
00256         }
00257     }
00258 
00259     // implicit mutation + evaluation
00260     else if(bImplicitMutation_) {
00261         std::vector<IndividualP> mutationPool;
00262         myJob_.resize(0);
00263         comm_->sendIndividuals(myJob_, MASTER);
00264         uint myJobSize = 1;
00265 
00266         // while individuals to mutate
00267         while(myJobSize != 0) {
00268 
00269             myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00270 
00271             mutationPool.resize(myJobSize);
00272             for(uint i = 0; i < myJobSize; i++) {
00273                 mutationPool[i] = myJob_[i];
00274             }
00275 
00276             mutation_->mutation(mutationPool);
00277 
00278             for(uint i = 0; i < myJobSize; i++)
00279                 if(!mutationPool[i]->fitness->isValid())
00280                     mutationPool[i]->fitness = evalOp_->evaluate(mutationPool[i]);
00281 
00282             comm_->sendIndividuals(myJob_, MASTER, myJobSize);
00283         }
00284     }
00285 
00286     return true;
00287 }
00288 
00289 
00290 void Algorithm::bcastTermination(StateP state)
00291 {
00292     if(bImplicitParallel_) {
00293         // master: if termination not true, do nothing; otherwise, send workers empty jobs
00294         if(state->getCommunicator()->getCommRank() == 0 && state->getTerminateCond()) {
00295             std::vector<IndividualP> empty;
00296             for(uint iWorker = 1; iWorker < state->getCommunicator()->getCommSize(); iWorker++) {
00297                 state->getCommunicator()->sendIndividuals(empty, iWorker);
00298             }
00299         }
00300         // worker: empty job received, set terminate for local process
00301         else if(state->getCommunicator()->getCommRank() != 0)
00302             state->setTerminateCond();
00303 
00304         return;
00305     }
00306 
00307     // explicitly parallel algorithm (synchronous)
00308     if(state->getCommunicator()->getCommRank() == 0) {
00309         for(uint i = 1; i < comm_->getCommSize(); i++)
00310             comm_->sendControlMessage(i, state->getTerminateCond());
00311     }
00312     else {
00313         if(comm_->recvControlMessage(0))
00314             state->setTerminateCond();
00315     }
00316 }
00317 
00318 
00319 void Algorithm::replaceWith(IndividualP oldInd, IndividualP newInd)
00320 {
00321     if(!bSynchronized_) {
00322         state_->getPopulation()->at(0)->replace(oldInd->index, newInd);
00323         return;
00324     }
00325 
00326     DemeP workingDeme = state_->getPopulation()->at(0);
00327 
00328     for(uint i = 0; i < workingDeme->size(); i++) {
00329         if(workingDeme->at(i)->index == oldInd->index) {
00330             workingDeme->at(i) = newInd;
00331             newInd->index = oldInd->index;
00332             return;
00333         }
00334     }
00335 }
00336 
00337 
00338 bool Algorithm::initializePopulation(StateP state)
00339 {
00340     CommunicatorP comm = state->getCommunicator();
00341     DemeP myDeme = state->getPopulation()->at(0);
00342     uint demeSize = (uint) myDeme->size();
00343 
00344     isConsistent_.assign(isConsistent_.size(), true);
00345 
00346     if(comm->getCommSize() == 1) {
00347         for(uint iInd = 0; iInd < demeSize; iInd++) {
00348             IndividualP ind = myDeme->at(iInd);
00349             state_->getContext()->evaluatedIndividual = ind;
00350             ECF_LOG(state_, 5, "Evaluating individual: " + ind->toString());
00351             ind->fitness = evalOp_->evaluate(ind);
00352             ECF_LOG(state_, 5, "New fitness: " + ind->fitness->toString());
00353             state_->increaseEvaluations();
00354         }
00355         return true;
00356     }
00357 
00358     if(comm->getCommRank() == 0) {
00359         uint jobsPerProcess = 4;
00360         uint jobSize = 1 + demeSize / comm->getCommSize() / jobsPerProcess;
00361         uint remaining = demeSize;
00362         std::vector<IndividualP> job;
00363 
00364         for(uint iInd = 0; iInd < demeSize; ) {
00365             for(uint i = 0; i < jobSize && iInd < demeSize; i++, iInd++) {
00366                 myDeme->at(iInd)->fitness = (FitnessP) state->getFitnessObject()->copy();
00367                 job.push_back(myDeme->at(iInd));
00368             }
00369             remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00370             uint iWorker = comm->getLastSource();
00371             comm->sendIndividuals(job, iWorker);
00372             job.resize(0);
00373         }
00374 
00375         int remainingWorkers = comm->getCommSize() - 1;
00376         while(remaining > 0 || remainingWorkers > 0) {
00377             remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00378 
00379             uint iWorker = comm->getLastSource();
00380             comm->sendIndividuals(job, iWorker);
00381             remainingWorkers--;
00382         }
00383 
00384         state->increaseEvaluations(demeSize);
00385     }
00386 
00387     else {
00388         std::vector<IndividualP> myJob_;
00389         myJob_.resize(0);
00390         comm->sendFitness(myJob_, MASTER);
00391 
00392         uint myJobSize;
00393         myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00394 
00395         // while individuals to evaluate
00396         while(myJobSize > 0) {
00397             for(uint i = 0; i < myJobSize; i++) {
00398                 state_->getContext()->evaluatedIndividual = myJob_[i];
00399                 ECF_LOG(state_, 5, "Evaluating individual: " + myJob_[i]->toString());
00400                 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00401                 ECF_LOG(state_, 5, "New fitness: " + myJob_[i]->fitness->toString());
00402             }
00403 
00404             comm->sendFitness(myJob_, MASTER, myJobSize);
00405             myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00406         }
00407     }
00408 
00409     return true;
00410 }
00411 
00412 
00413 // called before genotype change
00414 void Algorithm::storeIndividual(IndividualP ind)
00415 {
00416     if(isConsistent_[ind->index]) {
00417         storedInds_[ind->index] = (IndividualP) ind->copy();
00418     }
00419     isConsistent_[ind->index] = false;
00420 }
00421 
00422 
00423 // called after individual evaluation
00424 void Algorithm::setConsistency(IndividualP ind)
00425 {
00426     isConsistent_[ind->index] = true;
00427     storedInds_[ind->index]->fitness = (FitnessP) ind->fitness->copy();
00428 }
00429 
00430 
00431 // called when sending individuals for evaluation
00432 void Algorithm::storeGenotypes(std::vector<IndividualP>& pool)
00433 {
00434     for(uint ind = 0; ind < pool.size(); ind++) {
00435         uint index = pool[ind]->index;
00436 
00437         // find first available slot for individual
00438         uint cid = 0;
00439         while(cid < sentInds_[index].size()) {
00440             if(!sentInds_[index][cid])
00441                 break;
00442             cid++;
00443         }
00444         if(cid == sentInds_[index].size())
00445             sentInds_[index].resize(cid + 1);
00446 
00447         // assign cid, copy individual
00448         pool[ind]->cid = cid;
00449         sentInds_[index][cid] = (IndividualP) pool[ind]->copy();
00450     }
00451 }
00452 
00453 
00454 // called when fitness is restored
00455 void Algorithm::restoreIndividuals(std::vector<uint> received)
00456 {
00457     DemeP myDeme = state_->getPopulation()->getLocalDeme();
00458 
00459     for(uint ind = 0; ind < received.size(); ind++) {
00460         uint index = received[ind];
00461         uint cid = myDeme->at(index)->fitness->cid;
00462 
00463         // restore genotype
00464         if(isConsistent_[index] == false) {
00465             sentInds_[index][cid]->fitness = myDeme->at(index)->fitness;
00466             replaceWith(myDeme->at(index), sentInds_[index][cid]);
00467 
00468             isConsistent_[index] = true;
00469             storedInds_[index]->fitness = (FitnessP) myDeme->at(index)->fitness->copy();
00470         }
00471         // or reject new fitness
00472         else {
00473             myDeme->at(index)->fitness = (FitnessP) storedInds_[index]->fitness->copy();
00474         }
00475         sentInds_[index][cid].reset();
00476     }
00477 }
00478 
00479 
00480 // called at evolution end for fitness consistency
00481 void Algorithm::restorePopulation()
00482 {
00483     DemeP myDeme = state_->getPopulation()->getLocalDeme();
00484     uint demeSize = myDeme->getSize();
00485 
00486     for(uint ind = 0; ind < demeSize; ind++) {
00487         if(!isConsistent_[ind]) {
00488             (*myDeme)[ind] = (IndividualP) storedInds_[ind]->copy();
00489         }
00490     }
00491 }
00492 
00493 
00494 #else   // non _MPI
00495 
00496 
00497 bool Algorithm::initializePopulation(StateP state)
00498 {
00499     for(uint iDeme = 0; iDeme < state->getPopulation()->size(); iDeme++)
00500         for(uint iInd = 0; iInd < state->getPopulation()->at(iDeme)->size(); iInd++)
00501             evaluate(state->getPopulation()->at(iDeme)->at(iInd));
00502     return true;
00503 }
00504 
00505 #endif  // _MPI

Generated on Tue Nov 4 2014 13:04:30 for ECF by  doxygen 1.7.1