Data-JPack
view release on metacpan or search on metacpan
share/js/01-workerpool.js view on Meta::CPAN
}
}
//Stub class providing functional access to worker
class WorkerTemplate{
constructor(parameters){
this.id=parameters.id;
let p=JSON.stringify(parameters);
let code="("+bootstrap.toString();
code+=")(\'"+p+"\')";
//console.log(code);
//this.promise=undefined;
let blob=new Blob([code],{type:"text/javascript"});
this.worker=new Worker(window.URL.createObjectURL(blob));
this.worker.addEventListener("message",(e)=>this.fromWorker(e));
//Setup a queue for requests
this.isBusy=false;
this.requestQueue=[];
this.responseQueue=[];//this should only every have 0 or 1 elements
}
fromWorker(e){
switch(e.data.cmd){
case "callFunctionReturn":
let r=this.responseQueue.shift();
//console.log(`Response from worker ${this.id}`);
r.resolver(e);
this.isBusy=false;
this._executeNext(); //Trigger the queue again
//Resolve the promise with the e
//this.manager.returnCall(this, e);
break;
default:
break;
}
this.worker
}
callFunction(name, arg, transfer){
//Add to the queue
let resolver;
let rejecter;
let promise=new Promise((resolve, reject)=>{
resolver=resolve;
rejecter=reject;
});
this.requestQueue.push({name,arg,transfer,resolver,rejecter});
this._executeNext();
return promise;
}
_executeNext(){
//console.log("is busy", this.isBusy);
if((!this.isBusy) && (this.requestQueue.length>0)){
//console.log(`POSTING TO WORKER ${this.id}`);
this.isBusy=true;
let r=this.requestQueue.shift();
this.responseQueue.push(r);
//console.log("_in execute next", r);
try {
this.worker.postMessage({cmd:"callFunction",name:r.name, arg:r.arg },r.transfer);
}
catch(e){
let er={data:{name:"error", result:undefined}};
// Could not send message, probably a bad argument. Make as not busy
let r=this.responseQueue.shift();
//console.log(`Response from worker ${this.id}`);
r.rejecter(er);
this.isBusy=false;
this._executeNext(); //Trigger the queue again
}
}
}
}
class WorkerPool {
constructor(count){
this.availableQueue=[];
this.pool=[];
this.callQueue=[];
this.callMap={};
this.addWorkers(count);
this.callID=0;
this.existingFunctions=[];
}
addWorkers(count){
//Generate pool of workers
let w;
let id=this.pool.length;
for(let i=0;i<count;i++){
w=new WorkerTemplate({id:id+i});
this.pool.push(w);
this.availableQueue.push(w);
w.manager=this;
}
}
addScriptBody(string){
//console.log("Add script body");
//console.log(string);
let p=[];
for(let i=0;i<this.pool.length;i++){
p.push(this.pool[i].callFunction("importScript",string,[]));
}
return Promise.all(p);
}
addFunctions(refs,names){
//console.log("Add functions");
//console.log(refs);
let p=[];
if(names===undefined){
names=new Array(refs.length);
}
//console.log(refs,names);
for(let i=0;i<refs.length;i++){
p.push(this.addFunction(refs[i],names[i]));
}
return Promise.all(p);
}
//Directly calls each worker to add the function.
addFunction(ref,name,apiFlag){
( run in 1.762 second using v1.01-cache-2.11-cpan-39bf76dae61 )