线程通信-任务队列TaskQueue

线程?

嗯~我写的文章针对的不是小白用户,所以概念不做赘述。单线程有单线程的好处,多线程有多线程的好处,单线程业务单一不容易出错,多线程分摊压力,并行运行,良好利用CPU,也能防止某代码块崩溃而发生服务器整体崩溃。就拿个很直白的例子来讲,有些业务会阻塞很久导致其他业务等待,但是其他业务又不是必须等待这个阻塞业务,可以是异步查看的,这种情况我们就用多线程。再举个形象的例子,在游戏服务器中,不同地图的玩家要求他们的数据是同步且顺序执行的,但是不同地图之间的玩家又互不影响流程,为了保证同个地图玩家的交互没有数据问题(比如战斗伤害,交易等)就要使用多线程。还有个例子,比如有个排行榜要统计玩家数据进行排名,或者说网络游戏随时会保存玩家数据,数据库操作又是耗时的,所以用线程把业务分开来处理。

生产者消费者模型

字面意思,一群人负责生产东西,然后投递到某个地方,另一群人就来拿东西处理。这是一个典型的异步模型,利用一个中间商来暂存货物,实现生产者与消费者的解耦。跟现实中没什么两样,你买东西到零售店,零售店去批发商品,然而你并不关心他怎么拿到这些商品,你也不用立即去购买商品,因为东西都放在零售店,你可以随意安排你的时间来获取商品。  其实核心目的就是异步。

线程通信

不同之间的线程怎么通信?通过生产者消费者模型就可以解决。需要交互的线程都有一个各自的队列,队列来保证运行顺序先进先出。可以把这个线程比喻为一个指令执行器,生产者就是负责投递指令的其他线程(也可以是他自己),消费者就是线程自己下面的各种逻辑,根据不同的指令对应到这些逻辑上面。我肯定会拿游戏服务器来举例子,比如地图线程,玩家会在地图里打怪、走路等各种操作,谁发送指令呢,肯定是客户端了,他发送了这些指令过来,服务器检索玩家正处于哪个地图,就会把这些指令投入哪个地图的任务队列中去。地图线程每时每刻在做着任务出队的操作,并指派相应逻辑来处理。或许保存玩家数据的场景更直接更具体一点,其他线程把玩家数据发送给保存队列,保存队列按照一定规则取出然后存入数据库中。

任务队列

我们把这种线程封装一下就叫做任务队列,他们就像一个个任务执行器一样不停地工作。总结一下API,非常简单,需要一个add接口,一个run方法就搞定了,add接口负责投入任务,run方法不停循环取出任务然后处理。在选用队列的结构上,如果没有Doug Lea大神的话,jdk1.5之前我们会使用LinkedList之类的,但是LinkedList之类的都是非线程安全的,消费不说,肯定是本线程,但是投入的口子上就不一定了,大部分情况都是其他线程。那么我们选择加锁,run方法取任务的时候也做锁操作,可能还可以加一个double-check来防止锁带来的性能无用开销,然后利用Object自身的wait和notify来睡眠和唤醒线程,run方法发现没有任务了,就睡眠,add方法添加进任务了就唤醒线程。然而锁毕竟是一个很慢的技术,并行和串行本来就是矛盾的,但是有时候不得不保证顺序,锁就是强制在某个需要同步的地方,禁止代码往下走的东西。

public void run() {
    while (!stop) {
        if (queue.isEmpty()) {
            wait();
        }
        Object obj = queue.poll();
        //do obj
    }
}

public void add(Object obj) {
    queue.push(obj);
    notify();
}

类似上面的操作,是伪代码哈,锁也没有加的,看看就行。

在jdk1.5和之后,我们可以使用LinkedBlockingQueue来代替这个,具体原理可以百度。LinkedBlockingQueue的take方法如果遇到没有可出队列的元素,他会阻塞有元素为止。而他的offer操作也是线程安全的。所以我们就只用关心一个while(true)然后里面不停的执行逻辑就可以了,这里贴下代码,感觉这个东西很简单,说不了个什么名堂。

@ThreadSafe
public class LinkedBlockingTaskQueue extends AbstractTaskQueue implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(LinkedBlockingTaskQueue.class);
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    private final SingletonThreadFactory threadFactory;
    private boolean start = false;
    private Proc2<Runnable, Throwable> exception;

    public static LinkedBlockingTaskQueue create(String threadName) {
        return new LinkedBlockingTaskQueue(threadName);
    }


    private LinkedBlockingTaskQueue(String threadName) {
        threadFactory = new SingletonThreadFactory() {
            @Override
            public String name() {
                return threadName;
            }
        };
        threadFactory.newThread(this);
        threadFactory.thread().start();
    }


    public LinkedBlockingTaskQueue exception(Proc2<Runnable, Throwable> exception) {
        this.exception = exception;
        return this;
    }

    @Override
    public void run() {
        start = true;
        while (start || !queue.isEmpty()) {
            Runnable take = null;
            try {
                take = queue.take();
                take.run();
            } catch (Exception e) {
                LOGGER.error("invoke error", e);
                Procs.invoke(exception, take, e);
            }
        }
    }


    @Override
    public void shutdown() {
        start = false;
    }

    @Override
    public Thread thread() {
        return threadFactory.thread();
    }

    @Override
    public void execute(Runnable command) {
        Conditions.notNull(command, "command");
        if (thread() == Thread.currentThread()) {
            command.run();
        }
        Conditions.args(queue.offer(command), "add command failed, command:%s", command.getClass().getName());
    }
}

