项目中有一个需求,spring boot项目连接postgres数据库的地址,是存储在etcd当中的,在程序启动后,当etcd中的地址变化时,需要程序去连接新的postgres地址。
1. 修改Datasource定义,改为使用DynamicPGDataSource,它是一个自定义类,集成了
AbstractRoutingDataSource
@Primary@Beanpublic DataSource druidDataSource() {DruidDataSource druidDataSource = DruidDataSourceBuilder.create().build();druidDataSource.setUrl(url);druidDataSource.setUsername(username);druidDataSource.setPassword(password);druidDataSource.setDriverClassName(driverClassName);druidDataSource.setInitialSize(druidInitSize);// 初始化连接数druidDataSource.setMinIdle(druidMinIdle); // 最小连接数druidDataSource.setMaxActive(druidMaxActive);// 最大连接数druidDataSource.setPoolPreparedStatements(true);// 开启缓存preparedStatementdruidDataSource.setUseGlobalDataSourceStat(true);// 开启Druid提供的3s慢SQL监控Properties properties = new Properties();properties.put("druid.stat.mergeSql", true);properties.put("druid.stat.slowSqlMillis", 3000);druidDataSource.setConnectProperties(properties);try {druidDataSource.setFilters("stat,wall");druidDataSource.init();} catch (SQLException e) {log.error("构建数据库连接池异常,异常原因:{}", e);throw new RuntimeException(e);}DynamicPGDataSource dynamicPGDataSource = new DynamicPGDataSource();Map<Object, Object> targetDataSources = new HashMap<>();targetDataSources.put(ConnectInfo.currentPGIP, druidDataSource);dynamicPGDataSource.setTargetDataSources(targetDataSources);dynamicPGDataSource.setDefaultTargetDataSource(druidDataSource);return dynamicPGDataSource;}
2. 类定义
public class DynamicPGDataSource extends AbstractRoutingDataSource {@Overrideprotected Object determineCurrentLookupKey() {return DataSourceContext.getPGDataSource();}
}
public class DataSourceContext {private static String pgDataSource;public static void setPGDataSource(String ds) {pgDataSource = ds;}public static String getPGDataSource() {return pgDataSource;}
}
3. 定义re方法,当监听到数据源IP更改之后,去切换连接到新的数据源,并且关闭老的数据源连接。
public void refreshPGDataSource(String ip) {if(ConnectInfo.currentPGIP.equals(ip)) {log.info("currentPGIP equals ip, not operate, ip:{}", ip);}DynamicPGDataSource dynamicPGDataSource = ((DynamicPGDataSource)druidDataSource);Field field = null;try {field = AbstractRoutingDataSource.class.getDeclaredField("targetDataSources");} catch (NoSuchFieldException e) {e.printStackTrace();}field.setAccessible(true);// 获取当前的 targetDataSourcesMap<Object, Object> currentDataSources = null;try {currentDataSources = (Map<Object, Object>) field.get(dynamicPGDataSource);if(!currentDataSources.containsKey(ip)) {DruidDataSource druidDataSource = DruidDataSourceBuilder.create().build();String address = String.format("jdbc:postgresql://%s:5432/dsgdb?characterEncoding=utf-8&useSSL=false", ip);druidDataSource.setUrl(address);druidDataSource.setUsername(username);druidDataSource.setPassword(password);druidDataSource.setDriverClassName(driverClassName);druidDataSource.setInitialSize(20);// 初始化连接数druidDataSource.setMinIdle(10); // 最小连接数druidDataSource.setMaxActive(100);// 最大连接数druidDataSource.setPoolPreparedStatements(true);// 开启缓存preparedStatementdruidDataSource.setUseGlobalDataSourceStat(true);// 开启Druid提供的3s慢SQL监控Properties properties = new Properties();properties.put("druid.stat.mergeSql", true);properties.put("druid.stat.slowSqlMillis", 3000);druidDataSource.setConnectProperties(properties);druidDataSource.setFilters("stat,wall");currentDataSources.put(ip, druidDataSource);field.set(dynamicPGDataSource, currentDataSources);dynamicPGDataSource.afterPropertiesSet();}DataSourceContext.setPGDataSource(ip);ConnectInfo.currentPGIP = ip;// 关闭无用连接Iterator<Map.Entry<Object, Object>> iterator = currentDataSources.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<Object, Object> entry = iterator.next();if(!ip.equals(entry.getKey())) {DruidDataSource dataSource = (DruidDataSource)currentDataSources.get(entry.getKey());dataSource.close();iterator.remove();}}} catch (IllegalAccessException | SQLException e) {e.printStackTrace();}}