00001 #include "ECF_base.h"
00002
00003 #ifdef _MPI
00004
00005 const int MASTER = 0;
00006
00007
00008 void Algorithm::registerParallelParameters(StateP state)
00009 {
00010 std::string *type = new std::string("eval");
00011 state->getRegistry()->registerEntry("parallel.type", (voidP) type, ECF::STRING,
00012 "implicit parallelization method: eval - evaluation, mut - mutation + eval");
00013
00014 uint* jobSize = new uint(10);
00015 state->getRegistry()->registerEntry("parallel.jobsize", (voidP) jobSize, ECF::UINT,
00016 "implicit parallelization jobsize (individuals per job)");
00017
00018 uint *sync = new uint(0);
00019 state->getRegistry()->registerEntry("parallel.sync", (voidP) sync, ECF::UINT,
00020 "implicit parallelization synchronicity: 0 - async, 1 - sync ");
00021 }
00022
00023
00024 bool Algorithm::initializeParallel(StateP state)
00025 {
00026 state_ = state;
00027 comm_ = state->getCommunicator();
00028 selBestOp = static_cast<SelBestOpP> (new SelBestOp);
00029 totalEvaluations_ = wastedEvaluations_ = 0;
00030 bImplicitEvaluation_ = bImplicitMutation_ = false;
00031
00032
00033 uint demeSize = state->getPopulation()->getLocalDeme()->getSize();
00034 storedInds_.resize(demeSize);
00035 sentInds_.resize(demeSize);
00036 isConsistent_.resize(demeSize);
00037
00038 if(!state_->getRegistry()->isModified("parallel.type"))
00039 bImplicitParallel_ = false;
00040 else
00041 bImplicitParallel_ = true;
00042
00043 if(this->isParallel() && bImplicitParallel_) {
00044 ECF_LOG_ERROR(state_, "Error: implicit parallelization possible only with sequential algorithm!");
00045 throw "";
00046 }
00047
00048 if((this->isParallel() || bImplicitParallel_)) {
00049 if(state->getPopulation()->getNoDemes() > state->getCommunicator()->getCommGlobalSize()) {
00050 ECF_LOG_ERROR(state, "Error: number of processes must be equal or greater than the number of demes in parallel EA!");
00051 throw "";
00052 }
00053 }
00054 else if(state->getPopulation()->getNoDemes() != state->getCommunicator()->getCommGlobalSize()) {
00055 ECF_LOG_ERROR(state, "Error: number of processes must equal the number of demes in EA with sequential algorithm!");
00056 throw "";
00057 }
00058
00059 if(!bImplicitParallel_)
00060 return true;
00061
00062 voidP sptr = state->getRegistry()->getEntry("parallel.type");
00063 std::string parallelType = *((std::string*)sptr.get());
00064
00065 if(parallelType == "eval") {
00066 bImplicitEvaluation_ = true;
00067 }
00068 else if(parallelType == "mut") {
00069 bImplicitMutation_ = true;
00070 }
00071 else {
00072 ECF_LOG_ERROR(state, "Error: unkown implicit parallelization mode!");
00073 throw "";
00074 }
00075
00076 sptr = state->getRegistry()->getEntry("parallel.sync");
00077 bSynchronized_ = ((*((uint*) sptr.get())) != 0);
00078
00079 voidP jobSizeP = state->getRegistry()->getEntry("parallel.jobsize");
00080 jobSize_ = *((uint*) jobSizeP.get());
00081
00082 if(jobSize_ > state->getPopulation()->at(0)->getSize()) {
00083 ECF_LOG_ERROR(state, "Error: parallel.jobsize must be less or equal to the number of individuals!");
00084 throw "";
00085 }
00086
00087 return true;
00088 }
00089
00090
00091
00092 void Algorithm::initializeImplicit(StateP state)
00093 {
00094 for(uint i = 0; i < state->getPopulation()->at(0)->size(); i++) {
00095 IndividualP ind = (IndividualP) new Individual(state);
00096 ind->fitness = (FitnessP) state->getFitnessObject()->copy();
00097 demeCopy_.push_back(ind);
00098 requestIds_.push_back(0);
00099 requestMutIds_.push_back(0);
00100 }
00101 stored_.resize(state->getPopulation()->at(0)->size());
00102 currentBest_ = selBestOp->select(*(state->getPopulation()->at(0)));
00103 }
00104
00105
00106 bool Algorithm::advanceGeneration(StateP state)
00107 {
00108
00109 if(state->getCommunicator()->getCommRank() != 0 && bImplicitParallel_)
00110 return implicitParallelOperate(state);
00111
00112 else
00113 return advanceGeneration(state, state->getPopulation()->at(0));
00114 }
00115
00116
00117 void Algorithm::implicitEvaluate(IndividualP ind)
00118 {
00119 totalEvaluations_++;
00120 if(requestIds_[ind->index] > 0) {
00121 wastedEvaluations_++;
00122 return;
00123 }
00124
00125 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00126
00127 if(!isMember(ind, requests_)) {
00128 requests_.push_back(ind);
00129 requestIds_[ind->index] = 1;
00130
00131
00132
00133
00134 if(bSynchronized_) {
00135 stored_[ind->index] = ind;
00136 removeFrom(ind, *workingDeme);
00137 }
00138 }
00139
00140 if(requests_.size() < jobSize_)
00141 return;
00142
00143 if(comm_->messageWaiting()) {
00144 ECF_LOG(state_, 4, "Receiving evaluated individuals");
00145 std::vector<uint> indices = comm_->recvFitnessVector(demeCopy_, Comm::ANY_PROCESS);
00146 for(uint i = 0; i < indices.size(); i++) {
00147 if(requestIds_[indices[i]] > 0) {
00148 if(bSynchronized_) {
00149 stored_[indices[i]]->fitness = demeCopy_[indices[i]]->fitness;
00150 workingDeme->push_back(stored_[indices[i]]);
00151 }
00152 else
00153 workingDeme->at(indices[i])->fitness = demeCopy_[indices[i]]->fitness;
00154 requestIds_[indices[i]] = 0;
00155 }
00156 }
00157
00158 uint iWorker = comm_->getLastSource();
00159 comm_->sendIndividuals(requests_, iWorker);
00160 requests_.resize(0);
00161 }
00162 else {
00163 ECF_LOG(state_, 4, "Evaluating locally...");
00164 IndividualP ind = requests_.back();
00165 ind->fitness = evalOp_->evaluate(ind);
00166 requestIds_[ind->index] = 0;
00167 if(bSynchronized_) {
00168 workingDeme->push_back(ind);
00169 }
00170 requests_.pop_back();
00171 }
00172
00173 }
00174
00175
00176 uint Algorithm::implicitMutate(IndividualP ind)
00177 {
00178 DemeP workingDeme = state_->getPopulation()->getLocalDeme();
00179
00180 if(!isMember(ind, requestsMut_)) {
00181 requestsMut_.push_back(ind);
00182 requestMutIds_[ind->index] = 1;
00183
00184 if(bSynchronized_) {
00185 removeFrom(ind, *workingDeme);
00186 }
00187 }
00188 if(requestsMut_.size() < jobSize_)
00189 return 0;
00190
00191 if(comm_->messageWaiting()) {
00192 ECF_LOG(state_, 4, "Receiving mutated individuals");
00193 uint received = comm_->recvReplaceIndividuals(receivedMut_, Comm::ANY_PROCESS);
00194 currentBest_ = selBestOp->select(*workingDeme);
00195
00196 for(uint i = 0; i < received; i++) {
00197
00198 if(bSynchronized_) {
00199 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00200 newInd->index = receivedMut_[i]->index;
00201 workingDeme->push_back(newInd);
00202 }
00203
00204
00205 else {
00206 if(requestMutIds_[receivedMut_[i]->index] > 0
00207 && (workingDeme->at(receivedMut_[i]->index) != currentBest_
00208 || receivedMut_[i]->fitness->isBetterThan(currentBest_->fitness))) {
00209 IndividualP newInd = (IndividualP) receivedMut_[i]->copy();
00210 workingDeme->replace(receivedMut_[i]->index, newInd);
00211 }
00212 }
00213 requestMutIds_[receivedMut_[i]->index] = 0;
00214 }
00215
00216 uint iWorker = comm_->getLastSource();
00217 comm_->sendIndividuals(requestsMut_, iWorker);
00218
00219 requestsMut_.resize(0);
00220 return received;
00221 }
00222 else {
00223 ECF_LOG(state_, 4, "Mutating locally...");
00224 std::vector<IndividualP> pool;
00225 pool.push_back(requestsMut_.back());
00226 requestMutIds_[requestsMut_.back()->index] = 0;
00227 requestsMut_.pop_back();
00228
00229 if(bSynchronized_) {
00230 workingDeme->push_back(pool[0]);
00231 }
00232 uint mutated = mutation_->mutation(pool);
00233 return mutated;
00234 }
00235 }
00236
00237
00238
00239 bool Algorithm::implicitParallelOperate(StateP state)
00240 {
00241 ECF_LOG(state, 4, "Worker process initiating.");
00242
00243 if(bImplicitEvaluation_) {
00244 myJob_.resize(0);
00245 comm_->sendFitness(myJob_, MASTER);
00246 uint myJobSize = 1;
00247
00248
00249 while(myJobSize != 0) {
00250
00251 myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00252 for(uint i = 0; i < myJobSize; i++)
00253 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00254
00255 comm_->sendFitness(myJob_, MASTER, myJobSize);
00256 }
00257 }
00258
00259
00260 else if(bImplicitMutation_) {
00261 std::vector<IndividualP> mutationPool;
00262 myJob_.resize(0);
00263 comm_->sendIndividuals(myJob_, MASTER);
00264 uint myJobSize = 1;
00265
00266
00267 while(myJobSize != 0) {
00268
00269 myJobSize = comm_->recvReplaceIndividuals(myJob_, MASTER);
00270
00271 mutationPool.resize(myJobSize);
00272 for(uint i = 0; i < myJobSize; i++) {
00273 mutationPool[i] = myJob_[i];
00274 }
00275
00276 mutation_->mutation(mutationPool);
00277
00278 for(uint i = 0; i < myJobSize; i++)
00279 if(!mutationPool[i]->fitness->isValid())
00280 mutationPool[i]->fitness = evalOp_->evaluate(mutationPool[i]);
00281
00282 comm_->sendIndividuals(myJob_, MASTER, myJobSize);
00283 }
00284 }
00285
00286 return true;
00287 }
00288
00289
00290 void Algorithm::bcastTermination(StateP state)
00291 {
00292 if(bImplicitParallel_) {
00293
00294 if(state->getCommunicator()->getCommRank() == 0 && state->getTerminateCond()) {
00295 std::vector<IndividualP> empty;
00296 for(uint iWorker = 1; iWorker < state->getCommunicator()->getCommSize(); iWorker++) {
00297 state->getCommunicator()->sendIndividuals(empty, iWorker);
00298 }
00299 }
00300
00301 else if(state->getCommunicator()->getCommRank() != 0)
00302 state->setTerminateCond();
00303
00304 return;
00305 }
00306
00307
00308 if(state->getCommunicator()->getCommRank() == 0) {
00309 for(uint i = 1; i < comm_->getCommSize(); i++)
00310 comm_->sendControlMessage(i, state->getTerminateCond());
00311 }
00312 else {
00313 if(comm_->recvControlMessage(0))
00314 state->setTerminateCond();
00315 }
00316 }
00317
00318
00319 void Algorithm::replaceWith(IndividualP oldInd, IndividualP newInd)
00320 {
00321 if(!bSynchronized_) {
00322 state_->getPopulation()->at(0)->replace(oldInd->index, newInd);
00323 return;
00324 }
00325
00326 DemeP workingDeme = state_->getPopulation()->at(0);
00327
00328 for(uint i = 0; i < workingDeme->size(); i++) {
00329 if(workingDeme->at(i)->index == oldInd->index) {
00330 workingDeme->at(i) = newInd;
00331 newInd->index = oldInd->index;
00332 return;
00333 }
00334 }
00335 }
00336
00337
00338 bool Algorithm::initializePopulation(StateP state)
00339 {
00340 CommunicatorP comm = state->getCommunicator();
00341 DemeP myDeme = state->getPopulation()->at(0);
00342 uint demeSize = (uint) myDeme->size();
00343
00344 isConsistent_.assign(isConsistent_.size(), true);
00345
00346 if(comm->getCommSize() == 1) {
00347 for(uint iInd = 0; iInd < demeSize; iInd++) {
00348 IndividualP ind = myDeme->at(iInd);
00349 state_->getContext()->evaluatedIndividual = ind;
00350 ECF_LOG(state_, 5, "Evaluating individual: " + ind->toString());
00351 ind->fitness = evalOp_->evaluate(ind);
00352 ECF_LOG(state_, 5, "New fitness: " + ind->fitness->toString());
00353 state_->increaseEvaluations();
00354 }
00355 return true;
00356 }
00357
00358 if(comm->getCommRank() == 0) {
00359 uint jobsPerProcess = 4;
00360 uint jobSize = 1 + demeSize / comm->getCommSize() / jobsPerProcess;
00361 uint remaining = demeSize;
00362 std::vector<IndividualP> job;
00363
00364 for(uint iInd = 0; iInd < demeSize; ) {
00365 for(uint i = 0; i < jobSize && iInd < demeSize; i++, iInd++) {
00366 myDeme->at(iInd)->fitness = (FitnessP) state->getFitnessObject()->copy();
00367 job.push_back(myDeme->at(iInd));
00368 }
00369 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00370 uint iWorker = comm->getLastSource();
00371 comm->sendIndividuals(job, iWorker);
00372 job.resize(0);
00373 }
00374
00375 int remainingWorkers = comm->getCommSize() - 1;
00376 while(remaining > 0 || remainingWorkers > 0) {
00377 remaining -= comm->recvDemeFitness(*myDeme, MPI::ANY_SOURCE);
00378
00379 uint iWorker = comm->getLastSource();
00380 comm->sendIndividuals(job, iWorker);
00381 remainingWorkers--;
00382 }
00383
00384 state->increaseEvaluations(demeSize);
00385 }
00386
00387 else {
00388 std::vector<IndividualP> myJob_;
00389 myJob_.resize(0);
00390 comm->sendFitness(myJob_, MASTER);
00391
00392 uint myJobSize;
00393 myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00394
00395
00396 while(myJobSize > 0) {
00397 for(uint i = 0; i < myJobSize; i++) {
00398 state_->getContext()->evaluatedIndividual = myJob_[i];
00399 ECF_LOG(state_, 5, "Evaluating individual: " + myJob_[i]->toString());
00400 myJob_[i]->fitness = evalOp_->evaluate(myJob_[i]);
00401 ECF_LOG(state_, 5, "New fitness: " + myJob_[i]->fitness->toString());
00402 }
00403
00404 comm->sendFitness(myJob_, MASTER, myJobSize);
00405 myJobSize = comm->recvReplaceIndividuals(myJob_, MASTER);
00406 }
00407 }
00408
00409 return true;
00410 }
00411
00412
00413
00414 void Algorithm::storeIndividual(IndividualP ind)
00415 {
00416 if(isConsistent_[ind->index]) {
00417 storedInds_[ind->index] = (IndividualP) ind->copy();
00418 }
00419 isConsistent_[ind->index] = false;
00420 }
00421
00422
00423
00424 void Algorithm::setConsistency(IndividualP ind)
00425 {
00426 isConsistent_[ind->index] = true;
00427 storedInds_[ind->index]->fitness = (FitnessP) ind->fitness->copy();
00428 }
00429
00430
00431
00432 void Algorithm::storeGenotypes(std::vector<IndividualP>& pool)
00433 {
00434 for(uint ind = 0; ind < pool.size(); ind++) {
00435 uint index = pool[ind]->index;
00436
00437
00438 uint cid = 0;
00439 while(cid < sentInds_[index].size()) {
00440 if(!sentInds_[index][cid])
00441 break;
00442 cid++;
00443 }
00444 if(cid == sentInds_[index].size())
00445 sentInds_[index].resize(cid + 1);
00446
00447
00448 pool[ind]->cid = cid;
00449 sentInds_[index][cid] = (IndividualP) pool[ind]->copy();
00450 }
00451 }
00452
00453
00454
00455 void Algorithm::restoreIndividuals(std::vector<uint> received)
00456 {
00457 DemeP myDeme = state_->getPopulation()->getLocalDeme();
00458
00459 for(uint ind = 0; ind < received.size(); ind++) {
00460 uint index = received[ind];
00461 uint cid = myDeme->at(index)->fitness->cid;
00462
00463
00464 if(isConsistent_[index] == false) {
00465 sentInds_[index][cid]->fitness = myDeme->at(index)->fitness;
00466 replaceWith(myDeme->at(index), sentInds_[index][cid]);
00467
00468 isConsistent_[index] = true;
00469 storedInds_[index]->fitness = (FitnessP) myDeme->at(index)->fitness->copy();
00470 }
00471
00472 else {
00473 myDeme->at(index)->fitness = (FitnessP) storedInds_[index]->fitness->copy();
00474 }
00475 sentInds_[index][cid].reset();
00476 }
00477 }
00478
00479
00480
00481 void Algorithm::restorePopulation()
00482 {
00483 DemeP myDeme = state_->getPopulation()->getLocalDeme();
00484 uint demeSize = myDeme->getSize();
00485
00486 for(uint ind = 0; ind < demeSize; ind++) {
00487 if(!isConsistent_[ind]) {
00488 (*myDeme)[ind] = (IndividualP) storedInds_[ind]->copy();
00489 }
00490 }
00491 }
00492
00493
00494 #else // non _MPI
00495
00496
00497 bool Algorithm::initializePopulation(StateP state)
00498 {
00499 for(uint iDeme = 0; iDeme < state->getPopulation()->size(); iDeme++)
00500 for(uint iInd = 0; iInd < state->getPopulation()->at(iDeme)->size(); iInd++)
00501 evaluate(state->getPopulation()->at(iDeme)->at(iInd));
00502 return true;
00503 }
00504
00505 #endif // _MPI