我们可以看看take的源码。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

 

实际上里面他还是用了锁(ReentrantLock),只不过这个锁是轻量级的,原理还是上面说的,阻塞和唤醒的过程。但是我们可不可以再改进一下呢,不用锁,效率更高。这里我们就要说说大名鼎鼎的Disruptor了。

Disruptor

Disruptor利用的是一个环形队列(RingBuffer),这个环是由N个格子组成,每个格子的状态有两种,未被占用和被占用。Disruptor用一个序列(sequence)来表示当前运行到环的哪个位置了,有点像抽奖转盘,只不过这里移动的是指针。

em…如果觉得我画得太丑了的话,你可以尝试看下面这张图。

至于他怎么保证这个sequence指针是线程安全的,可以去百度一下Disruptor和了解一下CAS(Compare And Swap/Set)操作,CAS操作也可以参考一下AtomicInteger里面的代码。

下面我们就看看第二种任务队列

@ThreadSafe
public class DisruptorTaskQueue extends AbstractTaskQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorTaskQueue.class);
    private final Disruptor<Holder<Runnable>> disruptor;
    private final SingletonThreadFactory threadFactory;
    private Proc3<Runnable, Throwable, Long> exception;

    public static DisruptorTaskQueue create(String threadName) {
        return new DisruptorTaskQueue(threadName);
    }


    public static DisruptorTaskQueue create(String threadName, int bufferSize) {
        return new DisruptorTaskQueue(threadName, bufferSize);
    }

    private DisruptorTaskQueue(String threadName) {
        this(threadName, 2 << 12);// 4096
    }

    /**
     * 构造函数
     *
     * @param threadName
     * @param bufferSize 指定RingBuffer的大小
     */
    @SuppressWarnings("unchecked")
    private DisruptorTaskQueue(String threadName, int bufferSize) {
        this.threadFactory = new SingletonThreadFactory() {

            @Override
            public String name() {
                return threadName;
            }
        };
        disruptor = new Disruptor<>(Holder::empty, bufferSize, threadFactory, ProducerType.MULTI,
                new BlockingWaitStrategy());
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
            try {
                event.get().run();
            } catch (Exception e) {
                LOGGER.error("invoke handler error", e);
            } finally {
                event.set(null);
            }
        });
        // prevent Worker Threads from dying
        disruptor.setDefaultExceptionHandler(new ExceptionHandler<Holder<Runnable>>() {

            @Override
            public void handleEventException(Throwable ex, long sequence, Holder<Runnable> event) {
                LOGGER.error("sequence " + sequence + " error!", ex);
                Procs.invoke(exception, event.get(), ex, sequence);
            }

            @Override
            public void handleOnStartException(final Throwable ex) {
                LOGGER.error("Exception during onStart()", ex);
            }

            @Override
            public void handleOnShutdownException(final Throwable ex) {
                LOGGER.error("Exception during onShutdown()", ex);
            }
        });
        disruptor.start();
        LOGGER.info("thread " + threadFactory.name() + " start!");
    }


    /**
     * 错误处理
     *
     * @param exception
     * @return
     */
    public DisruptorTaskQueue exception(Proc3<Runnable, Throwable, Long> exception) {
        this.exception = exception;
        return this;
    }


    @Override
    public void execute(Runnable runnable) {
        Conditions.notNull(runnable, "command");
        if (thread() == Thread.currentThread()) {
            runnable.run();
        }
        disruptor.getRingBuffer().publishEvent((event, sequence) -> event.set(runnable));
    }

    @Override
    public Thread thread() {
        return threadFactory.thread();
    }

    @Override
    public void shutdown() {
        if (disruptor != null) {
            disruptor.shutdown();
            LOGGER.info("thread " + threadFactory.name() + " stop!");
        }
    }
}

虽然对于游戏来讲,Disruptor和LinkedBlockingQueue相差不大,但是总归是快一点。哦对了,这里提个醒,如果环大小的设置不当,小于了并发数量,就会出现异常。嗯,其实能想象到,你环的位置没给够,新来的任务放哪?占用还是阻塞?我再提一句,线程不是越多越好,线程的上下文切换也会带来开销,一个进程理想情况下业务线程不要超过10个,不然没得什么意义。

发表评论

电子邮件地址不会被公开。 必填项已用*标注