Skip to content
On this page

并发控制

简单 asyncPool

实现了一个异步池(asyncPool)函数,它可以处理一个包含大量异步任务的任务数组。这个函数的主要目的是为了限制并发执行的任务数量,从而提高性能。

js
async function asyncPool(poolLimit, array, iteratorFn) {
  // 首先,创建一个空数组来保存所有的 Promise 对象
  const ret = [] // 保存所有的 Promise 对象
  const executing = [] // 保存正在执行的 Promise 对象
  // 使用 for 循环遍历数组
  for (const item of array) {
    // 使用 Promise.resolve() 将参数封装成 Promise 对象
    const p = Promise.resolve().then(() => iteratorFn(item, array)) //
    ret.push(p) // 上行是异步执行的,所以保存新的异步任务
    //  // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 控制并行
      //  // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1))
      executing.push(e) // e为正在执行的异步
      if (executing.length >= poolLimit) {
        await Promise.race(executing) // 等待板块的任务执行完成
      }
    }
  }

  return Promise.all(ret)
}

实现原理:

  1. 首先,这个函数接受三个参数:poolLimit(最大并发任务数)、array(任务数组)和 iteratorFn(任务执行函数)。
  2. 它创建了一个空数组 ret 来保存所有的 Promise 对象,以及一个空数组 executing 来保存正在执行的 Promise 对象。
  3. 使用 for 循环遍历任务数组,并将每个任务封装成一个 Promise 对象。
  4. 当 poolLimit 值小于或等于总任务个数时,进行并发控制。
  5. 当任务完成后,从正在执行的任务数组中移除已完成的任务。
  6. 当正在执行的任务数组中的任务数量超过 poolLimit 时,等待这些任务执行完成。
  7. 最后,返回一个 Promise 对象,当所有任务完成后,这个 Promise 对象会 resolve。 用途:

这个函数可以用于处理大量异步任务,从而提高性能。例如,在网页加载时,需要处理一系列 DOM 操作和 ajax 请求,使用 asyncPool 函数可以并发执行这些任务,从而提高加载速度。

使用示例:

js
const timeout = i =>
  new Promise(resolve =>
    setTimeout(() => {
      console.log(i)
      resolve(i)
    }, i)
  )
// 当然,limit <= 0 的时候 我们可以理解为只允许一个请求存在
asyncPool(2, [8000, 5000, 3000, 2000], timeout).then(res => {
  console.log(res)
})

/*
5000
8000
3000
2000
[ 8000, 5000, 3000, 2000 ]
*/

asyncPool 另一种写法

js
function asyncPoolE7(poolLimit, array, iteratorFn) {
  let i = 0
  const ret = [] // 存储所有的异步任务
  const executing = [] // 存储正在执行的异步任务
  const enqueue = function () {
    if (i === array.length) {
      return Promise.resolve()
    }
    const item = array[i++] // 获取新的任务项
    const p = Promise.resolve().then(() => iteratorFn(item, array))
    ret.push(p)

    let r = Promise.resolve()

    // 当poolLimit值小于或等于总任务个数时,进行并发控制
    if (poolLimit <= array.length) {
      // 当任务完成后,从正在执行的任务数组中移除已完成的任务
      const e = p.then(() => executing.splice(executing.indexOf(e), 1))
      executing.push(e)
      if (executing.length >= poolLimit) {
        r = Promise.race(executing)
      }
    }

    // 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
    return r.then(() => enqueue())
  }
  return enqueue().then(() => Promise.all(ret))
}

任务队列控制并发

js
class SuperTask {
  constructor(paralleCount = 2) {
    this.tasks = []
    this.paralleCount = paralleCount // 并发数量
    this.runningCount = 0 // 正在运行的任务数量
  }

  /**
   * 添加任务
   * @param {*} task
   */
  add(task) {
    return new Promise((resolve, reject) => {
      this.tasks.push({ task, resolve, reject })
      this._run()
    })
  }

  /**
   * 执行任务
   */
  _run() {
    while (this.runningCount < this.paralleCount && this.tasks.length > 0) {
      const { task, resolve, reject } = this.tasks.shift()
      this.runningCount++
      task()
        .then(resolve, reject)
        .finally(() => {
          this.runningCount--
          this._run()
        })
    }
  }
}

实现了一个名为SuperTask的类,它可以管理多个并发任务。它的实现原理是将任务添加到tasks数组中,然后通过_run方法执行这些任务。_run方法会根据配置的paralleCount参数来同时执行最多paralleCount个任务,以确保并发执行。当所有任务执行完成后,会自动执行下一个任务,直到所有任务都完成。

主要功能:

  1. 添加任务:通过add方法添加一个新的任务,每个任务都是一个 Promise,可以在 Promise 的resolvereject方法中处理任务的结果和错误。
  2. 执行任务:_run方法负责执行任务,它会根据paralleCount参数来同时执行最多paralleCount个任务。
  3. 自动执行下一个任务:当所有任务执行完成后,会自动执行下一个任务,直到所有任务都完成。

