您的位置:首页 > 文旅 > 旅游 > Redis 入门到精通2

Redis 入门到精通2

2024/12/22 13:00:45 来源:https://blog.csdn.net/2301_80011650/article/details/141714614  浏览:    关键词:Redis 入门到精通2

目录

  • 一、发布订阅模式
  • 二、消息队列 Stream
  • 三、详细示例
  • 补充:

一、发布订阅模式

  1. 概念:

    • 发布订阅(Publish/Subscribe)模式是一种消息通信模式,在这种模式中,发送者(发布者)不是直接将消息发送给特定的接收者(订阅者),而是将消息发布到一个特定的频道(channel),而对这个频道感兴趣的订阅者会接收到消息。
  2. 特点:

    • 解耦发布者和订阅者:发布者不需要知道有哪些订阅者,订阅者也不需要知道消息是由谁发布的。
    • 异步通信:发布者和订阅者可以在不同的时间和地点进行操作,消息会在合适的时候被传递。
    • 多对多通信:多个发布者可以向同一个频道发布消息,多个订阅者可以订阅同一个频道。
  3. 命令及示例:

    • SUBSCRIBE channel [channel...]:订阅一个或多个频道。

      SUBSCRIBE news
      

      输出:等待接收消息,当有发布者向“news”频道发布消息时,会显示消息内容。

    • PUBLISH channel message:向指定频道发布消息。

      PUBLISH news "Breaking news!"
      

      输出:返回订阅者的数量,表示有多少个订阅者接收到了这条消息。

  4. 应用场景:

    • 实时通知:例如在社交网络中,当一个用户发布了新的状态,其他关注该用户的用户可以通过订阅特定的频道来接收通知。
    • 日志系统:可以将不同类型的日志发布到不同的频道,让订阅者根据需要进行处理。

二、消息队列 Stream

  1. 概念:

    • Redis Stream 是 Redis 5.0 引入的一种新的数据类型,用于实现消息队列。它提供了一种可靠的、持久化的、可扩展的消息队列解决方案。
  2. 特点:

    • 消息持久化:Stream 中的消息可以持久化到磁盘,即使 Redis 服务器重启,消息也不会丢失。
    • 消费者组:可以将多个消费者组成一个消费者组,每个消费者组可以独立地消费消息,实现负载均衡和高可用性。
    • 消息确认:消费者在处理完消息后需要进行确认,确保消息被正确处理。如果消费者在处理消息过程中出现故障,消息会重新分配给其他消费者进行处理。
  3. 命令及示例:

    • XADD stream [MAXLEN [~] count] *|ID field value [field value...]:向 Stream 中添加消息。

      XADD mystream * name "John" age 30
      

      输出:返回生成的消息 ID。

    • XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream1 [stream2...] ID [ID...]:读取 Stream 中的消息。

      XREAD BLOCK 0 STREAMS mystream $
      

      输出:显示读取到的消息内容,格式为一个数组,包含消息 ID 和字段值。

    • XGROUP CREATE stream groupname [MKSTREAM] [ID id]:创建消费者组。

      XGROUP CREATE mystream mygroup $
      

      输出:OK

    • XREADGROUP GROUP groupname consumer [COUNT count] [BLOCK milliseconds] STREAMS stream [stream...] ID [ID...]:消费者从消费者组中读取消息。

      XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
      

      输出:显示读取到的消息内容,格式为一个数组,包含消息 ID 和字段值。

    • XACK stream groupname id [id...]:消费者确认处理完消息。

      XACK mystream mygroup message_id
      

      输出:确认的消息数量。

  4. 应用场景:

    • 分布式任务队列:可以将任务作为消息添加到 Stream 中,然后由多个消费者从不同的节点上进行处理。
    • 事件驱动架构:可以将各种事件发布到 Stream 中,让订阅者根据事件进行相应的处理。

三、详细示例

一、发布订阅模式

  1. 引入依赖:
    • 如果使用 Maven,在pom.xml中添加以下依赖:
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.4.0</version>
</dependency>
  1. 发布者代码:
