基于Actuator的可修改配置的线程池监控 您所在的位置:网站首页 武汉酒店用品城 基于Actuator的可修改配置的线程池监控

基于Actuator的可修改配置的线程池监控

#基于Actuator的可修改配置的线程池监控| 来源: 网络整理| 查看: 265

1.概要

之前公司因为使用线程池习惯不好,导致线程池负载负载过高。触发了拒绝策略,导致大量任务丢失。而并没有对这个情况进行监控,导致业务出现故障之后才发现抛出了拒绝异常。所以有必要对大量使用线程池的项目进行监控,并且最好能在不停机的情况下对线程池的参数进行修改,由此我们可以用线程池的hook方法去对线程池的状态进行埋点,并且通过Actuator做可视化监控,自定义Endpoint去修改线程池内部参数,实现可以动态修改线程池参数。

2.实现 1.导入Maven依赖 org.springframework.boot spring-boot-starter-actuator 2.编写ThreadPoolMonitor.java监控类 import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 继承ThreadPoolExecutor类,覆盖了shutdown(), shutdownNow(), beforeExecute() 和 afterExecute() * 方法来统计线程池的执行情况 */ public class ThreadPoolMonitor extends ThreadPoolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class); /** * 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间 */ private final ConcurrentHashMap startTimes; /** * 线程池名称,一般以业务名称命名,方便区分 */ private final String poolName; private long totalDiff; /** * 调用父类的构造方法,并初始化HashMap和线程池名称 * * @param corePoolSize 线程池核心线程数 * @param maximumPoolSize 线程池最大线程数 * @param keepAliveTime 线程的最大空闲时间 * @param unit 空闲时间的单位 * @param workQueue 保存被提交任务的队列 * @param poolName 线程池名称 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, String poolName) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new EventThreadFactory(poolName), poolName); } /** * 调用父类的构造方法,并初始化HashMap和线程池名称 * * @param corePoolSize 线程池核心线程数 * @param maximumPoolSize 线程池最大线程数 * @param keepAliveTime 线程的最大空闲时间 * @param unit 空闲时间的单位 * @param workQueue 保存被提交任务的队列 * @param threadFactory 线程工厂 * @param poolName 线程池名称 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.startTimes = new ConcurrentHashMap(); this.poolName = poolName; } /** * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况 */ @Override public void shutdown() { // 统计已执行任务、正在执行任务、未执行任务数量 LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); } /** * 线程池立即关闭时,统计线程池情况 */ @Override public List shutdownNow() { // 统计已执行任务、正在执行任务、未执行任务数量 LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); } /** * 任务执行之前,记录任务开始时间 */ @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); } /** * 任务执行之后,计算任务结束时间 */ @Override protected void afterExecute(Runnable r, Throwable t) { Date startDate = startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); totalDiff += diff; // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、 // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、 // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止 LOGGER.info("{}-pool-monitor: " + "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " + "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " + "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}", this.poolName, diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); } /** * 生成线程池所用的线程,只是改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪 */ static class EventThreadFactory implements ThreadFactory { private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; /** * 初始化线程工厂 * * @param poolName 线程池名称 */ EventThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } public long getTotalDiff() { return totalDiff; } } 3.实现ResizeableBlockingQueue.java可变队列

这里我们直接修改LinkedBlockingQueue的代码,把capacity去掉final,变成一个可变参数。再新增get和set方法。

/** * The type Resizeable blocking queue. * * @param the type parameter */ public class ResizeableBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { private static final long serialVersionUID = -1232131234709194L; /* * 基于LinkedBlockingQueue 实现的一个可变队列容量的阻塞队列 * * */ /** * The type Node. * * @param the type parameter */ static class Node { E item; Node next; Node(E x) { item = x; } } private int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node head; private transient Node last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); /** * Gets capacity. * * @return the capacity */ public int getCapacity() { return capacity; } /** * Sets capacity. * * @param capacity the capacity */ public void setCapacity(int capacity) { this.capacity = capacity; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } private void enqueue(Node node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; } private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node h = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } /** * Fully lock. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Fully unlock. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } /** * Instantiates a new Resizeable blocking queue. */ public ResizeableBlockingQueue() { this(Integer.MAX_VALUE); } /** * Instantiates a new Resizeable blocking queue. * * @param capacity the capacity */ public ResizeableBlockingQueue(int capacity) { if (capacity this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) { throw new NullPointerException(); } if (n == capacity) { throw new IllegalStateException("Queue full"); } enqueue(new Node(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } // this doc comment is overridden to remove the reference to collections // greater in size than Integer.MAX_VALUE @Override public int size() { return count.get(); } // this doc comment is a modified copy of the inherited doc comment, // without the reference to unlimited queues. @Override public int remainingCapacity() { return capacity - count.get(); } @Override public void put(E e) throws InterruptedException { if (e == null) { throw new NullPointerException(); } // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 putLock.unlock(); } if (c == 0) { signalNotEmpty(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { throw new NullPointerException(); } long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } return true; } @Override public boolean offer(E e) { if (e == null) { throw new NullPointerException(); } final AtomicInteger count = this.count; if (count.get() == capacity) { return false; } int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() notFull.signal(); } } } finally { putLock.unlock(); } if (c == 0) { signalNotEmpty(); } return c >= 0; } @Override public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; } @Override public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) { return null; } E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) { notEmpty.signal(); } } } finally { takeLock.unlock(); } if (c == capacity) { signalNotFull(); } return x; } @Override public E peek() { if (count.get() == 0) { return null; } final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node first = head.next; if (first == null) { return null; } else { return first.item; } } finally { takeLock.unlock(); } } void unlink(Node p, Node trail) { // assert isFullyLocked(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; trail.next = p.next; if (last == p) { last = trail; } if (count.getAndDecrement() == capacity) { notFull.signal(); } } @Override public boolean remove(Object o) { if (o == null) { return false; } fullyLock(); try { for (Node trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } @Override public boolean contains(Object o) { if (o == null) { return false; } fullyLock(); try { for (Node p = head.next; p != null; p = p.next) { if (o.equals(p.item)) { return true; } } return false; } finally { fullyUnlock(); } } @Override public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node p = head.next; p != null; p = p.next) { a[k++] = p.item; } return a; } finally { fullyUnlock(); } } @Override @SuppressWarnings("unchecked") public T[] toArray(T[] a) { fullyLock(); try { int size = count.get(); if (a.length a[k++] = (T)p.item; } if (a.length > k) { a[k] = null; } return a; } finally { fullyUnlock(); } } @Override public String toString() { fullyLock(); try { Node p = head.next; if (p == null) { return "[]"; } StringBuilder sb = new StringBuilder(); sb.append('['); for (;;) { E e = p.item; sb.append(e == this ? "(this Collection)" : e); p = p.next; if (p == null) { return sb.append(']').toString(); } sb.append(',').append(' '); } } finally { fullyUnlock(); } } @Override public void clear() { fullyLock(); try { for (Node p, h = head; (p = h.next) != null; h = p) { h.next = h; p.item = null; } head = last; // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) { notFull.signal(); } } finally { fullyUnlock(); } } @Override public int drainTo(Collection c) { return drainTo(c, Integer.MAX_VALUE); } @Override public int drainTo(Collection c, int maxElements) { if (c == null) { throw new NullPointerException(); } if (c == this) { throw new IllegalArgumentException(); } if (maxElements int n = Math.min(maxElements, count.get()); // count.get provides visibility to first n Nodes Node h = head; int i = 0; try { while (i // Restore invariants even if c.add() threw if (i > 0) { // assert h.item == null; head = h; signalNotFull = (count.getAndAdd(-i) == capacity); } } } finally { takeLock.unlock(); if (signalNotFull) { signalNotFull(); } } } @Override public Iterator iterator() { return new Itr(); } private class Itr implements Iterator { /* * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ private Node current; private Node lastRet; private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) { currentElement = current.item; } } finally { fullyUnlock(); } } @Override public boolean hasNext() { return current != null; } private Node nextNode(Node p) { for (;;) { Node s = p.next; if (s == p) { return head.next; } if (s == null || s.item != null) { return s; } p = s; } } @Override public E next() { fullyLock(); try { if (current == null) { throw new NoSuchElementException(); } E x = currentElement; lastRet = current; current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } @Override public void remove() { if (lastRet == null) { throw new IllegalStateException(); } fullyLock(); try { Node node = lastRet; lastRet = null; for (Node trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } } /** * The type Lbq spliterator. * * @param the type parameter */ static final class LBQSpliterator implements Spliterator { static final int MAX_BATCH = 1 return est; } @Override public Spliterator trySplit() { Node h; final ResizeableBlockingQueue q = this.queue; int b = batch; int n = (b = MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && ((h = current) != null || (h = q.head.next) != null) && h.next != null) { Object[] a = new Object[n]; int i = 0; Node p = current; q.fullyLock(); try { if (p != null || (p = q.head.next) != null) { do { if ((a[i] = p.item) != null) { ++i; } } while ((p = p.next) != null && i est = 0L; exhausted = true; } else if ((est -= i) batch = i; return Spliterators.spliterator (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT); } } return null; } @Override public void forEachRemaining(Consumer action) { if (action == null) { throw new NullPointerException(); } final ResizeableBlockingQueue q = this.queue; if (!exhausted) { exhausted = true; Node p = current; do { E e = null; q.fullyLock(); try { if (p == null) { p = q.head.next; } while (p != null) { e = p.item; p = p.next; if (e != null) { break; } } } finally { q.fullyUnlock(); } if (e != null) { action.accept(e); } } while (p != null); } } @Override public boolean tryAdvance(Consumer action) { if (action == null) { throw new NullPointerException(); } final ResizeableBlockingQueue q = this.queue; if (!exhausted) { E e = null; q.fullyLock(); try { if (current == null) { current = q.head.next; } while (current != null) { e = current.item; current = current.next; if (e != null) { break; } } } finally { q.fullyUnlock(); } if (current == null) { exhausted = true; } if (e != null) { action.accept(e); return true; } } return false; } @Override public int characteristics() { return Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT; } } public Spliterator spliterator() { return new LBQSpliterator(this); } private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { fullyLock(); try { // Write out any hidden stuff, plus capacity s.defaultWriteObject(); // Write out all elements in the proper order. for (Node p = head.next; p != null; p = p.next) { s.writeObject(p.item); } // Use trailing null as sentinel s.writeObject(null); } finally { fullyUnlock(); } } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { // Read in capacity, and any hidden stuff s.defaultReadObject(); count.set(0); last = head = new Node(null); // Read in all elements and place in queue for (;;) { @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) { break; } add(item); } } } 4.实现ThreadPoolUtil.java

编写线程池工具类,通过Util去创建线程池,并且用HashMap去指向创建的线程池,之后可以通过这个HashMap去获取线程池。

/** * The type Thread pool util. * 线程池工具类 */ @Component public class ThreadPoolUtil { /** * 通过Hash去指向创建的线程池,之后可以通过这个HashMap去获取线程池 */ private final HashMap threadPoolExecutorHashMap = new HashMap(); /** * Creat thread pool thread pool monitor. * * 可以自定义队列类型的构造器 * * @param corePoolSize the core pool size * @param maximumPoolSize the maximum pool size * @param keepAliveTime the keep alive time * @param unit the unit * @param workQueue the work queue * @param poolName the pool name * @return the thread pool monitor */ public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, String poolName) { ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName); threadPoolExecutorHashMap.put(poolName, threadPoolExecutor); return threadPoolExecutor; } /** * Creat thread pool thread pool monitor. * * ResizeableBlockingQueue 里面修改了capacity参数 * 可以通过set方法去修改队列的大小 * 使用默认队列的构造器 * * @param corePoolSize the core pool size * @param maximumPoolSize the maximum pool size * @param keepAliveTime the keep alive time * @param unit the unit * @param queueSize the queue size * @param poolName the pool name * @return the thread pool monitor */ public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, int queueSize, String poolName) { ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ResizeableBlockingQueue(queueSize), poolName); threadPoolExecutorHashMap.put(poolName, threadPoolExecutor); return threadPoolExecutor; } /** * Gets thread pool executor hash map. * * @return the thread pool executor hash map */ public HashMap getThreadPoolExecutorHashMap() { return threadPoolExecutorHashMap; } } 5.实现线程池信息的实体类

