博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
J.U.C之Exchanger
阅读量:2494 次
发布时间:2019-05-11

本文共 7085 字,大约阅读时间需要 23 分钟。

J.U.CExchanger

一、简介

可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给  方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为  的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 

内存一致性效果:对于通过 Exchanger 成功交换对象的每对线程,每个线程中在 exchange() 之前的操作  从另一线程中相应的 exchange() 返回的后续操作。

二、实例

package com.mylearn.thread.tools;

import java.util.concurrent.Exchanger;

import java.util.concurrent.LinkedBlockingQueue;

/**

 * Created by IntelliJ IDEA.

 * User: yingkuohao

 * Date: 13-11-11

 * Time: 下午5:18

 * CopyRight:360buy

 * Descrption:

 * Exchanger是一个双方栅栏,各方在栅栏位置上交换数据,当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向

 * 缓冲区写入数据,另一个线程从缓冲区读取数据。这些线程可以使用Exchanger来汇合并把满的缓冲区与空的缓冲区交换。当两个

 * 线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。

 *   数据交换的时机取决于应用程序的响应需求。最简单的方案是,当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时,

 *   由清空任务进行交换。这样会把需要交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就会延迟。

 *   另一个方法是,不仅当缓冲区将被填满时交换,并且当缓冲区被填充到一定程度并保持一段时间后,也进行交换。

 * To change this template use File | Settings | File Templates.

 */

public class ExchangerTest {

    public static void main(String args[]) {

        LinkedBlockingQueue emptyQueue = new LinkedBlockingQueue(10);  //生产者队列

        LinkedBlockingQueue fullQueue = new LinkedBlockingQueue(10);    //消费者队列

        Exchanger<LinkedBlockingQueue> exchanger = new Exchanger<LinkedBlockingQueue>();

        try {

            //初始化一些值到fullQueue

            fullQueue.put("inital1");

            fullQueue.put("inital2");

            fullQueue.put("inital3");

        } catch (InterruptedException e) {

            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

        }

        Thread product = new Thread(new Product(emptyQueue, exchanger));     //生产者线程负责生产

        Thread consumer = new Thread(new Consumer(fullQueue, exchanger));     //消费者线程负责消费

        product.start();

        consumer.start();

//        当生产者队列满,消费者队列为空的时候,两者交换,生产者变为空,消费者队列变为满,相当于一个批量操作。

    }

    static class Product implements Runnable {

        LinkedBlockingQueue emptyQueue;

        Exchanger<LinkedBlockingQueue> exchanger;

        Product(LinkedBlockingQueue emptyQueue, Exchanger<LinkedBlockingQueue> exchanger) {

            this.emptyQueue = emptyQueue;

            this.exchanger = exchanger;

        }

        public void run() {

            while (emptyQueue != null) {

                try {

                    emptyQueue.put("product1");

                    System.out.println(Thread.currentThread().getName() + "成功放入一个商品");

                    if (emptyQueue.size() == 10) {

                        System.out.println(Thread.currentThread().getName() + "begin change: emptyQueue.size=" + emptyQueue.size());

                        Thread.sleep(1000);

                        emptyQueue = exchanger.exchange(emptyQueue);

                        System.out.println(Thread.currentThread().getName() + "after change: emptyQueue.size=" + emptyQueue.size());

                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                }

            }

        }

    }

    static class Consumer implements Runnable {

        LinkedBlockingQueue fullQueue;

        Exchanger<LinkedBlockingQueue> exchanger;

        Consumer(LinkedBlockingQueue fullQueue, Exchanger<LinkedBlockingQueue> exchanger) {

            this.fullQueue = fullQueue;

            this.exchanger = exchanger;

        }

