您的位置:首页 > 科技 > 能源 > 电子工程网账号_推广网站是什么意思_不用流量的地图导航软件_广告关键词

电子工程网账号_推广网站是什么意思_不用流量的地图导航软件_广告关键词

2024/11/18 9:43:30 来源:https://blog.csdn.net/randavy/article/details/142860280  浏览:    关键词:电子工程网账号_推广网站是什么意思_不用流量的地图导航软件_广告关键词
电子工程网账号_推广网站是什么意思_不用流量的地图导航软件_广告关键词

1. 适用场景

日常开发中,我们经常遇到这样的需求,在某个事件发生后,过一段时间做一个额外的动作,比如

  1. 拼单,如果2小时未能成单,取消拼单
  2. 下单,30分钟内未支付,取消订单
    之前的我们的做法通常是通过定时任务轮询,比如扫描创建时间是2小时之前,状态是未成功的拼单,然后做取消操作。这种方案存在的问题是:
  3. 扫描对数据库造成一定的压力
  4. 轮询的时间间隔会导致操作有一定的延迟
    延迟消息正是用来解决这类问题的银弹。

2. JDK实现

2.1 使用方式

JDK内部提供了DelayQueue队列和Delayed接口来实现延迟消息,我们先来看一个简单的Demo,我们会创建一个DelayMessage用来代表延迟消息,延迟消息需要实现Delayed接口

  1. getDealy,返回消息的延迟时间
  2. compareTo,为了让多个延迟消息排序,将时间最早的消息排到最前面
public class DelayMessage implements Delayed {private long expiredAtMs;private long delayMs;private String message;public DelayMessage(long delaySeconds, String message) {this.delayMs = delaySeconds * 1000;this.expiredAtMs = System.currentTimeMillis() + delayMs;this.message = message;}@Overridepublic long getDelay(TimeUnit unit) {long diff = expiredAtMs - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {long sTtl = getDelay(TimeUnit.MILLISECONDS);long oTtl = o.getDelay(TimeUnit.MILLISECONDS);return sTtl < oTtl ? -1 : (sTtl > oTtl ? 1 : 0);}public String getMessage() {return this.message;}
}

接着只需要创建消息队列,将延迟消息放入到队列中即可,然后创建一个线程来消费延迟队列即可

DelayQueue<DelayMessage> queue = new DelayQueue<>();
queue.put(new DelayMessage(1, "1s later"));
queue.put(new DelayMessage(60, "60s later"));
queue.put(new DelayMessage(120, "120s later"));ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(() -> {try {while (true) {DelayMessage dm = queue.take();System.out.println(currentTimeInText() + "_" + dm.getMessage());}} catch (InterruptedException e) {throw new RuntimeException(e);}
});
2.2 实现原理

从DelayQueue的源码我们可以看到,整个DelayQueue的核心就在于3个点:

  1. 数据存储,基于PriorityQueue,通过Delayed的compareTo方法排序,即基于时间顺序
  2. 数据写入,offer/put方法
  3. 数据消费,take/poll方法
1. 数据写入
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);           // PriorityQueue写入if (q.peek() == e) {  // 如果刚刚写入的消息是最高优先级的(最早被消费的),唤醒在take()方法阻塞的线程leader = null;    // Leader-Follow Parttern,减少RaceCondition, http://www.cs.wustl.edu/~schmidt/POSA/POSA2/available.signal(); // 唤醒在take()阻塞的线程}return true;} finally {lock.unlock();}
}
2. 数据消费
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();  // 队列为空,阻塞,直到offer(e)被调用else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)     // 延迟时间到了,取出item供使用return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();  // await释放锁,其他线程执行take(),如果leader != null有负责处理头部item的线程else {Thread thisThread = Thread.currentThread();  // 走到这说明头部元素暂无处理线程,将当前线程设定为处理线程leader = thisThread;try {available.awaitNanos(delay);  // 等待延迟时间后自动唤醒,重新进入循环,处理queue头部item} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}

代码很短,设计还是巧妙的,尤其是Leader-Follower模式的使用,在我们实现自己的组件时可以借鉴。

3. Redis实现

