Skip to content

Commit ddec811

Browse files
authored
KAFKA-19937: Introduced Shared ReaperThread for Persister / NetworkPartitionMetadataClient (#21842)
### Description This pull request addresses the redundant thread usage detailed in [KAFKA-19937](https://issues.apache.org/jira/browse/KAFKA-19937) affecting the `PersisterStateManager` and `NetworkPartitionMetadataClient` classes specifically. Presently each creates/manages its own separate `SystemTimerReaper` instances, but rely on identical timers with independent tasks. The changes proposed address this by introducing a new, sharable instance of the thread to reduce overhead. ### Key Changes - Updated `BrokerServer` to create a single shared `SystemTimerReaper` instance used by both `PersisterStateManager` and `NetworkPartitionMetadataClient`, with cleanup in the shutdown path after both components have been stopped. - Moved timer ownership to the caller for the affected classes to the caller (e.g., `PersisterStateManager.stop()` and `NetworkPartitionMetadataClient.close()` no longer close their injected timer, as lifecycle is managed by `BrokerServer`). - This specific timer ownership behavior is documented via JavaDocs for both `PersisterStateManager` and `NetworkPartitionMetadataClient` - Added null validation to the `SystemTimerReaper` constructor arguments. ### Tests and Verification Verified that all existing test suites still pass as expected and added the following to verify new behavior and usage related to the above changes: - Extended `SystemTimerReaperTest.java` to verify null validity and timer-sharing behavior (e.g., two consumers sharing a timer can both schedule and expire tasks independently). - Updated `PersisterStateManagerTest.java` to verify that `stop()` does not close the timer, consistent with the new caller-ownership contract. ### Reviewer(s) Tagging @AndrewJSchofield (initial reporter) Reviewers: Sushant Mahajan <smahajan@confluent.io>, Andrew Schofield <aschofield@confluent.io>
1 parent eef6cab commit ddec811

6 files changed

Lines changed: 61 additions & 13 deletions

File tree

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYamm
5454
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
5555
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}
5656
import org.apache.kafka.server.share.session.ShareSessionCache
57-
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
57+
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper, Timer}
5858
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler, NetworkPartitionMetadataClient, PartitionMetadataClient}
5959
import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, BrokerLifecycleManager, ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, FetchManager, FetchSessionCacheShard, KRaftTopicCreator, NodeToControllerChannelManagerImpl, ProcessRole, RaftControllerNodeProvider}
6060
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
@@ -168,6 +168,8 @@ class BrokerServer(
168168

169169
var persister: Persister = _
170170

171+
private var shareGroupTimer: Timer = _
172+
171173
private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
172174
lock.lock()
173175
try {
@@ -381,6 +383,9 @@ class BrokerServer(
381383
/* create share coordinator */
382384
shareCoordinator = createShareCoordinator()
383385

386+
/* create shared timer for share group components */
387+
shareGroupTimer = new SystemTimerReaper("share-group-reaper", new SystemTimer("share-group"))
388+
384389
/* create persister */
385390
persister = createShareStatePersister()
386391

@@ -655,7 +660,7 @@ class BrokerServer(
655660
),
656661
Time.SYSTEM,
657662
config.interBrokerListenerName(),
658-
new SystemTimerReaper("network-partition-metadata-client-reaper", new SystemTimer("network-partition-metadata-client"))
663+
shareGroupTimer
659664
)
660665
}
661666

@@ -734,10 +739,7 @@ class BrokerServer(
734739
NetworkUtils.buildNetworkClient("Persister", config, metrics, Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
735740
new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => shareCoordinator.partitionFor(key), config.interBrokerListenerName),
736741
Time.SYSTEM,
737-
new SystemTimerReaper(
738-
"persister-state-manager-reaper",
739-
new SystemTimer("persister")
740-
)
742+
shareGroupTimer
741743
)
742744
)
743745
} else if (klass.getName.equals(classOf[NoOpStatePersister].getName)) {
@@ -887,6 +889,8 @@ class BrokerServer(
887889
if (persister != null)
888890
Utils.swallow(this.logger.underlying, () => persister.stop())
889891

892+
Utils.closeQuietly(shareGroupTimer, "share group timer")
893+
890894
if (lifecycleManager != null)
891895
Utils.swallow(this.logger.underlying, () => lifecycleManager.close())
892896

server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
5555
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
5656
import org.apache.kafka.common.utils.Time;
57-
import org.apache.kafka.common.utils.Utils;
5857
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
5958
import org.apache.kafka.server.share.SharePartitionKey;
6059
import org.apache.kafka.server.util.InterBrokerSendThread;
@@ -127,6 +126,13 @@ public enum RPCType {
127126
UNKNOWN
128127
}
129128

129+
/**
130+
* Creates a new PersisterStateManager.
131+
*
132+
* <p>The caller retains ownership of the supplied {@link Timer} and is responsible for
133+
* closing it. {@link #stop()} will not close the timer, allowing it to be shared with
134+
* other components.
135+
*/
130136
public PersisterStateManager(KafkaClient client, ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
131137
if (client == null) {
132138
throw new IllegalArgumentException("Kafkaclient must not be null.");
@@ -166,7 +172,6 @@ public void start() {
166172
public void stop() throws Exception {
167173
if (isStarted.compareAndSet(true, false)) {
168174
this.sender.shutdown();
169-
Utils.closeQuietly(this.timer, "PersisterStateManager timer");
170175
}
171176
}
172177

server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.apache.kafka.server.util.ShutdownableThread;
2020

21+
import java.util.Objects;
22+
2123
/**
2224
* SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
2325
* to expire the tasks in the {@link Timer}.
@@ -44,8 +46,8 @@ public void doWork() {
4446
private final Reaper reaper;
4547

4648
public SystemTimerReaper(String reaperThreadName, Timer timer) {
47-
this.timer = timer;
48-
this.reaper = new Reaper(reaperThreadName);
49+
this.timer = Objects.requireNonNull(timer, "timer must not be null");
50+
this.reaper = new Reaper(Objects.requireNonNull(reaperThreadName, "reaperThreadName must not be null"));
4951
this.reaper.start();
5052
}
5153

server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4827,7 +4827,8 @@ public void testPersisterStateManagerClose() {
48274827
psm.stop();
48284828

48294829
verify(client, times(1)).close();
4830-
verify(timer, times(1)).close();
4830+
// Timer lifecycle is the caller's responsibility, not PersisterStateManager's.
4831+
verify(timer, times(0)).close();
48314832
} catch (Exception e) {
48324833
fail("unexpected exception", e);
48334834
}

server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import java.util.concurrent.CompletableFuture;
2626

27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
2729
public class SystemTimerReaperTest {
2830
private static class FutureTimerTask<T> extends TimerTask {
2931
CompletableFuture<T> future = new CompletableFuture<>();
@@ -67,4 +69,33 @@ public void testReaperClose() throws Exception {
6769
Mockito.verify(timer, Mockito.times(1)).close();
6870
TestUtils.waitForCondition(timerReaper::isShutdown, "reaper not shutdown");
6971
}
72+
73+
@Test
74+
public void testSharedTimerBetweenConsumers() throws Exception {
75+
try (Timer timer = new SystemTimerReaper("shared-reaper", new SystemTimer("shared-timer"))) {
76+
// Set up two independent consumer tasks to the same timer
77+
CompletableFuture<Void> consumer1Task = add(timer, 100L);
78+
CompletableFuture<Void> consumer2Task = add(timer, 200L);
79+
80+
TestUtils.assertFutureThrows(TimeoutException.class, consumer1Task);
81+
TestUtils.assertFutureThrows(TimeoutException.class, consumer2Task);
82+
83+
// After the first consumer's tasks have completed (simulating one consumer
84+
// stopping), the second consumer can still schedule and expire tasks as expected
85+
CompletableFuture<Void> consumer2LateTasks = add(timer, 100L);
86+
TestUtils.assertFutureThrows(TimeoutException.class, consumer2LateTasks);
87+
}
88+
}
89+
90+
@Test
91+
public void testRejectsNullName() {
92+
assertThrows(NullPointerException.class, () ->
93+
new SystemTimerReaper(null, Mockito.mock(Timer.class)));
94+
}
95+
96+
@Test
97+
public void testRejectsNullTimer() {
98+
assertThrows(NullPointerException.class, () ->
99+
new SystemTimerReaper("reaper", null));
100+
}
70101
}

server/src/main/java/org/apache/kafka/server/util/NetworkPartitionMetadataClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.common.requests.ListOffsetsRequest;
3232
import org.apache.kafka.common.requests.ListOffsetsResponse;
3333
import org.apache.kafka.common.utils.Time;
34-
import org.apache.kafka.common.utils.Utils;
3534
import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
3635
import org.apache.kafka.metadata.MetadataCache;
3736
import org.apache.kafka.server.util.timer.Timer;
@@ -68,6 +67,13 @@ public class NetworkPartitionMetadataClient implements PartitionMetadataClient {
6867
private volatile SendThread sendThread;
6968
private final Timer timer;
7069

70+
/**
71+
* Creates a new NetworkPartitionMetadataClient.
72+
*
73+
* <p>The caller retains ownership of the supplied {@link Timer} and is responsible for
74+
* closing it. {@link #close()} will not close the timer, allowing it to be shared with
75+
* other components.
76+
*/
7177
public NetworkPartitionMetadataClient(MetadataCache metadataCache,
7278
Supplier<KafkaClient> networkClientSupplier,
7379
Time time, ListenerName listenerName, Timer timer) {
@@ -149,7 +155,6 @@ public Map<TopicPartition, CompletableFuture<OffsetResponse>> listLatestOffsets(
149155
public void close() {
150156
// Only close sendThread if it was initialized. Note, close is called only during broker shutdown, so need
151157
// for further synchronization here.
152-
Utils.closeQuietly(timer, "NetworkPartitionMetadataClient timer");
153158
if (!initialized.get()) {
154159
return;
155160
}

0 commit comments

Comments
 (0)