线程池吞并业务异常

2023-07-06
0 441

有这样一个场景:前端请求创建了一个批处理任务,请求会返回任务的进度,然后前端会根据进度定时重新获取最新进度,直到进度为1。现在出现了一个问题:在进度没有达到1的情况下,前端一直在发请求获取最新进度,但是后端返回的进度信息却一直不更新了。

一、伪代码

public class TaskTest {

    private static final ExecutorService executor = Executors.newSingleThreadExecutor();

    @RequestMapping("/detection")
    public Object detection(List<Integer> serviceIds) {
        return doDetection(serviceIds);
    }
    
    @RequestMapping("getRate")
    public Object getTaskRate(String taskId){
       //  get rate from cache by taskId
        return rate;
    }


    String doDetection(List<Integer> serviceIds){
        executor.submit(new DetectionTask(serviceIds));
        String taskId = ""; // generate taskId cached
        return taskId;
    }

    static class DetectionTask implements Runnable{
        List<Integer> serviceIds;

        public DetectionTask(List<Integer> serviceIds) {
            this.serviceIds = serviceIds;
        }

        @Override
        public void run() {
            // todo
            List<ServiceInfo> serviceInfos = xxService.findServiceInfoByIds(serviceIds);
            for (ServiceInfo serviceInfo : serviceInfos) {
                if (serviceInfo.type.equals(ServiceInfo.ServiceType.WMS)){
                    // todo
                }
            }
        }
    }

    static class ServiceInfo{
        String name;
        ServiceType type;
        // ...

        enum ServiceType {
            MAP,
            REST,
            WMS,
            OTHERS
        }
    }
}

就是这样一份代码,在测试时出现了问题,有一定几率后端返回的进度会卡在一个小于1的值,导致前端一直发请求。

二、调试

在代码的调试过程中,发现第35行代码小概率会卡住,也就是if (serviceInfo.type.equals(ServiceInfo.ServiceType.WMS))这个条件判断,第一反应会不会是抛了异常?经过与数据库数据的比对之后,应该就是这里的问题。同时又来了新的问题,既然这里抛了空指针,为何日志中一点信息没有?想了下,这是在线程池中执行的任务,所以猜测会不会是异常被吞了?带着这个疑问去看了下源代码。

三、源代码

在代码中使用的是线程池的submit方法,所以跟进去看一下。

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *
     * @param runnable the runnable task being wrapped
     * @param value the default value for the returned future
     * @param <T> the type of the given value
     * @return a {@code RunnableFuture} which, when run, will run the
     * underlying runnable and which, as a {@code Future}, will yield
     * the given value as its result and provide for cancellation of
     * the underlying task
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
可以看到,在线程池的submit方法中,将要执行的任务包装成了一个FutureTask�,而在FutureTask中对异常做了处理:
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /**
     * Causes this future to report an {@link ExecutionException}
     * with the given throwable as its cause, unless this future has
     * already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon failure of the computation.
     *
     * @param t the cause of failure
     */
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

第31行,c.call(),会调用我们自己的业务代码,而我们的业务代码中抛了异常,在FutureTask中可以看到是被捕获了,异常经由37行setException方法被保存在了outcome中,而在调用get()方法获取返回值时,会抛出这个异常:

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
	private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

所以使用submit方法时业务代码抛出的异常才没有任务日志显示,因为被捕获处理了。

四、execute()

看完了submit()方法,再来看一下另外一个经常使用的方法execute(),直接跟进源代码:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
execute方法最直观的区别就是没有返回值!
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

没啥好看的,就调用了一下线程的start方法,没有catch。
所以使用execute方法在业务代码发生异常时是会直接抛出的。