介绍
安装
从官方网址下载稳定版5.0.6本并解压
yum install gcc
cd /usr/local/redis/redis-5.0.6
make MALLOC=libc
cd src && make install
# 可以直接启动
./redis-server
后台启动&开机启动
# 后台启动
cd /usr/local/redis/redis-5.0.6
vim redis.conf
---------------------------------------
daemonize yes
protected-mode no
# bind 127.0.0.1 #注释掉,否则只能本机访问
---------------------------------------
./src/redis-server /usr/local/redis/redis-5.0.6/redis.conf
ps -aux|grep redis
# 设置开机启动
mkdir /etc/redis
cp redis.conf /etc/redis/6379.conf
cp /usr/local/redis/redis-5.0.6/utils/redis_init_script /etc/init.d/redisd
cd /etc/init.d/
chkconfig redisd on
service redisd start
service redisd stop
单线程
redis单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。
redis内部实现采用epoll,采用了epoll+自己实现的简单的事件框架。epoll中的读、写、关闭、连接都转化成了事件,然后利用epoll的多路复用特性,绝不在io上浪费一点时间 这3个条件不是相互独立的,特别是第一条,如果请求都是耗时的,采用单线程吞吐量及性能可想而知了。应该说redis为特殊的场景选择了合适的技术方案。
数据类型
-
string
-
list
-
hash
-
set
-
zset有序集合,底层实现之一为跳跃表(集合中元素较大,或者元素较多的情况下使用),***redis zset底层实现原理
压缩列表(ziplist)
跳跃表(skiplist)
常用命令
# 阻塞删除
del
# 对于大的key,使用unlink来异步删除
# 可以使用unlink来替代del,小key跟del的效率一样,大key效率更高
unlink
# 生产使用`keys *`命令会导致服务器卡顿
keys
# 使用scan替代keys命令,它不会阻塞线程,并且提供分页查询的功能
# 注意集群模式下JedisCluster仅支持hash-tag模式的scan操作,如果没有hash-tag则只能遍历clusterNode列表每个机器进行scan
scan 0 match test* count 1000
//测试redis集群中使用scan进行key模糊查询
@Bean
public JedisCluster getJedisCluster() {
String[] serverArray = redisConfig.getNodes().split(",");
Set<HostAndPort> nodes = new HashSet<HostAndPort>();
try {
for (String ipPort : serverArray) {
String[] ipPortPair = ipPort.split(":");
nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}
} catch (Exception e) {
}
return new JedisCluster(nodes, redisConfig.getTimeout(), redisConfig.getSoTimeout(),
redisConfig.getMaxAttempts(), redisConfig.getPassword(), new GenericObjectPoolConfig());
}
@Autowired
public JedisCluster jedisCluster;
public void scanTest() {
ScanParams scanParams = new ScanParams().match("testkey*").count(1000);
Set<String> allKeys = new HashSet<>();
for (JedisPool pool : jedisCluster.getClusterNodes().values()) {
String cur = ScanParams.SCAN_POINTER_START;
do {
try (Jedis jedis = pool.getResource()) {
ScanResult<String> scanResult = jedis.scan(cur, scanParams);
allKeys.addAll(scanResult.getResult());
cur = scanResult.getStringCursor();
}
if (allKeys.size() >= 1000) {
break;
}
} while (!cur.equals(ScanParams.SCAN_POINTER_START));
if (allKeys.size() >= 1000) {
break;
}
}
allKeys.stream().forEach(System.out::println);
}
RESP协议
Redis是以行来划分,每行以\r\n行结束。每一行都有一个消息头,消息头共分为5种分别如下:
-
(
+
) 表示一个正确的状态信息,具体信息是当前行+后面的字符。 -
(
-
) 表示一个错误信息,具体信息是当前行-后面的字符。 -
(
*
) 表示消息体总共有多少行,不包括当前行,*后面是具体的行数。 -
(
$
) 表示下一行数据长度,不包括换行符长度\r\n,$后面则是对应的长度的数据。 -
(
:
) 表示返回一个数值,:后面是相应的数字节符。
# 可以查看保存在AOF文件中的命令内容
cat appendonly.aof
gossip协议
Redis Cluster中计算故障转移超时时间是server.cluster_node_timeout*2,因此如果有节点下线,就能够收到大部分集群节点发送来的下线报告
过期策略
Redis采用的过期策略
惰性删除+定期删除
- 惰性删除流程
- 在进行get或setnx等操作时,先检查key是否过期,
- 若过期,删除key,然后执行相应操作;
- 若没过期,直接执行相应操作
- 定期删除流程(简单而言,对指定个数个库的每一个库随机删除小于等于指定个数个过期key)
- 遍历每个数据库(就是redis.conf中配置的”database”数量,默认为16)
- 检查当前库中的指定个数个key(默认是每个库检查20个key,注意相当于该循环执行20次,循环体时下边的描述)
- 如果当前库中没有一个key设置了过期时间,直接执行下一个库的遍历
- 随机获取一个设置了过期时间的key,检查该key是否过期,如果过期,删除key
- 判断定期删除操作是否已经达到指定时长,若已经达到,直接退出定期删除。
- 检查当前库中的指定个数个key(默认是每个库检查20个key,注意相当于该循环执行20次,循环体时下边的描述)
- 遍历每个数据库(就是redis.conf中配置的”database”数量,默认为16)
注意:
- 对于定期删除,在程序中有一个全局变量current_db来记录下一个将要遍历的库,假设有16个库,我们这一次定期删除遍历了10个,那此时的current_db就是11,下一次定期删除就从第11个库开始遍历,假设current_db等于15了,那么之后遍历就再从0号库开始(此时current_db==0)
RDB对过期key的处理
过期key对RDB没有任何影响
- 从内存数据库持久化数据到RDB文件
- 持久化key之前,会检查是否过期,过期的key不进入RDB文件
- 从RDB文件恢复数据到内存数据库
- 数据载入数据库之前,会对key先进行过期检查,如果过期,不导入数据库(主库情况)
AOF对过期key的处理
过期key对AOF没有任何影响
- 从内存数据库持久化数据到AOF文件:
- 当key过期后,还没有被删除,此时进行执行持久化操作(该key是不会进入aof文件的,因为没有发生修改命令)
- 当key过期后,在发生删除操作时,程序会向aof文件追加一条del命令(在将来的以aof文件恢复数据的时候该过期的键就会被删掉)
- AOF重写
- 重写时,会先判断key是否过期,已过期的key不会重写到aof文件
数据迁移
离线迁移
通过RDB或者aof文件
- RDB,快,目的库必须开启才能用
步骤:
源库 执行 BGSAVE
- copy 源库的.rdb文件至目标库
- 重启目标库
- AOF,慢,源库和目的库必须都开启才能用
步骤: 1.将源库的aof文件拷贝到目标库的数据目录中 (两种方法) redis-cli -p 6380 –pipe < ./appendonly.aof
- 重启
建议采用RDB
脚本迁移
单库–>单库 脚本,性能差,不建议使用
#coding=utf-8
import redis
redis_from = redis.StrictRedis(host='127.0.0.1', port=6379, db=10)
redis_to = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
if __name__ == '__main__':
cnt = 0
for k in redis_from.keys():
data_type = redis_from.type(k)
if data_type == 'string':
v = redis_from.get(k)
redis_to.set(k, v)
elif data_type == 'list':
values = redis_from.lrange(k, 0, -1)
redis_to.lpush(k, values)
elif data_type == 'set':
values = redis_from.smembers(k)
redis_to.sadd(k, values)
elif data_type == 'hash':
keys = redis_from.hkeys(k)
for key in keys:
value = redis_from.hget(k, key)
redis_to.hset(k, key, value)
else:
print 'not known type'
cnt = cnt + 1
print 'total', cnt
开源工具
redis-shake
主从同步迁移
步骤:
- redis-cli 进入客户端
- 连接主,
slaveof ip port
- info查看是否复制成功
- 断开主,
slavof no one
分布式一致性算法RAFT
Sentinel哨兵模式集群leader选举使用的是raft协议。
Redis Sentinel的选举Leader原理及源码解析
一致性hash算法
持久化
RDB
# 进入redis客户端
redis-cli
# 查看RDB文件名称,在redis.conf中可以配置
config get dbfilename
# 查看工作目录,RDB文件和AOF文件会保存在这个目录下面
config get dir
# 查看最近一次持久化的信息
info persistence
在一定条件下会触发生成快照RDB文件:
-
客户端执行命令save和bgsave会生成快照;
# save命令会造成所有客户端请求都阻塞 save info persistence # bgsave(background save)会folk一个子线程来生成RDB,在创建子线程的过程中也是阻塞的,子线程生成新RDB文件之后替换老的文件 bgsave info persistence
-
根据配置文件(redis.conf)中save m n规则进行自动快照; 在指定的m秒内,redis中有n个键发生改变,则自动触发bgsave,可以配置多个,有一个规则命中则触发;
# RDB相关配置 save m n #配置快照(rdb)促发规则,格式:save <seconds> <changes> #save 900 1 900秒内至少有1个key被改变则做一次快照 #save 300 10 300秒内至少有300个key被改变则做一次快照 #save 60 10000 60秒内至少有10000个key被改变则做一次快照 #关闭该规则使用svae “” dbfilename dump.rdb #rdb持久化存储数据库文件名,默认为dump.rdb stop-write-on-bgsave-error yes #yes代表当使用bgsave命令持久化出错时候停止写RDB快照文件,no表明忽略错误继续写文件。 rdbchecksum yes #在写入文件和读取文件时是否开启rdb文件检查,检查是否有无损坏,如果在启动是检查发现损坏,则停止启动。 dir "/etc/redis" #数据文件存放目录,rdb快照文件和aof文件都会存放至该目录,请确保有写权限 rdbcompression yes #是否开启RDB文件压缩,该功能可以节约磁盘空间
-
主从复制时,从库全量复制同步主库数据,此时主库会执行bgsave命令进行快照;
-
客户端执行数据库清空命令FLUSHALL时候,触发快照;
-
客户端执行shutdown关闭redis时,触发快照;
AOF
# 查看是否开启AOF
config get appendonly
# 开启AOF
config set appendonly yes
# 将配置同步到配置文件
config rewrite
-
第一阶段,以RESP协议的方式追加写命令到缓冲区aof_buf,缓冲区可以减少磁盘I/O次数
-
第二阶段,将aof_buf缓冲区的命令写入到文件
# 根据这个配置来决定写磁盘方式,no:系统大约30s自动写磁盘,不好控制;always:每个写操作都写一次磁盘;everysec:默认方案,每秒写一次磁盘 config get appendfsync
-
文件重写
redis 随着命令不断的写入数据,aof 文件会越来越大。为了解决这个问题,redis 引入了 aof 文件重写机制,以便压缩 aof 体积 ,更小的 aof 文件可以更快的被 redis 加载。为什么为变小?
- 重复、无效的命令不写入文件
- 过期的数据不写入文件
- 多条命令合并写入,例如
RPUSH list1 a RPUSH list1 b
合并为RPUSH list1 a b
注意:重写aof时不是拿旧的aof文件来重写,而是基于redis现有的数据生成一份新的aof文件
重写的过程如下:
重写触发条件:
* 手动触发,执行bgrewriteaof
命令
* 自动触发
```bash
# 超过配置的最小文件大小时才可以重写
config get auto-aof-rewrite-min-size
# 超过上一次重写后的文件大小的一定百分比之后才可以重写
config get auto-aof-rewrite-percentage
```
# AOF相关配置
auto-aof-rewrite-min-size 64mb
#AOF文件最小重写大小,只有当AOF文件大小大于该值时候才可能重写,4.0默认配置64mb。
auto-aof-rewrite-percentage 100
#并且当前AOF文件大小和最后一次重写后的大小之间的比率等于或者等于指定的增长百分比,如100代表当前AOF文件是上次重写的两倍时候才重写。
appendfsync everysec
#no:不使用fsync方法同步,而是交给操作系统write函数去执行同步操作,在linux操作系统中大约每30秒刷一次缓冲。这种情况下,缓冲区数据同步不可控,并且在大量的写操作下,aof_buf缓冲区会堆积会越来越严重,一旦redis出现故障,数据就会丢失
#always:表示每次有写操作都调用fsync方法强制内核将数据写入到aof文件。这种情况下由于每次写命令都写到了文件中, 虽然数据比较安全,但是因为每次写操作都会同步到AOF文件中,所以在性能上会有影响,同时由于频繁的IO操作,硬盘的使用寿命会降低。
#everysec:数据将使用调用操作系统write写入文件,并使用fsync每秒一次从内核刷新到磁盘。 这是折中的方案,兼顾性能和数据安全,所以redis默认推荐使用该配置。
aof-load-truncated yes
#当redis突然运行崩溃时,会出现aof文件被截断的情况,Redis可以在发生这种情况时退出并加载错误,以下选项控制此行为。
#如果aof-load-truncated设置为yes,则加载截断的AOF文件,Redis服务器启动发出日志以通知用户该事件。
#如果该选项设置为no,则服务将中止并显示错误并停止启动。当该选项设置为no时,用户需要在重启之前使用“redis-check-aof”使用程序修复AOF文件再进行启动。
appendonly no
#yes开启AOF,no关闭AOF
appendfilename appendonly.aof
#指定AOF文件名,4.0无法通过config set 设置,只能通过修改配置文件设置。
dir /etc/redis
#RDB文件和AOF文件存放目录
混合持久化
# 查看是否开启混合持久化,redis5默认开启
config get aof-use-rdb-preamble
# 开启混合持久化
config set aof-use-rdb-preamble yes
# 配置信息同步到配置文件redis.conf
config rewrite
# 通过bgrewriteaof命令也可以手动执行持久化操作
bgrewriteaof
三种持久化方式的优缺点
RDB
优点:
- RDB 是一个非常紧凑(compact)的文件,体积小,因此在传输速度上比较快,因此适合灾难恢复。
- RDB 可以最大化 Redis 的性能:父进程在保存 RDB 文件时唯一要做的就是
fork
出一个子进程,然后这个子进程就会处理接下来的所有保存工作,父进程无须执行任何磁盘 I/O 操作。 - RDB 在恢复大数据集时的速度比 AOF 的恢复速度要快。
缺点:
- RDB是一个快照过程,无法完整的保存所有数据,尤其在数据量比较大时候,一旦出现故障丢失的数据将更多。
- 当redis中数据集比较大时候,RDB由于RDB方式需要对数据进行完成拷贝并生成快照文件,fork的子进程会耗CPU,并且数据越大,RDB快照生成会越耗时。
- RDB文件是特定的格式,阅读性差,由于格式固定,可能存在不兼容情况。
AOF
优点:
- 数据更完整,秒级数据丢失(取决于设置fsync策略)。
- 兼容性较高,由于是基于redis通讯协议而形成的命令追加方式,无论何种版本的redis都兼容,再者aof文件是明文的,可阅读性较好。
缺点:
- 数据文件体积较大,即使有重写机制,但是在相同的数据集情况下,AOF文件通常比RDB文件大。
- 相对RDB方式,AOF速度慢于RDB,并且在数据量大时候,恢复速度AOF速度也是慢于RDB。
- 由于频繁地将命令同步到文件中,AOF持久化对性能的影响相对RDB较大,但是对于我们来说是可以接受的。
混合持久化
优点:
- 混合持久化结合了RDB持久化 和 AOF 持久化的优点, 由于绝大部分都是RDB格式,加载速度快,同时结合AOF,增量的数据以AOF方式保存了,数据更少的丢失。
缺点:
- 兼容性差,一旦开启了混合持久化,在4.0之前版本都不识别该aof文件,同时由于前部分是RDB格式,阅读性较差
集群架构模式
单机
主从复制(master-slave)
slave分摊master的读压力
优点:
master/slave 角色
master/slave 数据相同
降低 master 读压力在转交从库
问题:
无法保证高可用
没有解决 master 写的压力
哨兵模式(sentinel)
监控(Monitoring): Sentinel 会不断地检查你的主服务器和从服务器是否运作正常。
提醒(Notification): 当被监控的某个 Redis 服务器出现问题时, Sentinel 可以通过 API 向管理员或者其他应用程序发送通知。
自动故障迁移(Automatic failover): 当一个主服务器不能正常工作时, Sentinel 会开始一次自动故障迁移操作。
优点:
保证高可用
监控各个节点
自动故障迁移
缺点:
- 主从模式,切换需要时间丢数据
- 没有解决 master 写的压力
# 开启哨兵的两种方式,哨兵是一种特殊的redis实例
redis-sentinel /path/to/sentinel.conf
redis-server /path/to/sentinel.conf --sentinel
sentinel monitor <master-name> <ip> <port> <quorum>
# 这个配置表达的是 哨兵节点定期监控 名字叫做 <master-name> 并且 IP 为 <ip> 端口号为 <port> 的主节点。<quorum> 表示的是哨兵判断主节点是否发生故障的票数。也就是说如果我们将<quorum>设置为2就代表至少要有两个哨兵认为主节点故障了,才算这个主节点是客观下线的了,一般是设置为sentinel节点数的一半加一。
sentinel down-after-milliseconds <master-name> <times>
# 每个哨兵节点会定期发送ping命令来判断Redis节点和其余的哨兵节点是否是可达的,如果超过了配置的<times>时间没有收到pong回复,就主观判断节点是不可达的,<times>的单位为毫秒。
sentinel parallel-syncs <master-name> <nums>
# 当哨兵节点都认为主节点故障时,哨兵投票选出的leader会进行故障转移,选出新的主节点,原来的从节点们会向新的主节点发起复制,这个配置就是控制在故障转移之后,每次可以向新的主节点发起复制的节点的个数,最多为<nums>个,因为如果不加控制会对主节点的网络和磁盘IO资源很大的开销。
sentinel failover-timeout <master-name> <times>
# 这个代表哨兵进行故障转移时如果超过了配置的<times>时间就表示故障转移超时失败。
sentinel auth-pass <master-name> <password>
# 如果主节点设置了密码,则需要这个配置,否则哨兵无法对主节点进行监控。
集群
twemproxy集群
Twemproxy,是推特开源的,它最大的缺点就是无法平滑的扩缩容,而Codis解决了Twemproxy扩缩容的问题,而且兼容了Twemproxy,它是由豌豆荚开源的,和Twemproxy都是代理模式。
特点:
多种 hash 算法:MD5、CRC16、CRC32、CRC32a、hsieh、murmur、Jenkins
支持失败节点自动删除
后端 Sharding 分片逻辑对业务透明,业务方的读写方式和操作单个 Redis 一致
缺点:
- 增加了新的 proxy,需要维护其高可用
- failover 逻辑需要自己实现,其本身不能支持故障的自动转移可扩展性差,进行扩缩容都需要手动干预
codis集群(推荐)
兼容twemproxy
redis cluster集群(推荐)
基于官方的 redis cluster 的方案,官方的教程:Redis集群教程。
优点:
1、无中心架构(不存在哪个节点影响性能瓶颈),少了 proxy 层。
2、数据按照 slot 存储分布在多个节点,节点间数据共享,可动态调整数据分布。
3、可扩展性,可线性扩展到 1000 个节点,节点可动态添加或删除。
4、高可用性,部分节点不可用时,集群仍可用。通过增加 Slave 做备份数据副本
5、实现故障自动 failover,节点之间通过 gossip 协议交换状态信息,用投票机制完成 Slave到 Master 的角色提升。
缺点:
1、资源隔离性较差,容易出现相互影响的情况。
2、数据通过异步复制,不保证数据的强一致性
redis cluster默认是不支持slave节点读或者写的,跟我们手动基于replication搭建的主从架构不一样的,如果想通过扩展slave节点来增加吞吐量,那可以直接增加master节点就可以了,master节点本来就是可以任意扩展的
使用场景
分布式锁
利用setnx、set函数加锁
核心思想就是使用redis的setnx
(set if not exist)命令,多线程同时执行setnx
命令时,只有一个线程可以获取返回值1。
-
问题1,如果一个线程加锁成功了但是没有解锁,其它线程就永远无法加锁了,所以需要给这个锁加上一个超时失效时间,也就是再执行
expire
命令。但是这样一来setnx
,expire
两个命令就无法保证操作的原子性了,setnx
有可能成功了,但是expire
却失败了,同样会有死锁的问题。- 为了解决这个加锁并设置超时时间的原子性问题,我们使用
set
命令并且给它传递NX
、PX
参数的方式。
- 为了解决这个加锁并设置超时时间的原子性问题,我们使用
-
问题2,如果加锁之后业务逻辑执行时间太长,导致锁超时失效了,第二个线程获取到锁也进入了,这个时候怎么办?
-
解决方法1,精简业务逻辑只保留必须的部分,减少超时几率,或者延长超时时间
-
解决方法2,在加锁成功后启动另外一个线程,当前requestId对应的锁快到期时检查redis是否已解锁,如果没有,说明业务逻辑还没有执行完成,则发送一段lua脚本给redis延长锁的超时时间
-- 伪代码 if redis.call("get",KEYS[1]) == ARGV[1] then redis.call("set",KEYS[1],ex=3000)
-
-
问题3,如果想实现可重入锁怎么做?
- 加锁失败时获取锁的值,如果值=当前线程上一次设置的值则进入业务代码块
if setnx == 0 if get(key) == my_request_id //重入 else //不可重入 else
示例分布式锁代码:
//jedis实现的redis分布式锁
public class RedisLock {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 默认锁过期时间
*/
public static final int DEFAULT_EXPIRE_MILLIS = 10 * 1000;
/**
* 默认获取锁的超时时间
*/
public static final int DEFAULT_ACQUIRE_TIMEOUT_MILLIS = 5 * 1000;
/**
* 默认申请锁的重试间隔时间
*/
public static final int DEFAULT_ACQUIRE_RETRY_MILLIS = 100;
/**
* 尝试获取分布式锁,获取锁失败则不等待,直接返回false
*
* @param lockKey 锁
* @param requestId 请求标识,可传递AppName
* @return 是否获取成功
*/
public static boolean lockWithoutWaiting(String lockKey, String requestId) {
return lockWithoutWaiting(lockKey, requestId, DEFAULT_EXPIRE_MILLIS);
}
/**
* 尝试获取分布式锁,获取锁失败则不等待,直接返回false
*
* @param lockKey 锁
* @param requestId 请求标识,可传递AppName
* @param expireMilliSeconds 过期时间
* @return 是否获取成功
*/
public static boolean lockWithoutWaiting(String lockKey, String requestId, int expireMilliSeconds) {
if (expireMilliSeconds <= 0) {
expireMilliSeconds = DEFAULT_EXPIRE_MILLIS;
}
Jedis jedis = RedisPoolManager.getInstance().getJedis();
try {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireMilliSeconds);
return LOCK_SUCCESS.equals(result);
}catch (Exception e){
LoggerManager.getInstance().error("加锁失败, 异常message:"+e.getMessage(),e);
}finally {
RedisPoolManager.getInstance().releaseJedis(jedis);
}
return false;
}
/**
* 获取锁,如果锁被占用则阻塞等待解锁,默认5s等待超时,如果获取到锁则锁过期时间默认10s
* @param lockKey 锁
* @param requestId 请求标识,可传递AppName
* @return 等待超时会返回false
*/
public static boolean lock(String lockKey, String requestId) {
return lock(lockKey, requestId, DEFAULT_ACQUIRE_TIMEOUT_MILLIS, DEFAULT_EXPIRE_MILLIS);
}
/**
* 获取锁,如果锁被占用则阻塞等待解锁
* @param lockKey 锁
* @param requestId 请求标识,可传递AppName
* @param acquireTimeoutMilliSeconds 获取锁超时时间, 默认5s
* @param expireMilliSeconds 锁过期时间, 默认10s
* @return
*/
public static boolean lock(String lockKey, String requestId, int acquireTimeoutMilliSeconds, int expireMilliSeconds) {
boolean locked = false;
if (acquireTimeoutMilliSeconds <= 0) {
acquireTimeoutMilliSeconds = DEFAULT_ACQUIRE_TIMEOUT_MILLIS;
}
if (expireMilliSeconds <= 0) {
expireMilliSeconds = DEFAULT_EXPIRE_MILLIS;
}
try {
while (acquireTimeoutMilliSeconds >= 0) {
if (lockWithoutWaiting(lockKey, requestId, expireMilliSeconds)) {
locked = true;
break;
}
acquireTimeoutMilliSeconds -= DEFAULT_ACQUIRE_RETRY_MILLIS;
if (acquireTimeoutMilliSeconds > 0) {
Thread.sleep(DEFAULT_ACQUIRE_RETRY_MILLIS);
}
}
} catch (Exception e) {
LoggerManager.getInstance().error("获取分布式锁失败, 异常message:"+e.getMessage(), e);
}
return locked;
}
/**
* 释放分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识,可传递AppName
* @return 是否释放成功
*/
public static boolean unLock(String lockKey, String requestId) {
Jedis jedis = RedisPoolManager.getInstance().getJedis();
try {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}catch (Exception e){
LoggerManager.getInstance().error("解锁失败, 异常message:"+e.getMessage(),e);
}finally {
RedisPoolManager.getInstance().releaseJedis(jedis);
}
return false;
}
}
public class RedisPoolManager {
private JedisPool pool;
private volatile static RedisPoolManager instance = new RedisPoolManager();
private void initedRedisPoolManager() {
try {
JedisPoolConfig config = new JedisPoolConfig();
//最大连接数
config.setMaxTotal(RedisConfigManager.MAX_TOTAL);
//闲置最大连接数
config.setMaxIdle(RedisConfigManager.MAX_IDLE);
//到达最大连接数后,调用者阻塞时间
config.setMaxWaitMillis(RedisConfigManager.MAX_WAIT_MILLIS);
pool = new JedisPool(config, RedisConfigManager.HOST, RedisConfigManager.PORT, RedisConfigManager.TIMEOUT, RedisConfigManager.PWD, RedisConfigManager.DATABASE);
} catch (Exception ex) {
LoggerManager.getInstance().error("Redis连接池创建失败",ex);
}
}
private RedisPoolManager() {
initedRedisPoolManager();
}
public static RedisPoolManager getInstance() {
if (instance == null) {
synchronized (RedisPoolManager.class) {
if (instance == null) {
instance = new RedisPoolManager();
}
}
}
return instance;
}
public Jedis getJedis() {
return pool.getResource();
}
public void releaseJedisPool() {
if (pool != null && !pool.isClosed()) {
pool.close();
}
}
public void releaseJedis(Jedis jedis) {
if (jedis != null) {
jedis.close();
}
}
public void releaseJedis(Jedis jedis, Exception e) {
if (jedis != null) {
jedis.close();
}
if (e != null) {
e.printStackTrace();
}
}
}
Redission组件加锁(hset)
http://www.imooc.com/article/284859
消息队列
Redis中列表List类型是按照插入顺序排序的字符串链表,和数据结构中的普通链表一样,可以在头部left和尾部right添加新的元素。插入时如果键不存在Redis将为该键创建一个新的链表。如果链表中所有元素均被删除,那么该键也会被删除。Redis的列表List可以包含的最大元素数量为4294967295,从元素插入和删除的效率来看,在链表的两头插入或删除元素非常高效。
生产-消费
lpush/rpush, lpop/rpop
# 从队列头部/尾部推送多个值
lpush mylist a b c d
rpush mylist2 a b c d
# 从队列头部/尾部取出所有的值
lrange mylist 0 -1
lrange mylist2 0 -1
# 从队列头部/尾部弹出一个值
lpop mylist
rpop mylist2
blpop/brpop
# 阻塞监听多个队列
brpop mylist1 mylist2 0
发布-订阅
1)PUBLISH channel msg 将信息 message 发送到指定的频道 channel
2)SUBSCRIBE channel [channel …] 订阅频道,可以同时订阅多个频道
# 向channel:1发送消息hi
publish channel:1 hi
# 订阅channel:1
subscribe channel:1
3)UNSUBSCRIBE [channel …] 取消订阅指定的频道, 如果不指定频道,则会取消订阅所有频道
4)PSUBSCRIBE pattern [pattern …]
按照模式通配符(pattern)订阅频道,通配符中?
表示1个占位符,*
表示任意个占位符(包括0),?*
表示1个以上占位符。使用psubscribe命令可以重复订阅同一个频道,如客户端执行了psubscribe c? c?*。这时向c1发布消息客户端会接受到两条消息。
# 订阅以下三种类型的channel
psubscribe b* c? d?*
# 测试发布以下消息之后订阅者收到的消息
publish b m1
publish b1 m1
publish b11 m1
publish c m1
publish c1 m1
publish c11 m1
publish d m1
publish d1 m1
publish d11 m1
5)PUNSUBSCRIBE [pattern [pattern …]] 退订指定的规则, 如果没有参数则会退订所有规则,使用punsubscribe只能退订通过psubscribe命令订阅的规则,不会影响直接通过subscribe命令订阅的频道;同样unsubscribe命令也不会影响通过psubscribe命令订阅的规则。另外需要注意punsubscribe命令退订某个规则时不会将其中的通配符展开,而是进行严格的字符串匹配,所以punsubscribe * 无法退订c规则,而是必须使用punsubscribe c才可以退订。
6)PUBSUB subcommand [argument [argument …]] 查看订阅与发布系统状态
redis vs rabbitmq
- 3.1 可靠消费
Redis: 没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理;
BRPOPLPUSH LIST1 ANOTHER_LIST TIMEOUT
RabbitMQ: 具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费.
- 3.2 可靠发布
Reids: 不提供,需自行实现;
RabbitMQ: 具有发布确认功能,保证消息被发布到服务器.
- 3.3 高可用
Redis: 采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案.
RabbitMQ: 集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作.
- 3.4 持久化
Redis: 将整个Redis实例持久化到磁盘;
RabbitMQ: 队列.消息,都可以选择是否持久化.
- 3.5 消费者负载均衡
Redis: 不提供,需自行实现;
RabbitMQ: 根据消费者情况,进行消息的均衡分发.
- 3.6 队列监控
Redis: 不提供,需自行实现;
RabbitMQ: 后台可以监控某个队列的所有信息,(内存,磁盘,消费者,生产者,速率等).
- 3.7 流量控制
Redis: 不提供,需自行实现;
RabbitMQ: 服务器过载的情况,对生产者速率会进行限制,保证服务可靠性.
-
3.8 出入队性能
-
3.9 应用场景分析
Redis: 轻量级,高并发,延迟敏感 即时数据分析、秒杀计数器、缓存等;
RabbitMQ: 重量级,高并发,异步 批量数据异步处理、并行任务串行化,高负载任务的负载均衡等
缓存穿透、缓存雪崩
- 缓存穿透
一般的缓存系统,都是按照key去缓存查询,如果不存在对应的value,就应该去后端系统查找(比如DB)。一些恶意的请求会故意查询不存在的key,请求量很大,就会对后端系统造成很大的压力。这就叫做缓存穿透。
如何避免?
1:对查询结果为空的情况也进行缓存,缓存时间设置短一点,或者该key对应的数据insert了之后清理缓存。
2:对一定不存在的key进行过滤。可以把所有的可能存在的key放到一个大的Bitmap中,查询时通过该bitmap过滤,例如布隆过滤器。
- 缓存雪崩
当缓存服务器重启或者大量缓存集中在某一个时间段失效,这样在失效的时候,会给后端系统带来很大压力。导致系统崩溃。
如何避免?
1:在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
2:做二级缓存,A1为原始缓存,A2为拷贝缓存,A1失效时,可以访问A2,A1缓存失效时间设置为短期,A2设置为长期
3:不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。
Redis常见性能问题和解决方案
-
Master最好不要做任何持久化工作,如RDB内存快照和AOF日志文件;(Master写内存快照,save命令调度rdbSave函数,会阻塞主线程的工作,当快照比较大时对性能影响是非常大的,会间断性暂停服务,所以Master最好不要写内存快照;AOF文件过大会影响Master重启的恢复速度)
-
如果数据比较重要,某个Slave开启AOF备份数据,策略设置为每秒同步一次
-
为了主从复制的速度和连接的稳定性,Master和Slave最好在同一个局域网内
-
尽量避免在压力很大的主库上增加从库
-
主从复制不要用图状结构,用单向链表结构更为稳定,即:Master <- Slave1 <- Slave2 <- Slave3…;这样的结构方便解决单点故障问题,实现Slave对Master的替换。如果Master挂了,可以立刻启用Slave1做Master,其他不变。
-
同时开启RDB、AOF,重启redis的时候使用哪种方式来恢复数据?只会加载AOF,数据完整性高
QPS
在命令窗口输入info stats
,查看如下信息:
instantaneous_ops_per_sec:100
这个值是redis每100ms采样一次,采样16次,也就是1.6s得到的操作次数
缓存命中率
在命令窗口输入info
,查看如下两个值:
keyspace_hits:37184284
keyspace_misses:2342519
缓存命中率 = keyspace_hits / (keyspace_hits + keyspace_misses)
影响缓存命中率的因素
1. 业务场景
缓存适合“读多写少”的业务场景,反之,使用缓存的意义不大,命中率会很低。缓存时间越长,命中率会越高。时效性要求越低,就越适合缓存。
- 更新策略
缓存的粒度越小,命中率会越高。举个实际的例子说明:
当缓存单个对象的时候(例如:单个用户信息),只有当该对象对应的数据发生变化时,我们才需要更新缓存或者让移除缓存。而当缓存一个集合的时候(例如:所有用户数据),其中任何一个对象对应的数据发生变化时,都需要更新或移除缓存。
- 清理策略
对于持续运行的服务器来说, 服务器需要定期对自身的资源和状态进行必要的检查和整理,有三种不同的删除策略,定时删除会短时间内占用大量cpu,惰性删除会在一段时间内浪费内存,所以定期删除是一个折中的办法。redis使用的是惰性删除+定期删除的组合方式。
(1)定时清理。在设置键的过期时间时,创建一个回调事件,当过期时间达到时,由时间处理器自动执行键的删除操作。
(2)惰性清理。键过期了就过期了,不管。当读/写一个已经过期的key时,会触发惰性删除策略,直接删除掉这个过期key
(3)定期清理。每隔一段时间,对expires字典进行检查,删除里面的过期键。
- 缓存容量
缓存的容量有限,则容易引起缓存失效和被淘汰。
- 缓存故障
缓存节点故障,也会引起缓存失效,业内比较典型的做法就是通过一致性Hash算法,或者通过节点冗余的方式。
参考
https://www.cnblogs.com/jasontec/p/9699242.html
***Redis详解(十三)—— Redis布隆过滤器,利用redis及guava实现布隆过滤器