import redis.clients.jedis.Jedis;public class RedisPublisher {public static void main(String[] args) {// 创建一个 Jedis 实例,连接到本地 Redis 服务器,端口为 6379Jedis jedis = new Jedis("localhost", 6379);// 向名为 "news" 的频道发布消息 "Breaking news! Java example."jedis.publish("news", "Breaking news! Java example.");// 关闭 Jedis 连接jedis.close();}
}
  1. 订阅者代码:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;public class RedisSubscriber {public static void main(String[] args) {// 创建一个 Jedis 实例,连接到本地 Redis 服务器,端口为 6379Jedis jedis = new Jedis("localhost", 6379);// 创建一个 JedisPubSub 实例用于处理订阅的消息JedisPubSub pubSub = new JedisPubSub() {// 当接收到消息时,这个方法会被调用@Overridepublic void onMessage(String channel, String message) {// 打印接收到的消息,包括频道名和消息内容System.out.println("Received from channel " + channel + ": " + message);}};// 订阅名为 "news" 的频道,开始接收消息jedis.subscribe(pubSub, "news");}
}

二、消息队列 Stream

  1. 引入依赖:与发布订阅模式相同。

  2. 生产者代码:

import redis.clients.jedis.Jedis;public class RedisStreamProducer {public static void main(String[] args) {// 创建一个 Jedis 实例,连接到本地 Redis 服务器,端口为 6379Jedis jedis = new Jedis("localhost", 6379);// 定义 Stream 的名称String streamName = "mystream";// 使用 xadd 方法向 Stream 添加消息,"*"表示让 Redis 自动生成消息 IDString messageId = jedis.xadd(streamName, "*", "message", "Hello from Java Stream example.");// 打印添加的消息的 IDSystem.out.println("Added message with ID: " + messageId);// 关闭 Jedis 连接jedis.close();}
}
  1. 消费者组创建代码:
import redis.clients.jedis.Jedis;public class RedisStreamCreateGroup {public static void main(String[] args) {// 创建一个 Jedis 实例,连接到本地 Redis 服务器,端口为 6379Jedis jedis = new Jedis("localhost", 6379);// 定义 Stream 的名称String streamName = "mystream";// 定义消费者组的名称String groupName = "mygroup";// 使用 xgroupCreate 方法创建消费者组,"0"表示从 Stream 的开头开始消费,true 表示如果 Stream 不存在则创建jedis.xgroupCreate(streamName, groupName, "0", true);// 关闭 Jedis 连接jedis.close();}
}
  1. 消费者代码:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;import java.util.List;
import java.util.Map;public class RedisStreamConsumer {public static void main(String[] args) {// 创建一个 Jedis 实例,连接到本地 Redis 服务器,端口为 6379Jedis jedis = new Jedis("localhost", 6379);// 定义 Stream 的名称String streamName = "mystream";// 定义消费者组的名称String groupName = "mygroup";// 定义消费者的名称String consumerName = "consumer1";while (true) {// 使用 xreadGroup 方法从消费者组中读取消息,StreamEntryID.UNRECEIVED_ENTRY 表示从未读取过的消息开始,1 表示读取一条消息,"mystream"是要读取的 Stream 的名称List<StreamEntry> messages = jedis.xreadGroup(groupName, consumerName, StreamEntryID.UNRECEIVED_ENTRY, 1, "mystream");if (messages!= null &&!messages.isEmpty()) {// 获取第一条消息StreamEntry entry = messages.get(0);// 获取消息的字段Map<String, String> fields = entry.getFields();// 打印接收到的消息内容System.out.println("Received message: " + fields.get("message"));}}}
}

补充:

一、生产者

  1. 定义:

    • 生产者是负责生成数据或任务的实体。在不同的场景下,生产者可以是一个线程、一个进程或者一个独立的系统。
  2. 作用:

    • 生产者的主要任务是创建数据项或任务,并将其放入一个共享的数据结构或队列中,供其他实体(消费者)使用。
  3. 示例:

    • 在一个工厂生产线上,生产产品的工人可以被视为生产者。他们不断地制造产品,并将其放置在传送带上,等待后续的加工或包装环节(消费者)进行处理。

二、消费者

  1. 定义:

    • 消费者是负责处理数据或任务的实体。与生产者类似,消费者也可以是一个线程、一个进程或者一个独立的系统。
  2. 作用:

    • 消费者从共享的数据结构或队列中获取数据项或任务,并对其进行处理。处理的方式可以根据具体的应用场景而定,例如进行计算、存储、发送到其他系统等。
  3. 示例:

    • 在一个超市的仓库中,负责将货物从仓库中取出并摆放到货架上的员工可以被视为消费者。他们从仓库(共享的数据结构)中获取货物(数据项),并将其放置到货架上供顾客购买(处理数据项)。

在使用生产者和消费者模型时,通常需要考虑以下几个问题:

  1. 同步:生产者和消费者需要进行同步,以确保数据的正确生产和消费。例如,当队列已满时,生产者需要等待消费者取出数据后才能继续生产;当队列为空时,消费者需要等待生产者生产数据后才能继续消费。
  2. 互斥:在对共享数据结构进行操作时,需要确保生产者和消费者之间的互斥访问,以避免数据的不一致性。
  3. 并发:生产者和消费者可以在不同的线程或进程中同时运行,以提高系统的效率和吞吐量。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com