JDK实现的延迟队列已经能解决部分场景了,不过也存在两个明显的问题

  1. 队列数据没持久化,重启或进程崩溃都会导致数据丢失
  2. 不支持分布式,不能跨进程共享
3.1 消息队列

通过上面的JDK实现,我们已经能把Redis实现的延迟消息的逻辑猜的八九不离十了,假设我们用LIST存储,先通过LPUSH写入队列消息(message1、message2)

127.0.0.1:6379> LPUSH my_delay_queue message1
(integer) 1
127.0.0.1:6379> LPUSH my_delay_queue message2
(integer) 2
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
2) "message1"

通过RPOPLPUSH,从队列取出待消费的消息,并暂存到临时队列(my_delay_queue)中

127.0.0.1:6379> RPOPLPUSH my_delay_queue my_delay_queue_temp
"message1"
127.0.0.1:6379> LRANGE my_delay_queue_temp 0 -1
1) "message1"
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"

这是在程序代码中消费message1,如果消费成功,从临时队列中删除消息

127.0.0.1:6379> LREM my_delay_queue_temp 1 message1
(integer) 1

最终队列的状态是,delayQueue中只剩message2,临时队列中为空

127.0.0.1:6379> LRANGE my_delay_queue_temp 0 -1
(empty array)
127.0.0.1:6379> LRANGE my_delay_queue 0 -1
1) "message2"
3.2 延迟队列

用LIST只能实现FIFO,要想实现基于时间的优先级,需要改用ZSET来存储数据,用时间做时间戳

127.0.0.1:6379> ZADD s_delay_queue 1728625236 message0
127.0.0.1:6379> ZADD s_delay_queue 1728625256 message0
127.0.0.1:6379> ZADD s_delay_queue 1728625256 message2
127.0.0.1:6379> ZADD s_delay_queue 1728625266 message3127.0.0.1:6379> ZRANGE s_delay_queue 0 -1 WITHSCORES
1) "message0"
2) "1728625236"
3) "message1"
4) "1728625256"
5) "message2"
6) "1728625256"
7) "message3"
8) "1728625266"

通过使用ZRANGEBYSCORE获取延迟时间已经到的item

127.0.0.1:6379> ZRANGEBYSCORE s_delay_queue 0 1728625256
1) "message0"
2) "message1"
3) "message2"

ZSET并没有提供RPOPLPUSH的命令,我们使用Lua脚本来模拟这个操作,这段lua接受两个KEY,一个ARGV

  1. KEYS[1],表示ZSET的名字
  2. KEYS[2],表示LIST的名字
  3. ARGV[1],表示SCORE的范围截至时间
local elements = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])
if #elements > 0 thenfor i, element in ipairs(elements) doredis.call('LPUSH', KEYS[2], element)redis.call('ZREM', KEYS[1], element)end
end
return elements

然后是通过EVAL执行这段Lua,这里我们从ZSET(s_delay_queue)读取score <= 1728625237的item,返回并暂存到LIST(s_delay_queue_temp)中,模拟了RPOPLPUSH的操作

127.0.0.1:6379> EVAL "local elements = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1]) if #elements > 0 then for i, element in ipairs(elements) do redis.call('LPUSH', KEYS[2], element) redis.call('ZREM', KEYS[1], element) end end return elements" 2 s_delay_queue s_delay_queue_temp 1728625237
1) "message0"

剩下的逻辑基本上和[[基于Redis的延迟队列#3.1 消息队列]]一样,在程序中消费message,成功之后删除s_delay_queue_temp中的数据。我们需要做的是在程序中定时的执行这段Lua脚本,并且实现类似DelayQueue的逻辑,支持阻塞的take()操作,以及消费失败时的错误处理,显然要处理的错误细节并不少。

3.3 Redisson实现
1. 数据结构

Redisson封装了基于Redis的延迟消息实现,我们来看一个使用的Redisson延迟队列的demo

Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayBlockingQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);delayedQueue.offer("message1", 1, TimeUnit.MINUTES);
delayedQueue.offer("message2", 5, TimeUnit.MINUTES);
delayedQueue.offer("message3", 10, TimeUnit.MINUTES);
delayedQueue.offer("message4", 15, TimeUnit.MINUTES);ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(() -> {while (true) {String data = blockingQueue.poll(60, TimeUnit.SECONDS);if (data != null) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":" + data);}}
});

