Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
LedgerHandle currentLedger = ml.currentLedger;
LedgerHandle spyCurrentLedger = Mockito.spy(currentLedger);
ml.currentLedger = spyCurrentLedger;
Answer answer = invocation -> {
Answer<?> answer = invocation -> {
long firstEntry = (long) invocation.getArguments()[0];
log.info("reading entry: {}", firstEntry);
if (firstEntry == start1) {
Expand All @@ -112,7 +112,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
Object res = invocation.callRealMethod();
return res;
} else if (secondReadEntries.contains(firstEntry)) {
final CompletableFuture res = new CompletableFuture<>();
final CompletableFuture<Object> res = new CompletableFuture<>();
threadFactory.newThread(() -> {
try {
readCompleteSignal2.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ void testConcurrentResetCursor() throws Exception {
final int messages = 100;
final int consumers = 5;

List<Future<AtomicBoolean>> futures = new ArrayList();
List<Future<AtomicBoolean>> futures = new ArrayList<>();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(consumers + 1);
Expand Down Expand Up @@ -2372,7 +2372,7 @@ void testReadEntriesOrWaitBlocking() throws Exception {
final int messages = 100;
final int consumers = 10;

List<Future<Void>> futures = new ArrayList();
List<Future<Void>> futures = new ArrayList<>();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(consumers + 1);
Expand Down Expand Up @@ -3115,7 +3115,7 @@ void testReplayEntries() throws Exception {
ledger.addEntry("entry4".getBytes(Encoding));

// 1. Replay empty position set should return empty entry set
Set<Position> positions = new HashSet();
Set<Position> positions = new HashSet<>();
assertTrue(c1.replayEntries(positions).isEmpty());

positions.add(p1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void verifyConcurrentUsage() throws Exception {
final AtomicBoolean done = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(numProducers + numConsumers + 1);

List<Future<?>> futures = new ArrayList();
List<Future<?>> futures = new ArrayList<>();

for (int i = 0; i < numProducers; i++) {
futures.add(executor.submit(() -> {
Expand Down Expand Up @@ -266,7 +266,7 @@ public void verifyAsyncReadEntryUsingCache() throws Exception {
final AtomicBoolean done = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(numProducers + numConsumers + 1);

List<Future<?>> futures = new ArrayList();
List<Future<?>> futures = new ArrayList<>();
List<Position> positions = new CopyOnWriteArrayList<>();

for (int i = 0; i < numProducers; i++) {
Expand Down Expand Up @@ -375,12 +375,12 @@ public void testConcurrentMarkDelete() throws Exception {
mlConfig.setMetadataMaxEntriesPerLedger(10);
ManagedLedger ledger = factory.open("ml-markdelete-ledger", mlConfig);

final List<Position> addedEntries = new ArrayList();
final List<Position> addedEntries = new ArrayList<>();

int numCursors = 10;
final CyclicBarrier barrier = new CyclicBarrier(numCursors);

List<ManagedCursor> cursors = new ArrayList();
List<ManagedCursor> cursors = new ArrayList<>();
for (int i = 0; i < numCursors; i++) {
cursors.add(ledger.openCursor(String.format("c%d", i)));
}
Expand All @@ -390,7 +390,7 @@ public void testConcurrentMarkDelete() throws Exception {
addedEntries.add(pos);
}

List<Future<?>> futures = new ArrayList();
List<Future<?>> futures = new ArrayList<>();

for (ManagedCursor cursor : cursors) {
futures.add(executor.submit(() -> {
Expand Down Expand Up @@ -424,15 +424,15 @@ public void asyncMarkDeleteAndClose() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("c1");

List<Position> positions = new ArrayList();
List<Position> positions = new ArrayList<>();

for (int i = 0; i < 10; i++) {
Position p = ledger.addEntry("entry".getBytes());
positions.add(p);
}

final CountDownLatch counter = new CountDownLatch(positions.size());
final AtomicReference<Exception> gotException = new AtomicReference();
final AtomicReference<Exception> gotException = new AtomicReference<>();

for (Position p : positions) {
cursor.asyncDelete(p, new DeleteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ public void getCursors() throws Exception {

c2.close();
ledger.deleteCursor("c2");
assertEquals(Sets.newHashSet(ledger.getCursors()), new HashSet());
assertEquals(Sets.newHashSet(ledger.getCursors()), new HashSet<>());
}

@Test
Expand Down Expand Up @@ -1621,15 +1621,15 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
public void ledgersList() throws Exception {
MetaStore store = factory.getMetaStore();

assertEquals(Sets.newHashSet(store.getManagedLedgers()), new HashSet());
assertEquals(Sets.newHashSet(store.getManagedLedgers()), new HashSet<>());
ManagedLedger ledger1 = factory.open("ledger1");
assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet("ledger1"));
ManagedLedger ledger2 = factory.open("ledger2");
assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet("ledger1", "ledger2"));
ledger1.delete();
assertEquals(Sets.newHashSet(store.getManagedLedgers()), Sets.newHashSet("ledger2"));
ledger2.delete();
assertEquals(Sets.newHashSet(store.getManagedLedgers()), new HashSet());
assertEquals(Sets.newHashSet(store.getManagedLedgers()), new HashSet<>());
}

@Test
Expand Down Expand Up @@ -3225,7 +3225,7 @@ public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
assertTrue(ctxHolder.get() instanceof CompletableFuture);
CompletableFuture ledgerCreateHook = (CompletableFuture) ctxHolder.get();
CompletableFuture<?> ledgerCreateHook = (CompletableFuture<?>) ctxHolder.get();
assertTrue(ledgerCreateHook.isCompletedExceptionally());

ledger.close();
Expand Down Expand Up @@ -3650,7 +3650,7 @@ public void testManagedLedgerWithPlacementPolicyInCustomMetadata() throws Except
assertEquals(config.getProperties().get("key"), "value");
}

private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
private void setFieldValue(Class<?> clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(classObj, fieldValue);
Expand Down Expand Up @@ -4530,7 +4530,7 @@ private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) {
BlockingQueue<Runnable> queue = WhiteboxImpl.getInternalState(boundedScheduledExecutorService, "queue");
for (Runnable r : queue) {
if (r instanceof FutureTask) {
FutureTask futureTask = (FutureTask) r;
FutureTask<?> futureTask = (FutureTask<?>) r;
if (!futureTask.isCancelled() && !futureTask.isDone()) {
taskCounter++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testBookkeeperFirstOffloadRead(String offloadType) throws Exception
}
config.setRetentionTime(0, TimeUnit.MILLISECONDS);
config.setRetentionSizeInMB(0);
CompletableFuture trimFuture = new CompletableFuture();
CompletableFuture<Void> trimFuture = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(trimFuture);
trimFuture.join();
Awaitility.await().untilAsserted(() -> {
Expand Down Expand Up @@ -381,7 +381,7 @@ public CompletableFuture<Void> closeAsync() {

static class MockOffloadReadHandle implements ReadHandle, OffloadedLedgerHandle {
final long id;
final List<ByteBuf> entries = new ArrayList();
final List<ByteBuf> entries = new ArrayList<>();
final LedgerMetadata metadata;
long lastAccessTimestamp = System.currentTimeMillis();

Expand Down Expand Up @@ -413,7 +413,7 @@ public CompletableFuture<Void> closeAsync() {

@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
List<LedgerEntry> readEntries = new ArrayList();
List<LedgerEntry> readEntries = new ArrayList<>();
for (long eid = firstEntry; eid <= lastEntry; eid++) {
ByteBuf buf = entries.get((int) eid).retainedSlice();
readEntries.add(LedgerEntryImpl.create(id, eid, buf.readableBytes(), buf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception {

// Registry interceptor.
ManagedLedgerConfig config = new ManagedLedgerConfig();
Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
Set<ManagedLedgerPayloadProcessor> processors = new HashSet<>();
processors.add(new TestPayloadProcessor());
ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors);
ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet<>(), processors);
config.setManagedLedgerInterceptor(interceptor);
config.setMaxEntriesPerLedger(100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception {
LedgerHandle spyFirstLedger = spy(firstLedger);
CountDownLatch replyReadSignal = new CountDownLatch(1);
AtomicBoolean replayReadWasTriggered = new AtomicBoolean();
Answer answer = invocation -> {
Answer<?> answer = invocation -> {
long firstEntry = (long) invocation.getArguments()[0];
if (firstEntry == firstWaitingAckPos.getEntryId()) {
replayReadWasTriggered.set(true);
final CompletableFuture res = new CompletableFuture<>();
final CompletableFuture<Object> res = new CompletableFuture<>();
threadFactory.newThread(() -> {
try {
replyReadSignal.await();
Expand Down Expand Up @@ -246,6 +246,7 @@ public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception {
consumerList.get(consumerList.size() - 1).join();

synchronized (dispatcher) {
@SuppressWarnings("rawtypes")
LinkedHashMap recentJoinedConsumers = dispatcher.getRecentlyJoinedConsumers();
assertTrue(verifyMapItemsAreInOrder(recentJoinedConsumers));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void testMainGenerateDocs() throws Exception {
ByteArrayOutputStream baoStream = new ByteArrayOutputStream();
System.setOut(new PrintStream(baoStream));

Class argumentsClass = Class.forName("org.apache.pulsar.PulsarBrokerStarter$StarterArguments");
Class<?> argumentsClass = Class.forName("org.apache.pulsar.PulsarBrokerStarter$StarterArguments");
PulsarBrokerStarter.main(new String[]{"-g"});

String message = baoStream.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testMainGenerateDocs() throws Exception {
ByteArrayOutputStream baoStream = new ByteArrayOutputStream();
System.setOut(new PrintStream(baoStream));

Class argumentsClass =
Class<?> argumentsClass =
Class.forName("org.apache.pulsar.PulsarClusterMetadataTeardown$Arguments");

PulsarClusterMetadataTeardown.main(new String[]{"-zk", "zk", "-g"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testMainGenerateDocs() throws Exception {
ByteArrayOutputStream baoStream = new ByteArrayOutputStream();
System.setOut(new PrintStream(baoStream));

Class argumentsClass =
Class<?> argumentsClass =
Class.forName("org.apache.pulsar.PulsarVersionStarter$Arguments");

PulsarVersionStarter.main(new String[]{"-g"});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void testMarkDeletedPositionCanForwardAfterTopicLedgerLost(boolean enable
ConsumerAndReceivedMessages consumerAndReceivedMessages1 =
waitConsumeAndAllMessages(topicName, subName, enabledBatch, false);
List<MessageIdImpl>[] messageIds = consumerAndReceivedMessages1.messageIds;
Consumer consumer = consumerAndReceivedMessages1.consumer;
Consumer<?> consumer = consumerAndReceivedMessages1.consumer;
MessageIdImpl individualPosition = messageIds[1].get(messageCountPerEntry - 1);
MessageIdImpl expectedMarkDeletedPosition =
new MessageIdImpl(messageIds[0].get(0).getLedgerId(), messageIds[0].get(0).getEntryId(), -1);
Expand Down Expand Up @@ -153,7 +153,7 @@ private void waitPersistentCursorLedger(String topicName, String subName, final

private List<MessageIdImpl>[] sendManyMessages(String topicName, int ledgerCount, int messageCountPerLedger,
int messageCountPerEntry) throws Exception {
List<MessageIdImpl>[] messageIds = new List[ledgerCount];
@SuppressWarnings({"unchecked", "rawtypes"}) List<MessageIdImpl>[] messageIds = new List[ledgerCount];
for (int i = 0; i < ledgerCount; i++){
admin.topics().unload(topicName);
if (messageCountPerEntry == 1) {
Expand Down Expand Up @@ -221,9 +221,9 @@ private ConsumerAndReceivedMessages waitConsumeAndAllMessages(String topicName,
final boolean enabledBatch,
boolean ack) throws Exception {
List<MessageIdImpl> messageIds = new ArrayList<>();
final Consumer consumer = createConsumer(topicName, subName, enabledBatch);
final Consumer<?> consumer = createConsumer(topicName, subName, enabledBatch);
while (true){
Message message = consumer.receive(5, TimeUnit.SECONDS);
Message<?> message = consumer.receive(5, TimeUnit.SECONDS);
if (message != null){
messageIds.add((MessageIdImpl) message.getMessageId());
if (ack) {
Expand All @@ -239,13 +239,14 @@ private ConsumerAndReceivedMessages waitConsumeAndAllMessages(String topicName,

@AllArgsConstructor
private static class ConsumerAndReceivedMessages {
private Consumer consumer;
private Consumer<?> consumer;
private List<MessageIdImpl>[] messageIds;
}

private List<MessageIdImpl>[] sortMessageId(List<MessageIdImpl> messageIds, boolean enabledBatch){
Map<Long, List<MessageIdImpl>> map = messageIds.stream().collect(Collectors.groupingBy(v -> v.getLedgerId()));
TreeMap<Long, List<MessageIdImpl>> sortedMap = new TreeMap<>(map);
@SuppressWarnings({"unchecked", "rawtypes"})
List<MessageIdImpl>[] res = new List[sortedMap.size()];
Iterator<Map.Entry<Long, List<MessageIdImpl>>> iterator = sortedMap.entrySet().iterator();
for (int i = 0; i < sortedMap.size(); i++){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2623,7 +2623,7 @@ public void testMaxSubscriptionsPerTopic() throws Exception {
final String topic = "persistent://testTenant/ns1/max-subscriptions-per-topic";

admin.topics().createPartitionedTopic(topic, 3);
Producer producer = pulsarClient.newProducer().topic(topic).create();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producer.close();

// create subscription
Expand Down Expand Up @@ -2662,9 +2662,9 @@ public void testMaxSubscriptionsPerTopic() throws Exception {
producer = pulsarClient.newProducer().topic(topic).create();
producer.close();

Consumer consumer1 = null;
Consumer consumer2 = null;
Consumer consumer3 = null;
Consumer<?> consumer1 = null;
Consumer<?> consumer2 = null;
Consumer<?> consumer3 = null;

try {
consumer1 = pulsarClient.newConsumer().subscriptionName("test-sub1").topic(topic).subscribe();
Expand Down Expand Up @@ -3268,14 +3268,14 @@ private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
AbstractTopic topic = (AbstractTopic) topics.get(topicName).join().get();
AbstractTopic spyTopic = Mockito.spy(topic);
AtomicInteger counter = new AtomicInteger();
doAnswer(new Answer() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
return invocation.callRealMethod();
}
}).when(spyTopic).addSchema(any(SchemaData.class));
doAnswer(new Answer() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
Expand Down Expand Up @@ -3797,7 +3797,7 @@ private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
for (int i = 0; i < 10; i++) {
Awaitility.await().untilAsserted(() -> {
Message m = consumer.receive();
Message<?> m = consumer.receive();
assertNotNull(m);
consumer.acknowledge(m);
});
Expand Down Expand Up @@ -4162,7 +4162,7 @@ public void testOverridesNamespaceOffloadThreshold() throws Exception {
public void testDeletePatchyPartitionedTopic() throws Exception {
final String topic = BrokerTestUtil.newUniqueName(defaultNamespace + "/tp");
admin.topics().createPartitionedTopic(topic, 2);
Producer producer = pulsarClient.newProducer().topic(TopicName.get(topic).getPartition(0).toString())
Producer<byte[]> producer = pulsarClient.newProducer().topic(TopicName.get(topic).getPartition(0).toString())
.create();
// Mock a scenario that "-partition-1" has been removed due to topic GC.
pulsar.getBrokerService().getTopic(TopicName.get(topic).getPartition(1).toString(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void testDeadlockDetectionOverhead() {
class DummyProducerBuilder<T> extends ProducerBuilderImpl<T> {
// This is a dummy producer builder to test the health check timeout
// the producer constructed by this builder will not send any message
@SuppressWarnings("rawtypes")
public DummyProducerBuilder(PulsarClientImpl client, Schema schema) {
super(client, schema);
}
Expand Down
Loading
Loading