管理线程间共享资源(以数据库连接池为例)
- 使用线程安全的数据结构
在 Node.js 中,可以借助
worker_threads
模块来实现多线程。对于数据库连接池这种共享资源,可以使用线程安全的数据结构来管理。例如,使用 Map
来存储连接池,并对其访问进行同步控制。
const { Worker, isMainThread, parentPort } = require('worker_threads');
const mysql = require('mysql2');
if (isMainThread) {
const connectionPool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test',
connectionLimit: 10
});
const worker = new Worker(__filename);
worker.on('message', (message) => {
console.log('Main thread received:', message);
});
worker.postMessage({ connectionPool });
} else {
parentPort.on('message', (message) => {
const { connectionPool } = message;
connectionPool.getConnection((err, connection) => {
if (err) throw err;
connection.query('SELECT 1 + 1 AS solution', (error, results, fields) => {
connection.release();
if (error) throw error;
parentPort.postMessage(`Query result: ${results[0].solution}`);
});
});
});
}
- 锁机制
为了防止多个线程同时访问和修改共享资源(如连接池中的连接),可以引入锁机制。虽然 Node.js 没有内置的锁,但可以通过
Atomics
和 SharedArrayBuffer
来模拟简单的锁。
const { Worker, isMainThread, parentPort, SharedArrayBuffer, Atomics } = require('worker_threads');
const mysql = require('mysql2');
// 模拟锁
class SimpleLock {
constructor() {
this.lock = new SharedArrayBuffer(1);
this.int32View = new Int32Array(this.lock);
this.int32View[0] = 0;
}
acquire() {
while (Atomics.compareExchange(this.int32View, 0, 0, 1) !== 0);
}
release() {
Atomics.store(this.int32View, 0, 0);
}
}
if (isMainThread) {
const connectionPool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test',
connectionLimit: 10
});
const lock = new SimpleLock();
const worker = new Worker(__filename);
worker.on('message', (message) => {
console.log('Main thread received:', message);
});
worker.postMessage({ connectionPool, lock });
} else {
parentPort.on('message', (message) => {
const { connectionPool, lock } = message;
lock.acquire();
connectionPool.getConnection((err, connection) => {
lock.release();
if (err) throw err;
connection.query('SELECT 1 + 1 AS solution', (error, results, fields) => {
connection.release();
if (error) throw error;
parentPort.postMessage(`Query result: ${results[0].solution}`);
});
});
});
}
优雅处理网络请求错误
- 网络超时处理
使用
Promise.race
和 setTimeout
来实现网络请求超时。
const fetch = require('node-fetch');
function fetchWithTimeout(url, options = {}, timeout = 5000) {
return Promise.race([
fetch(url, options),
new Promise((_, reject) => {
setTimeout(() => {
reject(new Error('Network request timed out'));
}, timeout);
})
]);
}
fetchWithTimeout('https://example.com', {}, 3000)
.then(response => response.text())
.then(data => console.log(data))
.catch(error => console.error('Error:', error.message));
- 请求被拒绝处理
在
fetch
或其他网络请求库的 catch
块中捕获请求被拒绝的错误(如 403 状态码等)。
fetch('https://example.com')
.then(response => {
if (!response.ok) {
throw new Error(`Request was rejected with status ${response.status}`);
}
return response.text();
})
.then(data => console.log(data))
.catch(error => console.error('Error:', error.message));
- 综合错误处理
结合上述两种情况,以及其他可能的网络请求错误,进行统一的错误处理。
function fetchWithErrorHandling(url, options = {}, timeout = 5000) {
return Promise.race([
fetch(url, options)
.then(response => {
if (!response.ok) {
throw new Error(`Request was rejected with status ${response.status}`);
}
return response.text();
}),
new Promise((_, reject) => {
setTimeout(() => {
reject(new Error('Network request timed out'));
}, timeout);
})
]);
}
fetchWithErrorHandling('https://example.com', {}, 3000)
.then(data => console.log(data))
.catch(error => console.error('Error:', error.message));