Redisson的实现比[[#3.2 延迟队列]]要负责一点,它内部构建了4个数据结构。通过Redis的命令查看,我们能看到3个KEY

127.0.0.1:6379> KEYS *
2) "delayBlockingQueue"
4) "redisson_delay_queue:{delayBlockingQueue}"
6) "redisson_delay_queue_timeout:{delayBlockingQueue}"
  1. delayBlockingQueue是我们创建RBlockingQueue时指定的名称,用来存储延迟时间到期,但尚未被处理的任务
  2. redisson_delay_queue_timeout:{delayBlockingQueue},类型是zset,记录延迟任务和时间
  3. redisson_delay_queue:{delayBlockingQueue},类型是list,记录任务列表,保持任务的顺序
    通过TYPE命令,我们能查看他们的数据类型
127.0.0.1:6379> TYPE redisson_delay_queue:{delayBlockingQueue}
list
127.0.0.1:6379> TYPE redisson_delay_queue_timeout:{delayBlockingQueue}
zset

此外Redission还创建了一个Channel,用来在delayQueue写入数据的时候做通知

127.0.0.1:6379> PUBSUB channels
1) "redisson_delay_queue_channel:{delayBlockingQueue}"
2. 数据写入

通过RDelayedQueue写入数据的时候,最终会调用offerAsync方法

public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay < 0) {throw new IllegalArgumentException("Delay can't be negative");}long delayInMs = timeUnit.toMillis(delay);long timeout = System.currentTimeMillis() + delayInMs;long randomId = ThreadLocalRandom.current().nextLong();return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);"                       // 写入  redisson_delay_queue_timeout:{delayBlockingQueue}+ "redis.call('rpush', KEYS[3], value);"                               // 写入  redisson_delay_queue:{delayBlockingQueue}// if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "                    // 取时间戳最小的元素+ "if v[1] == value then "+ "redis.call('publish', KEYS[4], ARGV[1]); "                       // 如果新插入的元素是zset的第一个元素,做channel通知+ "end;",Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e));
}
3. 数据消费

创建RDelayedQueue时,redisson创建了一个QueueTransferTask任务,负责从redisson_delay_queue_timeout:{delayBlockingQueue}将到期的数据迁移到delayBlockingQueue

protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);channelName = prefixName("redisson_delay_queue_channel", getName());queueName = prefixName("redisson_delay_queue", getName());timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {@Overrideprotected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " // 从redisson_delay_queue_timeout拿到期的任务+ "if #expiredValues > 0 then "+ "for i, v in ipairs(expiredValues) do "+ "local randomId, value = struct.unpack('dLc0', v);"+ "redis.call('rpush', KEYS[1], value);"      // 写入到 delayBlockingQueue+ "redis.call('lrem', KEYS[3], 1, v);"        // 从 redisson_delay_queue 删除+ "end; "+ "redis.call('zrem', KEYS[2], unpack(expiredValues));" // 从 redisson_delay_queue_timeout 删除+ "end; "// get startTime from scheduler queue head task+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "+ "if v[1] ~= nil then " // 如果最小时间戳的任务存在,返回它的时间戳+ "return v[2]; "+ "end "+ "return nil;",Arrays.<Object>asList(getName(), timeoutSetName, queueName),  // KEYS: delayBlockingQueue , redisson_delay_queue_timeout*、redisson_delay_queue*System.currentTimeMillis(), 100);}@Overrideprotected RTopic getTopic() {return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);}};queueTransferService.schedule(queueName, task);this.queueTransferService = queueTransferService;
}
4. RBlockingQueue

通过[[#3. 数据消费]]的操作,redisson已经将到期的延迟任务写入到delayBlockingQueue了,剩下要做的就是用delayBlockingQueue实现阻塞队列了,核心代码在 RedissonBlockingQueue,其实实现很简单,我们来看下代码,take()方法实际只是执行了一个redis命令BLPOP

@Override
public V take() throws InterruptedException {return commandExecutor.getInterrupted(takeAsync());
}
@Override
public RFuture<V> takeAsync() {return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com