From f82f28035f0f702649f6926d6c734775519ce397 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Sat, 10 Jan 2026 03:43:44 +0000 Subject: [PATCH 1/5] Manager terminates zlocks after max halts When the manager is gathering per table info it will attempt to use the existing tserver connection to request table information. If an exception is thrown during this request then the manager will record the failure by computing an entry in the badServers unable to be reached by the manager for long periods of time, the manager will attempt to communicate with the tserver 3 times before attempting a halt action. This commit adds a new property of max amount of halt requests that allows the manager to delete a zlock once the max halt requests have been attempted. --- .../apache/accumulo/core/conf/Property.java | 4 +++ .../core/fate/zookeeper/ServiceLock.java | 3 ++ .../org/apache/accumulo/manager/Manager.java | 35 +++++++++++++++++-- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 17edff8d2ea..af389b6dd50 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -450,6 +450,10 @@ public enum Property { "The number of threads used to run fault-tolerant executions (FATE)." + " These are primarily table operations like merge.", "1.4.3"), + MANAGER_MAX_TSERVER_HALTS("manager.max.tservers.halts", "0", PropertyType.COUNT, + "Allows the manager to force tserver halting by setting the max number of attempted tserver halt " + + " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.", + "2.1.5"), @Deprecated(since = "2.1.0") MANAGER_REPLICATION_SCAN_INTERVAL("manager.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION, diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java index 42e57206e58..45d04c71cde 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java @@ -723,6 +723,9 @@ public long getSessionId() throws KeeperException, InterruptedException { /** * This method will delete all server locks for a given path according the predicate conditions. * + * @param zk zookeeper instance + * @param zPath can be a path directly to a host or a general path like @{link + * org.apache.accumulo.core.Constants.ZTSERVERS} or a resource group * @param hostPortPredicate conditional predicate for determining if the lock should be removed. * @param messageOutput function for setting where the output from the lockPath goes * @param dryRun allows lock format validation and the messageOutput to be sent without actually diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 1bb15169914..904556e3170 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -195,6 +195,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, final AuditedSecurityOperation security; final Map badServers = Collections.synchronizedMap(new HashMap<>()); + final Map haltedServers = + Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); final EventCoordinator nextEvent = new EventCoordinator(); @@ -1150,6 +1152,8 @@ private List checkMigrationSanity(Set current, long start = System.currentTimeMillis(); final SortedMap result = new ConcurrentSkipListMap<>(); final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC); + final int maxTserverHalts = getConfiguration().getCount(Property.MANAGER_MAX_TSERVER_HALTS); + final boolean forceHaltingEnabled = maxTserverHalts != 0; for (TServerInstance serverInstance : currentServers) { final TServerInstance server = serverInstance; if (threads == 0) { @@ -1190,15 +1194,31 @@ private List checkMigrationSanity(Set current, > MAX_BAD_STATUS_COUNT) { if (shutdownServerRateLimiter.tryAcquire()) { log.warn("attempting to stop {}", server); + if (forceHaltingEnabled + && (haltedServers.computeIfAbsent(server, s -> new AtomicInteger(0)) + .incrementAndGet() > maxTserverHalts)) { + log.warn("tserver {} is not responding to halt requests, deleting zlock", server); + var zk = getContext().getZooReaderWriter(); + var iid = getContext().getInstanceID(); + String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; + try { + ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals, + log::info, false); + } catch (KeeperException | InterruptedException e) { + log.error("Failed to delete zlock for server {}", server); + } + haltedServers.remove(server); + } try { TServerConnection connection2 = tserverSet.getConnection(server); if (connection2 != null) { connection2.halt(managerLock); } } catch (TTransportException e1) { - // ignore: it's probably down + // ignore: it's probably down so log the exception at trace + log.trace("error attempting to halt tablet server {}", server, e1); } catch (Exception e2) { - log.info("error talking to troublesome tablet server", e2); + log.info("error talking to troublesome tablet server {}", server, e2); } } else { log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server, @@ -1225,6 +1245,12 @@ private List checkMigrationSanity(Set current, badServers.keySet().retainAll(currentServers); badServers.keySet().removeAll(info.keySet()); } + + synchronized (haltedServers) { + haltedServers.keySet().retainAll(currentServers); + haltedServers.keySet().removeAll(info.keySet()); + } + log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds", info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.)); @@ -1727,6 +1753,7 @@ public void update(LiveTServerSet current, Set deleted, } serversToShutdown.removeAll(deleted); badServers.keySet().removeAll(deleted); + haltedServers.keySet().removeAll(deleted); // clear out any bad server with the same host/port as a new server synchronized (badServers) { cleanListByHostAndPort(badServers.keySet(), deleted, added); @@ -1734,7 +1761,9 @@ public void update(LiveTServerSet current, Set deleted, synchronized (serversToShutdown) { cleanListByHostAndPort(serversToShutdown, deleted, added); } - + synchronized (haltedServers) { + cleanListByHostAndPort(haltedServers.keySet(), deleted, added); + } migrations.removeServers(deleted); nextEvent.event("There are now %d tablet servers", current.size()); } From 3da2750b31fe98085eab60ff29bcdf955bafaf62 Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Mon, 12 Jan 2026 11:32:43 -0500 Subject: [PATCH 2/5] Update server/manager/src/main/java/org/apache/accumulo/manager/Manager.java Co-authored-by: Dave Marion --- .../src/main/java/org/apache/accumulo/manager/Manager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 904556e3170..b815bacf3ae 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1205,7 +1205,7 @@ private List checkMigrationSanity(Set current, ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals, log::info, false); } catch (KeeperException | InterruptedException e) { - log.error("Failed to delete zlock for server {}", server); + log.error("Failed to delete zlock for server {}", server, e); } haltedServers.remove(server); } From c35454cb3ea751587780a0be18a6c05b732c84c6 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Mon, 12 Jan 2026 19:16:02 +0000 Subject: [PATCH 3/5] Moves traditional rpc halt to else statement Moves the rpc halt request into an else statement so an RPC halt request is not attempted on a tserver without a zlock. Moved the server removal from the halted map into the try section so it doesn't get removed if the zlock removal failed. --- .../org/apache/accumulo/manager/Manager.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b815bacf3ae..5637d045e81 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1204,21 +1204,23 @@ private List checkMigrationSanity(Set current, try { ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals, log::info, false); + haltedServers.remove(server); + badServers.remove(server); } catch (KeeperException | InterruptedException e) { log.error("Failed to delete zlock for server {}", server, e); } - haltedServers.remove(server); - } - try { - TServerConnection connection2 = tserverSet.getConnection(server); - if (connection2 != null) { - connection2.halt(managerLock); + } else { + try { + TServerConnection connection2 = tserverSet.getConnection(server); + if (connection2 != null) { + connection2.halt(managerLock); + } + } catch (TTransportException e1) { + // ignore: it's probably down so log the exception at trace + log.trace("error attempting to halt tablet server {}", server, e1); + } catch (Exception e2) { + log.info("error talking to troublesome tablet server {}", server, e2); } - } catch (TTransportException e1) { - // ignore: it's probably down so log the exception at trace - log.trace("error attempting to halt tablet server {}", server, e1); - } catch (Exception e2) { - log.info("error talking to troublesome tablet server {}", server, e2); } } else { log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server, From 9ed3f76d4963222208b5084723b7dba71035a246 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Mon, 12 Jan 2026 19:51:54 +0000 Subject: [PATCH 4/5] Apply PR naming feedback Changes the naming for the property and some vars to better describe intent. --- .../apache/accumulo/core/conf/Property.java | 2 +- .../org/apache/accumulo/manager/Manager.java | 25 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index af389b6dd50..4f3fbe8ccfe 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -450,7 +450,7 @@ public enum Property { "The number of threads used to run fault-tolerant executions (FATE)." + " These are primarily table operations like merge.", "1.4.3"), - MANAGER_MAX_TSERVER_HALTS("manager.max.tservers.halts", "0", PropertyType.COUNT, + MANAGER_TSERVER_HALT_ATTEMPTS("manager.tservers.halt.attempts", "0", PropertyType.COUNT, "Allows the manager to force tserver halting by setting the max number of attempted tserver halt " + " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.", "2.1.5"), diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5637d045e81..04eadd54be0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -195,7 +195,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, final AuditedSecurityOperation security; final Map badServers = Collections.synchronizedMap(new HashMap<>()); - final Map haltedServers = + final Map tserverHaltRpcAttempts = Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); @@ -1152,8 +1152,9 @@ private List checkMigrationSanity(Set current, long start = System.currentTimeMillis(); final SortedMap result = new ConcurrentSkipListMap<>(); final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC); - final int maxTserverHalts = getConfiguration().getCount(Property.MANAGER_MAX_TSERVER_HALTS); - final boolean forceHaltingEnabled = maxTserverHalts != 0; + final int maxTserverRpcHaltAttempts = + getConfiguration().getCount(Property.MANAGER_TSERVER_HALT_ATTEMPTS); + final boolean forceHaltingEnabled = maxTserverRpcHaltAttempts != 0; for (TServerInstance serverInstance : currentServers) { final TServerInstance server = serverInstance; if (threads == 0) { @@ -1195,8 +1196,8 @@ private List checkMigrationSanity(Set current, if (shutdownServerRateLimiter.tryAcquire()) { log.warn("attempting to stop {}", server); if (forceHaltingEnabled - && (haltedServers.computeIfAbsent(server, s -> new AtomicInteger(0)) - .incrementAndGet() > maxTserverHalts)) { + && (tserverHaltRpcAttempts.computeIfAbsent(server, s -> new AtomicInteger(0)) + .incrementAndGet() > maxTserverRpcHaltAttempts)) { log.warn("tserver {} is not responding to halt requests, deleting zlock", server); var zk = getContext().getZooReaderWriter(); var iid = getContext().getInstanceID(); @@ -1204,7 +1205,7 @@ private List checkMigrationSanity(Set current, try { ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals, log::info, false); - haltedServers.remove(server); + tserverHaltRpcAttempts.remove(server); badServers.remove(server); } catch (KeeperException | InterruptedException e) { log.error("Failed to delete zlock for server {}", server, e); @@ -1248,9 +1249,9 @@ private List checkMigrationSanity(Set current, badServers.keySet().removeAll(info.keySet()); } - synchronized (haltedServers) { - haltedServers.keySet().retainAll(currentServers); - haltedServers.keySet().removeAll(info.keySet()); + synchronized (tserverHaltRpcAttempts) { + tserverHaltRpcAttempts.keySet().retainAll(currentServers); + tserverHaltRpcAttempts.keySet().removeAll(info.keySet()); } log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds", @@ -1755,7 +1756,7 @@ public void update(LiveTServerSet current, Set deleted, } serversToShutdown.removeAll(deleted); badServers.keySet().removeAll(deleted); - haltedServers.keySet().removeAll(deleted); + tserverHaltRpcAttempts.keySet().removeAll(deleted); // clear out any bad server with the same host/port as a new server synchronized (badServers) { cleanListByHostAndPort(badServers.keySet(), deleted, added); @@ -1763,8 +1764,8 @@ public void update(LiveTServerSet current, Set deleted, synchronized (serversToShutdown) { cleanListByHostAndPort(serversToShutdown, deleted, added); } - synchronized (haltedServers) { - cleanListByHostAndPort(haltedServers.keySet(), deleted, added); + synchronized (tserverHaltRpcAttempts) { + cleanListByHostAndPort(tserverHaltRpcAttempts.keySet(), deleted, added); } migrations.removeServers(deleted); nextEvent.event("There are now %d tablet servers", current.size()); From 5826af2d1a3ae8400f88d9030d41bf56b5bc7512 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Mon, 12 Jan 2026 22:01:44 +0000 Subject: [PATCH 5/5] Switches halt logic to being time based Switches the halt logic to be time-based since number of attempts is based on general.rpc.timeout. --- .../apache/accumulo/core/conf/Property.java | 5 ++- .../org/apache/accumulo/manager/Manager.java | 37 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 4f3fbe8ccfe..41616f09d8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -450,8 +450,9 @@ public enum Property { "The number of threads used to run fault-tolerant executions (FATE)." + " These are primarily table operations like merge.", "1.4.3"), - MANAGER_TSERVER_HALT_ATTEMPTS("manager.tservers.halt.attempts", "0", PropertyType.COUNT, - "Allows the manager to force tserver halting by setting the max number of attempted tserver halt " + MANAGER_TSERVER_HALT_DURATION("manager.tservers.halt.grace.period", "0", + PropertyType.TIMEDURATION, + "Allows the manager to force tserver halting by setting the max duration of time spent attempting to halt a tserver " + " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.", "2.1.5"), @Deprecated(since = "2.1.0") diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 04eadd54be0..e92f310b0dd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -109,6 +109,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.manager.metrics.BalancerMetrics; @@ -195,7 +196,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, final AuditedSecurityOperation security; final Map badServers = Collections.synchronizedMap(new HashMap<>()); - final Map tserverHaltRpcAttempts = + final Map tserverHaltRpcAttempts = Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); @@ -1143,6 +1144,30 @@ private List checkMigrationSanity(Set current, } + /** + * This class tracks details about the haltRPCs used + */ + private static class GracefulHaltTimer { + + Duration maxHaltGraceDuration; + Timer timer; + + public GracefulHaltTimer(AccumuloConfiguration config) { + timer = null; + maxHaltGraceDuration = + Duration.ofMillis(config.getTimeInMillis(Property.MANAGER_TSERVER_HALT_DURATION)); + } + + public void startTimer() { + timer = Timer.startNew(); + } + + public boolean shouldForceHalt() { + return maxHaltGraceDuration.toMillis() != 0 && timer != null + && timer.hasElapsed(maxHaltGraceDuration); + } + } + private SortedMap gatherTableInformation(Set currentServers) { final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); @@ -1153,7 +1178,7 @@ private List checkMigrationSanity(Set current, final SortedMap result = new ConcurrentSkipListMap<>(); final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC); final int maxTserverRpcHaltAttempts = - getConfiguration().getCount(Property.MANAGER_TSERVER_HALT_ATTEMPTS); + getConfiguration().getCount(Property.MANAGER_TSERVER_HALT_DURATION); final boolean forceHaltingEnabled = maxTserverRpcHaltAttempts != 0; for (TServerInstance serverInstance : currentServers) { final TServerInstance server = serverInstance; @@ -1195,9 +1220,9 @@ private List checkMigrationSanity(Set current, > MAX_BAD_STATUS_COUNT) { if (shutdownServerRateLimiter.tryAcquire()) { log.warn("attempting to stop {}", server); - if (forceHaltingEnabled - && (tserverHaltRpcAttempts.computeIfAbsent(server, s -> new AtomicInteger(0)) - .incrementAndGet() > maxTserverRpcHaltAttempts)) { + var gracefulHaltTimer = tserverHaltRpcAttempts.computeIfAbsent(server, + s -> new GracefulHaltTimer(getConfiguration())); + if (gracefulHaltTimer.shouldForceHalt()) { log.warn("tserver {} is not responding to halt requests, deleting zlock", server); var zk = getContext().getZooReaderWriter(); var iid = getContext().getInstanceID(); @@ -1221,6 +1246,8 @@ private List checkMigrationSanity(Set current, log.trace("error attempting to halt tablet server {}", server, e1); } catch (Exception e2) { log.info("error talking to troublesome tablet server {}", server, e2); + } finally { + gracefulHaltTimer.startTimer(); } } } else {