Node.js数据库操作指南 💾
引言
数据库操作是Node.js应用开发中的关键环节。本文将深入探讨Node.js数据库操作的实现方案,包括连接管理、查询优化、事务处理等方面,帮助开发者构建高效可靠的数据访问层。
数据库操作概述
Node.js数据库操作主要包括以下方面:
- 连接管理:连接池、故障恢复、负载均衡
- 查询处理:SQL构建、参数绑定、结果映射
- 事务管理:事务控制、隔离级别、一致性保证
- 性能优化:查询优化、缓存策略、批量操作
- 安全防护:SQL注入防护、访问控制、数据加密
数据库操作实现
数据库管理器
// 数据库管理器
class DatabaseManager {private static instance: DatabaseManager;private config: DatabaseConfig;private pools: Map<string, Pool>;private metrics: DatabaseMetrics;private constructor() {this.pools = new Map();this.metrics = {queries: 0,errors: 0,connections: 0,transactions: 0};this.config = {maxConnections: 10,idleTimeout: 30000,connectionTimeout: 5000,retryAttempts: 3};}// 获取单例实例static getInstance(): DatabaseManager {if (!DatabaseManager.instance) {DatabaseManager.instance = new DatabaseManager();}return DatabaseManager.instance;}// 初始化数据库管理器async init(config: DatabaseConfig): Promise<void> {this.config = { ...this.config, ...config };// 创建主数据库连接池await this.createPool('master', {host: config.masterHost,port: config.masterPort,database: config.database,user: config.user,password: config.password});// 创建从数据库连接池if (config.slaves) {for (const [name, slave] of Object.entries(config.slaves)) {await this.createPool(name, {host: slave.host,port: slave.port,database: config.database,user: config.user,password: config.password});}}// 设置事件监听器this.setupEventListeners();}// 创建连接池private async createPool(name: string, config: PoolConfig): Promise<void> {const pool = new Pool({...config,max: this.config.maxConnections,idleTimeoutMillis: this.config.idleTimeout,connectionTimeoutMillis: this.config.connectionTimeout});// 测试连接try {const client = await pool.connect();client.release();console.log(`Database pool ${name} created successfully`);} catch (error) {console.error(`Failed to create database pool ${name}:`, error);throw error;}this.pools.set(name, pool);}// 设置事件监听器private setupEventListeners(): void {for (const [name, pool] of this.pools.entries()) {pool.on('connect', () => {this.metrics.connections++;console.log(`New connection in pool ${name}`);});pool.on('error', error => {this.metrics.errors++;console.error(`Pool ${name} error:`, error);});}}// 执行查询async query<T>(sql: string,params: any[] = [],options: QueryOptions = {}): Promise<T> {const poolName = options.useReplica ? this.selectReplica() : 'master';const pool = this.pools.get(poolName);if (!pool) {throw new Error(`Database pool ${poolName} not found`);}let client;let retryCount = 0;while (retryCount < this.config.retryAttempts) {try {client = await pool.connect();const startTime = Date.now();const result = await client.query(sql, params);const duration = Date.now() - startTime;this.metrics.queries++;this.logQuery(sql, params, duration);return result.rows as T;} catch (error) {retryCount++;this.metrics.errors++;if (retryCount === this.config.retryAttempts) {throw new DatabaseError('Query failed after max retry attempts',sql,params,error);}await this.wait(Math.pow(2, retryCount) * 1000);} finally {if (client) {client.release();}}}throw new Error('Unexpected query execution path');}// 执行事务async transaction<T>(callback: (client: PoolClient) => Promise<T>): Promise<T> {const pool = this.pools.get('master');if (!pool) {throw new Error('Master database pool not found');}const client = await pool.connect();try {await client.query('BEGIN');this.metrics.transactions++;const result = await callback(client);await client.query('COMMIT');return result;} catch (error) {await client.query('ROLLBACK');throw error;} finally {client.release();}}// 批量插入async batchInsert<T>(table: string,records: T[],options: BatchOptions = {}): Promise<void> {const batchSize = options.batchSize || 1000;const batches = this.chunk(records, batchSize);for (const batch of batches) {const values = this.buildBatchValues(batch);const columns = Object.keys(batch[0]);const sql = `INSERT INTO ${table} (${columns.join(', ')})VALUES ${values}`;await this.query(sql);}}// 选择从库private selectReplica(): string {const replicas = Array.from(this.pools.keys()).filter(name => name !== 'master');if (replicas.length === 0) {return 'master';}// 简单轮询策略const index = Math.floor(Math.random() * replicas.length);return replicas[index];}// 构建批量插入值private buildBatchValues<T>(records: T[]): string {return records.map(record => {const values = Object.values(record).map(value => {if (value === null) return 'NULL';if (typeof value === 'string') return `'${value}'`;return value;}).join(', ');return `(${values})`;}).join(', ');}// 分割数组private chunk<T>(array: T[], size: number): T[][] {const chunks = [];for (let i = 0; i < array.length; i += size) {chunks.push(array.slice(i, i + size));}return chunks;}// 等待指定时间private wait(ms: number): Promise<void> {return new Promise(resolve => setTimeout(resolve, ms));}// 记录查询日志private logQuery(sql: string,params: any[],duration: number): void {console.log('Query executed:', {sql,params,duration: `${duration}ms`});}// 获取性能指标getMetrics(): DatabaseMetrics {return { ...this.metrics };}// 关闭所有连接池async close(): Promise<void> {for (const [name, pool] of this.pools.entries()) {await pool.end();console.log(`Database pool ${name} closed`);}}
}// 查询构建器
class QueryBuilder {private table: string;private conditions: string[] = [];private parameters: any[] = [];private orderClauses: string[] = [];private limitValue?: number;private offsetValue?: number;private joinClauses: string[] = [];constructor(table: string) {this.table = table;}// 添加条件where(column: string, operator: string, value: any): this {this.conditions.push(`${column} ${operator} $${this.parameters.length + 1}`);this.parameters.push(value);return this;}// 添加AND条件andWhere(column: string, operator: string, value: any): this {if (this.conditions.length > 0) {this.conditions.push('AND');}return this.where(column, operator, value);}// 添加OR条件orWhere(column: string, operator: string, value: any): this {if (this.conditions.length > 0) {this.conditions.push('OR');}return this.where(column, operator, value);}// 添加JOINjoin(table: string, condition: string): this {this.joinClauses.push(`JOIN ${table} ON ${condition}`);return this;}// 添加LEFT JOINleftJoin(table: string, condition: string): this {this.joinClauses.push(`LEFT JOIN ${table} ON ${condition}`);return this;}// 添加排序orderBy(column: string, direction: 'ASC' | 'DESC' = 'ASC'): this {this.orderClauses.push(`${column} ${direction}`);return this;}// 设置限制limit(value: number): this {this.limitValue = value;return this;}// 设置偏移offset(value: number): this {this.offsetValue = value;return this;}// 构建查询build(): { sql: string; params: any[] } {let sql = `SELECT * FROM ${this.table}`;if (this.joinClauses.length > 0) {sql += ' ' + this.joinClauses.join(' ');}if (this.conditions.length > 0) {sql += ' WHERE ' + this.conditions.join(' ');}if (this.orderClauses.length > 0) {sql += ' ORDER BY ' + this.orderClauses.join(', ');}if (this.limitValue !== undefined) {sql += ` LIMIT ${this.limitValue}`;}if (this.offsetValue !== undefined) {sql += ` OFFSET ${this.offsetValue}`;}return {sql,params: this.parameters};}
}// 数据库错误类
class DatabaseError extends Error {constructor(message: string,public sql: string,public params: any[],public originalError: Error) {super(message);this.name = 'DatabaseError';}
}// 接口定义
interface DatabaseConfig {masterHost: string;masterPort: number;database: string;user: string;password: string;maxConnections: number;idleTimeout: number;connectionTimeout: number;retryAttempts: number;slaves?: Record<string, SlaveConfig>;
}interface SlaveConfig {host: string;port: number;
}interface PoolConfig {host: string;port: number;database: string;user: string;password: string;
}interface QueryOptions {useReplica?: boolean;
}interface BatchOptions {batchSize?: number;
}interface DatabaseMetrics {queries: number;errors: number;connections: number;transactions: number;
}// 使用示例
async function main() {// 创建数据库管理器const dbManager = DatabaseManager.getInstance();// 初始化配置await dbManager.init({masterHost: 'localhost',masterPort: 5432,database: 'myapp',user: 'postgres',password: 'secret',maxConnections: 10,idleTimeout: 30000,connectionTimeout: 5000,retryAttempts: 3,slaves: {slave1: {host: 'slave1.example.com',port: 5432},slave2: {host: 'slave2.example.com',port: 5432}}});// 执行简单查询const users = await dbManager.query<User[]>('SELECT * FROM users WHERE age > $1',[18]);// 使用查询构建器const queryBuilder = new QueryBuilder('users').where('age', '>', 18).andWhere('status', '=', 'active').orderBy('created_at', 'DESC').limit(10);const { sql, params } = queryBuilder.build();const result = await dbManager.query(sql, params);// 执行事务await dbManager.transaction(async client => {await client.query('UPDATE users SET balance = balance - $1 WHERE id = $2', [100, 1]);await client.query('UPDATE users SET balance = balance + $1 WHERE id = $2', [100, 2]);});// 批量插入const newUsers = [{ name: 'User 1', age: 25 },{ name: 'User 2', age: 30 }];await dbManager.batchInsert('users', newUsers, { batchSize: 1000 });// 获取性能指标const metrics = dbManager.getMetrics();console.log('Database metrics:', metrics);// 关闭连接await dbManager.close();
}interface User {id: number;name: string;age: number;status: string;created_at: Date;
}main().catch(console.error);
最佳实践与建议
-
连接管理
- 使用连接池
- 配置最大连接数
- 设置连接超时
- 实现故障转移
-
查询优化
- 使用参数化查询
- 优化查询语句
- 合理使用索引
- 避免N+1查询
-
事务处理
- 合理设置隔离级别
- 控制事务范围
- 处理并发访问
- 实现事务补偿
-
性能优化
- 实现读写分离
- 使用批量操作
- 配置查询缓存
- 监控性能指标
总结
Node.js数据库操作需要考虑以下方面:
- 连接管理和故障处理
- 查询优化和性能调优
- 事务管理和数据一致性
- 安全防护和访问控制
- 监控和维护支持
通过合理的数据库操作实现,可以提高应用的性能和可靠性。
学习资源
- 数据库设计模式
- SQL优化技巧
- 事务处理机制
- 性能优化指南
- 安全最佳实践
如果你觉得这篇文章有帮助,欢迎点赞收藏,也期待在评论区看到你的想法和建议!👇
终身学习,共同成长。
咱们下一期见
💻