Code indexing in gitaly is broken and leads to code not being visible to the user. We work on the issue with highest priority.

Skip to content
Snippets Groups Projects
Commit d18ab6c7 authored by frey_m's avatar frey_m
Browse files

add sampler worker

parent 89797089
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "Pilot/Poller.h" #include "Pilot/Poller.h"
#include "Pilot/Worker.h" #include "Pilot/Worker.h"
#include "Pilot/SWorker.h"
#include "Optimizer/Optimizer.h" #include "Optimizer/Optimizer.h"
#include "Util/Trace/Trace.h" #include "Util/Trace/Trace.h"
...@@ -263,10 +264,18 @@ private: ...@@ -263,10 +264,18 @@ private:
tmplfile = input_file_.substr(pos+1); tmplfile = input_file_.substr(pos+1);
pos = tmplfile.find("."); pos = tmplfile.find(".");
std::string simName = tmplfile.substr(0,pos); std::string simName = tmplfile.substr(0,pos);
boost::scoped_ptr< Worker<Sim_t> > w(
new Worker<Sim_t>(objectives_, constraints_, simName, if ( cmd_args_->getArg<bool>("sample", true, false) ) {
comm_->getBundle(), cmd_args_));
std::cout << "Sample Worker" << std::endl;
boost::scoped_ptr< SWorker<Sim_t> > w(new SWorker<Sim_t>(objectives_, constraints_, simName,
comm_->getBundle(), cmd_args_));
} else {
boost::scoped_ptr< Worker<Sim_t> > w(new Worker<Sim_t>(objectives_, constraints_, simName,
comm_->getBundle(), cmd_args_));
}
std::cout << "Stop Worker.." << std::endl; std::cout << "Stop Worker.." << std::endl;
} }
......
#ifndef __SWORKER_H__
#define __SWORKER_H__
#include <iostream>
#include "boost/smart_ptr.hpp"
#include "Pilot/Poller.h"
#include "Comm/types.h"
#include "Util/Types.h"
#include "Util/MPIHelper.h"
#include "Util/CmdArguments.h"
#include "Simulation/Simulation.h"
/**
* \class Worker
* \brief A worker MPI entity consists of a processor group that runs a
* simulation of type Sim_t. The main loop in run() accepts new jobs from the
* master process runs the simulation and reports back the results.
*
* @see Pilot
* @see Poller
* @see MPIHelper.h
*
* @tparam Sim_T type of simulation to run
*/
template <class Sim_t>
class SWorker : protected Poller {
public:
SWorker(Expressions::Named_t objectives, Expressions::Named_t constraints,
std::string simName, Comm::Bundle_t comms, CmdArguments_t args)
: Poller(comms.worker)
, cmd_args_(args)
{
objectives_ = objectives;
constraints_ = constraints;
simulation_name_ = simName;
pilot_rank_ = comms.master_local_pid;
is_idle_ = true;
coworker_comm_ = comms.coworkers;
leader_pid_ = 0;
int my_local_pid = 0;
MPI_Comm_rank(coworker_comm_, &my_local_pid);
MPI_Comm_size(coworker_comm_, &num_coworkers_);
// distinction between leader and coworkers
if(my_local_pid == leader_pid_)
run();
else
runCoWorker();
}
~SWorker()
{}
private:
typedef boost::scoped_ptr<Sim_t> SimPtr_t;
bool is_idle_;
MPI_Comm coworker_comm_;
Expressions::Named_t objectives_;
Expressions::Named_t constraints_;
/// coworkers simply wait on a job broadcast from the leader and then
/// start a simulation..
void runCoWorker() {
MPI_Request stop_req;
size_t stop_value = 0;
MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
MPI_ANY_TAG, coworker_comm_, &stop_req);
is_running_ = true;
while(is_running_) {
//FIXME: bcast blocks after our leader stopped working
// Either we create a new class implementing a coworker in the
// same manner as the worker (poll loop). Anyway there is no way
// around removing the Bcast and adding another tag in the poll
// loop above in order to be able to exit cleanly.
if(stop_req != MPI_REQUEST_NULL) {
MPI_Status status;
int flag = 0;
MPI_Test(&stop_req, &flag, &status);
if(flag) {
if(status.MPI_TAG == MPI_COWORKER_NEW_JOB_TAG) {
Param_t params;
MPI_Bcast_params(params, leader_pid_, coworker_comm_);
try {
SimPtr_t sim(new Sim_t(objectives_, constraints_,
params, simulation_name_, coworker_comm_,
cmd_args_));
sim->run();
} catch(OptPilotException &ex) {
std::cout << "Exception while running simulation: "
<< ex.what() << std::endl;
}
MPI_Irecv(&stop_value, 1, MPI_UNSIGNED_LONG, leader_pid_,
MPI_ANY_TAG, coworker_comm_, &stop_req);
}
if(status.MPI_TAG == MPI_STOP_TAG) {
is_running_ = false;
break;
}
}
}
}
}
protected:
int leader_pid_;
int num_coworkers_;
int pilot_rank_;
std::string simulation_name_;
CmdArguments_t cmd_args_;
/// notify coworkers of incoming broadcast
void notifyCoWorkers(int tag) {
for(int i=0; i < num_coworkers_; i++) {
if(i == leader_pid_) continue;
size_t dummy = 0;
MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, i, tag, coworker_comm_);
}
}
void setupPoll() {
size_t dummy = 1;
MPI_Send(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
MPI_WORKER_STATUSUPDATE_TAG, comm_m);
}
void prePoll()
{}
void postPoll()
{}
void onStop() {
if(num_coworkers_ > 1)
notifyCoWorkers(MPI_STOP_TAG);
}
bool onMessage(MPI_Status status, size_t recv_value) override {
std::cout << "SWorker::onMessage" << std::endl;
if(status.MPI_TAG == MPI_WORK_JOBID_TAG) {
is_idle_ = false;
size_t job_id = recv_value;
// get new job
Param_t params;
MPI_Recv_params(params, (size_t)pilot_rank_, comm_m);
// and forward to coworkers (if any)
if(num_coworkers_ > 1) {
notifyCoWorkers(MPI_COWORKER_NEW_JOB_TAG);
MPI_Bcast_params(params, leader_pid_, coworker_comm_);
}
//XXX we need to know if we want EVAL or DERIVATIVE
//reqVarContainer_t reqVars;
//MPI_Recv_reqvars(reqVars, (size_t)pilot_rank_, comm_m);
try {
SimPtr_t sim(new Sim_t(objectives_,
constraints_,
params,
simulation_name_,
coworker_comm_,
cmd_args_));
// run simulation in a "blocking" fashion
sim->run();
} catch(OptPilotException &ex) {
std::cout << "Exception while running simulation: "
<< ex.what() << std::endl;
}
MPI_Send(&job_id, 1, MPI_UNSIGNED_LONG, pilot_rank_,
MPI_WORKER_FINISHED_TAG, comm_m);
size_t dummy = 0;
MPI_Recv(&dummy, 1, MPI_UNSIGNED_LONG, pilot_rank_,
MPI_WORKER_FINISHED_ACK_TAG, comm_m, &status);
is_idle_ = true;
return true;
} else {
std::stringstream os;
os << "Unexpected MPI_TAG: " << status.MPI_TAG;
std::cout << "(Worker) Error: " << os.str() << std::endl;
throw OptPilotException("Worker::onMessage", os.str());
}
}
};
#endif
...@@ -320,6 +320,8 @@ void OpalSimulation::run() { ...@@ -320,6 +320,8 @@ void OpalSimulation::run() {
void OpalSimulation::collectResults() { void OpalSimulation::collectResults() {
std::cout << "collectResults" << std::endl;
// clear old solutions // clear old solutions
requestedVars_.clear(); requestedVars_.clear();
......
...@@ -117,7 +117,6 @@ void SampleCmd::execute() { ...@@ -117,7 +117,6 @@ void SampleCmd::execute() {
// prepare function dictionary and add all available functions in // prepare function dictionary and add all available functions in
// expressions // expressions
functionDictionary_t funcs; functionDictionary_t funcs;
client::function::type ff;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
...@@ -205,6 +204,10 @@ void SampleCmd::execute() { ...@@ -205,6 +204,10 @@ void SampleCmd::execute() {
setenv("FIELDMAPS", dir.c_str(), 1); setenv("FIELDMAPS", dir.c_str(), 1);
} }
std::string argument = "--sample=1";
arguments.push_back(argument);
*gmsg << endl; *gmsg << endl;
for (size_t i = 0; i < arguments.size(); ++ i) { for (size_t i = 0; i < arguments.size(); ++ i) {
......
...@@ -124,6 +124,8 @@ private: ...@@ -124,6 +124,8 @@ private:
/// current generation /// current generation
int act_sample_m; int act_sample_m;
int done_sample_m;
enum State { enum State {
SUBMIT, SUBMIT,
STOP STOP
......
...@@ -62,8 +62,9 @@ Sampler/*<SO>*/::~Sampler() ...@@ -62,8 +62,9 @@ Sampler/*<SO>*/::~Sampler()
/*template< template <class> class SO >*/ /*template< template <class> class SO >*/
void Sampler/*<SO>*/::initialize() { void Sampler/*<SO>*/::initialize() {
nsamples_m = args_->getArg<int>("nsamples", true); nsamples_m = args_->getArg<int>("nsamples", true) - 1;
act_sample_m = 0; act_sample_m = 0;
done_sample_m = 0;
curState_m = SUBMIT; curState_m = SUBMIT;
gid = 0; gid = 0;
...@@ -109,98 +110,36 @@ bool Sampler/*<SO>*/::onMessage(MPI_Status status, size_t length) { ...@@ -109,98 +110,36 @@ bool Sampler/*<SO>*/::onMessage(MPI_Status status, size_t length) {
std::cout << "OPT_NEW_JOB_TAG" << std::endl; std::cout << "OPT_NEW_JOB_TAG" << std::endl;
dispatch_forward_solves(); dispatch_forward_solves();
// individuals_m. return true;
// size_t buf_size = length;
// size_t pilot_rank = status.MPI_SOURCE;
//
// char *buffer = new char[buf_size];
// MPI_Recv(buffer, buf_size, MPI_CHAR, pilot_rank,
// MPI_EXCHANGE_SOL_STATE_RES_TAG, comms_.opt, &status);
//
// SolutionState_t new_states;
// std::istringstream is(buffer);
// boost::archive::text_iarchive ia(is);
// ia >> new_states;
// delete[] buffer;
//
// std::set<unsigned int> new_state_ids;
// foreach(individual ind, new_states) {
//
// // only insert individual if not already in population
// if(variator_m->population()->isRepresentedInPopulation(ind.genes))
// continue;
//
// boost::shared_ptr<individual> new_ind(new individual);
// new_ind->genes = ind.genes;
// new_ind->objectives = ind.objectives;
//
// //XXX: can we pass more than lambda_m files to selector?
// unsigned int id =
// variator_m->population()->add_individual(new_ind);
// finishedBuffer_m.push(id);
//
// }
// return true;
} }
/*
case REQUEST_FINISHED: { case REQUEST_FINISHED: {
std::cout << "REQUEST_FINISHED" << std::endl; std::cout << "REQUEST_FINISHED" << std::endl;
// unsigned int jid = static_cast<unsigned int>(length); unsigned int jid = static_cast<unsigned int>(length);
// typename std::map<size_t, boost::shared_ptr<individual> >::iterator it; typename std::map<size_t, boost::shared_ptr<individual> >::iterator it;
// it = jobmapping_m.find(jid); it = jobmapping_m.find(jid);
//
// if(it == jobmapping_m.end()) {
// dump << "\t |-> NOT FOUND!" << std::endl;
// std::cout << "NON-EXISTING JOB with ID = " << jid << std::endl;
// throw OptPilotException("Sampler::onMessage",
// "non-existing job");
// }
//
// boost::shared_ptr<individual> ind = it->second;
// jobmapping_m.erase(it);
//
// reqVarContainer_t res;
// MPI_Recv_reqvars(res, status.MPI_SOURCE, comms_.opt);
//
// ind->objectives.clear();
//
// //XXX: check order of genes
// reqVarContainer_t::iterator itr;
// std::map<std::string, double> vars;
// for(itr = res.begin(); itr != res.end(); itr++) {
// // mark invalid if expression could not be evaluated or constraint does not hold
// if(!itr->second.is_valid || (itr->second.value.size() > 1 && !itr->second.value[0])) {
// std::ostringstream dump;
// if (!itr->second.is_valid) {
// dump << "invalid individual, objective or constraint\"" << itr->first
// << "\" failed to be evaluated correctly"
// << std::endl;
// } else {
// dump << "invalid individual, constraint \"" << itr->first
// << "\" failed to yield true; result: " << itr->second.value[1]
// << std::endl;
// }
// // variator_m->infeasible(ind);
// dispatch_forward_solves();
// return true;
// } else {
// // update objective value for valid objective
// if(itr->second.value.size() == 1)
// ind->objectives.push_back(itr->second.value[0]);
// }
// }
//
// finishedBuffer_m.push(jid);
if(it == jobmapping_m.end()) {
std::cout << "NON-EXISTING JOB with ID = " << jid << std::endl;
throw OptPilotException("Sampler::onMessage",
"non-existing job");
}
boost::shared_ptr<individual> ind = it->second;
jobmapping_m.erase(it);
done_sample_m++;
finishedBuffer_m.push(jid);
return true; return true;
} }
*/
default: { default: {
std::cout << "(Sampler) Error: unexpected MPI_TAG: " std::cout << "(Sampler) Error: unexpected MPI_TAG: "
<< status.MPI_TAG << std::endl; << status.MPI_TAG << std::endl;
...@@ -235,21 +174,15 @@ void Sampler::createNewIndividual_m() { ...@@ -235,21 +174,15 @@ void Sampler::createNewIndividual_m() {
boost::shared_ptr<Individual_t> ind = boost::shared_ptr<Individual_t>( new Individual_t(dNames)); boost::shared_ptr<Individual_t> ind = boost::shared_ptr<Individual_t>( new Individual_t(dNames));
std::cout << "HI 1" << std::endl;
for (uint i = 0; i < samplingOp_m.size(); ++i) { for (uint i = 0; i < samplingOp_m.size(); ++i) {
samplingOp_m[i]->create(ind, i); samplingOp_m[i]->create(ind, i);
} }
std::cout << "HI 2" << std::endl;
ind->id = gid++; ind->id = gid++;
individuals_m.push(ind); individuals_m.push(ind);
std::cout << "Done." << std::endl;
} }
...@@ -267,13 +200,11 @@ void Sampler::runStateMachine() { ...@@ -267,13 +200,11 @@ void Sampler::runStateMachine() {
case SUBMIT: { case SUBMIT: {
std::cout << "SUBMIT" << std::endl;
if ( act_sample_m == nsamples_m ) { if ( done_sample_m == nsamples_m) {
curState_m = STOP; curState_m = STOP;
} else { } else {
act_sample_m++;
// if(parent_queue_.size() > 0) { // if(parent_queue_.size() > 0) {
// std::vector<unsigned int> parents(parent_queue_.begin(), // std::vector<unsigned int> parents(parent_queue_.begin(),
...@@ -292,7 +223,12 @@ void Sampler::runStateMachine() { ...@@ -292,7 +223,12 @@ void Sampler::runStateMachine() {
// // feed results back to the selector. // // feed results back to the selector.
// //FIXME: variate works on staging set!!! // //FIXME: variate works on staging set!!!
// variator_m->variate(parents); // variator_m->variate(parents);
dispatch_forward_solves();
if ( act_sample_m != nsamples_m ) {
std::cout << "SUBMIT" << std::endl;
act_sample_m++;
dispatch_forward_solves();
}
// //
// curState_m = Variate; // curState_m = Variate;
// } // }
...@@ -328,14 +264,10 @@ void Sampler::dispatch_forward_solves() { ...@@ -328,14 +264,10 @@ void Sampler::dispatch_forward_solves() {
individuals_m.pop(); individuals_m.pop();
std::cout << "hello 1" << std::endl;
Param_t params; Param_t params;
DVarContainer_t::iterator itr; DVarContainer_t::iterator itr;
size_t i = 0; size_t i = 0;
std::cout << "hello 2" << std::endl;
for(itr = dvars_m.begin(); itr != dvars_m.end(); itr++, i++) { for(itr = dvars_m.begin(); itr != dvars_m.end(); itr++, i++) {
params.insert( params.insert(
std::pair<std::string, double> std::pair<std::string, double>
...@@ -343,23 +275,15 @@ void Sampler::dispatch_forward_solves() { ...@@ -343,23 +275,15 @@ void Sampler::dispatch_forward_solves() {
ind->genes[i])); ind->genes[i]));
} }
std::cout << "hello 3" << std::endl;
size_t jid = static_cast<size_t>(ind->id); size_t jid = static_cast<size_t>(ind->id);
int pilot_rank = comms_.master_local_pid; int pilot_rank = comms_.master_local_pid;
std::cout << "hello 4" << std::endl;
// now send the request to the pilot // now send the request to the pilot
MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, pilot_rank, OPT_NEW_JOB_TAG, comms_.opt); MPI_Send(&jid, 1, MPI_UNSIGNED_LONG, pilot_rank, OPT_NEW_JOB_TAG, comms_.opt);
MPI_Send_params(params, pilot_rank, comms_.opt); MPI_Send_params(params, pilot_rank, comms_.opt);
std::cout << "hello 5" << std::endl;
jobmapping_m.insert( jobmapping_m.insert(
std::pair<size_t, boost::shared_ptr<individual> >(jid, ind)); std::pair<size_t, boost::shared_ptr<individual> >(jid, ind));
} }
std::cout << "Done." << std::endl;
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment