diff --git a/CHANGES.md b/CHANGES.md index 4fd1859d4c..2a797ed4cc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -28,6 +28,7 @@ Release Notes. * Add Caffeine plugin as optional. * Add Undertow 2.1.7.final+ worker thread pool metrics. * Support for tracking in spring gateway versions 4.1.2 and above. +* Fix `ConsumeDriver` running status concurrency issues. All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1) diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java index a6d808fead..853c7fc3bc 100644 --- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java +++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java @@ -27,7 +27,7 @@ * Pool of consumers

Created by wusheng on 2016/10/25. */ public class ConsumeDriver implements IDriver { - private boolean running; + private volatile boolean running; private ConsumerThread[] consumerThreads; private Channels channels; private ReentrantLock lock; @@ -88,6 +88,9 @@ public void begin(Channels channels) { } lock.lock(); try { + if (running) { + return; + } this.allocateBuffer2Thread(); for (ConsumerThread consumerThread : consumerThreads) { consumerThread.start(); @@ -124,8 +127,14 @@ private void allocateBuffer2Thread() { @Override public void close(Channels channels) { + if (!running) { + return; + } lock.lock(); try { + if (!running) { + return; + } this.running = false; for (ConsumerThread consumerThread : consumerThreads) { consumerThread.shutdown();