生产者消费者模式使用场景:线程之间需要共享数据,并且有可能线程之间生产和消费数据的速度不同或者性能有差异。
本例使用JDK并发包的BlockingQueue作为共享缓冲区
package concurrent;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;public class Producer implements Runnable { private BlockingQueuequeue; private static AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { // TODO Auto-generated method stub while (true) { Thing thing = new Thing(count.incrementAndGet()); try { Thread.sleep(1000);// 模拟处理前期准备 if (!queue.offer(thing, 100, TimeUnit.MILLISECONDS)) { System.out.println("Fail to put " + thing.toString()); } else { System.out.println("Put" + thing.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } } }}
package concurrent;import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable { private BlockingQueuequeue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { while (true) { try { Thing t = queue.take(); if (t != null) { System.out.println("Get " + t.toString()); Thread.sleep(100);// 模拟处理得到的数据 } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }}
package concurrent;public class Thing {// 被生产和消费的东西 private final int id; public Thing(int id) { this.id = id; } @Override public String toString() { return "Thing " + id; }}
package concurrent;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class Main { public static void main(String[] args) { // TODO Auto-generated method stub BlockingQueuequeue = new LinkedBlockingQueue (20); Producer p1 = new Producer(queue); Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(p1); executor.submit(p2); executor.submit(p3); executor.submit(c1); executor.submit(c2); executor.submit(c3); }}