Promise的并发控制 - 从普通并发池到动态并发池_promise.race 并发控制-CSDN博客
/**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
* @param taskList 任务列表
* @param max 最大并发数量
* @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条
* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
*/
export const promisePool = <T>(taskList: task<T>[], limit: number) => {
return new Promise<T[]>(async (resolve, reject) => {
try {
const length = taskList.length
/**当前并发池 */
const pool: Promise<T>[] = []
/**结果数组 */
const res = new Array<T>(length)
/**完成的数量 */
let count = 0
for (let i = 0; i < length; i++) {
const task = taskList[i]();
//promise结束的回调
const handler = (info: T) => {
pool.splice(pool.indexOf(task), 1) //任务执行完就删除
res[i] = info //不能使用res.push,否则不能保证结果顺序
count++
if (count === length) {
resolve(res)
}
}
task.then((data) => {
handler(data)
console.log(`第${i}个任务完成,结果为`, data);
}, (err) => {
handler(err)
console.log(`第${i}个任务失败,原因为`, err);
})
pool.push(task)
//如果到达了并发限制,就等到池子中任意一个结束
if (pool.length >= limit) {
await Promise.race(pool)
}
}
} catch (error) {
console.error('并发池出错', error);
reject(error)
}
})
}
/**Promise并发池 - 可终止 - 每次都创建一个实例,避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStatic<T, Err>{
/**是否取消。在循环中若发现这个变成了true,就会中断 */
private isStop = false
/**运行静态Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量
* @param taskList 任务列表
* @param max 最大并发数量
* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果
*/
run = async (taskList: task<T>[], max: number) => {
return new Promise<Array<T | Err>>(async (resolve, reject) => {
type resType = T | Err
try {
this.isStop = false //开始的时候设为false
const length = taskList.length
const pool: Promise<resType>[] = []//并发池
let count = 0//当前结束了几个
const res = new Array<resType>(length)
for (let i = 0; i < length; i++) {
let task = taskList[i]();
if (this.isStop) return reject('并发池终止')
//成功和失败都要执行的函数
const handler = (_res: resType) => {
pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务
res[i] = _res //放入结果数组
count++
if (count === length) {
return resolve(res)
}
}
task.then((data) => {
handler(data)
console.log(`第${i}个任务完成,结果为`, data);
}, (err) => {
handler(err)
console.log(`第${i}个任务失败,原因为`, err);
})
pool.push(task);
if (pool.length === max) {
//利用Promise.race方法来获得并发池中某任务完成的信号,当有任务完成才让程序继续执行,让循环把并发池塞满
await Promise.race(pool)
}
}
} catch (error) {
console.error('promise并发池出错', error);
reject(error)
}
})
}
/**停止并发池运行 */
stop = () => {
this.isStop = true
}
}
type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }
/**动态并发池 */
export class PromisePoolDynamic<T> {
/**最大并发数量 */
private limit: number;
/**当前正在跑的数量 */
private runningCount: number;
/**等待队列 */
private queue: Array<taskWithCallbacks<T>>;
/**动态并发池 - 构造函数
* @param maxConcurrency 最大并发数量
*/
constructor(maxConcurrency: number) {
this.limit = maxConcurrency;
this.runningCount = 0;
this.queue = [];
}
/**添加任务
* @param task 任务,() => Promise<T>
* @returns 结果
*/
addTask(task: task<T>) {
//返回一个新的Promise实例,在任务完成前,会一直是pending状态
return new Promise<T>((resolve, reject) => {
const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;
if (this.runningCount < this.limit) {//并发数量没满则运行
console.log('任务添加:当前并发数', this.runningCount, '并发数量未满,直接运行');
this.runTask(taskWithCallbacks);
} else {//并发数量满则加入等待队列
console.log('任务添加:当前并发数', this.runningCount, '并发数量满,挂起等待');
this.queue.push(taskWithCallbacks);
}
});
}
/**运行任务
* @param taskWithCallback 带有resolve和reject的任务
*/
private runTask(taskWithCallback: taskWithCallbacks<T>) {
this.runningCount++;//当前并发数++
taskWithCallback.task()//从对象中取出任务执行
.then(result => {
this.runningCount--;
taskWithCallback.resolve(result);
console.log('任务完成', result, '当前并发数', this.runningCount);
this.checkQueue();
})
.catch(error => {
this.runningCount--;
taskWithCallback.reject(error);
this.checkQueue();
});
}
/**运行完成后,检查队列,看看是否有在等待的,有就取出第一个来运行 */
private checkQueue() {
if (this.queue.length > 0 && this.runningCount < this.limit) {
const nextTask = this.queue.shift()!;
console.log('并发池出现空位,取出等待队列的任务', nextTask);
this.runTask(nextTask);
}
}
}