Java高并发编程案例 您所在的位置:网站首页 java多线程应用场景例子 Java高并发编程案例

Java高并发编程案例

2024-07-15 14:55| 来源: 网络整理| 查看: 265

文章目录 synchronized关键字对象加锁修饰方法锁定静态方法同步和非同步方法同时调用脏读可重入锁异常释放锁同步监视器变化 volatile线程之间的可见性对比synchronizedAtomXXX淘宝面试题 Reentrantlock生产者--消费者使用wait+notify使用Lock+Condition ThreadLocal同步容器ConcurrentMapCopyOnWriteArrayListConcurrentLinkedQueueBlockingQueueLinkedBlockingQueueArrayBlockingQueue DelayQueue 线程池ExecutorCallableExecutorsThreadPoolFuture并行计算CachedPoolSingleThreadPoolScheduledPoolWorkStealingPoolForkJoinPool ParallelStream

synchronized关键字 对象加锁

对某个对象加锁,互斥锁(一个线程拿到其它就拿不到)

public class T { private int count = 10; private Object o = new Object(); public void m() { synchronized(o) { //任何线程要执行下面的代码,必须先拿到o的锁 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }

若每次都要建一个Object来当锁太麻烦,直接拿自身对象当锁即可。

public class T { private int count = 10; public void m() { synchronized(this) { //任何线程要执行下面的代码,必须先拿到this的锁 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } } 修饰方法

直接用synchronized修饰方法等同于锁定自身对象

public class T { private int count = 10; public synchronized void m() { //等同于在方法的代码执行时要synchronized(this) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } 锁定静态方法 public class T { private static int count = 10; public synchronized static void m() { //这里等同于synchronized(top.tjtulong.T.class) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void mm() { synchronized(T.class) { //考虑一下这里写synchronized(this)是否可以? count --; } } }

一个synchronized代码块的代码是一个原子操作。

同步和非同步方法同时调用 public class T { public synchronized void m1() { System.out.println(Thread.currentThread().getName() + " m1 start..."); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m1 end"); } public void m2() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 "); } public static void main(String[] args) { T t = new T(); /*new Thread(()->t.m1(), "t1").start(); new Thread(()->t.m2(), "t2").start();*/ new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); /* new Thread(new Runnable() { @Override public void run() { t.m1(); } }); */ } }

输出结果:

t1 m1 start... t2 m2 t1 m1 end

可见同步和非同步方法可以同时调用。

脏读

只对写加锁而不对读加锁会造成脏读!

public class Account { String name; double balance; public synchronized void set(String name, double balance) { this.name = name; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } this.balance = balance; } public /*synchronized*/ double getBalance(String name) { return this.balance; } public void setBalance(double balance) { this.balance = balance; } public static void main(String[] args) { Account a = new Account(); new Thread(() -> a.set("zhangsan", 100.0)).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } a.setBalance(50.0); System.out.println(a.getBalance("zhangsan")); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(a.getBalance("zhangsan")); } } 可重入锁 public class T { synchronized void m1() { System.out.println("m1 start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } m2(); } synchronized void m2() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m2"); } }

m2()方法可以执行

一个同步方法可以调用另外一个同步方法,一个线程已经拥有某个对象的锁,再次申请的时候仍然会得到该对象的锁,也就是说synchronized获得的锁是可重入的。

异常释放锁 public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while (true) { count++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if (count == 5) { int i = 1 / 0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续 System.out.println(i); } } } public static void main(String[] args) { T t = new T(); Runnable r = new Runnable() { @Override public void run() { t.m(); } }; new Thread(r, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(r, "t2").start(); } }

输出结果:

t1 start t1 count = 1 t1 count = 2 t1 count = 3 t1 count = 4 t1 count = 5 t2 start Exception in thread "t1" java.lang.ArithmeticException: / by zero t2 count = 6 at top.tjtulong.demo8.T.m(T.java:21) at top.tjtulong.demo8.T$1.run(T.java:33) at java.lang.Thread.run(Thread.java:748) t2 count = 7 t2 count = 8 t2 count = 9 t2 count = 10 t2 count = 11

程序在执行过程中,如果出现异常,默认情况锁会被释放,所以在并发处理的过程中,有异常要多加小心,不然可能会发生不一致的情况。比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时的数据,因此要非常小心的处理同步业务逻辑中的异常。

同步监视器变化

锁定某对象o,如果o的属性发生改变,不影响锁的使用;但是如果o变成另外一个对象,则锁定的对象发生改变,应该避免将锁定对象的引用变成另外的对象。

public class T { Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { T t = new T(); //启动第一个线程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //创建第二个线程 Thread t2 = new Thread(t::m, "t2"); t.o = new Object(); //锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会 t2.start(); } }

注:不要用字符串常量作为锁定对象,因为这样其实锁定的是同一个对象。这种情况还会发生比较诡异的现象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,但是你读不到源码,所以你在自己的代码中也锁定了"Hello",这时候就有可能发生非常诡异的死锁阻塞,因为你的程序和你用到的类库不经意间使用了同一把锁。

volatile 线程之间的可见性 public class T { //对比一下有无volatile的情况下,整个程序运行结果的区别 /*volatile*/ boolean running = true; void m() { System.out.println("m start"); while (running) { } System.out.println("m end!"); } public static void main(String[] args) { T t = new T(); new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t.running = false; } }

如果running变量不用volatile修饰的话,程序将不会停止。

volatile 关键字,使一个变量在多个线程间可见。A、B线程都用到一个变量,java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道。但使用volatile关键字,会让所有线程都会读到变量的修改值。

在上面的代码中,running是存在于堆内存的t对象中,当线程t1开始运行的时候,会把running值从内存中读到t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都去读取堆内存,这样当主线程修改running的值之后,t1线程感知不到,所以不会停止运行;而使用volatile,将会强制所有线程都去堆内存中读取running的值。

volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized。

对比synchronized public class T { volatile int count = 0; void m() { for(int i=0; i threads.add(new Thread(t::m, "thread-"+i)); } threads.forEach((o)->o.start()); threads.forEach((o)->{ try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }

输出结果必定小于100000.

这是由于volatile并不能保证多个线程共同修改running变量时所带来的不一致问题(++不是原子性操作),也就是说volatile不能替代synchronized。

enter description here

synchronized void m() { for (int i = 0; i //添加volatile,使t2能够得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer4 c = new MyContainer4(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2启动"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); //通知t1继续执行 lock.notify(); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); synchronized(lock) { for(int i=0; i lock.notify(); //释放锁,让t2得以执行 try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }

sleep() 不释放锁 wait() 释放锁 notify() 不释放锁

方法二:使用Latch(门闩) 替代wait和notify来进行通知。好处是通信方式简单,同时也可以指定等待时间。使用await和countdown方法替代wait和notify,CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行。

当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了,这时应该考虑countdownlatch/cyclicbarrier/semaphore。

public class MyContainer5 { // 添加volatile,使t2能够得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer5 c = new MyContainer5(); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { System.out.println("t2启动"); if (c.size() != 5) { try { latch.await(); TimeUnit.SECONDS.sleep(5); //也可以指定等待时间 //latch.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); for (int i = 0; i // 打开门闩,让t2得以执行 latch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); } } Reentrantlock

使用Reentrantlock可以替代synchronize完成同样的功能。需要注意的是,必须要必须要必须要手动释放锁。使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放。

class X{ //定义锁对象 private final ReentrantLock lock=new ReentrantLock(); //定义需要保证线程安全的方法 public void m(){ //加锁 lock.lock(); try{ //...method body } //使用finally块来保证释放锁 finally{ lock.unlock(); } } }

使用Reentrantlock可以进行尝试锁定tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待。

public class ReentrantLock3 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); for (int i = 0; i e.printStackTrace(); } finally { lock.unlock(); } } /** * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行 * 可以根据tryLock的返回值来判定是否锁定 * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中 */ void m2() { /* boolean locked = lock.tryLock(); System.out.println("m2 ..." + locked); if(locked) lock.unlock(); */ boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(locked) lock.unlock(); } } public static void main(String[] args) { ReentrantLock3 rl = new ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }

使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,在一个线程等待锁的过程中,可以被打断。

ReentrantLock还可以指定为公平锁,即谁等的时间长,谁得到锁,synchronized为非公平锁,不在乎一个线程已经等待多长时间。

public class ReentrantLock5 extends Thread { private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果 public void run() { for(int i=0; i System.out.println(Thread.currentThread().getName()+"获得锁"); }finally{ lock.unlock(); } } } public static void main(String[] args) { ReentrantLock5 rl=new ReentrantLock5(); Thread th1=new Thread(rl); Thread th2=new Thread(rl); th1.start(); th2.start(); } }

输出结果为:

Thread-1获得锁 Thread-2获得锁 Thread-1获得锁 Thread-2获得锁 Thread-1获得锁 Thread-2获得锁 Thread-1获得锁 Thread-2获得锁 生产者–消费者

写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。

使用wait+notify public class MyContainer1 { final private LinkedList lists = new LinkedList(); final private int MAX = 10; //最多10个元素 private int count = 0; public synchronized void put(T t) { while (lists.size() == MAX) { //想想为什么用while而不是用if? try { this.wait(); //effective java } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消费者线程进行消费 } public synchronized T get() { T t = null; while (lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count--; this.notifyAll(); //通知生产者进行生产 return t; } public static void main(String[] args) { MyContainer1 c = new MyContainer1(); //启动消费者线程 for (int i = 0; i for (int j = 0; j e.printStackTrace(); } //启动生产者线程 for (int i = 0; i for (int j = 0; j try { lock.lock(); while(lists.size() == MAX) { //想想为什么用while而不是用if? producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消费者线程进行消费 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生产者进行生产 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2 c = new MyContainer2(); //启动消费者线程 for(int i=0; i for(int j=0; j e.printStackTrace(); } //启动生产者线程 for(int i=0; i for(int j=0; j new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(tl.get()); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person()); }).start(); } static class Person { String name = "zhangsan"; } } 同步容器 ConcurrentMap

多线程情况下用ConcurrentHashMap代替HashMap; ConcurrentHashMap效率要比Hashtable高; ConcurrentSkipListMap支持高并发且排序; Collections.sychronizedXXX可以将非线程安全的容器变为线程安全的容器。

public class T01_ConcurrentMap { public static void main(String[] args) { //Map map = new ConcurrentHashMap(); //Map map = new ConcurrentSkipListMap(); //高并发并且排序 //Map map = new Hashtable(); Map map = new HashMap(); //Collections.synchronizedXXX //TreeMap Random r = new Random(); Thread[] ths = new Thread[100]; CountDownLatch latch = new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0; i for(int j=0; jt.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end - start); } } CopyOnWriteArrayList

写的速度很慢但读的速度很快。

ConcurrentLinkedQueue

内部加锁的队列

public class T04_ConcurrentQueue { public static void main(String[] args) { Queue strs = new ConcurrentLinkedQueue(); for (int i = 0; i static BlockingQueue strs = new LinkedBlockingQueue(); static Random r = new Random(); public static void main(String[] args) { new Thread(() -> { for (int i = 0; i strs.put("a" + i); //如果满了,就会等待 TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "p1").start(); for (int i = 0; i for (;;) { try { System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } } ArrayBlockingQueue public class T06_ArrayBlockingQueue { static BlockingQueue strs = new ArrayBlockingQueue(10); static Random r = new Random(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i static BlockingQueue tasks = new DelayQueue(); static Random r = new Random(); static class MyTask implements Delayed { long runningTime; MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if(this.getDelay(TimeUnit.MILLISECONDS) o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i public static void main(String[] args) { new T01_MyExecutor().execute(()->System.out.println("hello executor")); } @Override public void execute(Runnable command) { //new Thread(command).run(); command.run(); } } Callable

Callable接口提供了一个call()方法可以作为线程执行体,但call()方法比run()方法功能更强大:call()方法可以有返回值且call()方法可以声明抛出异常。

Executors

操作Exector的工具类。

ThreadPool

线程池,维护着一个任务队列和一个完成队列。

public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); //java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated());//是否执行完false System.out.println(service.isShutdown());//是否关闭true System.out.println(service); //java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated());//true System.out.println(service.isShutdown());//true System.out.println(service); //java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6] } } Future

相当于Callable方法未来要返回的值。

public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask task = new FutureTask(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1000; }); //new Callable () { Integer call();} new Thread(task).start(); System.out.println(task.get()); //阻塞返回1000 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1; }); System.out.println(f.get());//1 System.out.println(f.isDone());//true } } 并行计算

计算1-200000之间有多少个质数。

一般起线程数 > CPU核数

public class T07_ParallelComputing { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println(end - start); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20 MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future f1 = service.submit(t1); Future f2 = service.submit(t2); Future f3 = service.submit(t3); Future f4 = service.submit(t4); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println(end - start); } static class MyTask implements Callable { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List call() throws Exception { List r = getPrime(startPos, endPos); return r; } } static boolean isPrime(int num) { for(int i=2; i List results = new ArrayList(); for(int i=start; i public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); //java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] for (int i = 0; i try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); //java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] TimeUnit.SECONDS.sleep(80); System.out.println(service); //java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] } } SingleThreadPool

线程池中只有一个线程,保证任务顺序执行。

public class T09_SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i=0; i System.out.println(j + " " + Thread.currentThread().getName()); }); } } } ScheduledPool

以固定的频率执行任务

public class T10_ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); // 500 表示每隔0.5秒执行一次 } } WorkStealingPool

线程池中每隔线程都维护一个任务列表,当自身的任务列表执行完成后,会执行别的线程的任务列表。本质是ForkJoinPool线程池,所有的线程都是精灵线程。

public class T11_WorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(); System.out.println(Runtime.getRuntime().availableProcessors()); service.execute(new R(1000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); //daemon service.execute(new R(2000)); //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } } ForkJoinPool

Fork/Join框架就是在必要的情况下,将一个大任务进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join汇总。 enter description here

import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i private static final long serialVersionUID = 1L; int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); //System.in.read(); } }

ps:多线程归并排序

各种线程池归根到底都是ThreadPoolExecutor类,指定起始线程、最大线程、存活时间及使用的队列。

ParallelStream

利用多线程访问数据流

public class T14_ParallelStreamAPI { public static void main(String[] args) { List nums = new ArrayList(); Random r = new Random(); for(int i=0; iisPrime(v)); long end = System.currentTimeMillis(); System.out.println(end - start); //使用parallel stream api start = System.currentTimeMillis(); nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime); end = System.currentTimeMillis(); System.out.println(end - start); } static boolean isPrime(int num) { for(int i=2; i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有