溧阳市文章资讯

Node.js使用mysql2 库批量更新(Bulk Update)多条数据的方案

2026-03-30 22:23:02 浏览次数:2
详细信息

mysql2 库进行批量更新(Bulk Update)有几种常见的方案,以下是详细的实现方法:

方案一:使用 CASE WHEN 语句(推荐)

这是最高效的批量更新方式,单次 SQL 查询更新多条记录。

const mysql = require('mysql2/promise');

async function bulkUpdateWithCaseWhen(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    // 构建 CASE WHEN 语句
    const ids = updates.map(update => update.id);
    const cases = updates.map(update => 
      `WHEN id = ${update.id} THEN ${update.value}`
    ).join(' ');

    const sql = `
      UPDATE your_table
      SET 
        column1 = CASE id ${cases} END,
        column2 = CASE id 
          ${updates.map(update => 
            `WHEN id = ${update.id} THEN '${update.otherValue}'`
          ).join(' ')} 
        END
      WHERE id IN (${ids.join(',')})
    `;

    const [result] = await connection.execute(sql);
    return result;
  } finally {
    await connection.end();
  }
}

// 使用示例
const updates = [
  { id: 1, value: 100, otherValue: 'A' },
  { id: 2, value: 200, otherValue: 'B' },
  { id: 3, value: 300, otherValue: 'C' }
];

bulkUpdateWithCaseWhen(updates)
  .then(result => console.log('更新成功:', result.affectedRows))
  .catch(err => console.error('更新失败:', err));

方案二:使用 INSERT ... ON DUPLICATE KEY UPDATE

适用于有唯一索引或主键的表:

async function bulkUpdateOnDuplicate(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    // 批量插入或更新
    const sql = `
      INSERT INTO your_table (id, column1, column2, column3)
      VALUES ?
      ON DUPLICATE KEY UPDATE
        column1 = VALUES(column1),
        column2 = VALUES(column2),
        column3 = VALUES(column3)
    `;

    const values = updates.map(update => [
      update.id,
      update.column1,
      update.column2,
      update.column3
    ]);

    const [result] = await connection.query(sql, [values]);
    return result;
  } finally {
    await connection.end();
  }
}

// 使用示例
const data = [
  { id: 1, column1: 'value1', column2: 100, column3: 'A' },
  { id: 2, column1: 'value2', column2: 200, column3: 'B' },
  { id: 3, column1: 'value3', column2: 300, column3: 'C' }
];

bulkUpdateOnDuplicate(data);

方案三:使用事务 + 循环更新(小批量数据)

async function bulkUpdateWithTransaction(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    await connection.beginTransaction();

    for (const update of updates) {
      await connection.execute(
        'UPDATE your_table SET column1 = ?, column2 = ? WHERE id = ?',
        [update.column1, update.column2, update.id]
      );
    }

    await connection.commit();
    console.log('批量更新完成');
  } catch (error) {
    await connection.rollback();
    throw error;
  } finally {
    await connection.end();
  }
}

方案四:使用批量预处理语句

async function bulkUpdateWithPreparedStatements(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    const updatePromises = updates.map(update => {
      return connection.execute(
        'UPDATE your_table SET name = ?, age = ? WHERE id = ?',
        [update.name, update.age, update.id]
      );
    });

    const results = await Promise.all(updatePromises);
    return results;
  } finally {
    await connection.end();
  }
}

方案五:使用 VALUES() 函数批量更新

async function bulkUpdateWithValues(updates) {
  const connection = await mysql.createConnection({
    host: 'localhost',
    user: 'root',
    password: 'password',
    database: 'test_db'
  });

  try {
    // 创建临时表或使用 VALUES 语法
    const values = updates.map(update => 
      `(${update.id}, '${update.name}', ${update.age})`
    ).join(', ');

    const sql = `
      UPDATE your_table t
      JOIN (VALUES ${values}) AS tmp(id, name, age)
        ON t.id = tmp.id
      SET t.name = tmp.name, t.age = tmp.age
    `;

    const [result] = await connection.execute(sql);
    return result;
  } finally {
    await connection.end();
  }
}

封装为通用工具类

class BulkUpdater {
  constructor(config) {
    this.pool = mysql.createPool(config);
  }

  async bulkUpdateCaseWhen(table, updates, whereField = 'id') {
    if (!updates || updates.length === 0) return { affectedRows: 0 };

    const fields = Object.keys(updates[0]).filter(key => key !== whereField);
    const ids = updates.map(update => update[whereField]);

    // 构建 CASE WHEN 语句
    const caseStatements = fields.map(field => {
      const cases = updates.map(update => 
        `WHEN ${whereField} = ${update[whereField]} THEN ${this.escapeValue(update[field])}`
      ).join(' ');

      return `${field} = CASE ${cases} END`;
    }).join(', ');

    const sql = `
      UPDATE ${table}
      SET ${caseStatements}
      WHERE ${whereField} IN (${ids.join(',')})
    `;

    const [result] = await this.pool.execute(sql);
    return result;
  }

  escapeValue(value) {
    if (typeof value === 'string') {
      return `'${value.replace(/'/g, "''")}'`;
    }
    if (value === null || value === undefined) {
      return 'NULL';
    }
    return value;
  }

  async bulkUpdateOnDuplicate(table, updates, uniqueFields = ['id']) {
    if (!updates || updates.length === 0) return { affectedRows: 0 };

    const fields = Object.keys(updates[0]);
    const values = updates.map(update => 
      fields.map(field => update[field])
    );

    const fieldList = fields.join(', ');
    const updateList = fields
      .filter(field => !uniqueFields.includes(field))
      .map(field => `${field} = VALUES(${field})`)
      .join(', ');

    const sql = `
      INSERT INTO ${table} (${fieldList})
      VALUES ?
      ON DUPLICATE KEY UPDATE ${updateList}
    `;

    const [result] = await this.pool.query(sql, [values]);
    return result;
  }

  async close() {
    await this.pool.end();
  }
}

// 使用示例
const bulkUpdater = new BulkUpdater({
  host: 'localhost',
  user: 'root',
  password: 'password',
  database: 'test_db',
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0
});

// 使用 CASE WHEN
await bulkUpdater.bulkUpdateCaseWhen('users', [
  { id: 1, name: 'Alice', age: 25 },
  { id: 2, name: 'Bob', age: 30 }
]);

// 使用 ON DUPLICATE KEY UPDATE
await bulkUpdater.bulkUpdateOnDuplicate('users', [
  { id: 1, name: 'Alice', age: 26 },
  { id: 3, name: 'Charlie', age: 28 }
]);

await bulkUpdater.close();

性能优化建议

数据量较大时:分批处理,每批 1000-5000 条 使用连接池:避免频繁创建连接 添加索引:确保 WHERE 条件的字段有索引 监控性能:使用 EXPLAIN 分析查询 错误处理:添加重试机制和日志记录

注意事项

SQL 注入防护:始终使用参数化查询或转义输入 数据类型:确保传递的值类型与数据库列类型匹配 事务一致性:需要原子性操作时使用事务 连接管理:及时释放连接资源 数据验证:更新前验证数据的有效性

选择哪种方案取决于具体需求:

相关推荐