1、接口
/*** 持久化缓存接口* @author binbin.hou* @since 0.0.7* @param <K> key* @param <V> value*/
public interface ICachePersist<K, V> {/*** 持久化缓存信息* @param cache 缓存* @since 0.0.7*/void persist(final ICache<K, V> cache);}
接口和 rdb 的保持一致
2、注解定义
/*** 缓存拦截器* @author binbin.hou* @since 0.0.5*/
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {/*** 操作是否需要 append to file,默认为 false* 主要针对 cache 内容有变更的操作,不包括查询操作。* 包括删除,添加,过期等操作。* @return 是否* @since 0.0.10*/boolean aof() default false;}
我们在原来的 @CacheInterceptor
注解中添加 aof 属性,用于指定是否对操作开启 aof 模式。
3、AOF 持久化拦截实现
/*** AOF 持久化明细* @author binbin.hou* @since 0.0.10*/
public class PersistAofEntry {/*** 参数信息* @since 0.0.10*/private Object[] params;/*** 方法名称* @since 0.0.10*/private String methodName;//getter & setter &toString
}
这里我们只需要方法名,和参数对象。
暂时实现的简单一些即可。
4、持久化拦截器
public class CacheInterceptorAof<K,V> implements ICacheInterceptor<K, V> {private static final Log log = LogFactory.getLog(CacheInterceptorAof.class);@Overridepublic void before(ICacheInterceptorContext<K,V> context) {}@Overridepublic void after(ICacheInterceptorContext<K,V> context) {// 持久化类ICache<K,V> cache = context.cache();ICachePersist<K,V> persist = cache.persist();if(persist instanceof CachePersistAof) {CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist;String methodName = context.method().getName();PersistAofEntry aofEntry = PersistAofEntry.newInstance();aofEntry.setMethodName(methodName);aofEntry.setParams(context.params());String json = JSON.toJSONString(aofEntry);// 直接持久化log.debug("AOF 开始追加文件内容:{}", json);cachePersistAof.append(json);log.debug("AOF 完成追加文件内容:{}", json);}}}
我们定义拦截器,当 cache 中定义的持久化类为 CachePersistAof
时,将操作的信息放入到 CachePersistAof 的 buffer 列表中。
5、拦截器调用
//3. AOF 追加
final ICachePersist cachePersist = cache.persist();
if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) {if(before) {persistInterceptors.before(interceptorContext);} else {persistInterceptors.after(interceptorContext);}
}
当 AOF 的注解属性为 true 时,调用上述拦截器即可。
这里为了避免浪费,只有当持久化类为 AOF 模式时,才进行调用。
6、持久化类实现
/*** 缓存持久化-AOF 持久化模式* @author binbin.hou* @since 0.0.10*/
public class CachePersistAof<K,V> extends CachePersistAdaptor<K,V> {private static final Log log = LogFactory.getLog(CachePersistAof.class);/*** 缓存列表* @since 0.0.10*/private final List<String> bufferList = new ArrayList<>();/*** 数据持久化路径* @since 0.0.10*/private final String dbPath;public CachePersistAof(String dbPath) {this.dbPath = dbPath;}/*** 持久化* key长度 key+value* 第一个空格,获取 key 的长度,然后截取* @param cache 缓存*/@Overridepublic void persist(ICache<K, V> cache) {log.info("开始 AOF 持久化到文件");// 1. 创建文件if(!FileUtil.exists(dbPath)) {FileUtil.createFile(dbPath);}// 2. 持久化追加到文件中FileUtil.append(dbPath, bufferList);// 3. 清空 buffer 列表bufferList.clear();log.info("完成 AOF 持久化到文件");}@Overridepublic long delay() {return 1;}@Overridepublic long period() {return 1;}@Overridepublic TimeUnit timeUnit() {return TimeUnit.SECONDS;}/*** 添加文件内容到 buffer 列表中* @param json json 信息* @since 0.0.10*/public void append(final String json) {if(StringUtil.isNotEmpty(json)) {bufferList.add(json);}}}
实现一个 Buffer 列表,用于每次拦截器直接顺序添加。
持久化的实现也比较简单,追加到文件之后,直接清空 buffer 列表即可。
7、持久化测试
ICache<String, String> cache = CacheBs.<String,String>newInstance().persist(CachePersists.<String, String>aof("1.aof")).build();
cache.put("1", "1");
cache.expire("1", 10);
cache.remove("2");
TimeUnit.SECONDS.sleep(1);[DEBUG] [2020-10-02 12:20:41.979] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName":"put","params":["1","1"]}
[DEBUG] [2020-10-02 12:20:41.980] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName":"put","params":["1","1"]}
[DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName":"expireAt","params":["1",1601612441990]}
[DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName":"expireAt","params":["1",1601612441990]}
[DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 开始追加文件内容:{"methodName":"remove","params":["2"]}
[DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件内容:{"methodName":"remove","params":["2"]}
[DEBUG] [2020-10-02 12:20:42.088] [pool-1-thread-1] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: expire
[INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 开始持久化缓存信息
[INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 开始 AOF 持久化到文件
[INFO] [2020-10-02 12:20:42.798] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 完成 AOF 持久化到文件
[INFO] [2020-10-02 12:20:42.799] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 完成持久化缓存信息
8、AOF 加载实现
@Override
public void load(ICache<K, V> cache) {List<String> lines = FileUtil.readAllLines(dbPath);log.info("[load] 开始处理 path: {}", dbPath);if(CollectionUtil.isEmpty(lines)) {log.info("[load] path: {} 文件内容为空,直接返回", dbPath);return;}for(String line : lines) {if(StringUtil.isEmpty(line)) {continue;}// 执行// 简单的类型还行,复杂的这种反序列化会失败PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class);final String methodName = entry.getMethodName();final Object[] objects = entry.getParams();final Method method = METHOD_MAP.get(methodName);// 反射调用ReflectMethodUtil.invoke(cache, method, objects);}
}/*** 方法缓存** 暂时比较简单,直接通过方法判断即可,不必引入参数类型增加复杂度。* @since 0.0.10*/
private static final Map<String, Method> METHOD_MAP = new HashMap<>();
static {Method[] methods = Cache.class.getMethods();for(Method method : methods){CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class);if(cacheInterceptor != null) {// 暂时if(cacheInterceptor.aof()) {String methodName = method.getName();METHOD_MAP.put(methodName, method);}}}
}
9、测试
ICache<String, String> cache = CacheBs.<String,String>newInstance().load(CacheLoads.<String, String>aof("default.aof")).build();Assert.assertEquals(1, cache.size());
System.out.println(cache.keySet());