Thinking in Java---线程通信+三种方式实现生产者消费

发布时间:2017-09-02 15:43:32
Thinking in Java---线程通信+三种方式实现生产者消费者问题

前面讲过线程之间的同步问题;同步问题主要是为了保证对共享资源的并发访问不会出错,主要的思想是一次只让一个线程去访问共享资源,我们是通过加锁的方法实现。但是有时候我们还需要安排几个线程的执行次序,而在系统内部线程的调度是透明的,没有办法准确的控制线程的切换。所以Java提供了一种机制来保证线程之间的协调运行,这也就是我们所说的线程调度。在下面我们会介绍三种用于线程通信的方式,并且每种方式都会使用生产者消费者问题进行检验。

一。使用Object类提供的线程通信机制
Object类提供了wait(),notify(),notifyAll()三个方法进行线程通信。这三个方法都必须要由同步监视器对象来调用,具体由分为以下两种情况:
1)对于使用synchronized修饰的同步方法,因为该类的默认实例(this)就是同步监视器,所以可以在同步方法中直接调用这三个方法。
2)对于使用synchronized修饰的同步代码块,同步监视器是synchronized后面括号里的对象,所以必须使用该对象调用这三个方法。
也就是说,这三个方法只能用于synchronized做同步的线程通信。对着三个方法的具体解释如下:
wait():导致当前线程等待,直到其它线程调用该同步监视器的notify()或notifyAll()方法来唤醒该线程。该wait()方法还可以传入一个时间参数,这时候等到指定时间后就会自动苏醒。调用wait()方法的当前线程会释放对该同步监视器的锁定。
notify():唤醒在此同步监视器上等待的单个线程。如果当前有多个线程在等待,则随机选择一个。注意只有当前线程放弃对该同步监视器的锁定以后(使用了wait()方法),才可以执行被唤醒的线程。
notifyAll():唤醒在此同步监视器上等待的所有线程。同样只要在当前线程放弃对同步监视器的锁定之后,才可以执行被唤醒的线程。

使用这种通信机制模拟的生产者消费者问题如下:

package lkl1; ///生产者消费者中对应的缓冲区 //生产者可以向缓冲区中加入数据,消费者可以消耗掉缓冲区中的数据 //注意到缓冲区是限定了大小的,所以使用循环队列的思想进行模拟 public class Buffer { //根据循环队列的思想,如果out==in,则表示当前缓冲区为空,不可以进行消费 //如果(in+1)%n==out,则表示当前缓冲区为满,不可以进行生产(这样会浪费一个空间) private int n; ///缓冲区大小 private int num; //当前元素个数 //定义一个大小为n的缓冲区 private int buffer[]; //表示当前可以放置数据的位置,初始为0 private int in=0; //表示当前可以读取数据的位置,初始为0 private int out=0; Buffer(int n){ this.n=n; buffer=new int[n]; num=0; } //下面是生产和消费的方法 //生产操作,向缓冲区中加入一个元素x public synchronized void product(int x){ try{ if((in+1)%n==out){ wait(); //如果缓冲区已满,则阻塞当前线程 } else{ buffer[in]=x; in=(in+1)%n; System.out.println(Thread.currentThread().getName()+生产一个元素: +x); num++; System.out.println(当前元素个数为: +num); notifyAll(); //唤醒等待当前同步资源监视器的线程 } } catch(InterruptedException ex){ ex.printStackTrace(); } } ///消费操作,一次取出一个元素 public synchronized void comsumer(){ try{ if(in==out){ //如果缓冲区为空,阻塞当前线程 wait(); } else{ int xx=buffer[out]; out=(out+1)%n; num--; System.out.println(Thread.currentThread().getName()+消费了一个元素: +xx); System.out.println(当前元素个数为: +num); notifyAll(); } } catch(InterruptedException ex){ ex.printStackTrace(); } } } package lkl1; import java.util.Random; //生产者线程 //会不断的往缓冲区中加入元素 public class Product extends Thread{ //当前线程操作的缓冲区对象 private Buffer buffer; private Random rand; Product(){} Product(Buffer buffer){ this.buffer=buffer; rand=new Random(); } public void run(){ while(true){ //向缓冲区中添加一个随机数 buffer.product(rand.nextInt(100)); } } } package lkl1; //生产者线程 public class Consumer extends Thread{ private Buffer buffer; Consumer(){} Consumer(Buffer buffer){ this.buffer=buffer; } public void run(){ while(true){ //每次都消耗掉缓冲区中的一个元素 buffer.comsumer(); } } } package lkl1; //测试 public class BufferTest { public static void main(String[] args){ Buffer buffer = new Buffer(10); //一个生产者,多个消费者 new Product(buffer).start(); new Consumer(buffer).start(); new Consumer(buffer).start(); } }

二。使用Condition控制线程通信
前面我们讲同步方式的时候,除了synchronized关键字,还讲了可以使用Lock进行显示的加锁。在使用Lock对象时,是不存在隐式的同步监视器的,所以也就不能使用上面的线程通信方式了。其实在使用Lock对象来保证同步时,Java提供了一个Condition类来保持协调,使用Condition类可以让那些已经得到Lock对象却无法继续执行的线程释放Lock对象。
Condition提供了三个方法:await(),signal(),signallAll();这三个方法和Object对象的三个方法的基本用法是一样的。其实我们可以这样认为,Lock对象对应了我们上面讲的同步方法或同步代码块,而Condition对象对应了我们上面讲的同步监视器。还要注意的是,Condition实例被绑定在一个Lock对象上,要获得指定Lock的Condition实例,需要调用Lock对象的newCondtion()方法即可。
下面使用Lock和Condition的组合来实现生产者消费者问题。可以看到代码基本和上面是一样的。

package lkl1; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; ///生产者消费者中的缓冲区 //由一个数组代表,生产者可以向缓冲区中加入元素,消费者可以从缓冲区中取走元素 //如果缓冲区满,则生产者不能向缓冲区加入元素;如果缓冲区空,则消费者不能消费元素 //下面的程序中in表示生产者可以加入数据的位置,out表示消费者可以消费数据的位置 //in和out都会初始化为0,我们定义in==out表示缓冲区为空;(in+1)%n==out //表示缓冲区满,但是这种判满的方式是要浪费一个空间的。 //上个例子中使用了synchronized关键字保证对缓冲区的操作的同步。 //现在需要采用Lock和Condition类进行同步的控制. public class Buffer1 { private final Lock lock=new ReentrantLock(); private final Condition con=lock.newCondition(); private int n; private int buffer1[]; private int in; private int out; private int cnt; ///记录当前缓冲区中元素个数 Buffer1(){} Buffer1(int n){ this.n=n; buffer1=new int[n]; in=out=cnt=0; } //生产方法,加入元素x public void product(int x){ lock.lock(); //加锁 try{ if((in+1)%n==out){ //如果缓冲区满,则阻塞当前线程 con.await(); //con.signalAll(); } else{ buffer1[in]=x; in=(in+1)%n; cnt++; System.out.println(Thread.currentThread().getName()+向缓冲区中加入元素:+x); System.out.println(当前缓冲区中的元素个数为: +cnt); con.signalAll(); //唤醒其它线程 } } catch(InterruptedException ex){ ex.printStackTrace(); } finally{ ///使用finally语句保证锁能正确释放 lock.unlock(); } } //消费方法,取走缓冲区中的一个元素 public int consumer(){ int x=0; lock.lock(); try{ if(in==out){ //如果缓冲区空,则阻塞当前线程 con.await(); } else{ x=buffer1[out]; System.out.println(Thread.currentThread().getName()+消费元素: +x); out=(out+1)%n; cnt--; System.out.println(当前元素个数为: +cnt); con.signalAll(); //唤醒其它线程 } } catch(InterruptedException ex){ ex.printStackTrace(); } finally{ lock.unlock(); } return x; } } package lkl1; import java.util.Random; //消费者线程 public class Consumer1 extends Thread{ private Random rand=new Random(); private Buffer1 buffer1; //对应的缓冲区 Consumer1(Buffer1 buffer1){ this.buffer1=buffer1; } public void run(){ while(true){ buffer1.consumer(); try{ ///在消费者线程中加一个sleep语句,可以更好的体现线程之间的切换 sleep(50); } catch(Exception x){ x.printStackTrace(); } } } } package lkl1; import java.util.Random; //生产者线程 public class Product1 extends Thread{ private Random rand = new Random(); private Buffer1 buffer1; Product1(Buffer1 buffer1){ this.buffer1=buffer1; } public void run(){ while(true){ int x; x=rand.nextInt(100); buffer1.product(x); } } } package lkl1; ///Buffer1测试 //启动一个生产者线程,两个消费者线程 public class Buffer1Test { public static void main(String[] args) throws Exception{ Buffer1 buffer1 = new Buffer1(10); new Product1(buffer1).start(); new Consumer1(buffer1).start(); new Consumer1(buffer1).start(); } }

企业建站2800元起,携手武汉肥猫科技,做一个有见地的颜值派!更多优惠请戳:襄阳网站建设公司 http://https://www.jingchucn.com/zt/xiangyang_wangzhanjianshe/