实现线程池信息的实体类用来EndPoint返回数据

ThreadPoolDetailInfo.java

/** * The type Thread pool detail info. */ public class ThreadPoolDetailInfo { private String threadPoolName; private Integer poolSize; private Integer corePoolSize; private Integer largestPoolSize; private Integer maximumPoolSize; private long completedTaskCount; private Integer active; private long task; private long keepAliveTime; private String activePercent; private Integer queueCapacity; private Integer queueSize; private long avgDiff; /** * Instantiates a new Thread pool detail info. * * @param threadPoolName the thread pool name * @param poolSize the pool size * @param corePoolSize the core pool size * @param largestPoolSize the largest pool size * @param maximumPoolSize the maximum pool size * @param completedTaskCount the completed task count * @param active the active * @param task the task * @param keepAliveTime the keep alive time * @param activePercent the active percent * @param queueCapacity the queue capacity * @param queueSize the queue size * @param avgDiff the avg diff */ public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, String activePercent, Integer queueCapacity, Integer queueSize, long avgDiff) { this.threadPoolName = threadPoolName; this.poolSize = poolSize; this.corePoolSize = corePoolSize; this.largestPoolSize = largestPoolSize; this.maximumPoolSize = maximumPoolSize; this.completedTaskCount = completedTaskCount; this.active = active; this.task = task; this.keepAliveTime = keepAliveTime; this.activePercent = activePercent; this.queueCapacity = queueCapacity; this.queueSize = queueSize; this.avgDiff = avgDiff; } /** * Gets thread pool name. * * @return the thread pool name */ public String getThreadPoolName() { return threadPoolName; } /** * Sets thread pool name. * * @param threadPoolName the thread pool name */ public void setThreadPoolName(String threadPoolName) { this.threadPoolName = threadPoolName; } /** * Gets pool size. * * @return the pool size */ public Integer getPoolSize() { return poolSize; } /** * Sets pool size. * * @param poolSize the pool size */ public void setPoolSize(Integer poolSize) { this.poolSize = poolSize; } /** * Gets core pool size. * * @return the core pool size */ public Integer getCorePoolSize() { return corePoolSize; } /** * Sets core pool size. * * @param corePoolSize the core pool size */ public void setCorePoolSize(Integer corePoolSize) { this.corePoolSize = corePoolSize; } /** * Gets largest pool size. * * @return the largest pool size */ public Integer getLargestPoolSize() { return largestPoolSize; } /** * Sets largest pool size. * * @param largestPoolSize the largest pool size */ public void setLargestPoolSize(Integer largestPoolSize) { this.largestPoolSize = largestPoolSize; } /** * Gets maximum pool size. * * @return the maximum pool size */ public Integer getMaximumPoolSize() { return maximumPoolSize; } /** * Sets maximum pool size. * * @param maximumPoolSize the maximum pool size */ public void setMaximumPoolSize(Integer maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } /** * Gets completed task count. * * @return the completed task count */ public long getCompletedTaskCount() { return completedTaskCount; } /** * Sets completed task count. * * @param completedTaskCount the completed task count */ public void setCompletedTaskCount(long completedTaskCount) { this.completedTaskCount = completedTaskCount; } /** * Gets active. * * @return the active */ public Integer getActive() { return active; } /** * Sets active. * * @param active the active */ public void setActive(Integer active) { this.active = active; } /** * Gets task. * * @return the task */ public long getTask() { return task; } /** * Sets task. * * @param task the task */ public void setTask(long task) { this.task = task; } /** * Gets keep alive time. * * @return the keep alive time */ public long getKeepAliveTime() { return keepAliveTime; } /** * Sets keep alive time. * * @param keepAliveTime the keep alive time */ public void setKeepAliveTime(long keepAliveTime) { this.keepAliveTime = keepAliveTime; } /** * Gets active percent. * * @return the active percent */ public String getActivePercent() { return activePercent; } /** * Sets active percent. * * @param activePercent the active percent */ public void setActivePercent(String activePercent) { this.activePercent = activePercent; } /** * Gets queue capacity. * * @return the queue capacity */ public Integer getQueueCapacity() { return queueCapacity; } /** * Sets queue capacity. * * @param queueCapacity the queue capacity */ public void setQueueCapacity(Integer queueCapacity) { this.queueCapacity = queueCapacity; } /** * Gets queue size. * * @return the queue size */ public Integer getQueueSize() { return queueSize; } /** * Sets queue size. * * @param queueSize the queue size */ public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } /** * Gets avg diff. * * @return the avg diff */ public long getAvgDiff() { return avgDiff; } /** * Sets avg diff. * * @param avgDiff the avg diff */ public void setAvgDiff(long avgDiff) { this.avgDiff = avgDiff; } }

