一、异常场景
如下表所示,redis集群6个节点分别部署在3台机器上,每台机器分布一个主节点和一个从节点
机器 | redis主节点端口 | redis从节点端口 |
主机1 | 7001 | 7004 |
主机2 | 7003 | 7006 |
主机3 | 7005 | 7002 |
将主机1关闭,redis集群发生主从切换,从节点7006升级为主节点,redis集群状态为ok
但是客户端程序报如下错误:Redis command timed out
二、原因分析
SpringBoot2.X版本开始Redis默认的连接池都是采用的Lettuce。当节点发生改变后,Letture默认是不会刷新节点拓扑的
三、解决方案
方案一:
使用letture方式连接redis,需要设置开启刷新节点拓扑
方案二:
改用jedis方式连接redis,使用jedis客户端的服务可以在主从切换后15秒恢复
方案1和方案2实现方式见
RedisCluster集群模式下master宕机主从切换期间Lettuce连接Redis无法使用报错Redis command timed out的问题
以上两种方案在主从切换期间都有 短暂的时间 是程序不可用redis连接的,可能会导致程序业务数据丢失。方案三可解决此问题
方案三:
捕获程序中redis操作方法的连接异常,有异常就重新初始化连接工厂,直到连接可用
1、Redis连接工厂配置类
package com.chenly.conf;
import cn.hutool.core.util.StrUtil;
import com.chenly.utils.TimeUtils;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.MapPropertySource;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* redis 配置类
* @author chenly
* @date 2022/11/29 11:34
* @version v1.0
* @see {@link org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration}
*/
@Configuration
@Order(value = 1)
@Primary
public class RedisConfig {
//最大活跃数
@Value("${spring.redis.jedis.pool.max-active:8}")
private int maxActive;
//最大等待时间
@Value("${spring.redis.jedis.pool.max-wait:-1ms}")
private String maxWait;
//最大核心线程数
@Value("${spring.redis.jedis.pool.max-idle:8}")
private int maxIdle;
//最小核心线程数
@Value("${spring.redis.jedis.pool.min-idle:0}")
private int minIdle;
//redis连接的超时时长
@Value("${spring.redis.timeout:5}")
private String timeOut;
//redis连接的库
@Value("${spring.redis.database:0}")
private int database;
//节点配置
@Value("${spring.redis.cluster.nodes:#{null}}")
private String nodes;
//最大连接转移数
@Value("${spring.redis.cluster.max-redirects:3}")
private int maxRedirects;
//单节点情况下redis的ip
@Value("${spring.redis.host:#{null}}")
private String host;
//单节点情况下redis的端口
@Value("${spring.redis.port:#{null}}")
private Integer port;
//redis的连接密码
@Value("${spring.redis.password:#{null}}")
private String password;
public GenericObjectPoolConfig<?> genericObjectPoolConfig() {
GenericObjectPoolConfig<?> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(TimeUtils.parseUnitTimeToMillis(maxWait));
return config;
}
/**
* 连接配置
* @return
*/
public LettuceConnectionFactory connectionFactory() {
Map<String, Object> source = new HashMap<String, Object>();
RedisClusterConfiguration redisClusterConfiguration;
RedisStandaloneConfiguration redisStandaloneConfiguration;
//开启 自适应集群拓扑刷新和周期拓扑刷新
ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
// 开启全部自适应刷新
.enableAllAdaptiveRefreshTriggers() // 开启自适应刷新,自适应刷新不开启,Redis集群变更时将会导致连接异常
// 自适应刷新超时时间(默认30秒)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30)) //默认关闭开启后时间为30秒
// 开周期刷新
.enablePeriodicRefresh(Duration.ofSeconds(30)) // 默认关闭开启后时间为60秒 ClusterTopologyRefreshOptions.DEFAULT_REFRESH_PERIOD 60 .enablePeriodicRefresh(Duration.ofSeconds(2)) = .enablePeriodicRefresh().refreshPeriod(Duration.ofSeconds(2))
.build();
// https://github.com/lettuce-io/lettuce-core/wiki/Client-Options
ClientOptions clientOptions = ClusterClientOptions.builder()
//.topologyRefreshOptions(clusterTopologyRefreshOptions)
//redis命令超时时间,超时后才会使用新的拓扑信息重新建立连接
//.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))
.build();
// LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
// .clientOptions(clientOptions)
// .build();
LettuceClientConfiguration clientConfig= LettucePoolingClientConfiguration.builder()
.poolConfig(genericObjectPoolConfig())
.clientOptions(clientOptions)
.commandTimeout(Duration.ofMillis(TimeUtils.parseUnitTimeToMillis(timeOut))) //默认RedisURI.DEFAULT_TIMEOUT 60
.build();
//集群模式
if(nodes !=null){
source.put("spring.redis.cluster.nodes", nodes);
source.put("spring.redis.cluster.max-redirects", maxRedirects);
redisClusterConfiguration = new RedisClusterConfiguration(new MapPropertySource("RedisClusterConfiguration", source));
if(!StrUtil.isEmpty(password)){
redisClusterConfiguration.setPassword(password);
}
//根据配置和客户端配置创建连接工厂
LettuceConnectionFactory lettuceConnectionFactory = new
LettuceConnectionFactory(redisClusterConfiguration,clientConfig);
return lettuceConnectionFactory;
}else{
//单机模式
redisStandaloneConfiguration = new RedisStandaloneConfiguration(host,port);
redisStandaloneConfiguration.setDatabase(database);
if(!StrUtil.isEmpty(password)){
redisStandaloneConfiguration.setPassword(password);
}
//根据配置和客户端配置创建连接
LettuceConnectionFactory lettuceConnectionFactory = new
LettuceConnectionFactory(redisStandaloneConfiguration,clientConfig);
// lettuceConnectionFactory .afterPropertiesSet();
return lettuceConnectionFactory;
}
}
}
2、函数式接口
package com.chenly.utils;
/**
* @author: chenly
* @date: 2022-11-29 16:55
* @description:
* @version: 1.0
*/
@FunctionalInterface
public interface MyFunctionalInterface {
//定义一个抽象方法
public abstract Object method();
}
3、redis操作类
package com.montnets.emp.utils;
import com.montnets.emp.conf.RedisConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @初创作者: chenly
* @创建时间: 2022/11/29 15:51
*/
@Component
@Slf4j
public class RedisUtil {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedisConfig redisConfig;
//重试次数
@Value(value = "${ums.redis.retry.times:10}")
private int retryCount = 10;
//重试休眠时间,单位:毫秒
private int retrySleepTime = 3000;
/**
* @author :chenly
* 更新redisTemplate :处理集群宕机恢复后程序不恢复问题
*/
private synchronized StringRedisTemplate refreshRedisTemplate() {
LettuceConnectionFactory connectionFactory = redisConfig.connectionFactory();
connectionFactory.afterPropertiesSet();
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
/**
* redis操作
* 使用函数式接口,
* @author :chenly
* @param myInter 函数式接口
* @param logContent 日志描述
* @return
*/
public Object redisOperation(MyFunctionalInterface myInter,String logContent){
try{
return myInter.method();
}catch (Exception e){
log.warn(logContent,e.getMessage());
return redisRetry(myInter,logContent);
}
}
/**
* 异常重试
* 使用函数式接口
* @author :chenly
* @param myInter 函数式接口
* @param logContent 日志描述
* @return
*/
public Object redisRetry(MyFunctionalInterface myInter, String logContent){
Object object = null;
for(int i=0;i <=retryCount;i++){
synchronized (this) {
try {
object = myInter.method();
log.info("第{}次重试: "+logContent,(i+1),"成功");
return object;
} catch (Exception e) {
if (i >= retryCount) {
log.error("第{}次重试: "+logContent, (i+1),"异常:"+e.getMessage());
throw e;
}
log.warn("第{}次重试: "+logContent,(i+1), "异常:"+e.getMessage());
//更新redisTemplate
redisTemplate = refreshRedisTemplate();
}
}
try {
TimeUnit.MILLISECONDS.sleep(retrySleepTime);
} catch (InterruptedException e1) {
log.warn("休眠异常", e1.getMessage());
}
}
return null;
}
/**
* 删除key
* 使用函数式接口
* @param key
*/
public void delete(String key) {
String logContent = "redis操作: key:" + key + ", delete执行结果:{}";
redisOperation(() -> {
redisTemplate.delete(key);
return null;
}, logContent);
}
/**
* 设置指定 key 的值
* @param key
* @param value
*/
public void set(String key, String value) {
String logContent = "【redis】set操作: key:"+key+", value:"+value+",执行结果:{}";
redisOperation(()->{
redisTemplate.opsForValue().set(key, value);
return null;
},logContent);
}
/**
* 获取指定 key 的值
* @param key
* @return
*/
public String get(String key) {
String logContent = "【redis】get操作: key:"+key+",执行结果:{}";
return (String) redisOperation(()->{
return redisTemplate.opsForValue().get(key);
},logContent);
}
}
4、测试结果:
redis集群主从切换期间,程序有redis操作,会异常重试2~3次,即可恢复正常使用
原文连接:https://www.cnblogs.com/kiko2014551511/p/17044494.html
- 本文固定链接: http://www.jiagou.cc/602/
- 转载请注明: 摘星怪 于 架构迷 发表