目录
一、SynchronousQueue基础概念
主要特点
使用场景
示例代码
二、SynchronousQueue深入了解
1 SynchronousQueue介绍
2 SynchronousQueue核心属性
3 SynchronousQueue的TransferQueue源码
3.1 QNode源码信息
3.2 transfer方法实现
3.3 tansfer方法流程图
一、SynchronousQueue基础概念
SynchronousQueue
是Java并发包java.util.concurrent
中的一种特殊的BlockingQueue实现类。它并不像其他的队列那样拥有固定的容量大小,而是仅仅充当生产者和消费者之间的“传递”作用。当一个元素被放入队列时,必须立即有一个消费者来获取它,否则生产者的线程将会阻塞。同样地,如果试图从队列中取出一个元素,那么必须立即有一个生产者来放入一个元素,否则消费者的线程也会被阻塞。
主要特点
-
无缓冲:
SynchronousQueue
不存储元素,它仅仅作为一个传递元素的场所。 -
生产者消费者模式:
SynchronousQueue
非常适合用于实现生产者-消费者模式,其中生产者产生的元素必须立即被消费者消费掉。 -
线程阻塞:如果生产者尝试向队列中插入元素,但没有消费者来接收,则生产者的线程会被阻塞;反之亦然。
使用场景
SynchronousQueue
适用于如下几种场景:
-
需要立即处理数据的情况,不能有任何延迟。
-
不希望在队列中保留任何数据,而是希望尽快传递给下一个处理者。
-
需要在两个线程之间直接传递数据。
示例代码
下面是一个简单的使用SynchronousQueue
的例子:
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueExample {public static void main(String[] args) {SynchronousQueue<Integer> queue = new SynchronousQueue<>();// 生产者线程Thread producer = new Thread(() -> {try {System.out.println("Producer: Adding element to the queue");queue.put(42); // 生产者放入数据System.out.println("Producer: Element added");} catch (InterruptedException e) {e.printStackTrace();}});// 消费者线程Thread consumer = new Thread(() -> {try {Integer value = queue.take(); // 消费者获取数据System.out.println("Consumer: Got " + value + " from the queue");} catch (InterruptedException e) {e.printStackTrace();}});producer.start();consumer.start();try {producer.join();consumer.join();} catch (InterruptedException e) {e.printStackTrace();}}
}
在这个例子中,生产者线程尝试向队列中放入一个整数42
,而消费者线程则尝试从中取出数据。由于SynchronousQueue
的特点,生产者线程只有在消费者线程成功取出数据后才能继续执行。
总之,SynchronousQueue
是一个非常有用的工具,特别是在需要即时通信和传递数据的场景中。然而,由于其无缓冲的特性,使用时需要特别注意同步和线程安全问题。
二、SynchronousQueue深入了解
1 SynchronousQueue介绍
SynchronousQueue这个阻塞队列和其他的阻塞队列有很大的区别
在咱们的概念中,队列肯定是要存储数据的,但是SynchronousQueue不会存储数据的
SynchronousQueue队列中,他不存储数据,存储生产者或者是消费者
当存储一个生产者到SynchronousQueue队列中之后,生产者会阻塞(看你调用的方法)
生产者最终会有几种结果:
-
如果在阻塞期间有消费者来匹配,生产者就会将绑定的消息交给消费者
-
生产者得等阻塞结果,或者不允许阻塞,那么就直接失败
-
生产者在阻塞期间,如果线程中断,直接告辞。
同理,消费者和生产者的效果是一样。
生产者和消费者的数据是直接传递的,不会经过SynchronousQueue。
SynchronousQueue是不会存储数据的。
经过阻塞队列的学习:
生产者:
-
offer():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待消息,这里直接返回,告辞。
-
offer(time,unit):生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有消费者在等待消息,阻塞time时间,如果还没有,告辞。
-
put():生产者在放到SynchronousQueue的同时,如果有消费者在等待消息,直接配对。如果没有,死等。
消费者:poll(),poll(time,unit),take()。道理和上面的生产者一致。
测试效果:
public static void main(String[] args) throws InterruptedException {// 因为当前队列不存在数据,没有长度的概念。SynchronousQueue queue = new SynchronousQueue();String msg = "消息!";/*new Thread(() -> {// b = false:代表没有消费者来拿boolean b = false;try {b = queue.offer(msg,1, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(b);}).start();Thread.sleep(100);new Thread(() -> {System.out.println(queue.poll());}).start();*/new Thread(() -> {try {System.out.println(queue.poll(1, TimeUnit.SECONDS));} catch (InterruptedException e) {e.printStackTrace();}}).start();Thread.sleep(100);new Thread(() -> {queue.offer(msg);}).start();
}
2 SynchronousQueue核心属性
进到SynchronousQueue类的内部后,发现了一个内部类,Transferer,内部提供了一个transfer的方法
abstract static class Transferer<E> {abstract E transfer(E e, boolean timed, long nanos);
}
当前这个类中提供的transfer方法,就是生产者和消费者在调用读写数据时要用到的核心方法。
生产者在调用上述的transfer方法时,第一个参数e会正常传递数据
消费者在调用上述的transfer方法时,第一个参数e会传递null
SynchronousQueue针对抽象类Transferer做了几种实现。
一共看到了两种实现方式:
-
TransferStack
-
TransferQueue
这两种类继承了Transferer抽象类,在构建SynchronousQueue时,会指定使用哪种子类
// 到底采用哪种实现,需要把对应的对象存放到这个属性中
private transient volatile Transferer<E> transferer;
// 采用无参时,会调用下述方法,再次调用有参构造传入false
public SynchronousQueue() {this(false);
}
// 调用的是当前的有参构造,fair代表公平还是不公平
public SynchronousQueue(boolean fair) {// 如果是公平,采用Queue,如果是不公平,采用Stacktransferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue的特点
代码查看效果
public static void main(String[] args) throws InterruptedException {// 因为当前队列不存在数据,没有长度的概念。SynchronousQueue queue = new SynchronousQueue(true);SynchronousQueue queue = new SynchronousQueue(false);new Thread(() -> {try {queue.put("生1");} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(() -> {try {queue.put("生2");} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(() -> {try {queue.put("生3");} catch (InterruptedException e) {e.printStackTrace();}}).start();Thread.sleep(100);new Thread(() -> {System.out.println("消1:" + queue.poll());}).start();Thread.sleep(100);new Thread(() -> {System.out.println("消2:" + queue.poll());}).start();Thread.sleep(100);new Thread(() -> {System.out.println("消3:" + queue.poll());}).start();
}
3 SynchronousQueue的TransferQueue源码
为了查看清除SynchronousQueue的TransferQueue源码,需要从两点开始查看源码信息
3.1 QNode源码信息
static final class QNode {// 当前节点可以获取到next节点volatile QNode next; // item在不同情况下效果不同// 生产者:有数据// 消费者:为nullvolatile Object item; // 当前线程volatile Thread waiter; // 当前属性是区分消费者和生产者的属性final boolean isData;// 最终生产者需要将item交给消费者// 最终消费者需要获取生产者的item// 省略了大量提供的CAS操作....
}
3.2 transfer方法实现
// 当前方法是TransferQueue的核心内容
// e:传递的数据
// timed:false,代表无限阻塞,true,代表阻塞nacos时间
E transfer(E e, boolean timed, long nanos) {// 当前QNode是要封装当前生产者或者消费者的信息QNode s = null; // isData == true:代表是生产者// isData == false:代表是消费者boolean isData = (e != null);// 死循环for (;;) {// 获取尾节点和头结点QNode t = tail;QNode h = head;// 为了避免TransferQueue还没有初始化,这边做一个健壮性判断if (t == null || h == null) continue; // 如果满足h == t 条件,说明当前队列没有生产者或者消费者,为空// 如果有节点,同时当前节点和队列节点属于同一种角色。// if中的逻辑是进到队列if (h == t || t.isData == isData) { // ===================在判断并发问题==========================// 拿到尾节点的nextQNode tn = t.next;// 如果t不为尾节点,进来说明有其他线程并发修改了tailif (t != tail) // 重新走for循环 continue;// tn如果为不null,说明前面有线程并发,添加了一个节点if (tn != null) { // 直接帮助那个并发线程修改tail的指向 advanceTail(t, tn);// 重新走for循环 continue;}// 获取当前线程是否可以阻塞// 如果timed为true,并且阻塞的时间小于等于0// 不需要匹配,直接告辞!!!if (timed && nanos <= 0) return null;// 如果可以阻塞,将当前需要插入到队列的QNode构建出来if (s == null)s = new QNode(e, isData);// 基于CAS操作,将tail节点的next设置为当前线程if (!t.casNext(null, s)) // 如果进到if,说明修改失败,重新执行for循环修改 continue;// CAS操作成功,直接替换tail的指向advanceTail(t, s); // 如果进到队列中了,挂起线程,要么等生产者,要么等消费者。// x是返回替换后的数据Object x = awaitFulfill(s, e, timed, nanos);// 如果元素和节点相等,说明节点取消了if (x == s) { // 清空当前节点,将上一个节点的next指向当前节点的next,直接告辞 clean(t, s);return null;}// 判断当前节点是否还在队列中if (!s.isOffList()) { // 将当前节点设置为headadvanceHead(t, s); // 如果 x != null, 如果拿到了数据,说明我是消费者if (x != null) // 将当前节点的item设置为自己 s.item = s;// 线程置位nulls.waiter = null;}// 返回数据return (x != null) ? (E)x : e;} // 匹配队列中的橘色else { // 拿到head的next,作为要匹配的节点 QNode m = h.next; // 做并发判断,如果头节点,尾节点,或者head.next发生了变化,这边要重新走for循环if (t != tail || m == null || h != head)continue; // 没并发问题,可以拿数据// 拿到m节点的item作为x。Object x = m.item;// 如果isData == (x != null)满足,说明当前出现了并发问题,避免并发消费出现坑if (isData == (x != null) || // 如果排队的节点取消,就会讲当前QNode中的item指向QNodex == m || // 如果前面两个都没满足,可以交换数据了。 // 如果交换失败,说明有并发问题,!m.casItem(x, e)) { // 重新设置head节点,并且再走一次循环 advanceHead(h, m); continue;}// 替换headadvanceHead(h, m); // 唤醒head.next中的线程LockSupport.unpark(m.waiter);// 这边匹配好了,数据也交换了,直接返回// 如果 x != null,说明队列中是生产者,当前是消费者,这边直接返回x具体数据// 反之,队列中是消费者,当前是生产者,直接返回自己的数据return (x != null) ? (E)x : e;}}
}