文章目录
- 什么是并发控制
- 为什么需要控制并发
- 并发控制的实现
- 1. 简单的并发控制
- 2. 需要更细粒度控制
- 3. 性能和资源敏感型应用
什么是并发控制
简单来说,并发控制就是当我们有多个任务需要同时执行,但出于性能、资源或业务逻辑的限制,我们不能让它们全部同时开始,而是需要一种机制来管理这些任务的执行,确保它们能够有序、高效地完成
例如:
当我们使用Promise来处理异步请求时,可能会遇到需要同时向服务器发送多个请求的情况
。但是,如果同时发送的请求数量过多,可能会导致服务器压力过大,响应变慢,甚至导致请求失败
。这时,我们就需要用到 并发控制
。
我们可以设置一个并发限制数
,然后通过一个队列来管理待执行的任务。当队列中的任务数量小于并发限制数时
,我们可以直接执行任务
;当队列中的任务数量达到并发限制数
时,我们就需要等待某个任务完成
,然后再从队列中取出下一个任务执行。
通过这种方式,我们可以有效地控制并发量,避免过多的异步操作同时执行,从而保护服务器资源,提高系统的稳定性和响应速度。
为什么需要控制并发
- 避免数据竞争:当两个或多个任务同时访问同一数据资源,并且至少有一个任务在修改数据时,就可能发生数据竞争。如果不正确地处理,可能会导致数据损坏。
- 确保数据一致性:在数据库和文件系统中,需要确保事务或操作序列能够保持数据的一致性状态。
- 提高资源利用率:合理控制并发可以提高系统资源的利用率,避免资源浪费。
- 系统稳定性:过多的并发可能会导致系统过载,影响系统稳定性和响应速度。
并发控制的实现
选择最佳控制并发的方法时,需要考虑具体场景和需求。以下是一些常见场景和推荐方法:
1. 简单的并发控制
只需要简单地限制并发执行的任务数量
,使用第三方库如p-limit是最简单和最直接的方法
。这种方法易于理解和实现,且不需要自己管理复杂的并发逻辑。
const pLimit = require('p-limit');const limit = pLimit(5); // 限制并发为5const promises = arr.map(item => limit(() => asyncOperation(item)));
const results = await Promise.all(promises);
2. 需要更细粒度控制
需要更细粒度的控制,比如在并发执行中添加超时、重试逻辑或者更复杂的错误处理
,那么实现自己的并发控制逻辑可能是更好的选择。你可以使用Promise和async/await结合队列来实现这一点
//定义了一个Queue 类,这个类用于管理并发任务,接受一个参数【最大并发任务数】
class Queue {constructor(limit) {this.limit = limit;//最大并发数this.running = 0;//任务队列this.queue = [];}
//addTask方法用于向控制器添加一个新的任务。每个任务都是一个返回Promise的函数。addTask方法返回一个新的Promise,这个Promise会在任务完成时被解析或在任务失败时被拒绝。addTask(task) {return new Promise((resolve, reject) => {this.queue.push({ task, resolve, reject });// 将任务添加到队列中this.check(); // 尝试运行下一个任务});}
//check方法是一个异步方法,它检查当前正在运行的任务数是否小于最大并发数,并且队列中是否有待执行的任务。如果这两个条件都满足,它会从队列中取出一个任务并开始执行它async check() {// 如果达到最大并发数或队列为空,则不执行任何操作if (this.running >= this.limit || this.queue.length === 0) return;// 从队列中取出下一个任务const { task, resolve, reject } = this.queue.shift();// 增加正在运行的任务数this.running++;try {const result = await task(); // 执行任务并等待结果resolve(result); // 任务成功完成时解析返回的Promise} catch (error) {reject(error); // 任务失败时拒绝返回的Promise} finally {this.running--; // 无论成功还是失败,都减少正在运行的任务数this.check(); // 尝试运行下一个任务}}
}// 使用示例 最大并发数为3
const controller = new Queue (3);const asyncTasks = [() => fetch('/api/data1').then(response => response.json()),() => fetch('/api/data2').then(response => response.json()),() => fetch('/api/data3').then(response => response.json()),// ... 更多任务
];
// 向控制器添加任务,并收集返回的Promise
const results = asyncTasks.map(task => controller.addTask(task));
// 使用Promise.all等待所有任务完成
Promise.all(results).then(allResults => {console.log('All tasks completed:', allResults);}).catch(error => {console.error('Error:', error);});
3. 性能和资源敏感型应用
对于性能和资源非常敏感的应用,你可能需要更精细地控制任务的执行,以避免资源竞争和过载
。在这种情况下,使用Promise.all结合Promise.race的方法可以提供更多的灵活性。
async function limitedConcurrency(promises, limit) {let running = 0;//跟踪当前正在运行的任务数let i = 0;//用于遍历任务数组的索引const resultPromises = [];//存储当前正在执行的Promise对象的数组while (i < promises.length) {//在每次循环中,检查running是否小于limit,如果是,则从任务数组中取出下一个任务并开始执行(即通过调用任务函数来获取Promise对象,并将其添加到resultPromises数组中),然后增加running的值。if (running < limit) {running++;resultPromises.push(promises[i++]());}
//如果running达到了limit,则使用await Promise.race(resultPromises)来等待resultPromises数组中的任意一个Promise完成if (running >= limit) {const result = await Promise.race(resultPromises);resultPromises.splice(resultPromises.indexOf(result), 1);running--;}}return Promise.all(resultPromises);
}// 定义异步任务数组
const asyncTasks = [() => fetch('/api/data1').then(response => response.json()),() => fetch('/api/data2').then(response => response.json()),() => fetch('/api/data3').then(response => response.json()),// ... 可以添加更多任务
];
// 使用并发控制函数执行异步任务
(async () => {try {const results = await limitedConcurrency(asyncTasks, 3); // 限制并发数为3console.log('All tasks completed:', results);} catch (error) {console.error('Error occurred:', error);}
})();