ThreadPoolInfo.java

/** * The type Thread pool info. */ public class ThreadPoolInfo { private String threadPoolName; private int corePoolSize; private int maximumPoolSize; private String queueType; private int queueCapacity; /** * Instantiates a new Thread pool info. * * @param threadPoolName the thread pool name * @param corePoolSize the core pool size * @param maximumPoolSize the maximum pool size * @param queueType the queue type * @param queueCapacity the queue capacity */ public ThreadPoolInfo(String threadPoolName, int corePoolSize, int maximumPoolSize, String queueType, int queueCapacity) { this.threadPoolName = threadPoolName; this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.queueType = queueType; this.queueCapacity = queueCapacity; } /** * Gets thread pool name. * * @return the thread pool name */ public String getThreadPoolName() { return threadPoolName; } /** * Sets thread pool name. * * @param threadPoolName the thread pool name */ public void setThreadPoolName(String threadPoolName) { this.threadPoolName = threadPoolName; } /** * Gets core pool size. * * @return the core pool size */ public int getCorePoolSize() { return corePoolSize; } /** * Sets core pool size. * * @param corePoolSize the core pool size */ public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } /** * Gets maximum pool size. * * @return the maximum pool size */ public int getMaximumPoolSize() { return maximumPoolSize; } /** * Sets maximum pool size. * * @param maximumPoolSize the maximum pool size */ public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } /** * Gets queue type. * * @return the queue type */ public String getQueueType() { return queueType; } /** * Sets queue type. * * @param queueType the queue type */ public void setQueueType(String queueType) { this.queueType = queueType; } /** * Gets capacity. * * @return the capacity */ public int getqueueCapacity() { return queueCapacity; } /** * Sets capacity. * * @param queueCapacity the queue capacity */ public void setqueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } } 6.编写EndPoint

