diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 17edff8d2ea..41616f09d8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -450,6 +450,11 @@ public enum Property { "The number of threads used to run fault-tolerant executions (FATE)." + " These are primarily table operations like merge.", "1.4.3"), + MANAGER_TSERVER_HALT_DURATION("manager.tservers.halt.grace.period", "0", + PropertyType.TIMEDURATION, + "Allows the manager to force tserver halting by setting the max duration of time spent attempting to halt a tserver " + + " requests before deleting the tserver's zlock. A value of zero (default) disables this feature.", + "2.1.5"), @Deprecated(since = "2.1.0") MANAGER_REPLICATION_SCAN_INTERVAL("manager.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION, diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java index 42e57206e58..45d04c71cde 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ServiceLock.java @@ -723,6 +723,9 @@ public long getSessionId() throws KeeperException, InterruptedException { /** * This method will delete all server locks for a given path according the predicate conditions. * + * @param zk zookeeper instance + * @param zPath can be a path directly to a host or a general path like @{link + * org.apache.accumulo.core.Constants.ZTSERVERS} or a resource group * @param hostPortPredicate conditional predicate for determining if the lock should be removed. * @param messageOutput function for setting where the output from the lockPath goes * @param dryRun allows lock format validation and the messageOutput to be sent without actually diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 1bb15169914..e92f310b0dd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -109,6 +109,7 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.manager.metrics.BalancerMetrics; @@ -195,6 +196,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener, final AuditedSecurityOperation security; final Map badServers = Collections.synchronizedMap(new HashMap<>()); + final Map tserverHaltRpcAttempts = + Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); final Migrations migrations = new Migrations(); final EventCoordinator nextEvent = new EventCoordinator(); @@ -1141,6 +1144,30 @@ private List checkMigrationSanity(Set current, } + /** + * This class tracks details about the haltRPCs used + */ + private static class GracefulHaltTimer { + + Duration maxHaltGraceDuration; + Timer timer; + + public GracefulHaltTimer(AccumuloConfiguration config) { + timer = null; + maxHaltGraceDuration = + Duration.ofMillis(config.getTimeInMillis(Property.MANAGER_TSERVER_HALT_DURATION)); + } + + public void startTimer() { + timer = Timer.startNew(); + } + + public boolean shouldForceHalt() { + return maxHaltGraceDuration.toMillis() != 0 && timer != null + && timer.hasElapsed(maxHaltGraceDuration); + } + } + private SortedMap gatherTableInformation(Set currentServers) { final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); @@ -1150,6 +1177,9 @@ private List checkMigrationSanity(Set current, long start = System.currentTimeMillis(); final SortedMap result = new ConcurrentSkipListMap<>(); final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC); + final int maxTserverRpcHaltAttempts = + getConfiguration().getCount(Property.MANAGER_TSERVER_HALT_DURATION); + final boolean forceHaltingEnabled = maxTserverRpcHaltAttempts != 0; for (TServerInstance serverInstance : currentServers) { final TServerInstance server = serverInstance; if (threads == 0) { @@ -1190,15 +1220,35 @@ private List checkMigrationSanity(Set current, > MAX_BAD_STATUS_COUNT) { if (shutdownServerRateLimiter.tryAcquire()) { log.warn("attempting to stop {}", server); - try { - TServerConnection connection2 = tserverSet.getConnection(server); - if (connection2 != null) { - connection2.halt(managerLock); + var gracefulHaltTimer = tserverHaltRpcAttempts.computeIfAbsent(server, + s -> new GracefulHaltTimer(getConfiguration())); + if (gracefulHaltTimer.shouldForceHalt()) { + log.warn("tserver {} is not responding to halt requests, deleting zlock", server); + var zk = getContext().getZooReaderWriter(); + var iid = getContext().getInstanceID(); + String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; + try { + ServiceLock.deleteLocks(zk, tserversPath, server.getHostAndPort()::equals, + log::info, false); + tserverHaltRpcAttempts.remove(server); + badServers.remove(server); + } catch (KeeperException | InterruptedException e) { + log.error("Failed to delete zlock for server {}", server, e); + } + } else { + try { + TServerConnection connection2 = tserverSet.getConnection(server); + if (connection2 != null) { + connection2.halt(managerLock); + } + } catch (TTransportException e1) { + // ignore: it's probably down so log the exception at trace + log.trace("error attempting to halt tablet server {}", server, e1); + } catch (Exception e2) { + log.info("error talking to troublesome tablet server {}", server, e2); + } finally { + gracefulHaltTimer.startTimer(); } - } catch (TTransportException e1) { - // ignore: it's probably down - } catch (Exception e2) { - log.info("error talking to troublesome tablet server", e2); } } else { log.warn("Unable to shutdown {} as over the shutdown limit of {} per minute", server, @@ -1225,6 +1275,12 @@ private List checkMigrationSanity(Set current, badServers.keySet().retainAll(currentServers); badServers.keySet().removeAll(info.keySet()); } + + synchronized (tserverHaltRpcAttempts) { + tserverHaltRpcAttempts.keySet().retainAll(currentServers); + tserverHaltRpcAttempts.keySet().removeAll(info.keySet()); + } + log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds", info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.)); @@ -1727,6 +1783,7 @@ public void update(LiveTServerSet current, Set deleted, } serversToShutdown.removeAll(deleted); badServers.keySet().removeAll(deleted); + tserverHaltRpcAttempts.keySet().removeAll(deleted); // clear out any bad server with the same host/port as a new server synchronized (badServers) { cleanListByHostAndPort(badServers.keySet(), deleted, added); @@ -1734,7 +1791,9 @@ public void update(LiveTServerSet current, Set deleted, synchronized (serversToShutdown) { cleanListByHostAndPort(serversToShutdown, deleted, added); } - + synchronized (tserverHaltRpcAttempts) { + cleanListByHostAndPort(tserverHaltRpcAttempts.keySet(), deleted, added); + } migrations.removeServers(deleted); nextEvent.event("There are now %d tablet servers", current.size()); }