• Main Page
  • Modules
  • Classes
  • Files
  • File List

D:/Projekt/ECF_trunk/ECF/Algorithm.cpp

00001 #include "ECF_base.h"
00002 
00003 #ifdef _MPI
00004 
00005 const int MASTER = 0;
00006 
00007 
00008 void Algorithm::registerParallelParameters(StateP state)
00009 {
00010     std::string *type = new std::string("eval");
00011     state->getRegistry()->registerEntry("parallel.type", (voidP) type, ECF::STRING);
00012 
00013     uint* jobSize = new uint(10);
00014     state->getRegistry()->registerEntry("parallel.jobsize", (voidP) jobSize, ECF::UINT);
00015 
00016     uint *sync = new uint(0);
00017     state->getRegistry()->registerEntry("parallel.sync", (voidP) sync, ECF::UINT);
00018 }
00019 
00020 
00021 bool Algorithm::initializeParallel(StateP state)
00022 {
00023     state_ = state;
00024     comm_ = state->getCommunicator();
00025     selBestOp = static_cast<SelBestOpP> (new SelBestOp);
00026     totalEvaluations_ = wastedEvaluations_ = 0;
00027     bImplicitEvaluation_ = bImplicitMutation_ = false;
00028 
00029     // initialize protected members for population consistency
00030     uint demeSize = state->getPopulation()->getLocalDeme()->getSize();
00031     storedInds_.resize(demeSize);
00032     sentInds_.resize(demeSize);
00033     isConsistent_.resize(demeSize);
00034 
00035     if(!state_->getRegistry()->isModified("parallel.type"))
00036         bImplicitParallel_ = false;
00037     else
00038         bImplicitParallel_ = true;
00039 
00040     if(this->isParallel() && bImplicitParallel_) {
00041         ECF_LOG_ERROR(state_, "Error: implicit parallelization possible only with sequential algorithm!");
00042         throw "";
00043     }
00044 
00045     if((this->isParallel() || bImplicitParallel_)) {
00046         if(state->getPopulation()->getNoDemes() > state->getCommunicator()->getCommGlobalSize()) { 
00047             ECF_LOG_ERROR(state, "Error: number of processes must be equal or greater than the number of demes in parallel EA!");
00048             throw "";
00049         }
00050     }
00051     else if(state->getPopulation()->getNoDemes() != state->getCommunicator()->getCommGlobalSize()) {
00052         ECF_LOG_ERROR(state, "Error: number of processes must equal the number of demes in EA with sequential algorithm!");
00053         throw "";
00054     }
00055 
00056     if(!bImplicitParallel_)
00057         return true;
00058 
00059     voidP sptr = state->getRegistry()->getEntry("parallel.type");
00060     std::string parallelType = *((std::string*)sptr.get());
00061 
00062     if(parallelType == "eval") {
00063         bImplicitEvaluation_ = true;
00064     }
00065     else if(parallelType == "mut") {
00066         bImplicitMutation_ = true;
00067     }
00068     else {
00069         ECF_LOG_ERROR(state, "Error: unkown implicit parallelization mode!");
00070         throw "";
00071     }
00072 
00073     sptr = state->getRegistry()->getEntry("parallel.sync");
00074     bSynchronized_ = ((*((uint*) sptr.get())) != 0);
00075 
00076     voidP jobSizeP = state->getRegistry()->getEntry("parallel.jobsize");
00077     jobSize_ = *((uint*) jobSizeP.get());
00078 
00079     if(jobSize_ > state->getPopulation()->at(0)->getSize()) {
00080         ECF_LOG_ERROR(state, "Error: parallel.jobsize must be less or equal to the number of individuals!");
00081         throw "";
00082     }
00083 
00084     return true;
00085 }
00086 
00087 
00088 // create deme-sized vector of initialized individuals for receiving
00089 void Algorithm::initializeImplicit(StateP state)
00090 {
00091     for(uint i = 0; i < state->getPopulation()->at(0)->size(); i++) {
00092         IndividualP ind = (IndividualP) new Individual(state);
00093         ind->fitness = (FitnessP) state->getFitnessObject()->copy();
00094         demeCopy_.push_back(ind);
00095         requestIds_.push_back(0);
00096         requestMutIds_.push_back(0);
00097     }
00098     stored_.resize(state->getPopulation()->at(0)->size());
00099     currentBest_ = selBestOp->select(*(state->getPopulation()->at(0)));
00100 }
00101 
00102 
00103 bool Algorithm::advanceGeneration(StateP state)
00104 {
00105     // implicitly parallel: worker processes go to work
00106     if(state->getCommunicator()->getCommRank() != 0 && bImplicitParallel_)
00107         return implicitParallelOperate(state);
00108     // otherwise, run the algorithm
00109     else
00110         return advanceGeneration(state, state->getPopulation()->at(0));
00111 }
00112 
00113 
00114 void Algorithm::implicitEvaluate(IndividualP ind)
00115 {
00116     totalEvaluations_++;
00117     if(requestIds_[ind->index] > 0) {   // if already on evaluation
00118         wastedEvaluations_++;
00119         return;
00120     }
00121 
00122     DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00123 
00124     if(!isMember(ind, requests_)) {
00125         requests_.push_back(ind);
00126         requestIds_[ind->index] = 1;    // fitness wanted
00127 
00128         // synchronous implicit evaluation: individual to be evaluated is removed from the deme
00129         // upon return, the individual is put at the end of deme
00130         // individual's index is _NOT_ its place in deme vector anymore!
00131         if(bSynchronized_) {
00132             stored_[ind->index] = ind;
00133             removeFrom(ind, *workingDeme);
00134         }
00135     }
00136 
00137     if(requests_.size() < jobSize_)
00138         return;
00139 
00140     if(comm_->messageWaiting()) {
00141         ECF_LOG(state_, 4, "Receiving evaluated individuals");
00142         std::vector<uint> indices = comm_->recvFitnessVector(demeCopy_, Comm::ANY_PROCESS);
00143         for(uint i = 0; i < indices.size(); i++) {
00144             if(requestIds_[indices[i]] > 0) {   // only if needs evaluation
00145                 if(bSynchronized_) {    // return individual to deme
00146                     stored_[indices[i]]->fitness = demeCopy_[indices[i]]->fitness;
00147                     workingDeme->push_back(stored_[indices[i]]);
00148                 }
00149                 else
00150                     workingDeme->at(indices[i])->fitness = demeCopy_[indices[i]]->fitness;
00151                 requestIds_[indices[i]] = 0;
00152             }
00153         }
00154 
00155         uint iWorker = comm_->getLastSource();
00156         comm_->sendIndividuals(requests_, iWorker);
00157         requests_.resize(0);
00158     }
00159     else {
00160         ECF_LOG(state_, 4, "Evaluating locally...");
00161         IndividualP ind = requests_.back();
00162         ind->fitness = evalOp_->evaluate(ind);
00163         requestIds_[ind->index] = 0;
00164         if(bSynchronized_) {    // return individual to deme
00165             workingDeme->push_back(ind);
00166         }
00167         requests_.pop_back();
00168     }
00169 
00170 }
00171 
00172 
00173 uint Algorithm::implicitMutate(IndividualP ind)
00174 {
00175     DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00176 
00177     if(!isMember(ind, requestsMut_)) {
00178         requestsMut_.push_back(ind);
00179         requestMutIds_[ind->index] = 1; // mutation needed
00180 
00181         if(bSynchronized_) {    // remove from deme
00182             removeFrom(ind, *workingDeme);
00183         }
00184     }
00185     if(requestsMut_.size() < jobSize_)
00186         return 0;
00187 
00188     if(comm_->messageWaiting()) {
00189         ECF_LOG(state_, 4, "Receiving mutated individuals");
00190         uint received = comm_->recvReplaceIndividuals(receivedMut_, Comm::ANY_PROCESS);
00191         currentBest_ = selBestOp->select(*workingDeme);
00192 
00193         for(uint i = 0; i < received; i++) {
00194             // if synchronized, return individual to deme
00195             if(bSynchronized_) {
00196                 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00197                 newInd->index = receivedMut_[i]->index;
00198                 workingDeme->push_back(newInd);
00199             }
00200             // replace individual only if mutation is needed
00201             // also: only if not replacing the current best
00202             else {
00203                 if(requestMutIds_[receivedMut_[i]->index] > 0
00204                     && (workingDeme->at(receivedMut_[i]->index) != currentBest_ 
00205                     || receivedMut_[i]->fitness->isBetterThan(currentBest_->fitness))) {
00206                         IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00207                         workingDeme->replace(receivedMut_[i]->index, newInd);
00208                 }
00209             }
00210             requestMutIds_[receivedMut_[i]->index] = 0;
00211         }
00212 
00213         uint iWorker = comm_->getLastSource();
00214         comm_->sendIndividuals(requestsMut_, iWorker);
00215 
00216         requestsMut_.resize(0);
00217         return received;
00218     }
00219     else {
00220         ECF_LOG(state_, 4, "Mutating locally...");
00221         std::vector<IndividualP> pool;
00222         pool.push_back(requestsMut_.back());
00223         requestMutIds_[requestsMut_.back()->index] = 0;
00224         requestsMut_.pop_back();
00225 
00226         if(bSynchronized_) {    // return individual to deme
00227             workingDeme->push_back(pool[0]);
00228         }
00229         uint mutated = mutation_->mutation(pool);
00230         return mutated;
00231     }
00232 }
00233 
00234 
00235 // implicit parallelization: worker processes
00236 bool Algorithm::implicitParallelOperate(StateP state)
00237 {
00238     ECF_LOG(state, 4, "Worker process initiating.");
00239 
00240     if(bImplicitEvaluation_) {
00241         myJob_.resize(0);
00242         comm_->sendFitness(myJob_, MASTER);
00243         uint myJobSize = 1;
00244 
00245         // while individuals to evaluate
00246         while(myJobSize != 0) {
00247 
00248             myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00249             for(uint i = 0; i < myJobSize; i++)
00250                 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00251 
00252             comm_->sendFitness(myJob_, MASTER, myJobSize);
00253         }
00254     }
00255 
00256     // implicit mutation + evaluation
00257     else if(bImplicitMutation_) {
00258         std::vector<IndividualP> mutationPool;
00259         myJob_.resize(0);
00260         comm_->sendIndividuals(myJob_, MASTER);
00261         uint myJobSize = 1;
00262 
00263         // while individuals to mutate
00264         while(myJobSize != 0) {
00265 
00266             myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00267 
00268             mutationPool.resize(myJobSize);
00269             for(uint i = 0; i < myJobSize; i++) {
00270                 mutationPool[i] = myJob_[i];
00271             }
00272 
00273             mutation_->mutation(mutationPool);
00274 
00275             for(uint i = 0; i < myJobSize; i++)
00276                 if(!mutationPool[i]->fitness->isValid())
00277                     mutationPool[i]->fitness = evalOp_->evaluate(mutationPool[i]);
00278 
00279             comm_->sendIndividuals(myJob_, MASTER, myJobSize);
00280         }
00281     }
00282 
00283     return true;
00284 }
00285 
00286 
00287 void Algorithm::bcastTermination(StateP state)
00288 {
00289     if(bImplicitParallel_) {
00290         // master: if termination not true, do nothing; otherwise, send workers empty jobs
00291         if(state->getCommunicator()->getCommRank() == 0 && state->getTerminateCond()) {
00292             std::vector<IndividualP> empty;
00293             for(uint iWorker = 1; iWorker < state->getCommunicator()->getCommSize(); iWorker++) {
00294                 state->getCommunicator()->sendIndividuals(empty, iWorker);
00295             }
00296         }
00297         // worker: empty job received, set terminate for local process
00298         else if(state->getCommunicator()->getCommRank() != 0)
00299             state->setTerminateCond();
00300 
00301         return;
00302     }
00303 
00304     // explicitly parallel algorithm (synchronous)
00305     if(state->getCommunicator()->getCommRank() == 0) {
00306         for(uint i = 1; i < comm_->getCommSize(); i++)
00307             comm_->sendControlMessage(i, state->getTerminateCond());
00308     }
00309     else {
00310         if(comm_->recvControlMessage(0))
00311             state->setTerminateCond();
00312     }
00313 }
00314 
00315 
00316 void Algorithm::replaceWith(IndividualP oldInd, IndividualP newInd)
00317 {
00318     if(!bSynchronized_) {
00319         state_->getPopulation()->at(0)->replace(oldInd->index, newInd);
00320         return;
00321     }
00322 
00323     DemeP workingDeme = state_->getPopulation()->at(0);
00324 
00325     for(uint i = 0; i < workingDeme->size(); i++) {
00326         if(workingDeme->at(i)->index == oldInd->index) {
00327             workingDeme->at(i) = newInd;
00328             newInd->index = oldInd->index;
00329             return;
00330         }
00331     }
00332 }
00333 
00334 
00335 bool Algorithm::initializePopulation(StateP state)
00336 {
00337     CommunicatorP comm = state->getCommunicator();
00338     DemeP myDeme = state->getPopulation()->at(0);
00339     uint demeSize = (uint) myDeme->size();
00340 
00341     isConsistent_.assign(isConsistent_.size(), true);
00342 
00343     if(comm->getCommSize() == 1) {
00344         for(uint iInd = 0; iInd < demeSize; iInd++) {
00345             IndividualP ind = myDeme->at(iInd);
00346             state_->getContext()->evaluatedIndividual = ind;
00347             ECF_LOG(state_, 5, "Evaluating individual: " + ind->toString());
00348             ind->fitness = evalOp_->evaluate(ind);
00349             ECF_LOG(state_, 5, "New fitness: " + ind->fitness->toString());
00350             state_->increaseEvaluations();
00351         }
00352         return true;
00353     }
00354 
00355     if(comm->getCommRank() == 0) {
00356         uint jobsPerProcess = 4;
00357         uint jobSize = 1 + demeSize / comm->getCommSize() / jobsPerProcess;
00358         uint remaining = demeSize;
00359         std::vector<IndividualP> job;
00360 
00361         for(uint iInd = 0; iInd < demeSize; ) {
00362             for(uint i = 0; i < jobSize && iInd < demeSize; i++, iInd++) {
00363                 myDeme->at(iInd)->fitness = (FitnessP) state->getFitnessObject()->copy();
00364                 job.push_back(myDeme->at(iInd));
00365             }
00366             remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00367             uint iWorker = comm->getLastSource();
00368             comm->sendIndividuals(job, iWorker);
00369             job.resize(0);
00370         }
00371 
00372         int remainingWorkers = comm->getCommSize() - 1;
00373         while(remaining > 0 || remainingWorkers > 0) {
00374             remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00375 
00376             uint iWorker = comm->getLastSource();
00377             comm->sendIndividuals(job, iWorker);
00378             remainingWorkers--;
00379         }
00380 
00381         state->increaseEvaluations(demeSize);
00382     }
00383 
00384     else {
00385         std::vector<IndividualP> myJob_;
00386         myJob_.resize(0);
00387         comm->sendFitness(myJob_, MASTER);
00388 
00389         uint myJobSize;
00390         myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00391 
00392         // while individuals to evaluate
00393         while(myJobSize > 0) {
00394             for(uint i = 0; i < myJobSize; i++) {
00395                 state_->getContext()->evaluatedIndividual = myJob_[i];
00396                 ECF_LOG(state_, 5, "Evaluating individual: " + myJob_[i]->toString());
00397                 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00398                 ECF_LOG(state_, 5, "New fitness: " + myJob_[i]->fitness->toString());
00399             }
00400 
00401             comm->sendFitness(myJob_, MASTER, myJobSize);
00402             myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00403         }
00404     }
00405 
00406     return true;
00407 }
00408 
00409 
00410 // called before genotype change
00411 void Algorithm::storeIndividual(IndividualP ind)
00412 {
00413     if(isConsistent_[ind->index]) {
00414         storedInds_[ind->index] = (IndividualP) ind->copy();
00415     }
00416     isConsistent_[ind->index] = false;
00417 }
00418 
00419 
00420 // called after individual evaluation
00421 void Algorithm::setConsistency(IndividualP ind)
00422 {
00423     isConsistent_[ind->index] = true;
00424     storedInds_[ind->index]->fitness = (FitnessP) ind->fitness->copy();
00425 }
00426 
00427 
00428 // called when sending individuals for evaluation
00429 void Algorithm::storeGenotypes(std::vector<IndividualP>& pool)
00430 {
00431     for(uint ind = 0; ind < pool.size(); ind++) {
00432         uint index = pool[ind]->index;
00433 
00434         // find first available slot for individual
00435         uint cid = 0;
00436         while(cid < sentInds_[index].size()) {
00437             if(!sentInds_[index][cid])
00438                 break;
00439             cid++;
00440         }
00441         if(cid == sentInds_[index].size())
00442             sentInds_[index].resize(cid + 1);
00443 
00444         // assign cid, copy individual
00445         pool[ind]->cid = cid;
00446         sentInds_[index][cid] = (IndividualP) pool[ind]->copy();
00447     }
00448 }
00449 
00450 
00451 // called when fitness is restored
00452 void Algorithm::restoreIndividuals(std::vector<uint> received)
00453 {
00454     DemeP myDeme = state_->getPopulation()->getLocalDeme();
00455 
00456     for(uint ind = 0; ind < received.size(); ind++) {
00457         uint index = received[ind];
00458         uint cid = myDeme->at(index)->fitness->cid;
00459 
00460         // restore genotype
00461         if(isConsistent_[index] == false) {
00462             sentInds_[index][cid]->fitness = myDeme->at(index)->fitness;
00463             replaceWith(myDeme->at(index), sentInds_[index][cid]);
00464 
00465             isConsistent_[index] = true;
00466             storedInds_[index]->fitness = (FitnessP) myDeme->at(index)->fitness->copy();
00467         }
00468         // or reject new fitness
00469         else {
00470             myDeme->at(index)->fitness = (FitnessP) storedInds_[index]->fitness->copy();
00471         }
00472         sentInds_[index][cid].reset();
00473     }
00474 }
00475 
00476 
00477 // called at evolution end for fitness consistency
00478 void Algorithm::restorePopulation()
00479 {
00480     DemeP myDeme = state_->getPopulation()->getLocalDeme();
00481     uint demeSize = myDeme->getSize();
00482 
00483     for(uint ind = 0; ind < demeSize; ind++) {
00484         if(!isConsistent_[ind]) {
00485             (*myDeme)[ind] = (IndividualP) storedInds_[ind]->copy();
00486         }
00487     }
00488 }
00489 
00490 
00491 #else   // non _MPI
00492 
00493 
00494 bool Algorithm::initializePopulation(StateP state)
00495 {
00496     for(uint iDeme = 0; iDeme < state->getPopulation()->size(); iDeme++)
00497         for(uint iInd = 0; iInd < state->getPopulation()->at(iDeme)->size(); iInd++)
00498             evaluate(state->getPopulation()->at(iDeme)->at(iInd));
00499     return true;
00500 }
00501 
00502 #endif  // _MPI

Generated on Fri Jul 5 2013 09:34:22 for ECF by  doxygen 1.7.1