diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index de69d97bb79aa..06a8ab2462b96 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -24,6 +24,7 @@ import java.util.NavigableMap; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Predicate; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -76,6 +77,8 @@ public interface ManagedLedger { * data entry to be persisted * @return the Position at which the entry has been inserted * @throws ManagedLedgerException + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException; @@ -88,6 +91,8 @@ public interface ManagedLedger { * numberOfMessages of entry * @return the Position at which the entry has been inserted * @throws ManagedLedgerException + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException; @@ -102,6 +107,8 @@ public interface ManagedLedger { * callback object * @param ctx * opaque context + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ void asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx); @@ -116,6 +123,8 @@ public interface ManagedLedger { * number of bytes * @return the Position at which the entry has been inserted * @throws ManagedLedgerException + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException; @@ -132,6 +141,8 @@ public interface ManagedLedger { * number of bytes * @return the Position at which the entry has been inserted * @throws ManagedLedgerException + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException; @@ -150,6 +161,8 @@ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) thr * callback object * @param ctx * opaque context + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx); @@ -169,6 +182,8 @@ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) thr * callback object * @param ctx * opaque context + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx); @@ -184,6 +199,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad * callback object * @param ctx * opaque context + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx); @@ -199,6 +216,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad * callback object * @param ctx * opaque context + * @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all + * {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order. */ void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx); @@ -733,4 +752,13 @@ default CompletableFuture getLastDispatchablePosition(final Predicate< } Position getFirstPosition(); + + /** + * In the internal implementation of a managed ledger, it is reasonable to use a single-thread executor to execute + * tasks that need to be performed in order. By exposing this internal executor, the caller can synchronize its + * custom tasks, as well as the internal tasks. + */ + default Executor getExecutor() { + return Runnable::run; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7426059e576f6..0a9be319bbb54 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -798,26 +798,21 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } - // retain buffer in this thread + // The buffer will be queued in `pendingAddEntries` and might be polled later in a different thread. However, + // the caller could release it after this method returns. To ensure the buffer is not released when it's polled, + // increase the reference count, which should be decreased by `OpAddEntry`'s methods later. buffer.retain(); - - // Jump to specific thread to avoid contention from writers writing from different threads final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, currentLedgerTimeoutTriggered); var added = false; try { - // Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first - // element in `pendingAddEntries`. - synchronized (this) { - if (managedLedgerInterceptor != null) { - managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); - } - final var state = STATE_UPDATER.get(this); - beforeAddEntryToQueue(state); - pendingAddEntries.add(addOperation); - added = true; - afterAddEntryToQueue(state, addOperation); + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); } + beforeAddEntryToQueue(); + pendingAddEntries.add(addOperation); + added = true; + afterAddEntryToQueue(addOperation); } catch (Throwable throwable) { if (!added) { addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable)); @@ -825,7 +820,8 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback } } - protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { + protected void beforeAddEntryToQueue() throws ManagedLedgerException { + final var state = STATE_UPDATER.get(this); if (state.isFenced()) { throw new ManagedLedgerFencedException(); } @@ -836,7 +832,9 @@ protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException } } - protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { + // TODO: does this method really need to be synchronized? + protected synchronized void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException { + final var state = STATE_UPDATER.get(this); if (state == State.ClosingLedger || state == State.CreatingLedger) { // We don't have a ready ledger to write into // We are waiting for a new ledger to be created @@ -895,22 +893,6 @@ protected void afterFailedAddEntry(int numOfMessages) { managedLedgerInterceptor.afterFailedAddEntry(numOfMessages); } - protected boolean beforeAddEntry(OpAddEntry addOperation) { - // if no interceptor, just return true to make sure addOperation will be initiate() - if (managedLedgerInterceptor == null) { - return true; - } - try { - managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); - return true; - } catch (Exception e) { - addOperation.failed( - new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); - log.error("[{}] Failed to intercept adding an entry to bookie.", name, e); - return false; - } - } - @Override public void readyToCreateNewLedger() { // only set transition state to ClosedLedger if current state is WriteFailed diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index bae6cd66d2825..5a94714a45ab9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -223,14 +223,14 @@ private void initLastConfirmedEntry() { } @Override - protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { - if (state != State.LedgerOpened) { + protected void beforeAddEntryToQueue() throws ManagedLedgerException { + if (STATE_UPDATER.get(this) != State.LedgerOpened) { throw new ManagedLedgerException("Managed ledger is not opened"); } } @Override - protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { + protected void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException { if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) { pendingAddEntries.poll(); throw new ManagedLedgerException("Illegal addOperation context object."); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 574ed2f325136..78e38c49091fe 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -275,7 +275,10 @@ public void verifyAsyncReadEntryUsingCache() throws Exception { // wait for all threads to be ready to start at once barrier.await(); while (!done.get()) { - Position position = ledger.addEntry("entry".getBytes()); + Position position; + synchronized (this) { + position = ledger.addEntry("entry".getBytes()); + } positions.add(position); Thread.sleep(1); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ed05e47ed38e3..4c14be66b327c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -671,8 +671,21 @@ public void updateSubscribeRateLimiter() { } private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) { - ledger.asyncAddEntry(headersAndPayload, - (int) publishContext.getNumberOfMessages(), this, publishContext); + // Retain the buffer in advance to avoid the buffer might have been released when it's passed to `asyncAddEntry` + final var buffer = headersAndPayload.retain(); + try { + ledger.getExecutor().execute(() -> { + try { + ledger.asyncAddEntry(buffer, (int) publishContext.getNumberOfMessages(), this, publishContext); + } finally { + buffer.release(); + } + }); + } catch (Exception e) { + buffer.release(); // decrease the reference count retained at the beginning of this method + buffer.release(); // release the original headersAndPayload since it won't be used anymore + publishContext.completed(e, -1L, -1L); + } } public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 92b767104f6cf..051eb455481a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -63,6 +63,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -72,6 +73,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -296,16 +298,36 @@ public void testCreateTopicMLFailure() { @Test public void testPublishMessage() throws Exception { - + // Only allow at most 1 pending task + final var mlIOExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); doAnswer(invocationOnMock -> { final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0]; final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[2]; final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[3]; - callback.addComplete(PositionFactory.LATEST, payload, ctx); + mlIOExecutor.execute(() -> { + callback.addComplete(PositionFactory.LATEST, payload, ctx); + payload.release(); + }); return null; }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any()); + doReturn(mlIOExecutor).when(ledgerMock).getExecutor(); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + verifyMessagePublish(topic, true); + + mlIOExecutor.execute(() -> { + try { + Thread.sleep(30000); + } catch (InterruptedException ignored) { + } + }); + mlIOExecutor.execute(() -> {}); // make the work queue full + verifyMessagePublish(topic, false); + + mlIOExecutor.shutdown(); + } + + private void verifyMessagePublish(PersistentTopic topic, boolean success) throws Exception { long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp(); /* @@ -315,27 +337,35 @@ public void testPublishMessage() throws Exception { */ ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes()); - final CountDownLatch latch = new CountDownLatch(1); - + final var future = new CompletableFuture(); final Topic.PublishContext publishContext = new Topic.PublishContext() { @Override public void completed(Exception e, long ledgerId, long entryId) { - assertEquals(ledgerId, PositionFactory.LATEST.getLedgerId()); - assertEquals(entryId, PositionFactory.LATEST.getEntryId()); - latch.countDown(); + if (e == null) { + future.complete(PositionFactory.create(ledgerId, entryId)); + } else { + future.completeExceptionally(e); + } } @Override public void setMetadataFromEntryData(ByteBuf entryData) { // This method must be invoked before `completed` - assertEquals(latch.getCount(), 1); + assertFalse(future.isDone()); assertEquals(entryData.array(), payload.array()); } }; topic.publishMessage(payload, publishContext); - assertTrue(latch.await(1, TimeUnit.SECONDS)); - assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp); + try { + assertEquals(future.get(3, TimeUnit.SECONDS), PositionFactory.LATEST); + assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp); + assertTrue(success); + } catch (ExecutionException ignored) { + assertFalse(success); + } + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(payload.refCnt(), 0)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index b1c99940827c8..030b2b53d75c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -64,6 +64,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -2922,6 +2923,7 @@ private void setupMLAsyncCallbackMocks() { cursorMock = mock(ManagedCursor.class); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); + doReturn((Executor) Runnable::run).when(ledgerMock).getExecutor(); // call openLedgerComplete with ledgerMock on ML factory asyncOpen doAnswer((Answer) invocationOnMock -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java index 5b1c78574b462..b09dff63a8b36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageDuplicationTest.java @@ -41,6 +41,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -252,6 +253,7 @@ public void testIsDuplicateWithFailure() { ManagedLedger managedLedger = mock(ManagedLedger.class); MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger)); doReturn(true).when(messageDeduplication).isEnabled(); + doReturn((Executor) Runnable::run).when(managedLedger).getExecutor(); EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);