Skip to content

前端模拟线程池的实现

js
/**
 * 功能描述:
 * 线程池大小为 3 ,有 ABCDE 5 个任务,其中 任务X 执行 fail 时候需要重试,最大重试 3 次。需要等 ABCDE 所有执行完成才能返回。
 *
 * 异步任务队列,可能有很多个,比如 1000 个,全部同时执行是不现实的。
 * 所以需要按线程池的思路进行拆分,每个线程池单独进行处理。这样如果有个别进程失败,就直接返回了。就不用所有任务都执行。
 *
 * 实现思路:
 * 1. 线程池拆分单独处理。
 * 2. 如有任务需要重试,那么生成任务的时候,就要生成可重试的任务。如果具体执行过程中再去判断重试,就比较复杂了。
 */

/**
 * 异步任务线程池封装
 * @param {*} poolLimit 线程池大小
 * @param {*} taskList 异步任务队列
 */
async function asyncPool(poolLimit = 3, taskList = []) {
  return new Promise((resolve, reject) => {
    /**
     * 把任务队列,根据单个线程池限制进行拆分处理,如:
     * [A, B, C, D, E] => [[A, B, C], [D, E]]
     */
    const poolTaskList = [];
    taskList.forEach(async (item, index) => {
      if (index % poolLimit === 0) {
        poolTaskList.push([]);
      }
      poolTaskList[poolTaskList.length - 1].push(item);
    });

    /**
     * 遍历线程池列表,进行分别异步处理
     */
    let completeNum = 0; // 已执行线程池的数量
    poolTaskList.forEach(async (poolList, index) => {
      console.log(`第${index}个线程池,线程池任务数量:${poolList.length}`);

      let currentResultFail = null;
      try {
        await Promise.all(poolList);
      }
      catch (err) {
        currentResultFail = err;
      }
      completeNum++;

      console.log(`第${index}个线程池执行结果 || `, currentResultFail ? 'fail' : 'ok');

      if (currentResultFail || completeNum === poolTaskList.length) {
        if (currentResultFail) {
          return reject(currentResultFail);
        }
        else {
          return resolve(true);
        }
      }
    });
  });
}

/**
 * 重试请求封装
 */
class Retry {
  times = 3; // 重试次数
  ms = 3000; // 延迟执行时间
  callback = null; // 回调方法

  /**
   * 构造函数
   * @param {*} times 重试次数
   * @param {*} ms 重试延迟执行时间
   * @param {*} callback 失败回调,目前仅打印日志
   */
  constructor(times, ms, callback) {
    this.times = times;
    this.ms = ms;
    this.callback = callback;
  }

  /**
   * 延迟执行
   * @param ms 毫秒
   */
  static delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }

  /**
   * 执行重试
   * @param asyncTask 异步任务
   * @param times 执行次数
   */
  async work(asyncTask, times) {
    times = times || 1;
    try {
      await asyncTask();
      console.log('times:', times);
      return Promise.resolve(true);
    }
    catch (err) {
      this.callback && this.callback(times, err);
      // 如果失败没有到重试上限,继续重试
      if (times < this.times) {
        await Retry.delay(this.ms);
        await this.work(asyncTask, times + 1);
      }
      else {
        // 否则返回错误结果
        return Promise.reject(err);
      }
    }
  }
}

/**
 * 重试异步任务封装
 * @param {*} asyncTask 异步任务
 * @param {*} retryTimeLimit 重试次数限制
 * @param {*} retryTimeDelay 重试延迟时间
 */
async function asyncTaskRetry(asyncTask, retryTimeLimit = 3, retryTimeDelay = 3000) {
  // 生成重试实例,回调函数中可打印具体哪个任务在重试,以便跟踪问题,这里忽略
  const retry = new Retry(retryTimeLimit, retryTimeDelay, (count, err) => {
    console.log(`第${count}次重试 || err:`, err);
  });

  return retry.work(asyncTask);
}

/** *******  以下为测试代码  */

let retryNum = 0;
/**
 * 测试异步任务队列
 */
const taskList = [
  new Promise((resolve) => {
    setTimeout(() => {
      resolve(1);
    }, 100);
  }),
  new Promise((resolve) => {
    setTimeout(() => {
      resolve(2);
    }, 200);
  }),
  new Promise((resolve) => {
    setTimeout(() => {
      resolve(3);
    }, 300);
  }),
  new Promise((resolve) => {
    setTimeout(() => {
      resolve(4);
    }, 400);
  }),
  new Promise((resolve) => {
    setTimeout(() => {
      resolve(5);
    }, 500);
  }),
  // 直接返回成功实例
  asyncTaskRetry(() => {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve(6);
      }, 600);
    });
  }),
  // 重试返回成功实例
  asyncTaskRetry(() => {
    return new Promise((resolve, reject) => {
      console.log('执行次数:', retryNum);
      if (retryNum === 1) {
        setTimeout(() => {
          resolve(7);
        }, 700);
      }
      else {
        setTimeout(() => {
          reject(new Error('7'));
          retryNum++;
        }, 700);
      }
    });
  }),
  // 失败重试实例
  asyncTaskRetry(() => {
    return new Promise((resolve, reject) => {
      setTimeout(() => {
        reject(new Error('8'));
      }, 800);
    });
  }),
];

/**
 * 调用看下测试结果
 */
asyncPool(3, taskList)
  .then((result) => {
    console.log('asyncPool result || ok:', result);
  })
  .catch((err) => {
    console.error('asyncPool result || err:', err);
  });

Released under the MIT License.