diff --git a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java index 443606543..0216ccb0f 100644 --- a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java +++ b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java @@ -83,6 +83,7 @@ public class CoinJoinManager { private static final Logger log = LoggerFactory.getLogger(CoinJoinManager.class); + public static final String MESSAGE_PROCESSOR = "CoinJoin-MessageProcessor"; private final ArrayList wallets = Lists.newArrayList(); private final Context context; public final HashMap coinJoinClientManagers; @@ -98,8 +99,7 @@ public class CoinJoinManager { private RequestKeyParameter requestKeyParameter; private RequestDecryptedKey requestDecryptedKey; private final ScheduledExecutorService scheduledExecutorService; - private final ExecutorService messageProcessingExecutor = Executors.newFixedThreadPool(5, - new ContextPropagatingThreadFactory("CoinJoin-MessageProcessor")); + private volatile ExecutorService messageProcessingExecutor = null; protected final ReentrantLock lock = Threading.lock("coinjoin-manager"); private boolean finishCurrentSessions = false; @@ -194,6 +194,8 @@ public void start() { Context.propagate(context); schedule = scheduledExecutorService.scheduleWithFixedDelay( maintenanceRunnable, 1, 1, TimeUnit.SECONDS); + messageProcessingExecutor = Executors.newFixedThreadPool(5, + new ContextPropagatingThreadFactory(MESSAGE_PROCESSOR)); } public void stop() { @@ -214,17 +216,20 @@ public void stop() { finishCurrentSessions = false; // Shutdown the message processing executor - messageProcessingExecutor.shutdown(); - try { - if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown"); + if (messageProcessingExecutor != null) { + messageProcessingExecutor.shutdown(); + try { + if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown"); + messageProcessingExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for message processing executor to terminate", e); messageProcessingExecutor.shutdownNow(); + Thread.currentThread().interrupt(); } - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for message processing executor to terminate", e); - messageProcessingExecutor.shutdownNow(); - Thread.currentThread().interrupt(); } + messageProcessingExecutor = null; } finally { lock.unlock(); } @@ -234,6 +239,11 @@ public boolean isRunning() { return schedule != null && !schedule.isCancelled(); } + private boolean isMessageProcessorRunning() { + return messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown() && + !messageProcessingExecutor.isTerminated(); + } + public void initMasternodeGroup(AbstractBlockChain blockChain) { this.blockChain = blockChain; masternodeGroup = new MasternodeGroup(context, blockChain, masternodeListManager); @@ -282,8 +292,18 @@ public void close() { masternodeGroup.removePreMessageReceivedEventListener(preMessageReceivedEventListener); } // Ensure executor is shut down - if (!messageProcessingExecutor.isShutdown()) { - messageProcessingExecutor.shutdown(); + ExecutorService execToStop = null; + lock.lock(); + try { + if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) { + execToStop = messageProcessingExecutor; + messageProcessingExecutor = null; + } + } finally { + lock.unlock(); + } + if (execToStop != null) { + execToStop.shutdown(); } } @@ -508,12 +528,13 @@ public void processTransaction(Transaction tx) { public final PreMessageReceivedEventListener preMessageReceivedEventListener = (peer, m) -> { if (m instanceof CoinJoinQueue) { // Offload DSQueue message processing to thread pool to avoid blocking network I/O thread - try { - messageProcessingExecutor.execute(() -> { - processMessage(peer, m); - }); - } catch (RejectedExecutionException e) { - // Executor was shutdown - ignore, we're shutting down + ExecutorService exec = messageProcessingExecutor; + if (exec != null) { + try { + exec.execute(() -> processMessage(peer, m)); + } catch (RejectedExecutionException e) { + // swallow because this is being stopped + } } // Return null as dsq messages are only processed above return null;