00001 #include "ECF_base.h"
00002
00003 #ifdef _MPI
00004
00005 const int MASTER = 0;
00006
00007
00008 void Algorithm::registerParallelParameters(StateP state)
00009 {
00010 std::string *type = new std::string("eval");
00011 state->getRegistry()->registerEntry("parallel.type", (voidP) type, ECF::STRING);
00012
00013 uint* jobSize = new uint(10);
00014 state->getRegistry()->registerEntry("parallel.jobsize", (voidP) jobSize, ECF::UINT);
00015
00016 uint *sync = new uint(0);
00017 state->getRegistry()->registerEntry("parallel.sync", (voidP) sync, ECF::UINT);
00018 }
00019
00020
00021 bool Algorithm::initializeParallel(StateP state)
00022 {
00023 state_ = state;
00024 comm_ = state->getCommunicator();
00025 selBestOp = static_cast<SelBestOpP> (new SelBestOp);
00026 totalEvaluations_ = wastedEvaluations_ = 0;
00027 bImplicitEvaluation_ = bImplicitMutation_ = false;
00028
00029
00030 uint demeSize = state->getPopulation()->getLocalDeme()->getSize();
00031 storedInds_.resize(demeSize);
00032 sentInds_.resize(demeSize);
00033 isConsistent_.resize(demeSize);
00034
00035 if(!state_->getRegistry()->isModified("parallel.type"))
00036 bImplicitParallel_ = false;
00037 else
00038 bImplicitParallel_ = true;
00039
00040 if(this->isParallel() && bImplicitParallel_) {
00041 ECF_LOG_ERROR(state_, "Error: implicit parallelization possible only with sequential algorithm!");
00042 throw "";
00043 }
00044
00045 if((this->isParallel() || bImplicitParallel_)) {
00046 if(state->getPopulation()->getNoDemes() > state->getCommunicator()->getCommGlobalSize()) {
00047 ECF_LOG_ERROR(state, "Error: number of processes must be equal or greater than the number of demes in parallel EA!");
00048 throw "";
00049 }
00050 }
00051 else if(state->getPopulation()->getNoDemes() != state->getCommunicator()->getCommGlobalSize()) {
00052 ECF_LOG_ERROR(state, "Error: number of processes must equal the number of demes in EA with sequential algorithm!");
00053 throw "";
00054 }
00055
00056 if(!bImplicitParallel_)
00057 return true;
00058
00059 voidP sptr = state->getRegistry()->getEntry("parallel.type");
00060 std::string parallelType = *((std::string*)sptr.get());
00061
00062 if(parallelType == "eval") {
00063 bImplicitEvaluation_ = true;
00064 }
00065 else if(parallelType == "mut") {
00066 bImplicitMutation_ = true;
00067 }
00068 else {
00069 ECF_LOG_ERROR(state, "Error: unkown implicit parallelization mode!");
00070 throw "";
00071 }
00072
00073 sptr = state->getRegistry()->getEntry("parallel.sync");
00074 bSynchronized_ = ((*((uint*) sptr.get())) != 0);
00075
00076 voidP jobSizeP = state->getRegistry()->getEntry("parallel.jobsize");
00077 jobSize_ = *((uint*) jobSizeP.get());
00078
00079 if(jobSize_ > state->getPopulation()->at(0)->getSize()) {
00080 ECF_LOG_ERROR(state, "Error: parallel.jobsize must be less or equal to the number of individuals!");
00081 throw "";
00082 }
00083
00084 return true;
00085 }
00086
00087
00088
00089 void Algorithm::initializeImplicit(StateP state)
00090 {
00091 for(uint i = 0; i < state->getPopulation()->at(0)->size(); i++) {
00092 IndividualP ind = (IndividualP) new Individual(state);
00093 ind->fitness = (FitnessP) state->getFitnessObject()->copy();
00094 demeCopy_.push_back(ind);
00095 requestIds_.push_back(0);
00096 requestMutIds_.push_back(0);
00097 }
00098 stored_.resize(state->getPopulation()->at(0)->size());
00099 currentBest_ = selBestOp->select(*(state->getPopulation()->at(0)));
00100 }
00101
00102
00103 bool Algorithm::advanceGeneration(StateP state)
00104 {
00105
00106 if(state->getCommunicator()->getCommRank() != 0 && bImplicitParallel_)
00107 return implicitParallelOperate(state);
00108
00109 else
00110 return advanceGeneration(state, state->getPopulation()->at(0));
00111 }
00112
00113
00114 void Algorithm::implicitEvaluate(IndividualP ind)
00115 {
00116 totalEvaluations_++;
00117 if(requestIds_[ind->index] > 0) {
00118 wastedEvaluations_++;
00119 return;
00120 }
00121
00122 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00123
00124 if(!isMember(ind, requests_)) {
00125 requests_.push_back(ind);
00126 requestIds_[ind->index] = 1;
00127
00128
00129
00130
00131 if(bSynchronized_) {
00132 stored_[ind->index] = ind;
00133 removeFrom(ind, *workingDeme);
00134 }
00135 }
00136
00137 if(requests_.size() < jobSize_)
00138 return;
00139
00140 if(comm_->messageWaiting()) {
00141 ECF_LOG(state_, 4, "Receiving evaluated individuals");
00142 std::vector<uint> indices = comm_->recvFitnessVector(demeCopy_, Comm::ANY_PROCESS);
00143 for(uint i = 0; i < indices.size(); i++) {
00144 if(requestIds_[indices[i]] > 0) {
00145 if(bSynchronized_) {
00146 stored_[indices[i]]->fitness = demeCopy_[indices[i]]->fitness;
00147 workingDeme->push_back(stored_[indices[i]]);
00148 }
00149 else
00150 workingDeme->at(indices[i])->fitness = demeCopy_[indices[i]]->fitness;
00151 requestIds_[indices[i]] = 0;
00152 }
00153 }
00154
00155 uint iWorker = comm_->getLastSource();
00156 comm_->sendIndividuals(requests_, iWorker);
00157 requests_.resize(0);
00158 }
00159 else {
00160 ECF_LOG(state_, 4, "Evaluating locally...");
00161 IndividualP ind = requests_.back();
00162 ind->fitness = evalOp_->evaluate(ind);
00163 requestIds_[ind->index] = 0;
00164 if(bSynchronized_) {
00165 workingDeme->push_back(ind);
00166 }
00167 requests_.pop_back();
00168 }
00169
00170 }
00171
00172
00173 uint Algorithm::implicitMutate(IndividualP ind)
00174 {
00175 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00176
00177 if(!isMember(ind, requestsMut_)) {
00178 requestsMut_.push_back(ind);
00179 requestMutIds_[ind->index] = 1;
00180
00181 if(bSynchronized_) {
00182 removeFrom(ind, *workingDeme);
00183 }
00184 }
00185 if(requestsMut_.size() < jobSize_)
00186 return 0;
00187
00188 if(comm_->messageWaiting()) {
00189 ECF_LOG(state_, 4, "Receiving mutated individuals");
00190 uint received = comm_->recvReplaceIndividuals(receivedMut_, Comm::ANY_PROCESS);
00191 currentBest_ = selBestOp->select(*workingDeme);
00192
00193 for(uint i = 0; i < received; i++) {
00194
00195 if(bSynchronized_) {
00196 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00197 newInd->index = receivedMut_[i]->index;
00198 workingDeme->push_back(newInd);
00199 }
00200
00201
00202 else {
00203 if(requestMutIds_[receivedMut_[i]->index] > 0
00204 && (workingDeme->at(receivedMut_[i]->index) != currentBest_
00205 || receivedMut_[i]->fitness->isBetterThan(currentBest_->fitness))) {
00206 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00207 workingDeme->replace(receivedMut_[i]->index, newInd);
00208 }
00209 }
00210 requestMutIds_[receivedMut_[i]->index] = 0;
00211 }
00212
00213 uint iWorker = comm_->getLastSource();
00214 comm_->sendIndividuals(requestsMut_, iWorker);
00215
00216 requestsMut_.resize(0);
00217 return received;
00218 }
00219 else {
00220 ECF_LOG(state_, 4, "Mutating locally...");
00221 std::vector<IndividualP> pool;
00222 pool.push_back(requestsMut_.back());
00223 requestMutIds_[requestsMut_.back()->index] = 0;
00224 requestsMut_.pop_back();
00225
00226 if(bSynchronized_) {
00227 workingDeme->push_back(pool[0]);
00228 }
00229 uint mutated = mutation_->mutation(pool);
00230 return mutated;
00231 }
00232 }
00233
00234
00235
00236 bool Algorithm::implicitParallelOperate(StateP state)
00237 {
00238 ECF_LOG(state, 4, "Worker process initiating.");
00239
00240 if(bImplicitEvaluation_) {
00241 myJob_.resize(0);
00242 comm_->sendFitness(myJob_, MASTER);
00243 uint myJobSize = 1;
00244
00245
00246 while(myJobSize != 0) {
00247
00248 myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00249 for(uint i = 0; i < myJobSize; i++)
00250 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00251
00252 comm_->sendFitness(myJob_, MASTER, myJobSize);
00253 }
00254 }
00255
00256
00257 else if(bImplicitMutation_) {
00258 std::vector<IndividualP> mutationPool;
00259 myJob_.resize(0);
00260 comm_->sendIndividuals(myJob_, MASTER);
00261 uint myJobSize = 1;
00262
00263
00264 while(myJobSize != 0) {
00265
00266 myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00267
00268 mutationPool.resize(myJobSize);
00269 for(uint i = 0; i < myJobSize; i++) {
00270 mutationPool[i] = myJob_[i];
00271 }
00272
00273 mutation_->mutation(mutationPool);
00274
00275 for(uint i = 0; i < myJobSize; i++)
00276 if(!mutationPool[i]->fitness->isValid())
00277 mutationPool[i]->fitness = evalOp_->evaluate(mutationPool[i]);
00278
00279 comm_->sendIndividuals(myJob_, MASTER, myJobSize);
00280 }
00281 }
00282
00283 return true;
00284 }
00285
00286
00287 void Algorithm::bcastTermination(StateP state)
00288 {
00289 if(bImplicitParallel_) {
00290
00291 if(state->getCommunicator()->getCommRank() == 0 && state->getTerminateCond()) {
00292 std::vector<IndividualP> empty;
00293 for(uint iWorker = 1; iWorker < state->getCommunicator()->getCommSize(); iWorker++) {
00294 state->getCommunicator()->sendIndividuals(empty, iWorker);
00295 }
00296 }
00297
00298 else if(state->getCommunicator()->getCommRank() != 0)
00299 state->setTerminateCond();
00300
00301 return;
00302 }
00303
00304
00305 if(state->getCommunicator()->getCommRank() == 0) {
00306 for(uint i = 1; i < comm_->getCommSize(); i++)
00307 comm_->sendControlMessage(i, state->getTerminateCond());
00308 }
00309 else {
00310 if(comm_->recvControlMessage(0))
00311 state->setTerminateCond();
00312 }
00313 }
00314
00315
00316 void Algorithm::replaceWith(IndividualP oldInd, IndividualP newInd)
00317 {
00318 if(!bSynchronized_) {
00319 state_->getPopulation()->at(0)->replace(oldInd->index, newInd);
00320 return;
00321 }
00322
00323 DemeP workingDeme = state_->getPopulation()->at(0);
00324
00325 for(uint i = 0; i < workingDeme->size(); i++) {
00326 if(workingDeme->at(i)->index == oldInd->index) {
00327 workingDeme->at(i) = newInd;
00328 newInd->index = oldInd->index;
00329 return;
00330 }
00331 }
00332 }
00333
00334
00335 bool Algorithm::initializePopulation(StateP state)
00336 {
00337 CommunicatorP comm = state->getCommunicator();
00338 DemeP myDeme = state->getPopulation()->at(0);
00339 uint demeSize = (uint) myDeme->size();
00340
00341 isConsistent_.assign(isConsistent_.size(), true);
00342
00343 if(comm->getCommSize() == 1) {
00344 for(uint iInd = 0; iInd < demeSize; iInd++) {
00345 IndividualP ind = myDeme->at(iInd);
00346 state_->getContext()->evaluatedIndividual = ind;
00347 ECF_LOG(state_, 5, "Evaluating individual: " + ind->toString());
00348 ind->fitness = evalOp_->evaluate(ind);
00349 ECF_LOG(state_, 5, "New fitness: " + ind->fitness->toString());
00350 state_->increaseEvaluations();
00351 }
00352 return true;
00353 }
00354
00355 if(comm->getCommRank() == 0) {
00356 uint jobsPerProcess = 4;
00357 uint jobSize = 1 + demeSize / comm->getCommSize() / jobsPerProcess;
00358 uint remaining = demeSize;
00359 std::vector<IndividualP> job;
00360
00361 for(uint iInd = 0; iInd < demeSize; ) {
00362 for(uint i = 0; i < jobSize && iInd < demeSize; i++, iInd++) {
00363 myDeme->at(iInd)->fitness = (FitnessP) state->getFitnessObject()->copy();
00364 job.push_back(myDeme->at(iInd));
00365 }
00366 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00367 uint iWorker = comm->getLastSource();
00368 comm->sendIndividuals(job, iWorker);
00369 job.resize(0);
00370 }
00371
00372 int remainingWorkers = comm->getCommSize() - 1;
00373 while(remaining > 0 || remainingWorkers > 0) {
00374 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00375
00376 uint iWorker = comm->getLastSource();
00377 comm->sendIndividuals(job, iWorker);
00378 remainingWorkers--;
00379 }
00380
00381 state->increaseEvaluations(demeSize);
00382 }
00383
00384 else {
00385 std::vector<IndividualP> myJob_;
00386 myJob_.resize(0);
00387 comm->sendFitness(myJob_, MASTER);
00388
00389 uint myJobSize;
00390 myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00391
00392
00393 while(myJobSize > 0) {
00394 for(uint i = 0; i < myJobSize; i++) {
00395 state_->getContext()->evaluatedIndividual = myJob_[i];
00396 ECF_LOG(state_, 5, "Evaluating individual: " + myJob_[i]->toString());
00397 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00398 ECF_LOG(state_, 5, "New fitness: " + myJob_[i]->fitness->toString());
00399 }
00400
00401 comm->sendFitness(myJob_, MASTER, myJobSize);
00402 myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00403 }
00404 }
00405
00406 return true;
00407 }
00408
00409
00410
00411 void Algorithm::storeIndividual(IndividualP ind)
00412 {
00413 if(isConsistent_[ind->index]) {
00414 storedInds_[ind->index] = (IndividualP) ind->copy();
00415 }
00416 isConsistent_[ind->index] = false;
00417 }
00418
00419
00420
00421 void Algorithm::setConsistency(IndividualP ind)
00422 {
00423 isConsistent_[ind->index] = true;
00424 storedInds_[ind->index]->fitness = (FitnessP) ind->fitness->copy();
00425 }
00426
00427
00428
00429 void Algorithm::storeGenotypes(std::vector<IndividualP>& pool)
00430 {
00431 for(uint ind = 0; ind < pool.size(); ind++) {
00432 uint index = pool[ind]->index;
00433
00434
00435 uint cid = 0;
00436 while(cid < sentInds_[index].size()) {
00437 if(!sentInds_[index][cid])
00438 break;
00439 cid++;
00440 }
00441 if(cid == sentInds_[index].size())
00442 sentInds_[index].resize(cid + 1);
00443
00444
00445 pool[ind]->cid = cid;
00446 sentInds_[index][cid] = (IndividualP) pool[ind]->copy();
00447 }
00448 }
00449
00450
00451
00452 void Algorithm::restoreIndividuals(std::vector<uint> received)
00453 {
00454 DemeP myDeme = state_->getPopulation()->getLocalDeme();
00455
00456 for(uint ind = 0; ind < received.size(); ind++) {
00457 uint index = received[ind];
00458 uint cid = myDeme->at(index)->fitness->cid;
00459
00460
00461 if(isConsistent_[index] == false) {
00462 sentInds_[index][cid]->fitness = myDeme->at(index)->fitness;
00463 replaceWith(myDeme->at(index), sentInds_[index][cid]);
00464
00465 isConsistent_[index] = true;
00466 storedInds_[index]->fitness = (FitnessP) myDeme->at(index)->fitness->copy();
00467 }
00468
00469 else {
00470 myDeme->at(index)->fitness = (FitnessP) storedInds_[index]->fitness->copy();
00471 }
00472 sentInds_[index][cid].reset();
00473 }
00474 }
00475
00476
00477
00478 void Algorithm::restorePopulation()
00479 {
00480 DemeP myDeme = state_->getPopulation()->getLocalDeme();
00481 uint demeSize = myDeme->getSize();
00482
00483 for(uint ind = 0; ind < demeSize; ind++) {
00484 if(!isConsistent_[ind]) {
00485 (*myDeme)[ind] = (IndividualP) storedInds_[ind]->copy();
00486 }
00487 }
00488 }
00489
00490
00491 #else // non _MPI
00492
00493
00494 bool Algorithm::initializePopulation(StateP state)
00495 {
00496 for(uint iDeme = 0; iDeme < state->getPopulation()->size(); iDeme++)
00497 for(uint iInd = 0; iInd < state->getPopulation()->at(iDeme)->size(); iInd++)
00498 evaluate(state->getPopulation()->at(iDeme)->at(iInd));
00499 return true;
00500 }
00501
00502 #endif // _MPI