前言
在分库、分表之后,主键唯一ID的生成将不能基于数据库本身的自增方式生成,因为对于全局而言可能会重复,下面我们将构建一个外部服务统一提供 ID 的生成。
思路
我们采用一种基于数据库的分布式ID生成策略,每个节点维护一个本地的ID池,当节点的本地ID池耗尽时,再通过数据库去获取新的ID段,再放入 ID 池。
业务步长配置表
CREATE TABLE `sequence_step` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',`barrier` bigint(20) NOT NULL DEFAULT '1' COMMENT '当前序列值最大值',`step` int(11) NOT NULL DEFAULT '100' COMMENT '每次递增量',`sn` varchar(128) NOT NULL DEFAULT '' COMMENT '业务字段',`version` bigint(20) NOT NULL DEFAULT '1' COMMENT '乐观锁版本',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `uk_sn` (`sn`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='全局ID生成表';
- barrier:目前应用程序已经获取的最大ID值(即 ID 池的最大值);
- step:每次获取时,递增的步长;
- sn:业务字段名称,全局唯一。
可以通过表来配置每个业务字段的 ID 生成策略,比如 user_id,初始值是 1,步长是 100。
第一次的 ID 区间为【1,101),初始值更新为 101,则 ID 池子的数字就是【1,101)
系统优先去 ID 池子去取数据,当 ID 池子消耗完了后,再请求数据库,拿到新的初始值,并生成数据。
第二次的 ID 区间为【101,201),初始值更新为 201
…
可以看到,如果区间设置的太小,访问数据库就很频繁;区间设置的太大,万一系统崩溃了,断续的间隔就比较大了。
代码实现
整体流程如下
- 根据 sn 获取锁对象;
- 根据 sn 去本地内存获取 ID 段,如果不为空,则返回;否则,进入下一步;
- 根据 sn 去数据库获取 ID 段,并放入内存 ID 池。
ID 段
@NotThreadSafe
public class IDSequence implements Serializable {private String sn;//此sequence的唯一标识private int step;private AtomicLong current;//当前值private long barrier = -1;//栅栏,当前sequence所在step中的最大值。public IDSequence() {}public IDSequence(String sn) {this.sn = sn;}public IDSequence(String sn, long start, long barrier, int step) {this.sn = sn;this.current = new AtomicLong(start);this.barrier = barrier;this.step = step;}public String getSn() {return sn;}public void setSn(String sn) {this.sn = sn;}public long getCurrent() {if (current == null) {return -1;}return current.get();}public boolean valid() {return current != null;}/*** 此方法将会在并发环境中执行,需要注意控制数据安全性** @return 0:表示超限,则需要重试* 负1:表示超限,需要重新fetch新的step,当前Sequence已不可用* 大于1:正常,此值可以使用。*/public long increment() {if (current == null) {return -1;}long i = current.getAndIncrement();//边界if (i >= barrier) {current = null;return -1;//强制上层重置}return i;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append("sn:").append(sn).append(",step:").append(step).append(",current:").append(current).append(",barrier:").append(barrier);return sb.toString();}
}
ID 生成器
public class StepBasedIDGenerator{protected static final int DEFAULT_RETRY = 6;protected Map<String, IDSequence> cachedPool = new HashMap<>();private static Map<String,Object> locks = new ConcurrentHashMap<>(); //业务字段对应的锁private DataSource dataSource;public void setDataSource(DataSource dataSource) {this.dataSource = dataSource;}/*** 获取锁对象* @param sn* @return*/private Object getLock(String sn) {return locks.computeIfAbsent(sn, k -> new Object());}private IDSequence getSequence(String sn) {IDSequence sequence = cachedPool.get(sn);//正常,返回if (sequence != null && sequence.valid()) {return sequence;}int i = 0;while (i < DEFAULT_RETRY) {sequence = fetch(sn);if (sequence != null) {cachedPool.put(sn, sequence);break;}i++;}return sequence;}/*** ID 生成器的入口*/public long next(String sn) {Object lock = getLock(sn);synchronized (lock) {IDSequence sequence = getSequence(sn);if (sequence == null || !sequence.valid()) {throw new IllegalStateException("sn:" + sn + ",cant fetch sequence!");}try {long v = sequence.increment();if (v > 0) {return v;}return next(sn);} catch (Exception e) {throw new RuntimeException("fetch nextStep error:", e);}}}private Connection getConnection() {// 数据库URL,用户名和密码String url = "";String username = "";String password = "";Connection conn = null;try {// 加载数据库驱动Class.forName("com.mysql.cj.jdbc.Driver");// 创建数据库连接conn = DriverManager.getConnection(url, username, password);} catch (ClassNotFoundException | SQLException e) {e.printStackTrace();}return conn;}protected IDSequence fetch(String sn) {Connection connection = null;PreparedStatement ps = null;try {connection = getConnection();connection.setAutoCommit(true);//普通操作connection.setReadOnly(false);ps = connection.prepareStatement("select barrier,step,`version` from `sequence_step` where `sn` = ? limit 1");ps.setString(1, sn);ResultSet rs = ps.executeQuery();if (!rs.next()) {throw new IllegalStateException(sn + " is not existed,So fetch sequence cant be executed! Please check it!");}long _b = rs.getLong(1);//barrierint _s = rs.getInt(2);//steplong _v = rs.getLong(3);//versionps.close();ps = connection.prepareStatement("update `sequence_step` set barrier = ?,update_time = ?,`version` = `version` + 1 where `sn` = ? and `version` = ?");long barrier = _b + _s;ps.setLong(1, barrier);ps.setDate(2, new Date(System.currentTimeMillis()));ps.setString(3, sn);ps.setLong(4, _v);int row = ps.executeUpdate();ps.close();if (row > 0) {//更新成功return new IDSequence(sn, _b, barrier, _s);}return null;} catch (Exception e) {throw new RuntimeException(e);} finally {try {if (ps != null) {ps.close();}if (connection != null) {connection.close();}} catch (Exception ex) {//}}}
}