每个 Redisson 对象实例都会有一个与之对应的 Redis 数据实例,可以通过调用 getName 方法来取得 Redis 数据实例的名称(key)。
一、通用对象桶(Object Bucket)
Redisson 的分布式 RBucket ,Java 对象是一种通用对象桶可以用来存放任类型的对象。
RBucket<Object> bucket = redisson.getBucket("test:bucket:key:001");
bucket.set(new Object());
Object obj = bucket.get();
bucket.trySet(new Object());
bucket.compareAndSet(new Object(), new OObject());
bucket.getAndSet(new Object());
还可以通过 RBuckets 接口实现批量操作多个 RBucket 对象
RBuckets buckets = redisson.getBuckets();
List<RBucket<V>> foundBuckets = buckets.find("test:bucket:key*");
Map<String, V> loadedBuckets = buckets.get("test:bucket:key:001", "test:bucket:key:002", "test:bucket:key:003");
Map<String, Object> map = new HashMap<>();
map.put("myBucket1", new MyObject());
map.put("myBucket2", new MyObject());
// 利用Redis的事务特性,同时保存所有的通用对象桶,如果任意一个通用对象桶已经存在则放弃保存其他所有数据。
buckets.trySet(map);
// 同时保存全部通用对象桶。
buckets.set(map);
单元测试
/*** 获取缓存** @param key key* @param clazz 类型* @param <T> 类型*/public <T> T get(String key, Class<T> clazz) {RBucket<T> rBucket = redissonClient().getBucket(key);return clazz.cast(rBucket.get());}
@Testpublic void test_deleteRBucket_multi() {String redisKey1 = "test:deleteRBucket:multi:1";String redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);Assertions.assertNull(redisValue1);primaryRedissonHelper.set(redisKey1, "test_deleteRBucket_single_multi_1", 30);redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);Assertions.assertNotNull(redisValue1);Assertions.assertEquals("test_deleteRBucket_single_multi_1", redisValue1);String redisKey2 = "test_deleteRBucket_multi_2";String redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);Assertions.assertNull(redisValue2);primaryRedissonHelper.set(redisKey2, "test_deleteRBucket_single_multi_2", 30);redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);Assertions.assertNotNull(redisValue2);Assertions.assertEquals("test_deleteRBucket_single_multi_2", redisValue2);primaryRedissonHelper.deleteBucket(redisKey1, redisKey2);redisValue1 = primaryRedissonHelper.get(redisKey1, String.class);Assertions.assertNull(redisValue1);redisValue2 = primaryRedissonHelper.get(redisKey2, String.class);Assertions.assertNull(redisValue2);}
二、二进制流(Binary Stream)
Redisson 的分布式 RBinaryStream
,Java 对象同时提供了 InputStream
接口和 OutputStream
接口的实现。流的最大容量受 Redis 主节点的内存大小限制。
RBinaryStream stream = redisson.getBinaryStream("test:stream:001");
byte[] content = ...
stream.set(content);
InputStream is = stream.getInputStream();
byte[] readBuffer = new byte[512];
is.read(readBuffer);
OutputStream os = stream.getOuputStream();
byte[] contentToWrite = ...
os.write(contentToWrite);
三、地理空间对象桶(Geospatial Bucket)
Redisson 的分布式RGeo,
Java 对象是一种专门用来储存与地理位置有关的对象桶。
RGeo<String> geo = redisson.getGeo("test:geo:001");
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"),new GeoEntry(15.087269, 37.502669, "Catania"));
geo.addAsync(37.618423, 55.751244, "Moscow");
Double distance = geo.dist("Palermo", "Catania", GeoUnit.METERS);
geo.hashAsync("Palermo", "Catania");
Map<String, GeoPosition> positions = geo.pos("test2", "Palermo", "test3", "Catania", "test1");
List<String> cities = geo.radius(15, 37, 200, GeoUnit.KILOMETERS);
Map<String, GeoPosition> citiesWithPositions = geo.radiusWithPosition(15, 37, 200, GeoUnit.KILOMETERS);
四、原子长整型(AtomicLong)
Redisson 的分布式整长形 RAtomicLong 对象和 Java 中的 java.util.concurrent.atomic.AtomicLong
对象类似。
RAtomicLong atomicLong = redisson.getAtomicLong("test:long:001");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get();
五、原子双精度浮点(AtomicDouble)
Redisson 还提供了分布式原子双精度浮点 RAtomicDouble,弥补了 Java 自身的不足。
RAtomicDouble atomicDouble = redisson.getAtomicDouble("test:atomicDouble:001");
atomicDouble.set(2.81);
atomicDouble.addAndGet(4.11);
atomicDouble.get();
六、话题(订阅分发)(RTopic)
Redisson 的分布式话题 (RTopic
)、反射式(Reactive)和 RxJava2 标准的接口。
RTopic topic = redisson.getTopic("test:topic:001");
topic.addListener(SomeObject.class, new MessageListener<SomeObject>() {@Overridepublic void onMessage(String channel, SomeObject message) {//...}
});
// 在其他线程或JVM节点
RTopic topic = redisson.getTopic("test:topic:001");
long clientsReceivedMessage = topic.publish(new SomeObject());
在 Redis 节点故障转移(主从切换)或断线重连以后,所有的话题监听器将自动完成话题的重新订阅。
七、整长型累加器(LongAdder)
基于 Redis 的 Redisson 分布式整长型累加器(LongAdder)采用了与 java.util.concurrent.atomic.LongAdder
类似的接口。通过利用客户端内置的 LongAdder 对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式 AtomicLong 对象快 12000 倍。完美适用于分布式统计计量场景。
RLongAdder atomicLong = redisson.getLongAdder("test:atomicLong:001");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();
当不再使用整长型累加器对象的时候应该自行手动销毁,如果 Redisson 对象被关闭(shutdown)了,则不用手动销毁。
RLongAdder atomicLong = ...
atomicLong.destroy();
八、分布式锁(RLock)
/*** 一直尝试获取锁,直到获取到为止** @param lockName 锁名称* @param leaseTime 使用时间* @param maxAttempts 最大尝试次数*/public void tryLock(String lockName, long leaseTime, int maxAttempts) {RLock lock = getLock(lockName);int attempt = 0;try {while (attempt < maxAttempts) {boolean isLocked = lock.tryLock(1, leaseTime, TimeUnit.SECONDS);if (isLocked) {log.info("tryLock, isLocked=true, lockName={}, leaseTime={}", lockName, leaseTime);return;}Thread.sleep(20);attempt++;}log.warn("tryLock, failed to acquire lock after {} attempts, lockName={}, leaseTime={}", maxAttempts, lockName, leaseTime);} catch (InterruptedException e) {Thread.currentThread().interrupt(); // 重新设置中断状态log.warn("tryLock interrupted, lockName={}, leaseTime={}", lockName, leaseTime);}}
测试
@Testpublic void test_tryLock() throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);// 用于等待所有任务完成int count = 10;final CountDownLatch latch = new CountDownLatch(count);for (int i = 0; i < count; i++) {executorService.execute(() -> {String lockName = "test:lock:002";try {// 添加最大尝试次数primaryRedissonHelper.tryLock(lockName, 10, 10000);RLock rLock = primaryRedissonHelper.getLock(lockName);Assertions.assertTrue(rLock != null && rLock.isLocked());Thread.sleep(3000L);} catch (InterruptedException e) {log.warn("test_tryLock {}", ExceptionUtils.getStackTrace(e));} finally {// 无论是否成功,都减少计数latch.countDown();primaryRedissonHelper.unLock(lockName);}});}// 等待所有任务完成latch.await();executorService.shutdown();// 等待线程池完全关闭log.info("executorService.awaitTermination {}", executorService.awaitTermination(1, TimeUnit.MINUTES));}