        public void run() {

            while (fullQueue != null) {

                try {

                    Object object = fullQueue.take();

                    System.out.println(Thread.currentThread().getName() + "成功取到一个商品" + object);

                    if (fullQueue.isEmpty()) {

                        System.out.println(Thread.currentThread().getName() + "begin change: fullQueue.size=" + fullQueue.size());

                        Thread.sleep(1000);

                        fullQueue = exchanger.exchange(fullQueue);

                        System.out.println(Thread.currentThread().getName() + "after change: fullQueue.size=" + fullQueue.size());

                    }

                } catch (InterruptedException e) {

                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

                }

            }

        }

    }

}

结果:

Thread-0成功放入一个商品

Thread-1成功取到一个商品inital1

Thread-0成功放入一个商品

Thread-1成功取到一个商品inital2

Thread-0成功放入一个商品

Thread-1成功取到一个商品inital3

Thread-0成功放入一个商品

Thread-1begin change: fullQueue.size=0

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0begin change: emptyQueue.size=10

Thread-0after change: emptyQueue.size=0

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-1after change: fullQueue.size=10

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-0begin change: emptyQueue.size=10

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1begin change: fullQueue.size=0

Thread-1after change: fullQueue.size=10

Thread-0after change: emptyQueue.size=0

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-0成功放入一个商品

Thread-0成功放入一个商品

Thread-0begin change: emptyQueue.size=10

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1成功取到一个商品product1

Thread-1begin change: fullQueue.size=0

可见,当生产者把队列放满后,就等待消费者的队列

三、原理

参考:

在上面的模型中,我们假定一个空的栈(Stack),栈顶(Top)当然是没有元素的。同时我们假定一个数据结构Node,包含一个要交换的元素E和一个要填充的Node。这时线程T1携带节点node1进入栈(cas_push),当然这是CAS操作,这样栈顶就不为空了。线程T2携带节点node2进入栈,发现栈里面已经有元素了node1,同时发现node1holdNode)为空,于是将自己(node2)填充到node1hold中(cas_fill)。然后将元素node1从栈中弹出(cas_take)。这样线程T1就得到了node1.hold.item也就是node2的元素e2,线程T2就得到了node1.item也就是e1,从而达到了交换的目的。

算法描述就是下图展示的内容。

JDK 5就是采用类似的思想实现的ExchangerJDK 6以后为了支持多线程多对象同时Exchanger了就进行了改造(为了支持更好的并发),采用ConcurrentHashMap的思想,将Stack分割成很多的片段(或者说插槽Slot),线程IdThread.getId()hash相同的落在同一个Slot上,这样在默认32Slot上就有很好的吞吐量。当然会根据机器CPU内核的数量有一定的优化,有兴趣的可以去了解下Exchanger的源码。

四、源码

    public V exchange(V x) throws InterruptedException {

        if (!Thread.interrupted()) {

            Object v = doExchange(x == null? NULL_ITEM : x, false, 0);

            if (v == NULL_ITEM)

                return null;

            if (v != CANCEL)

                return (V)v;

            Thread.interrupted(); // Clear interrupt status on IE throw

        }

        throw new InterruptedException();

    }

   private Object doExchange(Object item, boolean timed, long nanos) {

        Node me = new Node(item);                 // Create in case occupying

        int index = hashIndex();                  // Index of current slot

        int fails = 0;                            // Number of CAS failures

        for (;;) {

            Object y;                             // Contents of current slot

            Slot slot = arena[index];

            if (slot == null)                     // Lazily initialize slots

                createSlot(index);                // Continue loop to reread

            else if ((y = slot.get()) != null &&  // Try to fulfill

                     slot.compareAndSet(y, null)) {

                Node you = (Node)y;               // Transfer item

                if (you.compareAndSet(null, item)) {

                    LockSupport.unpark(you.waiter);

                    return you.item;

                }                                 // Else cancelled; continue

            }

            else if (y == null &&                 // Try to occupy

                     slot.compareAndSet(null, me)) {

                if (index == 0)                   // Blocking wait for slot 0

                    return timed? awaitNanos(me, slot, nanos): await(me, slot);

                Object v = spinWait(me, slot);    // Spin wait for non-0

                if (v != CANCEL)

                    return v;

                me = new Node(item);              // Throw away cancelled node

                int m = max.get();

                if (m > (index >>>= 1))           // Decrease index

                    max.compareAndSet(m, m - 1);  // Maybe shrink table

            }

            else if (++fails > 1) {               // Allow 2 fails on 1st slot

                int m = max.get();

                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))

                    index = m + 1;                // Grow on 3rd failed slot

                else if (--index < 0)

                    index = m;                    // Circularly traverse

            }

        }

    }

没怎么看懂这段代码,待研究。。

你可能感兴趣的文章