Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
public class CoinJoinManager {

private static final Logger log = LoggerFactory.getLogger(CoinJoinManager.class);
public static final String MESSAGE_PROCESSOR = "CoinJoin-MessageProcessor";
private final ArrayList<WalletEx> wallets = Lists.newArrayList();
private final Context context;
public final HashMap<String, CoinJoinClientManager> coinJoinClientManagers;
Expand All @@ -98,8 +99,7 @@ public class CoinJoinManager {
private RequestKeyParameter requestKeyParameter;
private RequestDecryptedKey requestDecryptedKey;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService messageProcessingExecutor = Executors.newFixedThreadPool(5,
new ContextPropagatingThreadFactory("CoinJoin-MessageProcessor"));
private volatile ExecutorService messageProcessingExecutor = null;
protected final ReentrantLock lock = Threading.lock("coinjoin-manager");

private boolean finishCurrentSessions = false;
Expand Down Expand Up @@ -194,6 +194,8 @@ public void start() {
Context.propagate(context);
schedule = scheduledExecutorService.scheduleWithFixedDelay(
maintenanceRunnable, 1, 1, TimeUnit.SECONDS);
messageProcessingExecutor = Executors.newFixedThreadPool(5,
new ContextPropagatingThreadFactory(MESSAGE_PROCESSOR));
}

public void stop() {
Expand All @@ -214,17 +216,20 @@ public void stop() {
finishCurrentSessions = false;

// Shutdown the message processing executor
messageProcessingExecutor.shutdown();
try {
if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown");
if (messageProcessingExecutor != null) {
messageProcessingExecutor.shutdown();
try {
if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown");
messageProcessingExecutor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for message processing executor to terminate", e);
messageProcessingExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for message processing executor to terminate", e);
messageProcessingExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
messageProcessingExecutor = null;
} finally {
lock.unlock();
}
Expand All @@ -234,6 +239,11 @@ public boolean isRunning() {
return schedule != null && !schedule.isCancelled();
}

private boolean isMessageProcessorRunning() {
return messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown() &&
!messageProcessingExecutor.isTerminated();
}

public void initMasternodeGroup(AbstractBlockChain blockChain) {
this.blockChain = blockChain;
masternodeGroup = new MasternodeGroup(context, blockChain, masternodeListManager);
Expand Down Expand Up @@ -282,8 +292,18 @@ public void close() {
masternodeGroup.removePreMessageReceivedEventListener(preMessageReceivedEventListener);
}
// Ensure executor is shut down
if (!messageProcessingExecutor.isShutdown()) {
messageProcessingExecutor.shutdown();
ExecutorService execToStop = null;
lock.lock();
try {
if (messageProcessingExecutor != null && !messageProcessingExecutor.isShutdown()) {
execToStop = messageProcessingExecutor;
messageProcessingExecutor = null;
}
} finally {
lock.unlock();
}
if (execToStop != null) {
execToStop.shutdown();
}
}

Expand Down Expand Up @@ -508,12 +528,13 @@ public void processTransaction(Transaction tx) {
public final PreMessageReceivedEventListener preMessageReceivedEventListener = (peer, m) -> {
if (m instanceof CoinJoinQueue) {
// Offload DSQueue message processing to thread pool to avoid blocking network I/O thread
try {
messageProcessingExecutor.execute(() -> {
processMessage(peer, m);
});
} catch (RejectedExecutionException e) {
// Executor was shutdown - ignore, we're shutting down
ExecutorService exec = messageProcessingExecutor;
if (exec != null) {
try {
exec.execute(() -> processMessage(peer, m));
} catch (RejectedExecutionException e) {
// swallow because this is being stopped
}
}
// Return null as dsq messages are only processed above
return null;
Expand Down
Loading