总结一下分布式ID的一般实现方案
ID生成系统的需求
- 1.属于基础功能,需要绝对的高可用、高QPS、低延迟
- 2.id必须全局唯一
- 3.趋势递增(总体上id是递增的)、绝对顺序递增(下一个id必须大于上一次生成的id)
一些不合适的方案:
- 数据库自增id:性能依赖数据库的性能,分库分表不适用
- uuid:太占用空间,无顺序
雪花算法
雪花算法的原理
- 1位标识符:始终是0,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0。
- 41位时间戳:41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)得到的值,这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的。
- 10位机器标识码:可以部署在1024个节点,如果机器分机房(IDC)部署,这10位可以由 5位机房ID + 5位机器ID 组成。
- 12位序列:毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生2^12=4096个ID序号。
java实现:
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.net.Inet4Address;
import java.net.UnknownHostException;
/**
* Twitter_Snowflake<br>
* SnowFlake的结构如下(每部分用-分开):<br>
* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>
* 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>
* 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截得到的值)
* 这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。
* 41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
* 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>
* 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>
* 加起来刚好64位,为一个Long型。<br>
* SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,
* 经测试,SnowFlake每秒能够产生26万ID左右。
*/
public class SnowflakeIdWorker {
// ==============================Fields===========================================
/** 开始时间截 (2015-01-01) */
private final long twepoch = 1489111610226L;
/** 机器id所占的位数 */
private final long workerIdBits = 5L;
/** 数据标识id所占的位数 */
private final long dataCenterIdBits = 5L;
/** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/** 支持的最大数据标识id,结果是31 */
private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);
/** 序列在id中占的位数 */
private final long sequenceBits = 12L;
/** 机器ID向左移12位 */
private final long workerIdShift = sequenceBits;
/** 数据标识id向左移17位(12+5) */
private final long dataCenterIdShift = sequenceBits + workerIdBits;
/** 时间截向左移22位(5+5+12) */
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;
/** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/** 工作机器ID(0~31) */
private long workerId;
/** 数据中心ID(0~31) */
private long dataCenterId;
/** 毫秒内序列(0~4095) */
private long sequence = 0L;
/** 上次生成ID的时间截 */
private long lastTimestamp = -1L;
private static SnowflakeIdWorker idWorker;
static {
long workerId = getWorkId();
long dataCenterId = getDataCenterId();
idWorker = new SnowflakeIdWorker(workerId,dataCenterId);
}
//==============================Constructors=====================================
/**
* 构造函数
* @param workerId 工作ID (0~31)
* @param dataCenterId 数据中心ID (0~31)
*/
public SnowflakeIdWorker(long workerId, long dataCenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));
}
if (dataCenterId > maxDataCenterId || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));
}
this.workerId = workerId;
this.dataCenterId = dataCenterId;
}
// ==============================Methods==========================================
/**
* 获得下一个ID (该方法是线程安全的)
* @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果是同一时间生成的,则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变,毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift) //
| (dataCenterId << dataCenterIdShift) //
| (workerId << workerIdShift) //
| sequence;
}
/**
* 阻塞到下一个毫秒,直到获得新的时间戳
* @param lastTimestamp 上次生成ID的时间截
* @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间
* @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
private static Long getWorkId(){
try {
String hostAddress = Inet4Address.getLocalHost().getHostAddress();
int[] ints = StringUtils.toCodePoints(hostAddress);
int sums = 0;
for(int b : ints){
sums += b;
}
return (long)(sums % 32);
} catch (UnknownHostException e) {
// 如果获取失败,则使用随机数备用
return RandomUtils.nextLong(0,31);
}
}
private static Long getDataCenterId(){
int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName());
int sums = 0;
for (int i: ints) {
sums += i;
}
return (long)(sums % 32);
}
public static Long generateId(){
long id = idWorker.nextId();
return id;
}
//==============================Test=============================================
/** 测试 */
public static void main(String[] args) {
System.out.println(System.currentTimeMillis());
long startTime = System.nanoTime();
for (int i = 0; i < 50000; i++) {
long id = SnowflakeIdWorker.generateId();
System.out.println(id);
}
System.out.println((System.nanoTime()-startTime)/1000000+"ms");
}
}
问题
上面的代码1ms最多可以生成4096个序列号,并且在程序不重启的情况下,内存中存储了上一次的服务器时间戳,如果服务器时间发生回拨,应用会报错。但是如果碰到服务器重启了并且时间回拨了,那程序启动之后无法得知服务器重启之前的时间戳,所以运行正常,但是生成的id会与重启之前的id重复。
为了解决这个问题,美团Leaf-snowflake引入了zookeeper,服务器周期性(3s以上)向zookeeper上报服务器时间,应用启动的时候校验zk上的时间戳与本地的时间戳,如果发现服务器时钟回拨则告警。
TDDL
tddl这个项目目前已经不维护了,但是里面的分布式id生成的策略简单易用,基本可以满足一般项目的分布式id生成需求。它的原理也比较简单,通过数据库表的乐观锁来获取一段id放在内存中,每次通过nextValue来从内存中取id,用完再次向数据库请求一段新的id。
优点:
- 不同机器内存中放的id段不一样,但是可以满足id整体趋势递增的需求。
- 内存缓存id,生成性能高
缺点:
- 强依赖DB
- ID号码不够随机,外部可以猜测出单量等信息
与tddl类似的方案是美团的Leaf-segment方案。
一张表
--表
CREATE TABLE `biz_sequence` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`biz_name` varchar(45) NOT NULL DEFAULT '' COMMENT '业务名称',
`current_value` bigint(20) NOT NULL DEFAULT '0' COMMENT '当前最大值',
`is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否删除',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`creator` varchar(15) NOT NULL DEFAULT '' COMMENT '创建人',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`modifier` varchar(15) NOT NULL DEFAULT '' COMMENT '修改人',
PRIMARY KEY (`id`),
KEY `idx_biz_name` (`biz_name`)
) ENGINE=InnoDB COMMENT='数据序列表';
生成mybatis对象
import lombok.Data;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.Table;
import java.io.Serializable;
import java.util.Date;
@Table(name = "biz_sequence")
@Data
public class BizSequence implements Serializable {
/**
* 主键
*/
@Id
private Long id;
/**
* 业务名称
*/
@Column(name = "biz_name")
private String bizName;
/**
* 当前最大值
*/
@Column(name = "current_value")
private Long currentValue;
/**
* 是否删除
*/
@Column(name = "is_deleted")
private Integer isDeleted;
/**
* 创建时间
*/
@Column(name = "gmt_create")
private Date gmtCreate;
/**
* 创建人
*/
private String creator;
/**
* 修改时间
*/
@Column(name = "gmt_modified")
private Date gmtModified;
/**
* 修改人
*/
private String modifier;
private static final long serialVersionUID = 1L;
}
// mybatis mapper
public interface BizSequenceMapper extends Mapper<BizSequence> {
}
<!-- mybatis xml -->
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.xxx.BizSequenceMapper">
<resultMap id="BaseResultMap" type="com.xxx.entity.BizSequence">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="biz_name" jdbcType="VARCHAR" property="bizName"/>
<result column="current_value" jdbcType="BIGINT" property="currentValue"/>
<result column="is_deleted" jdbcType="BIT" property="isDeleted"/>
<result column="gmt_create" jdbcType="TIMESTAMP" property="gmtCreate"/>
<result column="creator" jdbcType="VARCHAR" property="creator"/>
<result column="gmt_modified" jdbcType="TIMESTAMP" property="gmtModified"/>
<result column="modifier" jdbcType="VARCHAR" property="modifier"/>
</resultMap>
</mapper>
核心代码
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Scope("prototype")
public class DefaultSequence implements DefaultSequenceGateway {
private final Lock lock = new ReentrantLock();
private volatile SequenceRange currentRange;
@Autowired
private SequenceDao sequenceDao;
public DefaultSequence() {
}
@Override
public long nextValue(String bizName) throws SequenceException {
if (this.currentRange == null) {
this.lock.lock();
try {
if (this.currentRange == null) {
this.currentRange = this.sequenceDao.nextRange(bizName);
}
} finally {
this.lock.unlock();
}
}
long value = this.currentRange.getAndIncrement();
if (value == -1L) {
this.lock.lock();
try {
do {
if (this.currentRange.isOver()) {
this.currentRange = this.sequenceDao.nextRange(bizName);
}
value = this.currentRange.getAndIncrement();
} while (value == -1L);
} finally {
this.lock.unlock();
}
}
if (value < 0L) {
throw new SequenceException("Sequence value overflow, value = " + value);
} else {
return value;
}
}
public SequenceDao getSequenceDao() {
return this.sequenceDao;
}
public void setSequenceDao(SequenceDao sequenceDao) {
this.sequenceDao = sequenceDao;
}
}
public interface DefaultSequenceGateway {
long nextValue(String bizName) throws Exception;
}
@Component
public class DefaultSequenceDao implements SequenceDao {
private static final Logger logger = LoggerFactory.getLogger(DefaultSequenceDao.class);
private static final int MIN_STEP = 1;
private static final int MAX_STEP = 100000;
private static final int DEFAULT_STEP = 1000;
private static final int DEFAULT_RETRY_TIMES = 150;
private static final long DELTA = 100000000L;
private final int step = 1000;
private int retryTimes = 150;
@Autowired
private BizSequenceMapper bizSequenceMapper;
public DefaultSequenceDao() {
}
@Override
public SequenceRange nextRange(String name) throws SequenceException {
if (name == null) {
throw new IllegalArgumentException("序列名称不能为空");
} else {
for (int i = 0; i < this.retryTimes + 1; ++i) {
long oldValue;
long newValue;
//select value from tablename = 'tablename' where biz ='bizName'
Example getExample = new Example(BizSequence.class);
getExample.createCriteria().andEqualTo("bizName", name);
List<BizSequence> bizSequenceList = bizSequenceMapper.selectByExampleAndRowBounds(getExample, new RowBounds(0, 1));
oldValue = bizSequenceList.get(0).getCurrentValue();
StringBuilder message;
if (oldValue < 0L) {
message = new StringBuilder();
message.append("Sequence value cannot be less than zero, value = ").append(oldValue);
message.append(", please check table ");
throw new SequenceException(message.toString());
}
if (oldValue > 9223372036754775807L) {
message = new StringBuilder();
message.append("Sequence value overflow, value = ").append(oldValue);
message.append(", please check table ");
throw new SequenceException(message.toString());
}
newValue = oldValue + (long) this.step;
SequenceRange var11;
// update tableName set value ='value' ,gmtModified = 'gmtModified' where bizName ='bizName' and value = 'value'
Example updateExample = new Example(BizSequence.class);
BizSequence bizSequence = bizSequenceList.get(0);
bizSequence.setCurrentValue(newValue);
bizSequence.setGmtModified(new java.util.Date());
Example.Criteria criteria = updateExample.createCriteria();
criteria.andEqualTo("bizName", name);
criteria.andEqualTo("currentValue", oldValue);
int affectedRows = bizSequenceMapper.updateByExampleSelective(bizSequence, updateExample);
if (affectedRows == 0) {
continue;
}
var11 = new SequenceRange(oldValue + 1L, newValue);
return var11;
}
throw new SequenceException("Retried too many times, retryTimes = " + this.retryTimes);
}
}
public int getStep() {
return this.step;
}
}
public interface SequenceDao {
SequenceRange nextRange(String var1) throws SequenceException;
}
public class SequenceException extends Exception {
private static final long serialVersionUID = 1L;
public SequenceException() {
}
public SequenceException(String message) {
super(message);
}
public SequenceException(String message, Throwable cause) {
super(message, cause);
}
public SequenceException(Throwable cause) {
super(cause);
}
}
public class SequenceRange {
private final long min;
private final long max;
private final AtomicLong value;
private volatile boolean over = false;
public SequenceRange(long min, long max) {
this.min = min;
this.max = max;
this.value = new AtomicLong(min);
}
public long getAndIncrement() {
long currentValue = this.value.getAndIncrement();
if (currentValue > this.max) {
this.over = true;
return -1L;
} else {
return currentValue;
}
}
public long getMin() {
return this.min;
}
public long getMax() {
return this.max;
}
public boolean isOver() {
return this.over;
}
}