本文共 7085 字,大约阅读时间需要 23 分钟。
可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
内存一致性效果:对于通过 Exchanger 成功交换对象的每对线程,每个线程中在 exchange() 之前的操作 从另一线程中相应的 exchange() 返回的后续操作。
package com.mylearn.thread.tools; import java.util.concurrent.Exchanger; import java.util.concurrent.LinkedBlockingQueue; /** * Created by IntelliJ IDEA. * User: yingkuohao * Date: 13-11-11 * Time: 下午5:18 * CopyRight:360buy * Descrption: * Exchanger是一个双方栅栏,各方在栅栏位置上交换数据,当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向 * 缓冲区写入数据,另一个线程从缓冲区读取数据。这些线程可以使用Exchanger来汇合并把满的缓冲区与空的缓冲区交换。当两个 * 线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。 * 数据交换的时机取决于应用程序的响应需求。最简单的方案是,当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时, * 由清空任务进行交换。这样会把需要交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就会延迟。 * 另一个方法是,不仅当缓冲区将被填满时交换,并且当缓冲区被填充到一定程度并保持一段时间后,也进行交换。 * To change this template use File | Settings | File Templates. */ public class ExchangerTest { public static void main(String args[]) { LinkedBlockingQueue emptyQueue = new LinkedBlockingQueue(10); //生产者队列 LinkedBlockingQueue fullQueue = new LinkedBlockingQueue(10); //消费者队列 Exchanger<LinkedBlockingQueue> exchanger = new Exchanger<LinkedBlockingQueue>(); try { //初始化一些值到fullQueue fullQueue.put("inital1"); fullQueue.put("inital2"); fullQueue.put("inital3"); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } Thread product = new Thread(new Product(emptyQueue, exchanger)); //生产者线程负责生产 Thread consumer = new Thread(new Consumer(fullQueue, exchanger)); //消费者线程负责消费 product.start(); consumer.start(); // 当生产者队列满,消费者队列为空的时候,两者交换,生产者变为空,消费者队列变为满,相当于一个批量操作。 } static class Product implements Runnable { LinkedBlockingQueue emptyQueue; Exchanger<LinkedBlockingQueue> exchanger; Product(LinkedBlockingQueue emptyQueue, Exchanger<LinkedBlockingQueue> exchanger) { this.emptyQueue = emptyQueue; this.exchanger = exchanger; } public void run() { while (emptyQueue != null) { try { emptyQueue.put("product1"); System.out.println(Thread.currentThread().getName() + "成功放入一个商品"); if (emptyQueue.size() == 10) { System.out.println(Thread.currentThread().getName() + "begin change: emptyQueue.size=" + emptyQueue.size()); Thread.sleep(1000); emptyQueue = exchanger.exchange(emptyQueue); System.out.println(Thread.currentThread().getName() + "after change: emptyQueue.size=" + emptyQueue.size()); } } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } } static class Consumer implements Runnable { LinkedBlockingQueue fullQueue; Exchanger<LinkedBlockingQueue> exchanger; Consumer(LinkedBlockingQueue fullQueue, Exchanger<LinkedBlockingQueue> exchanger) { this.fullQueue = fullQueue; this.exchanger = exchanger; } public void run() { while (fullQueue != null) { try { Object object = fullQueue.take(); System.out.println(Thread.currentThread().getName() + "成功取到一个商品" + object); if (fullQueue.isEmpty()) { System.out.println(Thread.currentThread().getName() + "begin change: fullQueue.size=" + fullQueue.size()); Thread.sleep(1000); fullQueue = exchanger.exchange(fullQueue); System.out.println(Thread.currentThread().getName() + "after change: fullQueue.size=" + fullQueue.size()); } } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } } } |
结果:
Thread-0成功放入一个商品 Thread-1成功取到一个商品inital1 Thread-0成功放入一个商品 Thread-1成功取到一个商品inital2 Thread-0成功放入一个商品 Thread-1成功取到一个商品inital3 Thread-0成功放入一个商品 Thread-1begin change: fullQueue.size=0 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0begin change: emptyQueue.size=10 Thread-0after change: emptyQueue.size=0 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-1after change: fullQueue.size=10 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-0begin change: emptyQueue.size=10 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1begin change: fullQueue.size=0 Thread-1after change: fullQueue.size=10 Thread-0after change: emptyQueue.size=0 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-0成功放入一个商品 Thread-0成功放入一个商品 Thread-0begin change: emptyQueue.size=10 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1成功取到一个商品product1 Thread-1begin change: fullQueue.size=0 |
可见,当生产者把队列放满后,就等待消费者的队列
参考:
在上面的模型中,我们假定一个空的栈(Stack),栈顶(Top)当然是没有元素的。同时我们假定一个数据结构Node,包含一个要交换的元素E和一个要填充的“洞”Node。这时线程T1携带节点node1进入栈(cas_push),当然这是CAS操作,这样栈顶就不为空了。线程T2携带节点node2进入栈,发现栈里面已经有元素了node1,同时发现node1的hold(Node)为空,于是将自己(node2)填充到node1的hold中(cas_fill)。然后将元素node1从栈中弹出(cas_take)。这样线程T1就得到了node1.hold.item也就是node2的元素e2,线程T2就得到了node1.item也就是e1,从而达到了交换的目的。
算法描述就是下图展示的内容。
JDK 5就是采用类似的思想实现的Exchanger。JDK 6以后为了支持多线程多对象同时Exchanger了就进行了改造(为了支持更好的并发),采用ConcurrentHashMap的思想,将Stack分割成很多的片段(或者说插槽Slot),线程Id(Thread.getId())hash相同的落在同一个Slot上,这样在默认32个Slot上就有很好的吞吐量。当然会根据机器CPU内核的数量有一定的优化,有兴趣的可以去了解下Exchanger的源码。
public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) { Object v = doExchange(x == null? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); } |
private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // Create in case occupying int index = hashIndex(); // Index of current slot int fails = 0; // Number of CAS failures for (;;) { Object y; // Contents of current slot Slot slot = arena[index]; if (slot == null) // Lazily initialize slots createSlot(index); // Continue loop to reread else if ((y = slot.get()) != null && // Try to fulfill slot.compareAndSet(y, null)) { Node you = (Node)y; // Transfer item if (you.compareAndSet(null, item)) { LockSupport.unpark(you.waiter); return you.item; } // Else cancelled; continue } else if (y == null && // Try to occupy slot.compareAndSet(null, me)) { if (index == 0) // Blocking wait for slot 0 return timed? awaitNanos(me, slot, nanos): await(me, slot); Object v = spinWait(me, slot); // Spin wait for non-0 if (v != CANCEL) return v; me = new Node(item); // Throw away cancelled node int m = max.get(); if (m > (index >>>= 1)) // Decrease index max.compareAndSet(m, m - 1); // Maybe shrink table } else if (++fails > 1) { // Allow 2 fails on 1st slot int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // Grow on 3rd failed slot else if (--index < 0) index = m; // Circularly traverse } } } |
没怎么看懂这段代码,待研究。。