From a0b0e3d8d5d4051d155d0a6ac7d9fdefcbf096eb Mon Sep 17 00:00:00 2001 From: HashEngineering Date: Wed, 11 Feb 2026 21:40:05 -0800 Subject: [PATCH 1/2] fix: improve CoinJoinManager message processor --- .../coinjoin/utils/CoinJoinManager.java | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java index 7477db337..58c02e434 100644 --- a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java +++ b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java @@ -71,6 +71,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -82,6 +83,7 @@ public class CoinJoinManager { private static final Logger log = LoggerFactory.getLogger(CoinJoinManager.class); + public static final String MESSAGE_PROCESSOR = "CoinJoin-MessageProcessor"; private final ArrayList wallets = Lists.newArrayList(); private final Context context; public final HashMap coinJoinClientManagers; @@ -97,8 +99,7 @@ public class CoinJoinManager { private RequestKeyParameter requestKeyParameter; private RequestDecryptedKey requestDecryptedKey; private final ScheduledExecutorService scheduledExecutorService; - private final ExecutorService messageProcessingExecutor = Executors.newFixedThreadPool(5, - new ContextPropagatingThreadFactory("CoinJoin-MessageProcessor")); + private ExecutorService messageProcessingExecutor = null; protected final ReentrantLock lock = Threading.lock("coinjoin-manager"); private boolean finishCurrentSessions = false; @@ -193,6 +194,8 @@ public void start() { Context.propagate(context); schedule = scheduledExecutorService.scheduleWithFixedDelay( maintenanceRunnable, 1, 1, TimeUnit.SECONDS); + messageProcessingExecutor = Executors.newFixedThreadPool(5, + new ContextPropagatingThreadFactory(MESSAGE_PROCESSOR)); } public void stop() { @@ -213,17 +216,20 @@ public void stop() { finishCurrentSessions = false; // Shutdown the message processing executor - messageProcessingExecutor.shutdown(); - try { - if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown"); + if (messageProcessingExecutor != null) { + messageProcessingExecutor.shutdown(); + try { + if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown"); + messageProcessingExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + log.warn("Interrupted while waiting for message processing executor to terminate", e); messageProcessingExecutor.shutdownNow(); + Thread.currentThread().interrupt(); } - } catch (InterruptedException e) { - log.warn("Interrupted while waiting for message processing executor to terminate", e); - messageProcessingExecutor.shutdownNow(); - Thread.currentThread().interrupt(); } + messageProcessingExecutor = null; } finally { lock.unlock(); } @@ -233,6 +239,11 @@ public boolean isRunning() { return schedule != null && !schedule.isCancelled(); } + private boolean isMessageProcessorRunning() { + return messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown() && + !messageProcessingExecutor.isTerminated(); + } + public void initMasternodeGroup(AbstractBlockChain blockChain) { this.blockChain = blockChain; masternodeGroup = new MasternodeGroup(context, blockChain, masternodeListManager); @@ -281,8 +292,9 @@ public void close() { masternodeGroup.removePreMessageReceivedEventListener(preMessageReceivedEventListener); } // Ensure executor is shut down - if (!messageProcessingExecutor.isShutdown()) { + if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) { messageProcessingExecutor.shutdown(); + messageProcessingExecutor = null; } } @@ -507,9 +519,15 @@ public void processTransaction(Transaction tx) { public final PreMessageReceivedEventListener preMessageReceivedEventListener = (peer, m) -> { if (m instanceof CoinJoinQueue) { // Offload DSQueue message processing to thread pool to avoid blocking network I/O thread - messageProcessingExecutor.execute(() -> { - processMessage(peer, m); - }); + if (isMessageProcessorRunning()) { + try { + messageProcessingExecutor.execute(() -> { + processMessage(peer, m); + }); + } catch (RejectedExecutionException e) { + // swallow because this is being stopped + } + } // Return null as dsq meessages are only processed above return null; } else if (isCoinJoinMessage(m)) { From 40464c9a2bc364de25d29a63fa88335c9da3c81d Mon Sep 17 00:00:00 2001 From: HashEngineering Date: Wed, 18 Feb 2026 14:08:57 -0800 Subject: [PATCH 2/2] fix: minor fixes to eliminate race conditions --- .../coinjoin/utils/CoinJoinManager.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java index 95faf8017..0216ccb0f 100644 --- a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java +++ b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java @@ -99,7 +99,7 @@ public class CoinJoinManager { private RequestKeyParameter requestKeyParameter; private RequestDecryptedKey requestDecryptedKey; private final ScheduledExecutorService scheduledExecutorService; - private ExecutorService messageProcessingExecutor = null; + private volatile ExecutorService messageProcessingExecutor = null; protected final ReentrantLock lock = Threading.lock("coinjoin-manager"); private boolean finishCurrentSessions = false; @@ -292,9 +292,18 @@ public void close() { masternodeGroup.removePreMessageReceivedEventListener(preMessageReceivedEventListener); } // Ensure executor is shut down - if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) { - messageProcessingExecutor.shutdown(); - messageProcessingExecutor = null; + ExecutorService execToStop = null; + lock.lock(); + try { + if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) { + execToStop = messageProcessingExecutor; + messageProcessingExecutor = null; + } + } finally { + lock.unlock(); + } + if (execToStop != null) { + execToStop.shutdown(); } } @@ -519,11 +528,10 @@ public void processTransaction(Transaction tx) { public final PreMessageReceivedEventListener preMessageReceivedEventListener = (peer, m) -> { if (m instanceof CoinJoinQueue) { // Offload DSQueue message processing to thread pool to avoid blocking network I/O thread - if (isMessageProcessorRunning()) { + ExecutorService exec = messageProcessingExecutor; + if (exec != null) { try { - messageProcessingExecutor.execute(() -> { - processMessage(peer, m); - }); + exec.execute(() -> processMessage(peer, m)); } catch (RejectedExecutionException e) { // swallow because this is being stopped }