通过actuator里的@RestControllerEndpoint注解可以添加Endpoints接口。本质上是和@Endpoint,@WebEndpoint作用是一样的,都是为服务增加actuator 接口,方便管理运行中的服务。但是有一个明显的不同是,@RestControllerEndpoint只支持Http方式的访问,不支持JMX的访问。而且,端点的方法上面只支持@GetMapping,@PostMapping,@DeleteMapping,@RequestMapping等,而不支持@ReadOperation,@WriteOperation,@DeleteOperation。而且它返回的格式是:application/json。

由于我司的监控系统只支持json格式,实际上使用Metrics和Grafana去监控会更好。

/** * The type Thread pool endpoint. * * @author newrank */ @RestControllerEndpoint(id = "threadpool") @Component public class ThreadPoolEndpoint { @Autowired private ThreadPoolUtil threadPoolUtil; private static final ReentrantLock LOCK = new ReentrantLock(); private static final String RESIZEABLE_BLOCKING_QUEUE = "ResizeableBlockingQueue"; /** * getThreadPools * 获取当前所有线程池的线程名称 */ @GetMapping("getThreadPools") private List getThreadPools (){ List threadPools = new ArrayList(); if (!threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()){ for (Map.Entry entry : threadPoolUtil.getThreadPoolExecutorHashMap().entrySet()) { threadPools.add(entry.getKey()); } } return threadPools; } /** * 获取线程池可变参数信息 * @param threadPoolName * @return */ @GetMapping("getThreadPoolFixInfo") private ThreadPoolInfo getThreadPoolInfo(@RequestParam String threadPoolName){ if (threadPoolUtil.getThreadPoolExecutorHashMap().containsKey(threadPoolName)){ ThreadPoolMonitor threadPoolExecutor = threadPoolUtil.getThreadPoolExecutorHashMap().get(threadPoolName); int queueCapacity = 0; if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPoolExecutor.getQueue().getClass().getSimpleName())){ ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPoolExecutor.getQueue(); queueCapacity = queue.getCapacity(); } return new ThreadPoolInfo(threadPoolName,threadPoolExecutor.getCorePoolSize(),threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getQueue().getClass().getSimpleName(),queueCapacity); } return null; } /** * 修改线程池配置 * @param threadPoolInfo * @return */ @PostMapping("setThreadPoolFixInfo") private Boolean setThreadPoolInfo(@RequestBody ThreadPoolInfo threadPoolInfo){ if (threadPoolUtil.getThreadPoolExecutorHashMap().containsKey(threadPoolInfo.getThreadPoolName())){ LOCK.lock(); try { ThreadPoolMonitor threadPoolExecutor = threadPoolUtil.getThreadPoolExecutorHashMap().get(threadPoolInfo.getThreadPoolName()); threadPoolExecutor.setMaximumPoolSize(threadPoolInfo.getMaximumPoolSize()); threadPoolExecutor.setCorePoolSize(threadPoolInfo.getCorePoolSize()); if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPoolExecutor.getQueue().getClass().getSimpleName())){ ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPoolExecutor.getQueue(); queue.setCapacity(threadPoolInfo.getqueueCapacity()); } return true; }catch (Exception e){ e.printStackTrace(); return false; } finally { LOCK.unlock(); } } return false; } /** * 获取线程池监控信息 * @return */ @GetMapping("getThreadPoolListInfo") private List getThreadPoolListInfo(){ List detailInfoList = new ArrayList(); if (!threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()){ for (Map.Entry entry : threadPoolUtil.getThreadPoolExecutorHashMap().entrySet()) { ThreadPoolDetailInfo threadPoolDetailInfo = threadPoolInfo(entry.getValue(),entry.getKey()); detailInfoList.add(threadPoolDetailInfo); } } return detailInfoList; } /** * 组装线程池详情 * @param threadPool * @param threadPoolName * @return */ private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool,String threadPoolName) { BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount()); BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize()); BigDecimal result =activeCount.divide(maximumPoolSize, 2, BigDecimal.ROUND_HALF_UP); NumberFormat numberFormat = NumberFormat.getPercentInstance(); numberFormat.setMaximumFractionDigits(2); int queueCapacity = 0; if (RESIZEABLE_BLOCKING_QUEUE.equals(threadPool.getQueue().getClass().getSimpleName())){ ResizeableBlockingQueue queue = (ResizeableBlockingQueue) threadPool.getQueue(); queueCapacity = queue.getCapacity(); } return new ThreadPoolDetailInfo(threadPoolName,threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(), threadPool.getActiveCount(),threadPool.getTaskCount(),threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS), numberFormat.format(result.doubleValue()),queueCapacity,threadPool.getQueue().size(),threadPool.getTotalDiff()/threadPool.getTaskCount()); } } 7.使用线程池监控 注解 @Async("asyncExecutor") public void getTrendQuery(){ //do something } 直接使用public void test() { asyncExecutor.execute(()->{ //do something } ); 1. 查看线程详情 http://localhost/actuator/threadpool/getThreadPoolListInfo //GET请求

返回:

[ { "active": 0, //正在进行的任务数 "activePercent": "0%",//线程池负载 "completedTaskCount": 17, //完成的任务数 "corePoolSize": 16, //核心线程数 "keepAliveTime": 60000,//线程存活时间 "largestPoolSize": 16,//到达的最大线程数 "maximumPoolSize": 32, //最大线程数 "poolSize": 16,//当前线程数 "queueCapacity": 500,//队列长度 ps:如果不是ResizeableBlockingQueue 队列则默认为0 "task": 0, //任务总数 "queueSize":0,//队列中缓存的任务数量 "threadPoolName": "asyncExecutor" //线程池名称 } ] 2. 查看线程池参数 http://localhost/actuator/threadpool/getThreadPoolFixInfo?threadPoolName=asyncExecutor //GET请求

参数:

名称类型threadPoolNameString

返回:

{ "corePoolSize": 16, //核心线程数 "maximumPoolSize": 32, //最大线程数 "queueCapacity": 500, //队列大小 "queueType": "ResizeableBlockingQueue", //队列类型 "threadPoolName": "asyncExecutor" //线程池名称 } 3. 修改线程池参数 https://localhost/actuator/threadpool/setThreadPoolInfo //Post请求

参数:

名称类型备注threadPoolNameStringcorePoolSizeint可变maximumPoolSizeint可变queueCapacityint可变queueTypeString不可变

请求类型:json

返回: Boolean

以上完整代码在Github中

Github

个人博客

西西弗的石头

作者水平有限,若有错误遗漏,请指出。

参考文章

1.Java线程池实现原理及其在美团业务中的实践

2.Java并发(六)线程池监控



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有