并发控制
简单 asyncPool
实现了一个异步池(asyncPool)函数,它可以处理一个包含大量异步任务的任务数组。这个函数的主要目的是为了限制并发执行的任务数量,从而提高性能。
js
async function asyncPool(poolLimit, array, iteratorFn) {
// 首先,创建一个空数组来保存所有的 Promise 对象
const ret = [] // 保存所有的 Promise 对象
const executing = [] // 保存正在执行的 Promise 对象
// 使用 for 循环遍历数组
for (const item of array) {
// 使用 Promise.resolve() 将参数封装成 Promise 对象
const p = Promise.resolve().then(() => iteratorFn(item, array)) //
ret.push(p) // 上行是异步执行的,所以保存新的异步任务
// // 当poolLimit值小于或等于总任务个数时,进行并发控制
if (poolLimit <= array.length) {
// 控制并行
// // 当任务完成后,从正在执行的任务数组中移除已完成的任务
const e = p.then(() => executing.splice(executing.indexOf(e), 1))
executing.push(e) // e为正在执行的异步
if (executing.length >= poolLimit) {
await Promise.race(executing) // 等待板块的任务执行完成
}
}
}
return Promise.all(ret)
}
实现原理:
- 首先,这个函数接受三个参数:poolLimit(最大并发任务数)、array(任务数组)和 iteratorFn(任务执行函数)。
- 它创建了一个空数组 ret 来保存所有的 Promise 对象,以及一个空数组 executing 来保存正在执行的 Promise 对象。
- 使用 for 循环遍历任务数组,并将每个任务封装成一个 Promise 对象。
- 当 poolLimit 值小于或等于总任务个数时,进行并发控制。
- 当任务完成后,从正在执行的任务数组中移除已完成的任务。
- 当正在执行的任务数组中的任务数量超过 poolLimit 时,等待这些任务执行完成。
- 最后,返回一个 Promise 对象,当所有任务完成后,这个 Promise 对象会 resolve。 用途:
这个函数可以用于处理大量异步任务,从而提高性能。例如,在网页加载时,需要处理一系列 DOM 操作和 ajax 请求,使用 asyncPool 函数可以并发执行这些任务,从而提高加载速度。
使用示例:
js
const timeout = i =>
new Promise(resolve =>
setTimeout(() => {
console.log(i)
resolve(i)
}, i)
)
// 当然,limit <= 0 的时候 我们可以理解为只允许一个请求存在
asyncPool(2, [8000, 5000, 3000, 2000], timeout).then(res => {
console.log(res)
})
/*
5000
8000
3000
2000
[ 8000, 5000, 3000, 2000 ]
*/
asyncPool 另一种写法
js
function asyncPoolE7(poolLimit, array, iteratorFn) {
let i = 0
const ret = [] // 存储所有的异步任务
const executing = [] // 存储正在执行的异步任务
const enqueue = function () {
if (i === array.length) {
return Promise.resolve()
}
const item = array[i++] // 获取新的任务项
const p = Promise.resolve().then(() => iteratorFn(item, array))
ret.push(p)
let r = Promise.resolve()
// 当poolLimit值小于或等于总任务个数时,进行并发控制
if (poolLimit <= array.length) {
// 当任务完成后,从正在执行的任务数组中移除已完成的任务
const e = p.then(() => executing.splice(executing.indexOf(e), 1))
executing.push(e)
if (executing.length >= poolLimit) {
r = Promise.race(executing)
}
}
// 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
return r.then(() => enqueue())
}
return enqueue().then(() => Promise.all(ret))
}
任务队列控制并发
js
class SuperTask {
constructor(paralleCount = 2) {
this.tasks = []
this.paralleCount = paralleCount // 并发数量
this.runningCount = 0 // 正在运行的任务数量
}
/**
* 添加任务
* @param {*} task
*/
add(task) {
return new Promise((resolve, reject) => {
this.tasks.push({ task, resolve, reject })
this._run()
})
}
/**
* 执行任务
*/
_run() {
while (this.runningCount < this.paralleCount && this.tasks.length > 0) {
const { task, resolve, reject } = this.tasks.shift()
this.runningCount++
task()
.then(resolve, reject)
.finally(() => {
this.runningCount--
this._run()
})
}
}
}
实现了一个名为SuperTask的类,它可以管理多个并发任务。它的实现原理是将任务添加到tasks数组中,然后通过_run方法执行这些任务。_run方法会根据配置的paralleCount参数来同时执行最多paralleCount个任务,以确保并发执行。当所有任务执行完成后,会自动执行下一个任务,直到所有任务都完成。
主要功能:
- 添加任务:通过
add方法添加一个新的任务,每个任务都是一个 Promise,可以在 Promise 的resolve和reject方法中处理任务的结果和错误。 - 执行任务:
_run方法负责执行任务,它会根据paralleCount参数来同时执行最多paralleCount个任务。 - 自动执行下一个任务:当所有任务执行完成后,会自动执行下一个任务,直到所有任务都完成。
注意事项:
SuperTask类的实例化需要提供paralleCount参数,默认值为 2。这个参数决定了同时执行的任务数量,too large a number may cause performance issues.- 添加任务的方法
add返回一个 Promise,可以在 Promise 的then和catch方法中处理任务的结果和错误。 _run方法会不断检查tasks数组中是否有任务可以执行,如果tasks数组为空,它会自动等待任务添加到数组中。
使用示例
javascript
function timeout(time) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve()
}, time)
})
}
const superTask = new SuperTask()
function addTask(time, name) {
superTask
.add(() => timeout(time))
.then(result => {
console.log(`任务${name}完成`)
})
}
addTask(2000, 1)
addTask(2000, 2)
addTask(2000, 3)
addTask(2000, 4)
控制并发,并且重试
javascript
function sendRequest(requestList, limits, callback, retryTimes =0) {
// 定义执行队列,表示所有待执行的任务
const taskQueue = [];
// 定义开始时能执行的并发数
const maxConcurrentNum = Math.min(limits, requestList.length);
// 定义放在allSettled的所有promise
let allPromises = [];
// 当前并发数
let concurrentCount = 0;
// 包裹promise,并且将相关信息重新包装放入请求队列
const wrapPromise = (requestItem)=>{
return new Promise((resolve,reject)=>{
// 构建执行队列
taskQueue.push({
requestFn:requestItem, // 请求函数放到此处
resolve,
reject,
remainRetryTime:retryTimes // 剩余重试次数
})
})
};
// 启动初次能执行的任务
const runTaskNeeded = () => {
let i = 0
// 启动当前的任务
while(i < maxConcurrentNum){
i++
runTask()
}
};
// 取出任务并推送到执行器
const runTask = () => {
const task = taskQueue.shift()
task && runner(task)
};
// 执行器,这里去执行任务
const runner = async (task) => {
const {
requestFn,
resolve,
reject,
remainRetryTime
} = task;
try {
// 并发数 +1
concurrentCount++
// 执行任务
const res = await requestFn()
// 拿到结果,直接结束
resolve(res)
} catch (error) {
// 判断还有无重试次数
if(remainRetryTime > 0){
// 重新放回队列,注意这样并不会影响allSettled结果的顺序
taskQueue.push(task)
// 剩余重试次数-1
task.remainRetryTime --
}else {
// 没有剩余次数则直接结束
reject(error)
}
}finally{
// 并发数-1
concurrentCount--
// 捞起下一个任务
picker()
}
};
// 捞起下一个任务
const picker = () => {
if(concurrentCount < limits && taskQueue.length > 0 ){
// 继续执行任务
runTask()
}
};
// 初始化,构建执行队列以及包裹promise
const init = ()=>{
for(let requestItem of requestList){
const wrappedPromise = wrapPromise(requestItem)
// 构建包裹promise的数组,用于allSettled
allPromises.push(wrappedPromise)
}
}
// 开始执行函数
const start = ()=>{
init()
runTaskNeeded()
}
// 开始
start()
// allSettled用来获取结果
Promise.allSettled(allPromises).then(callback,callback)
}
这段代码实现了一个基于 Promise 的异步请求发送函数sendRequest。它的用途是处理一个包含多个请求的列表,这些请求需要按照一定的并发限制来执行。函数允许定义请求失败后的重试次数。以下是对于这段代码的解释:
实现原理: 使用了一个名为
taskQueue的队列来存储待执行的任务。当所有任务都入队后,函数会根据并发限制来启动最多limits个并发任务。在任务执行过程中,如果发生异常,会将其重新放入队列中,并递减重试次数。当所有任务都执行完毕后,函数会调用传入的callback函数来处理结果。用途: 这个函数可以用于处理 HTTP 或异步操作时的并发请求,以便在满足一定的并发限制条件下,尽可能快速地完成所有请求。
注意事项:
- 函数参数
requestList是一个数组,其中包含需要执行的请求函数。这些函数应该返回一个 Promise 对象。 - 参数
limits是一个正整数,表示可以同时执行的最大并发任务数。 - 参数
callback是一个函数,用于处理所有任务执行完毕后的结果。这个函数可以有多个参数,分别表示成功执行的任务结果数组和失败执行的任务结果数组。 - 参数
retryTimes是一个可选参数,用于定义每个请求失败后的重试次数。默认值为 0,表示不重试。
- 函数参数
javascript
// 示例
const requestList = [
() =>
new Promise((resolve, reject) => {
setTimeout(() => {
resolve('请求1成功')
}, 1000)
}),
() =>
new Promise((resolve, reject) => {
setTimeout(() => {
resolve('请求2成功')
}, 1000)
}),
() =>
new Promise((resolve, reject) => {
setTimeout(() => {
reject('请求3失败')
}, 1000)
}),
() =>
new Promise((resolve, reject) => {
setTimeout(() => {
resolve('请求4成功')
}, 1000)
}),
]
const limits = 2
sendRequest(requestList, limits, (result1, result2) => {
console.log('成功请求:', result1)
console.log('失败请求:', result2)
})