Data-JPack

 view release on metacpan or  search on metacpan

share/js/01-workerpool.js  view on Meta::CPAN

	}
}



//Stub class providing functional access to worker
class WorkerTemplate{
	constructor(parameters){
		this.id=parameters.id;
		let p=JSON.stringify(parameters);

		let code="("+bootstrap.toString();
		code+=")(\'"+p+"\')";
		//console.log(code);
		//this.promise=undefined;
		let blob=new Blob([code],{type:"text/javascript"});
		this.worker=new Worker(window.URL.createObjectURL(blob));
		this.worker.addEventListener("message",(e)=>this.fromWorker(e));

		//Setup a queue for requests
		this.isBusy=false;
		this.requestQueue=[];
		this.responseQueue=[];//this should only every have 0 or 1 elements
	}

	fromWorker(e){

		switch(e.data.cmd){
			case "callFunctionReturn":
				let r=this.responseQueue.shift();
				//console.log(`Response from worker ${this.id}`);
				r.resolver(e);
				this.isBusy=false;
				this._executeNext();	//Trigger the queue again
				//Resolve the promise with the e
				//this.manager.returnCall(this, e);
				break;
			default:
				break;
		}

		this.worker
	}



	callFunction(name, arg, transfer){
		//Add to the queue
		let resolver;
    let rejecter;
		let promise=new Promise((resolve, reject)=>{
			resolver=resolve;
      rejecter=reject;
		});
		this.requestQueue.push({name,arg,transfer,resolver,rejecter});
		this._executeNext();
		return promise;

	}
	_executeNext(){
    //console.log("is busy", this.isBusy);
		if((!this.isBusy) && (this.requestQueue.length>0)){
			//console.log(`POSTING TO WORKER ${this.id}`);
			this.isBusy=true;
			let r=this.requestQueue.shift();
			this.responseQueue.push(r);
      //console.log("_in execute next", r);
      try {
			  this.worker.postMessage({cmd:"callFunction",name:r.name, arg:r.arg },r.transfer);
      }
      catch(e){
        let er={data:{name:"error", result:undefined}};
        // Could not send message, probably a bad argument. Make as not busy
				let r=this.responseQueue.shift();
				//console.log(`Response from worker ${this.id}`);
				r.rejecter(er);
				this.isBusy=false;
				this._executeNext();	//Trigger the queue again
      }

		}
	}
}


class WorkerPool {
	constructor(count){
		this.availableQueue=[];
		this.pool=[];
		this.callQueue=[];
		this.callMap={};
		this.addWorkers(count);
		this.callID=0;
		this.existingFunctions=[];
	}

	addWorkers(count){
		//Generate pool of workers
		let w;
		let id=this.pool.length;
		for(let i=0;i<count;i++){
			w=new WorkerTemplate({id:id+i});
			this.pool.push(w);
			this.availableQueue.push(w);
			w.manager=this;
		}
	}

	addScriptBody(string){
    //console.log("Add script body");
    //console.log(string);
		let p=[];
		for(let i=0;i<this.pool.length;i++){
			p.push(this.pool[i].callFunction("importScript",string,[]));
		}
		return Promise.all(p);
		
	}
	addFunctions(refs,names){
    //console.log("Add functions");
    //console.log(refs);
		let p=[];
		if(names===undefined){
			names=new Array(refs.length);
		}
		//console.log(refs,names);
		for(let i=0;i<refs.length;i++){
			p.push(this.addFunction(refs[i],names[i]));
		}
		return Promise.all(p);
	}
	//Directly calls each worker to add the function.
	addFunction(ref,name,apiFlag){



( run in 1.762 second using v1.01-cache-2.11-cpan-39bf76dae61 )