注意事项:

  1. SuperTask类的实例化需要提供paralleCount参数,默认值为 2。这个参数决定了同时执行的任务数量,too large a number may cause performance issues.
  2. 添加任务的方法add返回一个 Promise,可以在 Promise 的thencatch方法中处理任务的结果和错误。
  3. _run方法会不断检查tasks数组中是否有任务可以执行,如果tasks数组为空,它会自动等待任务添加到数组中。

使用示例

javascript
function timeout(time) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve()
    }, time)
  })
}

const superTask = new SuperTask()

function addTask(time, name) {
  superTask
    .add(() => timeout(time))
    .then(result => {
      console.log(`任务${name}完成`)
    })
}

addTask(2000, 1)
addTask(2000, 2)
addTask(2000, 3)
addTask(2000, 4)

控制并发,并且重试

javascript
function sendRequest(requestList, limits, callback, retryTimes =0) {

  // 定义执行队列,表示所有待执行的任务
  const taskQueue = [];

  // 定义开始时能执行的并发数
  const maxConcurrentNum = Math.min(limits, requestList.length);

  // 定义放在allSettled的所有promise
  let allPromises = [];

  // 当前并发数
  let concurrentCount = 0;

  // 包裹promise,并且将相关信息重新包装放入请求队列
  const wrapPromise = (requestItem)=>{
      return new Promise((resolve,reject)=>{
          // 构建执行队列
          taskQueue.push({
              requestFn:requestItem,  // 请求函数放到此处
              resolve,
              reject,
              remainRetryTime:retryTimes // 剩余重试次数
          })
      })
  };

  // 启动初次能执行的任务
  const runTaskNeeded = () => {
      let i = 0
      // 启动当前的任务
      while(i < maxConcurrentNum){
          i++
          runTask()
      }
  };

  // 取出任务并推送到执行器
  const runTask = () => {
      const task = taskQueue.shift()
      task && runner(task)
  };

  // 执行器,这里去执行任务
  const runner = async (task) => {
      const {
          requestFn,
          resolve,
          reject,
          remainRetryTime
      } = task;

      try {
          // 并发数 +1
          concurrentCount++
          // 执行任务
          const res = await requestFn()
          // 拿到结果,直接结束
          resolve(res)

      } catch (error) {
          // 判断还有无重试次数
          if(remainRetryTime > 0){
              // 重新放回队列,注意这样并不会影响allSettled结果的顺序
              taskQueue.push(task)
              // 剩余重试次数-1
              task.remainRetryTime --

          }else {
              // 没有剩余次数则直接结束
              reject(error)
          }

      }finally{
          // 并发数-1
          concurrentCount--
          // 捞起下一个任务
          picker()
      }
  };

  // 捞起下一个任务
  const picker = () => {
      if(concurrentCount < limits && taskQueue.length > 0 ){
          // 继续执行任务
          runTask()
      }
  };

  // 初始化,构建执行队列以及包裹promise
  const init = ()=>{
      for(let requestItem of requestList){
          const wrappedPromise = wrapPromise(requestItem)
          // 构建包裹promise的数组,用于allSettled
          allPromises.push(wrappedPromise)
      }
  }

  // 开始执行函数
  const start = ()=>{
      init()
      runTaskNeeded()
  }

  // 开始
  start()

  // allSettled用来获取结果
  Promise.allSettled(allPromises).then(callback,callback)
}

这段代码实现了一个基于 Promise 的异步请求发送函数sendRequest。它的用途是处理一个包含多个请求的列表,这些请求需要按照一定的并发限制来执行。函数允许定义请求失败后的重试次数。以下是对于这段代码的解释:

  1. 实现原理: 使用了一个名为taskQueue的队列来存储待执行的任务。当所有任务都入队后,函数会根据并发限制来启动最多limits个并发任务。在任务执行过程中,如果发生异常,会将其重新放入队列中,并递减重试次数。当所有任务都执行完毕后,函数会调用传入的callback函数来处理结果。

  2. 用途: 这个函数可以用于处理 HTTP 或异步操作时的并发请求,以便在满足一定的并发限制条件下,尽可能快速地完成所有请求。

  3. 注意事项:

    • 函数参数requestList是一个数组,其中包含需要执行的请求函数。这些函数应该返回一个 Promise 对象。
    • 参数limits是一个正整数,表示可以同时执行的最大并发任务数。
    • 参数callback是一个函数,用于处理所有任务执行完毕后的结果。这个函数可以有多个参数,分别表示成功执行的任务结果数组和失败执行的任务结果数组。
    • 参数retryTimes是一个可选参数,用于定义每个请求失败后的重试次数。默认值为 0,表示不重试。
javascript
// 示例
const requestList = [
  () =>
    new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve('请求1成功')
      }, 1000)
    }),
  () =>
    new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve('请求2成功')
      }, 1000)
    }),
  () =>
    new Promise((resolve, reject) => {
      setTimeout(() => {
        reject('请求3失败')
      }, 1000)
    }),
  () =>
    new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve('请求4成功')
      }, 1000)
    }),
]

const limits = 2

sendRequest(requestList, limits, (result1, result2) => {
  console.log('成功请求:', result1)
  console.log('失败请求:', result2)
})

上次更新于: