From 3cf73c0ccb42b48aa81cd5412415ea2ab0e284d2 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Thu, 30 Oct 2025 15:43:37 -0400 Subject: [PATCH 1/7] Add unified port Property --- .../core/conf/AccumuloConfiguration.java | 32 +++-- .../apache/accumulo/core/conf/Property.java | 66 ++++----- .../accumulo/core/conf/PropertyType.java | 91 ++++++++----- .../core/conf/AccumuloConfigurationTest.java | 88 +++++------- .../core/conf/ConfigCheckUtilTest.java | 8 +- .../core/conf/DefaultConfigurationTest.java | 5 +- .../accumulo/core/conf/PropertyTest.java | 22 ++- .../MiniAccumuloConfigImpl.java | 18 +-- .../minicluster/MiniAccumuloClusterTest.java | 9 +- .../accumulo/server/rpc/TServerUtils.java | 125 ++++++------------ .../apache/accumulo/server/util/Admin.java | 20 ++- ...ccumuloConfigurationIsPropertySetTest.java | 16 ++- .../conf/RuntimeFixedPropertiesTest.java | 6 +- .../server/conf/SystemConfigurationTest.java | 34 ++--- .../conf/ZooBasedConfigurationTest.java | 5 +- .../conf/store/impl/ZooPropLoaderTest.java | 8 +- .../accumulo/server/rpc/TServerUtilsTest.java | 91 +++---------- .../apache/accumulo/compactor/Compactor.java | 5 +- .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../org/apache/accumulo/manager/Manager.java | 6 +- .../org/apache/accumulo/monitor/Monitor.java | 2 +- .../monitor/EmbeddedWebServerTest.java | 9 +- .../apache/accumulo/tserver/ScanServer.java | 7 +- .../apache/accumulo/tserver/TabletServer.java | 6 +- ...hriftServerBindsBeforeZooKeeperLockIT.java | 19 +-- .../ExternalCompactionTestUtils.java | 1 - .../conf/store/PropCacheCaffeineImplZkIT.java | 8 +- .../test/functional/GarbageCollectorIT.java | 2 +- .../GarbageCollectorTrashDefaultIT.java | 2 +- .../GarbageCollectorTrashEnabledIT.java | 2 +- ...llectorTrashEnabledWithCustomPolicyIT.java | 2 +- .../test/functional/GracefulShutdownIT.java | 1 - .../test/functional/ZombieTServer.java | 2 +- .../test/manager/SuspendedTabletsIT.java | 5 +- .../test/performance/NullTserver.java | 4 +- 35 files changed, 301 insertions(+), 430 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java index 158db00a009..ad4bb58d7e0 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java @@ -347,21 +347,27 @@ public IntStream getPortStream(Property property) { checkType(property, PropertyType.PORT); String portString = get(property); - try { + Preconditions.checkArgument(portString != null, "Port cannot be null."); + portString = portString.trim(); + Preconditions.checkArgument(!portString.isEmpty(), "Port cannot be empty."); + + if (portString.contains("-")) { + // value is a range, parse it as such return PortRange.parse(portString); - } catch (IllegalArgumentException e) { - try { - int port = Integer.parseInt(portString); - if (port == 0 || PortRange.VALID_RANGE.contains(port)) { - return IntStream.of(port); - } else { - log.error("Invalid port number {}; Using default {}", port, property.getDefaultValue()); - return IntStream.of(Integer.parseInt(property.getDefaultValue())); - } - } catch (NumberFormatException e1) { - throw new IllegalArgumentException("Invalid port syntax. Must be a single positive " - + "integers or a range (M-N) of positive integers"); + } + + // getting to this point means the value is a single port (not a range) + try { + int port = Integer.parseInt(portString); + if (port == 0 || PortRange.VALID_RANGE.contains(port)) { + return IntStream.of(port); } + log.error("Invalid port number: {}. Using default instead: {}", port, + property.getDefaultValue()); + return PortRange.parse(property.getDefaultValue().trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid port syntax. Must be a single positive integer or a range M-N.", e); } } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ed35c8683f3..1707cb1129a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -95,6 +95,14 @@ public enum Property { The local IP address to which this server should bind for sending \ and receiving network traffic. If not set then the process binds to all addresses. """, "2.1.4"), + RPC_BIND_PORT("rpc.bind.port", "19000-19999", PropertyType.PORT, + """ + The port or range of ports servers attempt to bind for RPC traffic. Provide a single \ + value to target an exact port (will attempt higher ports if given port is already in use, \ + up to 1000 additional checks), or a range using formats like '19000-19999' to allow searching for \ + the first available port within that range. + """, + "4.0.0"), RPC_MAX_MESSAGE_SIZE("rpc.message.size.max", Integer.toString(Integer.MAX_VALUE), PropertyType.BYTES, "The maximum size of a message that can be received by a server.", "2.1.3"), @@ -395,8 +403,6 @@ was changed and it now can accept multiple class names. The metrics spi was intr // properties that are specific to manager server behavior MANAGER_PREFIX("manager.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the manager server.", "2.1.0"), - MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT, - "The port used for handling client connections on the manager.", "1.3.5"), MANAGER_TABLET_BALANCER("manager.tablet.balancer", "org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME, "The balancer class that accumulo will use to make tablet assignment and " @@ -585,11 +591,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J expiration time, will trigger a background refresh for future hits. \ Value must be less than 100%. Set to 0 will disable refresh. """, "2.1.3"), - SSERV_PORTSEARCH("sserver.port.search", "true", PropertyType.BOOLEAN, - "if the sserver.port.client ports are in use, search higher ports until one is available.", - "2.1.0"), - SSERV_CLIENTPORT("sserver.port.client", "9996", PropertyType.PORT, - "The port used for handling client connections on the tablet servers.", "2.1.0"), SSERV_MINTHREADS("sserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests.", "2.1.0"), SSERV_MINTHREADS_TIMEOUT("sserver.server.threads.timeout", "0s", PropertyType.TIMEDURATION, @@ -637,11 +638,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J "Specifies the size of the cache for RFile index blocks.", "1.3.5"), TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for summary data on each tablet server.", "2.0.0"), - TSERV_PORTSEARCH("tserver.port.search", "true", PropertyType.BOOLEAN, - "if the tserver.port.client ports are in use, search higher ports until one is available.", - "1.3.5"), - TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, - "The port used for handling client connections on the tablet servers.", "1.3.5"), TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "5%", PropertyType.MEMORY, "The amount of memory used to store write-ahead-log mutations before flushing them.", "1.7.0"), @@ -846,8 +842,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J "Time between garbage collection cycles. In each cycle, old RFiles or write-ahead logs " + "no longer in use are removed from the filesystem.", "1.3.5"), - GC_PORT("gc.port.client", "9998", PropertyType.PORT, - "The listening port for the garbage collector's monitor service.", "1.3.5"), GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT, "The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"), GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, @@ -864,8 +858,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J // properties that are specific to the monitor server behavior MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server.", "1.3.5"), - MONITOR_PORT("monitor.port.client", "9995", PropertyType.PORT, - "The listening port for the monitor's http service.", "1.3.5"), MONITOR_SSL_KEYSTORE("monitor.ssl.keyStore", "", PropertyType.PATH, "The keystore for enabling monitor SSL.", "1.5.0"), @Sensitive @@ -1285,11 +1277,6 @@ start with the category prefix, followed by a scope (minc, majc, scan, \ + " should be cancelled. This checks for situations like was the tablet deleted (split " + " and merge do this), was the table deleted, was a user compaction canceled, etc.", "2.1.4"), - COMPACTOR_PORTSEARCH("compactor.port.search", "true", PropertyType.BOOLEAN, - "If the compactor.port.client ports are in use, search higher ports until one is available.", - "2.1.0"), - COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT, - "The port used for handling client connections on the compactor servers.", "2.1.0"), COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION, "The minimum amount of time to wait between checks for the next compaction job, backing off" + "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.", @@ -1626,6 +1613,7 @@ public static boolean isValidTablePropertyKey(String key) { // RPC options RPC_BACKLOG, RPC_SSL_KEYSTORE_TYPE, RPC_SSL_TRUSTSTORE_TYPE, RPC_USE_JSSE, RPC_SSL_ENABLED_PROTOCOLS, RPC_SSL_CLIENT_PROTOCOL, RPC_SASL_QOP, RPC_MAX_MESSAGE_SIZE, + RPC_BIND_PORT, // INSTANCE options INSTANCE_ZK_HOST, INSTANCE_ZK_TIMEOUT, INSTANCE_SECRET, INSTANCE_SECURITY_AUTHENTICATOR, @@ -1644,17 +1632,16 @@ public static boolean isValidTablePropertyKey(String key) { // MANAGER options MANAGER_THREADCHECK, MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, MANAGER_METADATA_SUSPENDABLE, MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, - MANAGER_CLIENTPORT, MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT, - MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, - MANAGER_TABLET_REFRESH_MINTHREADS, MANAGER_TABLET_REFRESH_MAXTHREADS, - MANAGER_TABLET_MERGEABILITY_INTERVAL, MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX, + MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, MANAGER_TABLET_REFRESH_MINTHREADS, + MANAGER_TABLET_REFRESH_MAXTHREADS, MANAGER_TABLET_MERGEABILITY_INTERVAL, + MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX, // SSERV options - SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_CLIENTPORT, - SSERV_PORTSEARCH, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, - SSERV_DEFAULT_BLOCKSIZE, SSERV_SCAN_REFERENCE_EXPIRATION_TIME, - SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT, - SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME, + SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_DATACACHE_SIZE, + SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, SSERV_DEFAULT_BLOCKSIZE, + SSERV_SCAN_REFERENCE_EXPIRATION_TIME, SSERV_CACHED_TABLET_METADATA_EXPIRATION, + SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT, SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME, // TSERV options TSERV_TOTAL_MUTATION_QUEUE_MAX, TSERV_WAL_MAX_SIZE, TSERV_WAL_MAX_AGE, @@ -1662,22 +1649,21 @@ public static boolean isValidTablePropertyKey(String key) { TSERV_WAL_TOLERATED_MAXIMUM_WAIT_DURATION, TSERV_MAX_IDLE, TSERV_SESSION_MAXIDLE, TSERV_SCAN_RESULTS_MAX_TIMEOUT, TSERV_MINC_MAXCONCURRENT, TSERV_THREADCHECK, TSERV_LOG_BUSY_TABLETS_COUNT, TSERV_LOG_BUSY_TABLETS_INTERVAL, TSERV_WAL_SORT_MAX_CONCURRENT, - TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_CLIENTPORT, TSERV_PORTSEARCH, - TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE, - TSERV_MINTHREADS, TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, - TSERV_SCAN_MAX_OPENFILES, TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME, + TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_DATACACHE_SIZE, + TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE, TSERV_MINTHREADS, + TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, TSERV_SCAN_MAX_OPENFILES, + TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME, // GC options - GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START, GC_PORT, + GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START, // MONITOR options - MONITOR_PORT, MONITOR_SSL_KEYSTORETYPE, MONITOR_SSL_TRUSTSTORETYPE, - MONITOR_SSL_INCLUDE_PROTOCOLS, MONITOR_LOCK_CHECK_INTERVAL, MONITOR_ROOT_CONTEXT, + MONITOR_SSL_KEYSTORETYPE, MONITOR_SSL_TRUSTSTORETYPE, MONITOR_SSL_INCLUDE_PROTOCOLS, + MONITOR_LOCK_CHECK_INTERVAL, MONITOR_ROOT_CONTEXT, // COMPACTOR options - COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_CLIENTPORT, COMPACTOR_THREADCHECK, - COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS, COMPACTOR_MINTHREADS_TIMEOUT, - COMPACTOR_GROUP_NAME, + COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_THREADCHECK, COMPACTOR_MINTHREADS, + COMPACTOR_MINTHREADS_TIMEOUT, COMPACTOR_GROUP_NAME, // COMPACTION_COORDINATOR options COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java index 27a61ef82a0..82be56a9842 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java @@ -31,7 +31,6 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.file.rfile.RFile; @@ -96,13 +95,11 @@ public enum PropertyType { + " 'localhost:2000,www.example.com,10.10.1.1:500' and 'localhost'.\n" + "Examples of invalid host lists are '', ':1000', and 'localhost:80000'"), - PORT("port", - x -> Stream.of(new Bounds(1024, 65535), in(true, "0"), new PortRange("\\d{4,5}-\\d{4,5}")) - .anyMatch(y -> y.test(x)), - "An positive integer in the range 1024-65535 (not already in use or" - + " specified elsewhere in the configuration),\n" - + "zero to indicate any open ephemeral port, or a range of positive" - + " integers specified as M-N"), + PORT("port", new PortPredicate(), + "A positive integer in the range 1024-65535 (not already in use or specified elsewhere in the" + + " configuration), zero to indicate any open ephemeral port, or a range of positive" + + " integers expressed as M-N (inclusive) or using interval notation such as [M,N) or" + + " [M,N]."), COUNT("count", new Bounds(0, Integer.MAX_VALUE), "A non-negative integer in the range of 0-" + Integer.MAX_VALUE), @@ -423,43 +420,71 @@ public boolean test(final String input) { } - public static class PortRange extends Matches { - - public static final Range VALID_RANGE = Range.of(1024, 65535); - - public PortRange(final String pattern) { - super(pattern); - } + private static class PortPredicate implements Predicate { @Override public boolean test(final String input) { - if (super.test(input)) { + if (input == null) { + return true; + } + final String trimmed = input.trim(); + if (trimmed.isEmpty()) { + return false; + } + if ("0".equals(trimmed)) { + return true; + } + + try { + int port = Integer.parseInt(trimmed); + return PortRange.VALID_RANGE.contains(port); + } catch (NumberFormatException e) { try { - PortRange.parse(input); + PortRange.parse(trimmed); return true; - } catch (IllegalArgumentException e) { + } catch (IllegalArgumentException ex) { return false; } - } else { - return false; } } + } - public static IntStream parse(String portRange) { - int idx = portRange.indexOf('-'); - if (idx != -1) { - int low = Integer.parseInt(portRange.substring(0, idx)); - int high = Integer.parseInt(portRange.substring(idx + 1)); - if (!VALID_RANGE.contains(low) || !VALID_RANGE.contains(high) || low > high) { - throw new IllegalArgumentException( - "Invalid port range specified, only 1024 to 65535 supported."); - } - return IntStream.rangeClosed(low, high); + public static class PortRange { + + public static final Range VALID_RANGE = Range.of(1024, 65535); + + private PortRange() {} + + public static IntStream parse(String value) { + Objects.requireNonNull(value, "Port range cannot be null."); + value = value.trim(); + Preconditions.checkArgument(!value.isEmpty(), "Port range cannot be empty."); + Preconditions.checkArgument(value.contains("-"), + "Invalid port range, expected format like M-N."); + String[] parts = value.split("-", 2); + Preconditions.checkArgument(parts.length == 2, "Invalid port range, must use M-N notation."); + + int low; + int high; + try { + low = Integer.parseInt(parts[0].trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid port value: " + parts[0], e); } - throw new IllegalArgumentException( - "Invalid port range specification, must use M-N notation."); - } + try { + high = Integer.parseInt(parts[1].trim()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid port value: " + parts[1], e); + } + + Preconditions.checkArgument(VALID_RANGE.contains(low), + "Port range bounds must be 1024 to 65535. Got " + low); + Preconditions.checkArgument(VALID_RANGE.contains(high), + "Port range bounds must be 1024 to 65535 Got " + high); + Preconditions.checkArgument(high > low, "Upper bound must be >= lower bound."); + return IntStream.rangeClosed(low, high); + } } private static class ValidFateConfig implements Predicate { diff --git a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java index cfcf2403068..b28a7036a1a 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.conf; import static org.apache.accumulo.core.conf.Property.TABLE_ITERATOR_MINC_PREFIX; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -59,80 +60,59 @@ public void testGetPropertyByString() { @Test public void testGetSinglePort() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "9997"); - int[] ports = cc.getPort(Property.TSERV_CLIENTPORT); + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + int expected = 25000; + cc.set(Property.RPC_BIND_PORT, Integer.toString(expected)); + int[] ports = cc.getPort(Property.RPC_BIND_PORT); assertEquals(1, ports.length); - assertEquals(9997, ports[0]); + assertEquals(expected, ports[0]); } @Test public void testGetAnyPort() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "0"); - int[] ports = cc.getPort(Property.TSERV_CLIENTPORT); + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + int expected = 0; + cc.set(Property.RPC_BIND_PORT, Integer.toString(expected)); + int[] ports = cc.getPort(Property.RPC_BIND_PORT); assertEquals(1, ports.length); - assertEquals(0, ports[0]); + assertEquals(expected, ports[0]); } @Test - public void testGetInvalidPort() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "1020"); - int[] ports = cc.getPort(Property.TSERV_CLIENTPORT); - assertEquals(1, ports.length); - assertEquals(Integer.parseInt(Property.TSERV_CLIENTPORT.getDefaultValue()), ports[0]); + public void testGetInvalidPortFallsBackToDefaultRange() { + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + cc.set(Property.RPC_BIND_PORT, "1020"); + int[] ports = cc.getPort(Property.RPC_BIND_PORT); + int[] defaultPorts = DefaultConfiguration.getInstance().getPort(Property.RPC_BIND_PORT); + assertArrayEquals(defaultPorts, ports); } @Test public void testGetPortRange() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "9997-9999"); - int[] ports = cc.getPort(Property.TSERV_CLIENTPORT); - assertEquals(3, ports.length); - assertEquals(9997, ports[0]); - assertEquals(9998, ports[1]); - assertEquals(9999, ports[2]); + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + cc.set(Property.RPC_BIND_PORT, "36000-36002"); + int[] ports = cc.getPort(Property.RPC_BIND_PORT); + assertArrayEquals(new int[] {36000, 36001, 36002}, ports); } @Test - public void testGetPortRangeInvalidLow() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "1020-1026"); - assertThrows(IllegalArgumentException.class, () -> { - int[] ports = cc.getPort(Property.TSERV_CLIENTPORT); - assertEquals(3, ports.length); - assertEquals(1024, ports[0]); - assertEquals(1025, ports[1]); - assertEquals(1026, ports[2]); - }); - } - - @Test - public void testGetPortRangeInvalidHigh() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "65533-65538"); - assertThrows(IllegalArgumentException.class, () -> { - int[] ports = cc.getPort(Property.TSERV_CLIENTPORT); - assertEquals(3, ports.length); - assertEquals(65533, ports[0]); - assertEquals(65534, ports[1]); - assertEquals(65535, ports[2]); - }); + public void testGetPortRangeInvalid() { + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + cc.set(Property.RPC_BIND_PORT, "1020-1026"); + var msg = + assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.RPC_BIND_PORT)); + assertTrue(msg.getMessage().startsWith("Port range bounds must be 1024 to 65535")); + + cc.set(Property.RPC_BIND_PORT, "65533-65538"); + msg = assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.RPC_BIND_PORT)); + assertTrue(msg.getMessage().startsWith("Port range bounds must be 1024 to 65535")); } @Test public void testGetPortInvalidSyntax() { - AccumuloConfiguration c = DefaultConfiguration.getInstance(); - ConfigurationCopy cc = new ConfigurationCopy(c); - cc.set(Property.TSERV_CLIENTPORT, "[65533,65538]"); - assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.TSERV_CLIENTPORT)); + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + cc.set(Property.RPC_BIND_PORT, "bad-port"); + assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.RPC_BIND_PORT)); } private static class TestConfiguration extends AccumuloConfiguration { diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ConfigCheckUtilTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ConfigCheckUtilTest.java index 6741407ba0f..cc8b9281ae8 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/ConfigCheckUtilTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/ConfigCheckUtilTest.java @@ -36,7 +36,7 @@ public void setUp() { @Test public void testPass() { - m.put(Property.MANAGER_CLIENTPORT.getKey(), "9999"); + m.put(Property.RPC_BIND_PORT.getKey(), "19000"); m.put(Property.MANAGER_TABLET_BALANCER.getKey(), "org.apache.accumulo.server.manager.balancer.TableLoadBalancer"); m.put(Property.MANAGER_BULK_TIMEOUT.getKey(), "5m"); @@ -50,21 +50,21 @@ public void testPass_Empty() { @Test public void testPass_UnrecognizedValidProperty() { - m.put(Property.MANAGER_CLIENTPORT.getKey(), "9999"); + m.put(Property.RPC_BIND_PORT.getKey(), "19000"); m.put(Property.MANAGER_PREFIX.getKey() + "something", "abcdefg"); ConfigCheckUtil.validate(m.entrySet(), "test"); } @Test public void testPass_UnrecognizedProperty() { - m.put(Property.MANAGER_CLIENTPORT.getKey(), "9999"); + m.put(Property.RPC_BIND_PORT.getKey(), "19000"); m.put("invalid.prefix.value", "abcdefg"); ConfigCheckUtil.validate(m.entrySet(), "test"); } @Test public void testFail_Prefix() { - m.put(Property.MANAGER_CLIENTPORT.getKey(), "9999"); + m.put(Property.RPC_BIND_PORT.getKey(), "19000"); m.put(Property.MANAGER_PREFIX.getKey(), "oops"); assertThrows(ConfigCheckException.class, () -> ConfigCheckUtil.validate(m.entrySet(), "test")); } diff --git a/core/src/test/java/org/apache/accumulo/core/conf/DefaultConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/DefaultConfigurationTest.java index 8ddecfbdf3d..c305ae49bdd 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/DefaultConfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/DefaultConfigurationTest.java @@ -37,15 +37,14 @@ public void setUp() { @Test public void testGet() { - assertEquals(Property.MANAGER_CLIENTPORT.getDefaultValue(), c.get(Property.MANAGER_CLIENTPORT)); + assertEquals(Property.RPC_BIND_PORT.getDefaultValue(), c.get(Property.RPC_BIND_PORT)); } @Test public void testGetProperties() { Map p = new java.util.HashMap<>(); c.getProperties(p, x -> true); - assertEquals(Property.MANAGER_CLIENTPORT.getDefaultValue(), - p.get(Property.MANAGER_CLIENTPORT.getKey())); + assertEquals(Property.RPC_BIND_PORT.getDefaultValue(), p.get(Property.RPC_BIND_PORT.getKey())); assertFalse(p.containsKey(Property.MANAGER_PREFIX.getKey())); assertTrue(p.containsKey(Property.TSERV_DEFAULT_BLOCKSIZE.getKey())); } diff --git a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java index 79d84444c7d..6221989364e 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java @@ -110,11 +110,23 @@ public void testPorts() { HashSet usedPorts = new HashSet<>(); for (Property prop : Property.values()) { if (prop.getType().equals(PropertyType.PORT)) { - int port = Integer.parseInt(prop.getDefaultValue()); - assertTrue(Property.isValidProperty(prop.getKey(), Integer.toString(port))); - assertFalse(usedPorts.contains(port), "Port already in use: " + port); - usedPorts.add(port); - assertTrue(port > 1023 && port < 65536, "Port out of range of valid ports: " + port); + String defaultValue = prop.getDefaultValue(); + try { + int port = Integer.parseInt(defaultValue); + assertTrue(Property.isValidProperty(prop.getKey(), Integer.toString(port))); + assertFalse(usedPorts.contains(port), "Port already in use: " + port); + usedPorts.add(port); + assertTrue(port > 1023 && port < 65536, "Port out of range of valid ports: " + port); + } catch (NumberFormatException e) { + int[] ports = PropertyType.PortRange.parse(defaultValue).toArray(); + assertTrue(ports.length > 0, "Port range must contain at least one port"); + for (int port : ports) { + assertTrue(PropertyType.PortRange.VALID_RANGE.contains(port), + "Port out of range of valid ports: " + port); + assertFalse(usedPorts.contains(port), "Port already in use: " + port); + usedPorts.add(port); + } + } } } } diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index 620d4883b8f..82c65164124 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -174,7 +174,6 @@ MiniAccumuloConfigImpl initialize() { // enable metrics reporting - by default will appear in standard log files. mergeProp(Property.GENERAL_MICROMETER_ENABLED.getKey(), "true"); - mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true"); mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M"); mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M"); mergeProp(Property.TSERV_SUMMARYCACHE_SIZE.getKey(), "10M"); @@ -183,12 +182,7 @@ MiniAccumuloConfigImpl initialize() { mergeProp(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false"); mergeProp(Property.GC_CYCLE_DELAY.getKey(), "4s"); mergeProp(Property.GC_CYCLE_START.getKey(), "0s"); - mergePropWithRandomPort(Property.MANAGER_CLIENTPORT.getKey()); - mergePropWithRandomPort(Property.TSERV_CLIENTPORT.getKey()); - mergePropWithRandomPort(Property.MONITOR_PORT.getKey()); - mergePropWithRandomPort(Property.GC_PORT.getKey()); - - mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true"); + mergeProp(Property.RPC_BIND_PORT.getKey(), "0"); mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(), Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue()); @@ -285,16 +279,6 @@ private void mergeProp(String key, String value) { } } - /** - * Sets a given key with a random port for the value on the site config if it doesn't already - * exist. - */ - private void mergePropWithRandomPort(String key) { - if (!siteConfig.containsKey(key)) { - siteConfig.put(key, "0"); - } - } - /** * Calling this method is optional. If not set, defaults to 'miniInstance' * diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java index 79a45ede151..16cc0267b79 100644 --- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java +++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterTest.java @@ -205,12 +205,9 @@ public void testRandomPorts() throws Exception { try (var reader = Files.newBufferedReader(accumuloProps, UTF_8)) { config.read(reader); } - for (Property randomPortProp : new Property[] {Property.TSERV_CLIENTPORT, Property.MONITOR_PORT, - Property.MANAGER_CLIENTPORT, Property.GC_PORT}) { - String value = config.getString(randomPortProp.getKey()); - assertNotNull(value, "Found no value for " + randomPortProp); - assertEquals("0", value); - } + String value = config.getString(Property.RPC_BIND_PORT.getKey()); + assertNotNull(value, "Found no value for " + Property.RPC_BIND_PORT); + assertEquals("0", value); } @AfterAll diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 7d6544d110b..f855cbd94d7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -28,13 +28,10 @@ import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.Arrays; -import java.util.EnumSet; import java.util.HashSet; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.net.ssl.SSLServerSocket; @@ -42,14 +39,12 @@ import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.conf.PropertyType; import org.apache.accumulo.core.conf.PropertyType.PortRange; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.security.SaslRpcServer; @@ -79,6 +74,7 @@ */ public class TServerUtils { private static final Logger log = LoggerFactory.getLogger(TServerUtils.class); + private static final int SINGLE_PORT_FALLBACK_RANGE = 1000; /** * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client @@ -93,48 +89,49 @@ public class TServerUtils { * @return array of HostAndPort objects */ public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) { - return ports.mapToObj(port -> HostAndPort.fromParts(hostname, port)) - .toArray(HostAndPort[]::new); - } + int[] configuredPorts = ports.toArray(); + if (configuredPorts.length == 0) { + return new HostAndPort[0]; + } - /** - * - * @param config Accumulo configuration - * @return A Map object with reserved port numbers as keys and Property objects as values - */ - static Map getReservedPorts(AccumuloConfiguration config, - Property portProperty) { - return EnumSet.allOf(Property.class).stream() - .filter(p -> p.getType() == PropertyType.PORT && p != portProperty) - .flatMap(rp -> config.getPortStream(rp).mapToObj(portNum -> new Pair<>(portNum, rp))) - .filter(p -> p.getFirst() != 0).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); + IntStream candidates; + if (configuredPorts.length == 1 && configuredPorts[0] > 0) { + int basePort = configuredPorts[0]; + int maxPort = PortRange.VALID_RANGE.getMaximum(); + int searchUpperBound = Math.min(maxPort, basePort + SINGLE_PORT_FALLBACK_RANGE); + IntStream fallback = basePort < searchUpperBound + ? IntStream.rangeClosed(basePort + 1, searchUpperBound) : IntStream.empty(); + candidates = IntStream.concat(IntStream.of(basePort), fallback); + } else { + candidates = Arrays.stream(configuredPorts); + } + + return candidates.mapToObj(port -> HostAndPort.fromParts(hostname, port)) + .toArray(HostAndPort[]::new); } /** - * Create a ServerAddress, at the given port, or higher, if that port is not available. Callers - * must start the ThriftServer after calling this method using - * {@code ServerAddress#startThriftServer(String)} + * Create a ServerAddress bound to one of the ports configured by {@code rpc.bind.port}. Callers + * must start the returned ThriftServer after calling this method using + * {@link ServerAddress#startThriftServer(String)}. * * @param context RPC configuration - * @param portHintProperty the port to attempt to open, can be zero, meaning "any available port" + * @param hostname host name to bind * @param processor the service to be started * @param serverName the name of the class that is providing the service - * @param portSearchProperty A boolean Property to control if port-search should be used, or null - * to disable * @param minThreadProperty A Property to control the minimum number of threads in the pool + * @param threadTimeOutProperty A Property to control thread timeout * @param timeBetweenThreadChecksProperty A Property to control the amount of time between checks * to resize the thread pool * @return the server object created, and the port actually used - * @throws UnknownHostException when we don't know our own address + * @throws UnknownHostException when no configured port could be bound or the host is unknown */ public static ServerAddress createThriftServer(ServerContext context, String hostname, - Property portHintProperty, TProcessor processor, String serverName, - Property portSearchProperty, Property minThreadProperty, Property threadTimeOutProperty, - Property timeBetweenThreadChecksProperty) throws UnknownHostException { + TProcessor processor, String serverName, Property minThreadProperty, + Property threadTimeOutProperty, Property timeBetweenThreadChecksProperty) + throws UnknownHostException { final AccumuloConfiguration config = context.getConfiguration(); - final IntStream portHint = config.getPortStream(portHintProperty); - int minThreads = 2; if (minThreadProperty != null) { minThreads = config.getCount(minThreadProperty); @@ -152,11 +149,6 @@ public static ServerAddress createThriftServer(ServerContext context, String hos long maxMessageSize = config.getAsBytes(Property.RPC_MAX_MESSAGE_SIZE); - boolean portSearch = false; - if (portSearchProperty != null) { - portSearch = config.getBoolean(portSearchProperty); - } - int backlog = config.getCount(Property.RPC_BACKLOG); final ThriftServerType serverType = context.getThriftServerType(); @@ -170,48 +162,21 @@ public static ServerAddress createThriftServer(ServerContext context, String hos // metrics mbean more than once TimedProcessor timedProcessor = new TimedProcessor(processor, context.getMetricsInfo()); - HostAndPort[] addresses = getHostAndPorts(hostname, portHint); + HostAndPort[] addresses = + getHostAndPorts(hostname, config.getPortStream(Property.RPC_BIND_PORT)); + if (addresses.length == 0) { + throw new UnknownHostException("No candidate ports defined by rpc.bind.port"); + } + try { return TServerUtils.createThriftServer(serverType, timedProcessor, context.getInstanceID(), serverName, minThreads, threadTimeOut, config, timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), context.getSaslParams(), context.getClientTimeoutInMillis(), - backlog, portSearch, addresses); + backlog, addresses); } catch (TTransportException e) { - if (portSearch) { - // Build a list of reserved ports - as identified by properties of type PropertyType.PORT - Map reservedPorts = getReservedPorts(config, portHintProperty); - - HostAndPort last = addresses[addresses.length - 1]; - // Attempt to allocate a port outside of the specified port property - // Search sequentially over the next 1000 ports - for (int port = last.getPort() + 1; port < last.getPort() + 1001; port++) { - if (reservedPorts.containsKey(port)) { - log.debug("During port search, skipping reserved port {} - property {} ({})", port, - reservedPorts.get(port).getKey(), reservedPorts.get(port).getDescription()); - - continue; - } - - if (PortRange.VALID_RANGE.isBefore(port)) { - break; - } - try { - HostAndPort addr = HostAndPort.fromParts(hostname, port); - return TServerUtils.createThriftServer(serverType, timedProcessor, - context.getInstanceID(), serverName, minThreads, threadTimeOut, config, - timeBetweenThreadChecks, maxMessageSize, context.getServerSslParams(), - context.getSaslParams(), context.getClientTimeoutInMillis(), backlog, portSearch, - addr); - } catch (TTransportException tte) { - log.info("Unable to use port {}, retrying.", port); - } - } - log.error("Unable to start TServer", e); - throw new UnknownHostException("Unable to find a listen port"); - } else { - log.error("Unable to start TServer", e); - throw new UnknownHostException("Unable to find a listen port"); - } + log.error("Unable to start {} on {} using rpc.bind.port={}", serverName, hostname, + config.get(Property.RPC_BIND_PORT), e); + throw new UnknownHostException("Unable to bind to any configured rpc.bind.port values"); } } @@ -570,8 +535,7 @@ public static ServerAddress createThriftServer(final AccumuloConfiguration conf, ThriftServerType serverType, TProcessor processor, InstanceId instanceId, String serverName, int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, boolean portSearch, - HostAndPort... addresses) { + long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, HostAndPort... addresses) { if (serverType == ThriftServerType.SASL) { processor = updateSaslProcessor(serverType, processor); @@ -580,7 +544,7 @@ public static ServerAddress createThriftServer(final AccumuloConfiguration conf, try { return createThriftServer(serverType, new TimedProcessor(processor, metricsInfo), instanceId, serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, - sslParams, saslParams, serverSocketTimeout, backlog, portSearch, addresses); + sslParams, saslParams, serverSocketTimeout, backlog, addresses); } catch (TTransportException e) { throw new IllegalStateException(e); } @@ -597,8 +561,7 @@ private static ServerAddress createThriftServer(ThriftServerType serverType, TimedProcessor processor, InstanceId instanceId, String serverName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, int backlog, boolean portSearch, HostAndPort... addresses) - throws TTransportException { + long serverSocketTimeout, int backlog, HostAndPort... addresses) throws TTransportException { TProtocolFactory protocolFactory = ThriftUtil.serverProtocolFactory(instanceId); // This is presently not supported. It's hypothetically possible, I believe, to work, but it // would require changes in how the transports @@ -641,11 +604,7 @@ yield createNonBlockingServer(address, processor, protocolFactory, serverName, }; break; } catch (TTransportException e) { - if (portSearch) { - log.debug("Failed attempting to create server at {}. {}", address, e.getMessage()); - } else { - log.warn("Error attempting to create server at {}. Error: {}", address, e.getMessage()); - } + log.debug("Failed attempting to create server at {}. {}", address, e.getMessage()); } } if (serverAddress == null) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index c3dd6474f5b..8fa42e0c9e4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -785,19 +785,25 @@ private static void stopTabletServer(final ClientContext context, List s final ZooCache zc = context.getZooCache(); Set runningServers; + int[] configuredPorts = context.getConfiguration().getPort(Property.RPC_BIND_PORT); + int fallbackPort = configuredPorts.length == 0 ? 0 : configuredPorts[0]; + for (String server : servers) { runningServers = context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); if (runningServers.size() == 1 && !force) { log.info("Only 1 tablet server running. Not attempting shutdown of {}", server); return; } - for (int port : context.getConfiguration().getPort(Property.TSERV_CLIENTPORT)) { - HostAndPort address = AddressUtil.parseAddress(server, port); - final String finalServer = qualifyWithZooKeeperSessionId(context, zc, address.toString()); - log.info("Stopping server {}", finalServer); - ThriftClientTypes.MANAGER.executeVoid(context, client -> client - .shutdownTabletServer(TraceUtil.traceInfo(), context.rpcCreds(), finalServer, force)); - } + + int port = runningServers.stream() + .filter(si -> server.equals(si.toHostPortString()) || server.equals(si.getHost())) + .mapToInt(ServerId::getPort).findFirst().orElse(fallbackPort); + + HostAndPort address = AddressUtil.parseAddress(server, port); + final String finalServer = qualifyWithZooKeeperSessionId(context, zc, address.toString()); + log.info("Stopping server {}", finalServer); + ThriftClientTypes.MANAGER.executeVoid(context, client -> client + .shutdownTabletServer(TraceUtil.traceInfo(), context.rpcCreds(), finalServer, force)); } } diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/AccumuloConfigurationIsPropertySetTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/AccumuloConfigurationIsPropertySetTest.java index c450db3bf0f..45299a885f8 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/AccumuloConfigurationIsPropertySetTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/AccumuloConfigurationIsPropertySetTest.java @@ -20,10 +20,10 @@ import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; -import static org.apache.accumulo.core.conf.Property.GC_PORT; import static org.apache.accumulo.core.conf.Property.INSTANCE_SECRET; import static org.apache.accumulo.core.conf.Property.INSTANCE_ZK_HOST; import static org.apache.accumulo.core.conf.Property.MANAGER_BULK_TIMEOUT; +import static org.apache.accumulo.core.conf.Property.RPC_BIND_PORT; import static org.apache.accumulo.core.conf.Property.TABLE_BLOOM_ENABLED; import static org.apache.accumulo.core.conf.Property.TABLE_BLOOM_SIZE; import static org.apache.accumulo.core.conf.Property.TABLE_DURABILITY; @@ -51,6 +51,7 @@ import java.util.Set; import java.util.UUID; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; @@ -135,7 +136,7 @@ private static Map setToMap(Set props) { @Test public void testConfigurationCopy() { - var shouldBeSet = Set.of(TABLE_BLOOM_SIZE, GC_PORT); + var shouldBeSet = Set.of(TABLE_BLOOM_SIZE, RPC_BIND_PORT); var shouldNotBeSet = Sets.difference(ALL_PROPERTIES, shouldBeSet); assertFalse(shouldNotBeSet.isEmpty()); @@ -208,7 +209,7 @@ public void testSiteConfiguration() throws IOException { @Test public void testSystemConfiguration() { - var setOnSystem = Set.of(GC_PORT, TSERV_SCAN_MAX_OPENFILES); + var setOnSystem = Set.of(RPC_BIND_PORT, TSERV_SCAN_MAX_OPENFILES); var sysProps = new VersionedProperties(1, Instant.now(), setToMap(setOnSystem)); expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).once(); @@ -233,13 +234,16 @@ public void testSystemConfiguration() { // now set a few fixed properties on the parent to simulate a user setting a fixed property that // requires a restart var shouldBeSetMore = new HashSet(shouldBeSet); - Property.FIXED_PROPERTIES.stream().limit(5).forEach(p -> { + var additionalFixed = Property.FIXED_PROPERTIES.stream().filter(p -> !shouldBeSet.contains(p)) + .limit(5).collect(Collectors.toList()); + assertEquals(5, additionalFixed.size()); + additionalFixed.forEach(p -> { // use the default value so we can verify it's actually set, and not just looks set because it // has the same value as the default parent.set(p.getKey(), p.getDefaultValue()); shouldBeSetMore.add(p); }); - // make sure we actually added some + // make sure we actually added the expected number of new entries assertEquals(5, Sets.symmetricDifference(shouldBeSet, shouldBeSetMore).size()); // verify that the view of the configuration now includes the fixed properties, because we added @@ -293,7 +297,7 @@ public void testTableConfiguration() { @Test public void testZooBasedConfiguration() { - var setOnSystem = Set.of(GC_PORT); + var setOnSystem = Set.of(RPC_BIND_PORT); var sysProps = new VersionedProperties(1, Instant.now(), setToMap(setOnSystem)); expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).once(); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/RuntimeFixedPropertiesTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/RuntimeFixedPropertiesTest.java index 6029f3af937..62871360f0e 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/RuntimeFixedPropertiesTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/RuntimeFixedPropertiesTest.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.server.conf; -import static org.apache.accumulo.core.conf.Property.GC_PORT; +import static org.apache.accumulo.core.conf.Property.RPC_BIND_PORT; import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -77,8 +77,8 @@ public void changedTest() { assertTrue(fixed.hasChanged(Map.of(TSERV_NATIVEMAP_ENABLED.getKey(), "true"))); // prop added - changed - assertTrue(fixed - .hasChanged(Map.of(TSERV_NATIVEMAP_ENABLED.getKey(), "false", GC_PORT.getKey(), "1234"))); + assertTrue(fixed.hasChanged( + Map.of(TSERV_NATIVEMAP_ENABLED.getKey(), "false", RPC_BIND_PORT.getKey(), "20000"))); // same - no change assertFalse(fixed.hasChanged(storedProps)); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java index a7533e899c5..3d99ac07609 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java @@ -18,11 +18,10 @@ */ package org.apache.accumulo.server.conf; -import static org.apache.accumulo.core.conf.Property.GC_PORT; +import static org.apache.accumulo.core.conf.Property.RPC_BIND_PORT; import static org.apache.accumulo.core.conf.Property.TABLE_BLOOM_ENABLED; import static org.apache.accumulo.core.conf.Property.TABLE_BLOOM_SIZE; import static org.apache.accumulo.core.conf.Property.TABLE_DURABILITY; -import static org.apache.accumulo.core.conf.Property.TSERV_CLIENTPORT; import static org.apache.accumulo.core.conf.Property.TSERV_SCAN_MAX_OPENFILES; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; @@ -33,7 +32,6 @@ import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Instant; @@ -81,7 +79,7 @@ public void initMocks() { var sysPropKey = SystemPropKey.of(); VersionedProperties sysProps = - new VersionedProperties(1, Instant.now(), Map.of(GC_PORT.getKey(), "1234", + new VersionedProperties(1, Instant.now(), Map.of(RPC_BIND_PORT.getKey(), "20000-20005", TSERV_SCAN_MAX_OPENFILES.getKey(), "19", TABLE_BLOOM_ENABLED.getKey(), "true")); expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).once(); replay(propStore); @@ -108,14 +106,12 @@ public void testFromDefault() { public void testFromFixed() { var sysPropKey = SystemPropKey.of(); - assertEquals("9997", sysConfig.get(TSERV_CLIENTPORT)); // default - assertEquals("1234", sysConfig.get(GC_PORT)); // fixed sys config + assertEquals("20000-20005", sysConfig.get(RPC_BIND_PORT)); // fixed sys config assertEquals("19", sysConfig.get(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertEquals("true", sysConfig.get(TABLE_BLOOM_ENABLED)); // sys config assertEquals(TABLE_BLOOM_SIZE.getDefaultValue(), sysConfig.get(TABLE_BLOOM_SIZE)); // default - assertFalse(sysConfig.isPropertySet(TSERV_CLIENTPORT)); // default - assertTrue(sysConfig.isPropertySet(GC_PORT)); // fixed sys config + assertTrue(sysConfig.isPropertySet(RPC_BIND_PORT)); // fixed sys config assertTrue(sysConfig.isPropertySet(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertTrue(sysConfig.isPropertySet(TABLE_BLOOM_ENABLED)); // sys config assertTrue(sysConfig.isPropertySet(TABLE_BLOOM_SIZE)); // default @@ -123,7 +119,7 @@ public void testFromFixed() { reset(propStore); VersionedProperties sysUpdateProps = new VersionedProperties(2, Instant.now(), - Map.of(GC_PORT.getKey(), "3456", TSERV_SCAN_MAX_OPENFILES.getKey(), "27", + Map.of(RPC_BIND_PORT.getKey(), "21000-21005", TSERV_SCAN_MAX_OPENFILES.getKey(), "27", TABLE_BLOOM_ENABLED.getKey(), "false", TABLE_BLOOM_SIZE.getKey(), "2048")); expect(propStore.get(eq(sysPropKey))).andReturn(sysUpdateProps).anyTimes(); propStore.invalidate(sysPropKey); @@ -132,14 +128,12 @@ public void testFromFixed() { sysConfig.zkChangeEvent(sysPropKey); - assertEquals("9997", sysConfig.get(TSERV_CLIENTPORT)); // default - assertEquals("1234", sysConfig.get(GC_PORT)); // fixed sys config + assertEquals("20000-20005", sysConfig.get(RPC_BIND_PORT)); // fixed sys config assertEquals("19", sysConfig.get(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertEquals("false", sysConfig.get(TABLE_BLOOM_ENABLED)); // sys config assertEquals("2048", sysConfig.get(TABLE_BLOOM_SIZE)); // default - assertFalse(sysConfig.isPropertySet(TSERV_CLIENTPORT)); // default - assertTrue(sysConfig.isPropertySet(GC_PORT)); // fixed sys config + assertTrue(sysConfig.isPropertySet(RPC_BIND_PORT)); // fixed sys config assertTrue(sysConfig.isPropertySet(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertTrue(sysConfig.isPropertySet(TABLE_BLOOM_ENABLED)); // sys config assertTrue(sysConfig.isPropertySet(TABLE_BLOOM_SIZE)); // default @@ -152,7 +146,7 @@ public void testRGOverride() { var defaultRGPropKey = ResourceGroupPropKey.DEFAULT; var testRGPropKey = ResourceGroupPropKey.of(ResourceGroupId.of("test")); VersionedProperties sysProps = - new VersionedProperties(1, Instant.now(), Map.of(GC_PORT.getKey(), "1234", + new VersionedProperties(1, Instant.now(), Map.of(RPC_BIND_PORT.getKey(), "20000-20005", TSERV_SCAN_MAX_OPENFILES.getKey(), "19", TABLE_BLOOM_ENABLED.getKey(), "true")); expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).atLeastOnce(); expect(propStore.get(eq(defaultRGPropKey))).andReturn(new VersionedProperties()).anyTimes(); @@ -169,14 +163,12 @@ public void testRGOverride() { sysConfig.zkChangeEvent(sysPropKey); sysConfig.zkChangeEvent(defaultRGPropKey); - assertEquals("9997", sysConfig.get(TSERV_CLIENTPORT)); // default - assertEquals("1234", sysConfig.get(GC_PORT)); // fixed sys config + assertEquals("20000-20005", sysConfig.get(RPC_BIND_PORT)); // fixed sys config assertEquals("19", sysConfig.get(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertEquals("true", sysConfig.get(TABLE_BLOOM_ENABLED)); // sys config assertEquals("1048576", sysConfig.get(TABLE_BLOOM_SIZE)); // default - assertFalse(sysConfig.isPropertySet(TSERV_CLIENTPORT)); // default - assertTrue(sysConfig.isPropertySet(GC_PORT)); // fixed sys config + assertTrue(sysConfig.isPropertySet(RPC_BIND_PORT)); // fixed sys config assertTrue(sysConfig.isPropertySet(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertTrue(sysConfig.isPropertySet(TABLE_BLOOM_ENABLED)); // sys config assertTrue(sysConfig.isPropertySet(TABLE_BLOOM_SIZE)); // default @@ -186,14 +178,12 @@ public void testRGOverride() { ResourceGroupConfiguration rgConfig = new ResourceGroupConfiguration(context, testRGPropKey, testSysConfig); - assertEquals("9997", rgConfig.get(TSERV_CLIENTPORT)); // default - assertEquals("1234", rgConfig.get(GC_PORT)); // fixed sys config + assertEquals("20000-20005", rgConfig.get(RPC_BIND_PORT)); // fixed sys config assertEquals("19", rgConfig.get(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertEquals("false", rgConfig.get(TABLE_BLOOM_ENABLED)); // sys config assertEquals("4096", rgConfig.get(TABLE_BLOOM_SIZE)); // default - assertFalse(rgConfig.isPropertySet(TSERV_CLIENTPORT)); // default - assertTrue(rgConfig.isPropertySet(GC_PORT)); // fixed sys config + assertTrue(rgConfig.isPropertySet(RPC_BIND_PORT)); // fixed sys config assertTrue(rgConfig.isPropertySet(TSERV_SCAN_MAX_OPENFILES)); // fixed sys config assertTrue(rgConfig.isPropertySet(TABLE_BLOOM_ENABLED)); // sys config assertTrue(rgConfig.isPropertySet(TABLE_BLOOM_SIZE)); // default diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/ZooBasedConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/ZooBasedConfigurationTest.java index 8855da58da8..07d2d91aeca 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/ZooBasedConfigurationTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/ZooBasedConfigurationTest.java @@ -19,8 +19,8 @@ package org.apache.accumulo.server.conf; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.apache.accumulo.core.conf.Property.GC_PORT; import static org.apache.accumulo.core.conf.Property.MANAGER_BULK_TIMEOUT; +import static org.apache.accumulo.core.conf.Property.RPC_BIND_PORT; import static org.apache.accumulo.core.conf.Property.TABLE_BLOOM_ENABLED; import static org.apache.accumulo.core.conf.Property.TABLE_BLOOM_SIZE; import static org.apache.accumulo.core.conf.Property.TABLE_DURABILITY; @@ -46,6 +46,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.NamespaceId; @@ -167,7 +168,7 @@ public void getPropertiesTest() { // TABLE_SPLIT_THRESHOLD // read a fixed property from the system config - assertEquals("9998", zbc.get(GC_PORT)); + assertEquals(Property.RPC_BIND_PORT.getDefaultValue(), zbc.get(RPC_BIND_PORT)); // read a property from the sysconfig assertEquals(MINUTES.toMillis(5), zbc.getTimeInMillis(MANAGER_BULK_TIMEOUT)); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java index d6f8085ff70..d89aca919e6 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java @@ -18,9 +18,7 @@ */ package org.apache.accumulo.server.conf.store.impl; -import static org.apache.accumulo.core.conf.Property.GC_PORT; -import static org.apache.accumulo.core.conf.Property.MANAGER_CLIENTPORT; -import static org.apache.accumulo.core.conf.Property.TSERV_CLIENTPORT; +import static org.apache.accumulo.core.conf.Property.RPC_BIND_PORT; import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED; import static org.apache.accumulo.core.conf.Property.TSERV_SCAN_MAX_OPENFILES; import static org.easymock.EasyMock.anyObject; @@ -391,11 +389,9 @@ public void getIfCachedNotPresentTest() { public void captureExampleTest() throws Exception { Map props = new HashMap<>(); - props.put(TSERV_CLIENTPORT.getKey(), "1234"); + props.put(RPC_BIND_PORT.getKey(), "20000-20005"); props.put(TSERV_NATIVEMAP_ENABLED.getKey(), "false"); props.put(TSERV_SCAN_MAX_OPENFILES.getKey(), "2345"); - props.put(MANAGER_CLIENTPORT.getKey(), "3456"); - props.put(GC_PORT.getKey(), "4567"); VersionedProperties vProps = new VersionedProperties(8, Instant.now(), props); Capture path = newCapture(); diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index c4eed579b30..1c73c765f97 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -25,16 +25,13 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.UnknownHostException; -import java.util.Map; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Iface; import org.apache.accumulo.core.clientImpl.thrift.ClientService.Processor; @@ -60,10 +57,11 @@ public class TServerUtilsTest { private ServerContext context; private ZooSession zk; private MetricsInfo metricsInfo; - private final ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance()); + private ConfigurationCopy conf; @BeforeEach public void createMockServerContext() { + conf = new ConfigurationCopy(DefaultConfiguration.getInstance()); context = createMock(ServerContext.class); zk = createMock(ZooSession.class); expect(context.getZooSession()).andReturn(zk).anyTimes(); @@ -94,8 +92,7 @@ public void verifyMockServerContext() { @Test public void testStartServerZeroPort() throws Exception { TServer server = null; - conf.set(Property.TSERV_CLIENTPORT, "0"); - conf.set(Property.TSERV_PORTSEARCH, "false"); + conf.set(Property.RPC_BIND_PORT, "0"); try { ServerAddress address = startServer(); assertNotNull(address); @@ -113,8 +110,7 @@ public void testStartServerZeroPort() throws Exception { public void testStartServerFreePort() throws Exception { TServer server = null; int port = getFreePort(1024); - conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port)); - conf.set(Property.TSERV_PORTSEARCH, "false"); + conf.set(Property.RPC_BIND_PORT, Integer.toString(port)); try { ServerAddress address = startServer(); assertNotNull(address); @@ -130,26 +126,12 @@ public void testStartServerFreePort() throws Exception { @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing") @Test - public void testStartServerUsedPort() throws Exception { - int port = getFreePort(1024); - InetAddress addr = InetAddress.getByName("localhost"); - // Bind to the port - conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port)); - conf.set(Property.TSERV_PORTSEARCH, "false"); - try (ServerSocket s = new ServerSocket(port, 50, addr)) { - assertNotNull(s); - assertThrows(UnknownHostException.class, this::startServer); - } - } - - @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing") - @Test - public void testStartServerUsedPortWithSearch() throws Exception { + public void testStartServerUsedPortFallsBack() throws Exception { TServer server = null; + InetAddress addr = InetAddress.getByName("localhost"); int[] port = findTwoFreeSequentialPorts(1024); // Bind to the port - InetAddress addr = InetAddress.getByName("localhost"); - conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port[0])); + conf.set(Property.RPC_BIND_PORT, Integer.toString(port[0])); try (ServerSocket s = new ServerSocket(port[0], 50, addr)) { assertNotNull(s); ServerAddress address = startServer(); @@ -161,57 +143,24 @@ public void testStartServerUsedPortWithSearch() throws Exception { if (server != null) { server.stop(); } - } } @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing") @Test - public void testStartServerNonDefaultPorts() throws Exception { + public void testStartServerUsedPortWithSearch() throws Exception { TServer server = null; - - // This test finds 5 free ports in more-or-less a contiguous way and then - // uses those port numbers to Accumulo services in the below (ascending) sequence - // 0. TServer default client port (this test binds to this port to force a port search) - // 1. GC - // 2. Manager - // 3. Monitor - // 4. One free port - this is the one that we expect the TServer to finally use - int[] ports = findTwoFreeSequentialPorts(1024); - int tserverDefaultPort = ports[0]; - conf.set(Property.TSERV_CLIENTPORT, Integer.toString(tserverDefaultPort)); - int gcPort = ports[1]; - conf.set(Property.GC_PORT, Integer.toString(gcPort)); - - ports = findTwoFreeSequentialPorts(gcPort + 1); - int managerPort = ports[0]; - conf.set(Property.MANAGER_CLIENTPORT, Integer.toString(managerPort)); - int monitorPort = ports[1]; - conf.set(Property.MONITOR_PORT, Integer.toString(monitorPort)); - - ports = findTwoFreeSequentialPorts(monitorPort + 1); - int tserverFinalPort = ports[0]; - - // Ensure that the TServer client port we set above is NOT in the reserved ports - Map reservedPorts = - TServerUtils.getReservedPorts(conf, Property.TSERV_CLIENTPORT); - assertFalse(reservedPorts.containsKey(tserverDefaultPort)); - - // Ensure that all the ports we assigned (GC, Manager, Monitor) are included in the reserved - // ports as returned by TServerUtils - assertTrue(reservedPorts.containsKey(gcPort)); - assertTrue(reservedPorts.containsKey(managerPort)); - assertTrue(reservedPorts.containsKey(monitorPort)); - + int[] port = findTwoFreeSequentialPorts(1024); + // Bind to the port InetAddress addr = InetAddress.getByName("localhost"); - try (ServerSocket s = new ServerSocket(tserverDefaultPort, 50, addr)) { + conf.set(Property.RPC_BIND_PORT, port[0] + "-" + port[1]); + try (ServerSocket s = new ServerSocket(port[0], 50, addr)) { + assertNotNull(s); ServerAddress address = startServer(); assertNotNull(address); server = address.getServer(); assertNotNull(server); - - // Finally ensure that the TServer is using the last port (i.e. port search worked) - assertEquals(address.getAddress().getPort(), tserverFinalPort); + assertEquals(port[1], address.getAddress().getPort()); } finally { if (server != null) { server.stop(); @@ -225,8 +174,7 @@ public void testStartServerPortRange() throws Exception { TServer server = null; int[] port = findTwoFreeSequentialPorts(1024); String portRange = port[0] + "-" + port[1]; - conf.set(Property.TSERV_CLIENTPORT, portRange); - conf.set(Property.TSERV_PORTSEARCH, "false"); + conf.set(Property.RPC_BIND_PORT, portRange); try { ServerAddress address = startServer(); assertNotNull(address); @@ -249,8 +197,7 @@ public void testStartServerPortRangeFirstPortUsed() throws Exception { int[] port = findTwoFreeSequentialPorts(1024); String portRange = port[0] + "-" + port[1]; // Bind to the port - conf.set(Property.TSERV_CLIENTPORT, portRange); - conf.set(Property.TSERV_PORTSEARCH, "false"); + conf.set(Property.RPC_BIND_PORT, portRange); try (ServerSocket s = new ServerSocket(port[0], 50, addr)) { assertNotNull(s); ServerAddress address = startServer(); @@ -301,9 +248,9 @@ private ServerAddress startServer() throws Exception { // misconfiguration) String hostname = "localhost"; - ServerAddress sa = TServerUtils.createThriftServer(context, hostname, Property.TSERV_CLIENTPORT, - processor, "TServerUtilsTest", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, - Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); + ServerAddress sa = TServerUtils.createThriftServer(context, hostname, processor, + "TServerUtilsTest", Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, + Property.TSERV_THREADCHECK); sa.startThriftServer("TServerUtilsTestThread"); return sa; } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 9772d4bd9f6..a83dce42225 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -417,9 +417,8 @@ protected void startCompactorClientService() throws UnknownHostException { var processor = ThriftProcessorTypes.getCompactorTProcessor(this, clientHandler, getCompactorThriftHandlerInterface(), getContext()); updateThriftServer(() -> { - return TServerUtils.createThriftServer(getContext(), getBindAddress(), - Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), - Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, + return TServerUtils.createThriftServer(getContext(), getBindAddress(), processor, + this.getClass().getSimpleName(), Property.COMPACTOR_MINTHREADS, Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK); }, true); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index bc5edfa1681..ba6dba9456d 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -421,7 +421,7 @@ private void getZooLock(HostAndPort addr) throws KeeperException, InterruptedExc private void startStatsService() throws UnknownHostException { var processor = ThriftProcessorTypes.getGcTProcessor(this, this, getContext()); - IntStream port = getConfiguration().getPortStream(Property.GC_PORT); + IntStream port = getConfiguration().getPortStream(Property.RPC_BIND_PORT); HostAndPort[] addresses = TServerUtils.getHostAndPorts(getBindAddress(), port); long maxMessageSize = getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE); updateThriftServer(() -> { @@ -429,7 +429,7 @@ private void startStatsService() throws UnknownHostException { processor, getContext().getInstanceID(), this.getClass().getSimpleName(), 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0, - getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), false, + getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), addresses); }, true); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 3eb2d5437f9..b82b884eac6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -923,9 +923,9 @@ public void run() { compactionCoordinator.getThriftService(), managerClientHandler, getContext()); try { updateThriftServer(() -> { - return TServerUtils.createThriftServer(context, getBindAddress(), - Property.MANAGER_CLIENTPORT, processor, "Manager", null, Property.MANAGER_MINTHREADS, - Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); + return TServerUtils.createThriftServer(context, getBindAddress(), processor, "Manager", + Property.MANAGER_MINTHREADS, Property.MANAGER_MINTHREADS_TIMEOUT, + Property.MANAGER_THREADCHECK); }, false); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getBindAddress(), e); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 070b93266d5..8ef08e1e542 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -356,7 +356,7 @@ private GCStatus fetchGcStatus() { @Override public void run() { ServerContext context = getContext(); - int[] ports = getConfiguration().getPort(Property.MONITOR_PORT); + int[] ports = getConfiguration().getPort(Property.RPC_BIND_PORT); String rootContext = getConfiguration().get(Property.MONITOR_ROOT_CONTEXT); // Needs leading slash in order to property create rest endpoint requests Preconditions.checkArgument(rootContext.startsWith("/"), diff --git a/server/monitor/src/test/java/org/apache/accumulo/monitor/EmbeddedWebServerTest.java b/server/monitor/src/test/java/org/apache/accumulo/monitor/EmbeddedWebServerTest.java index 188db48225b..ac0864d4592 100644 --- a/server/monitor/src/test/java/org/apache/accumulo/monitor/EmbeddedWebServerTest.java +++ b/server/monitor/src/test/java/org/apache/accumulo/monitor/EmbeddedWebServerTest.java @@ -78,21 +78,18 @@ public static void finishMocks() { @Test public void testContextPath() { // Test removal of trailing slash - EmbeddedWebServer ews = new EmbeddedWebServer(monitor.get(), - Integer.parseInt(Property.MONITOR_PORT.getDefaultValue())); + EmbeddedWebServer ews = new EmbeddedWebServer(monitor.get(), 0); assertEquals("/test", ews.getContextPath(), "Context path of " + ews.getContextPath() + " does not match"); // Test redirect URL configuration.get().set(Property.MONITOR_ROOT_CONTEXT, "/../test"); IllegalArgumentException exception = - assertThrows(IllegalArgumentException.class, () -> new EmbeddedWebServer(monitor.get(), - Integer.parseInt(Property.MONITOR_PORT.getDefaultValue()))); + assertThrows(IllegalArgumentException.class, () -> new EmbeddedWebServer(monitor.get(), 0)); assertEquals("Root context: \"/../test\" is not a valid URL", exception.getMessage()); // Test whitespace in URL configuration.get().set(Property.MONITOR_ROOT_CONTEXT, "/whitespace /test"); exception = - assertThrows(IllegalArgumentException.class, () -> new EmbeddedWebServer(monitor.get(), - Integer.parseInt(Property.MONITOR_PORT.getDefaultValue()))); + assertThrows(IllegalArgumentException.class, () -> new EmbeddedWebServer(monitor.get(), 0)); assertEquals("Root context: \"/whitespace /test\" is not a valid URL", exception.getMessage()); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 71303627227..1b32b1a396c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -301,10 +301,9 @@ protected void startScanServerClientService() throws UnknownHostException { ThriftProcessorTypes.getScanServerTProcessor(this, clientHandler, this, getContext()); updateThriftServer(() -> { - return TServerUtils.createThriftServer(getContext(), getBindAddress(), - Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), - Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, Property.SSERV_MINTHREADS_TIMEOUT, - Property.SSERV_THREADCHECK); + return TServerUtils.createThriftServer(getContext(), getBindAddress(), processor, + this.getClass().getSimpleName(), Property.SSERV_MINTHREADS, + Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK); }, true); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 43be28270d6..e9569d81b16 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -411,9 +411,9 @@ AutoCloseable acquireRecoveryMemory(TabletMetadata tabletMetadata) { private void startServer(String address, TProcessor processor) throws UnknownHostException { updateThriftServer(() -> { - return TServerUtils.createThriftServer(getContext(), address, Property.TSERV_CLIENTPORT, - processor, this.getClass().getSimpleName(), Property.TSERV_PORTSEARCH, - Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); + return TServerUtils.createThriftServer(getContext(), address, processor, + this.getClass().getSimpleName(), Property.TSERV_MINTHREADS, + Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); }, true); } diff --git a/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java b/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java index f9e007b41fb..6dffd6a2d0f 100644 --- a/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ThriftServerBindsBeforeZooKeeperLockIT.java @@ -243,24 +243,15 @@ public void testGarbageCollectorPorts() throws Exception { private Process startProcess(MiniAccumuloClusterImpl cluster, ServerType serverType, int port) throws IOException { - final Property property; final Class service = switch (serverType) { - case MONITOR -> { - property = Property.MONITOR_PORT; - yield Monitor.class; - } - case MANAGER -> { - property = Property.MANAGER_CLIENTPORT; - yield Manager.class; - } - case GARBAGE_COLLECTOR -> { - property = Property.GC_PORT; - yield SimpleGarbageCollector.class; - } + case MONITOR -> Monitor.class; + case MANAGER -> Manager.class; + case GARBAGE_COLLECTOR -> SimpleGarbageCollector.class; default -> throw new IllegalArgumentException("Irrelevant server type for test"); }; - return cluster._exec(service, serverType, Map.of(property.getKey(), Integer.toString(port))) + return cluster + ._exec(service, serverType, Map.of(Property.RPC_BIND_PORT.getKey(), Integer.toString(port))) .getProcess(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index 18b6edae036..457abf588f6 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -239,7 +239,6 @@ public static void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuratio "[{'group':'" + GROUP8 + "'}]"); cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); cfg.setProperty(Property.COMPACTOR_CANCEL_CHECK_INTERVAL, "5s"); - cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "100ms"); cfg.setProperty(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "1s"); cfg.setProperty(Property.GENERAL_THREADPOOL_SIZE, "10"); diff --git a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java index ddb53c52f70..73fea96ac4f 100644 --- a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java +++ b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java @@ -18,9 +18,7 @@ */ package org.apache.accumulo.test.conf.store; -import static org.apache.accumulo.core.conf.Property.GC_PORT; -import static org.apache.accumulo.core.conf.Property.MANAGER_CLIENTPORT; -import static org.apache.accumulo.core.conf.Property.TSERV_CLIENTPORT; +import static org.apache.accumulo.core.conf.Property.RPC_BIND_PORT; import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED; import static org.apache.accumulo.core.conf.Property.TSERV_SCAN_MAX_OPENFILES; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; @@ -119,11 +117,9 @@ public void cleanupZnodes() throws Exception { @Test public void init() throws Exception { Map props = new HashMap<>(); - props.put(TSERV_CLIENTPORT.getKey(), "1234"); + props.put(RPC_BIND_PORT.getKey(), "20000-20005"); props.put(TSERV_NATIVEMAP_ENABLED.getKey(), "false"); props.put(TSERV_SCAN_MAX_OPENFILES.getKey(), "2345"); - props.put(MANAGER_CLIENTPORT.getKey(), "3456"); - props.put(GC_PORT.getKey(), "4567"); VersionedProperties vProps = new VersionedProperties(props); // directly create prop node - simulate existing properties. diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index e4d93597b83..834921d1771 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@ -101,7 +101,7 @@ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET); cfg.setProperty(Property.GC_CYCLE_START, "1"); cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); - cfg.setProperty(Property.GC_PORT, "0"); + cfg.setProperty(Property.RPC_BIND_PORT, "0"); cfg.setProperty(Property.TSERV_MAXMEM, "5K"); // reduce the batch size significantly in order to cause the integration tests to have // to process many batches of deletion candidates. diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashDefaultIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashDefaultIT.java index 34d503677de..4d1ac05e02a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashDefaultIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashDefaultIT.java @@ -57,7 +57,7 @@ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) cfg.setProperty(Property.GC_CYCLE_START, "1"); cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); - cfg.setProperty(Property.GC_PORT, "0"); + cfg.setProperty(Property.RPC_BIND_PORT, "0"); cfg.setProperty(Property.TSERV_MAXMEM, "5K"); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledIT.java index 6476bef75bd..58557f0067f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledIT.java @@ -57,7 +57,7 @@ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) cfg.setProperty(Property.GC_CYCLE_START, "1"); cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); - cfg.setProperty(Property.GC_PORT, "0"); + cfg.setProperty(Property.RPC_BIND_PORT, "0"); cfg.setProperty(Property.TSERV_MAXMEM, "5K"); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledWithCustomPolicyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledWithCustomPolicyIT.java index 500ad7da424..708693f7ec2 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledWithCustomPolicyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorTrashEnabledWithCustomPolicyIT.java @@ -75,7 +75,7 @@ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) cfg.setProperty(Property.GC_CYCLE_START, "1"); cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); - cfg.setProperty(Property.GC_PORT, "0"); + cfg.setProperty(Property.RPC_BIND_PORT, "0"); cfg.setProperty(Property.TSERV_MAXMEM, "5K"); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java index c1bad36ae49..4c02457200e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GracefulShutdownIT.java @@ -105,7 +105,6 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2); cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s"); cfg.setProperty(Property.COMPACTOR_CANCEL_CHECK_INTERVAL, "5s"); - cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true"); cfg.setProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + GROUP_NAME + ".planner", RatioBasedCompactionPlanner.class.getName()); cfg.setProperty( diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 3437310e348..683cfbecb1a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -127,7 +127,7 @@ public static void main(String[] args) throws Exception { ServerAddress serverPort = TServerUtils.createThriftServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, context.getInstanceID(), "ZombieTServer", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, - context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, + context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, port)); serverPort.startThriftServer("walking dead"); diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index 033fab023cb..3124469fc18 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -268,9 +268,8 @@ private void suspensionTestBody(TServerKiller serverStopper, AfterSuspendAction HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); log.info("Restarting " + restartedServer); ((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, ServerType.TABLET_SERVER, - Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), - Property.TSERV_PORTSEARCH.getKey(), "false"), - "-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME); + Map.of(Property.RPC_BIND_PORT.getKey(), "" + restartedServer.getPort()), "-o", + Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME); // Eventually, the suspended tablets should be reassigned to the newly alive tserver. log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 785eeacfcae..bb0669a74c3 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -282,7 +282,7 @@ static class Opts extends Help { @Parameter(names = "--table", description = "table to adopt", required = true) String tableName = null; @Parameter(names = "--port", description = "port number to use") - int port = DefaultConfiguration.getInstance().getPort(Property.TSERV_CLIENTPORT)[0]; + int port = DefaultConfiguration.getInstance().getPort(Property.RPC_BIND_PORT)[0]; } public static void main(String[] args) throws Exception { @@ -318,7 +318,7 @@ public static void main(String[] args) throws Exception { ServerAddress sa = TServerUtils.createThriftServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, context.getInstanceID(), "NullTServer", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, - context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, + context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, opts.port)); sa.startThriftServer("null tserver"); From aa5b11af54e0191eb199d3ce2c6d0459fe7ca245 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Mon, 24 Nov 2025 14:54:43 -0500 Subject: [PATCH 2/7] Add ability to set accumulo props in accumulo-env.sh --- assemble/bin/accumulo | 12 +++++++++++- assemble/conf/accumulo-env.sh | 10 ++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/assemble/bin/accumulo b/assemble/bin/accumulo index aa7e4ca92ad..0eb0783a66f 100755 --- a/assemble/bin/accumulo +++ b/assemble/bin/accumulo @@ -86,7 +86,17 @@ function main() { JAVA=("${ACCUMULO_JAVA_PREFIX[@]}" "$JAVA") fi - exec "${JAVA[@]}" "${JAVA_OPTS[@]}" org.apache.accumulo.start.Main "$@" + # Allow users to supply extra Accumulo arguments via ACCUMULO_MAIN_ARGS + if ! declare -p ACCUMULO_MAIN_ARGS &>/dev/null; then + ACCUMULO_MAIN_ARGS=() + else + declare_output=$(declare -p ACCUMULO_MAIN_ARGS 2>/dev/null) + if [[ $declare_output != declare\ -a* ]]; then + read -r -a ACCUMULO_MAIN_ARGS <<<"${ACCUMULO_MAIN_ARGS}" + fi + fi + + exec "${JAVA[@]}" "${JAVA_OPTS[@]}" org.apache.accumulo.start.Main "$@" "${ACCUMULO_MAIN_ARGS[@]}" } main "$@" diff --git a/assemble/conf/accumulo-env.sh b/assemble/conf/accumulo-env.sh index 52f99ab7642..ba93e35ffc0 100644 --- a/assemble/conf/accumulo-env.sh +++ b/assemble/conf/accumulo-env.sh @@ -159,3 +159,13 @@ esac ## environment, that will override what is set here, rather than some mangled ## merged result. You can set the variable any way you like. #declare -p 'ACCUMULO_JAVA_PREFIX' &>/dev/null || ACCUMULO_JAVA_PREFIX='' + +## ACCUMULO_MAIN_ARGS can be used to pass extra arguments directly to +## org.apache.accumulo.start.Main (after the JVM options). Declare as an array +## to avoid issues with spaces in values. +#case "$cmd" in +# monitor) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=9995") ;; +# tserver) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=20000-20049") ;; +# compactor) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=20050-20099") ;; +# sserver) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=20100-20149") ;; +#esac From 120dce0ba0d09cbfb4c8b5db5b4f3e66c81130df Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Mon, 24 Nov 2025 15:06:08 -0500 Subject: [PATCH 3/7] properly read array --- assemble/bin/accumulo | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/assemble/bin/accumulo b/assemble/bin/accumulo index 0eb0783a66f..48ba0736ea6 100755 --- a/assemble/bin/accumulo +++ b/assemble/bin/accumulo @@ -92,7 +92,8 @@ function main() { else declare_output=$(declare -p ACCUMULO_MAIN_ARGS 2>/dev/null) if [[ $declare_output != declare\ -a* ]]; then - read -r -a ACCUMULO_MAIN_ARGS <<<"${ACCUMULO_MAIN_ARGS}" + ACCUMULO_MAIN_ARGS_RAW=${ACCUMULO_MAIN_ARGS[*]} + read -r -a ACCUMULO_MAIN_ARGS <<<"${ACCUMULO_MAIN_ARGS_RAW}" fi fi From 16da39a74b2ceb59d7886a3f79fdf65994112f7f Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 25 Nov 2025 11:24:49 -0500 Subject: [PATCH 4/7] rollback changes to scripts --- assemble/bin/accumulo | 13 +------------ assemble/conf/accumulo-env.sh | 10 ---------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/assemble/bin/accumulo b/assemble/bin/accumulo index 48ba0736ea6..aa7e4ca92ad 100755 --- a/assemble/bin/accumulo +++ b/assemble/bin/accumulo @@ -86,18 +86,7 @@ function main() { JAVA=("${ACCUMULO_JAVA_PREFIX[@]}" "$JAVA") fi - # Allow users to supply extra Accumulo arguments via ACCUMULO_MAIN_ARGS - if ! declare -p ACCUMULO_MAIN_ARGS &>/dev/null; then - ACCUMULO_MAIN_ARGS=() - else - declare_output=$(declare -p ACCUMULO_MAIN_ARGS 2>/dev/null) - if [[ $declare_output != declare\ -a* ]]; then - ACCUMULO_MAIN_ARGS_RAW=${ACCUMULO_MAIN_ARGS[*]} - read -r -a ACCUMULO_MAIN_ARGS <<<"${ACCUMULO_MAIN_ARGS_RAW}" - fi - fi - - exec "${JAVA[@]}" "${JAVA_OPTS[@]}" org.apache.accumulo.start.Main "$@" "${ACCUMULO_MAIN_ARGS[@]}" + exec "${JAVA[@]}" "${JAVA_OPTS[@]}" org.apache.accumulo.start.Main "$@" } main "$@" diff --git a/assemble/conf/accumulo-env.sh b/assemble/conf/accumulo-env.sh index ba93e35ffc0..52f99ab7642 100644 --- a/assemble/conf/accumulo-env.sh +++ b/assemble/conf/accumulo-env.sh @@ -159,13 +159,3 @@ esac ## environment, that will override what is set here, rather than some mangled ## merged result. You can set the variable any way you like. #declare -p 'ACCUMULO_JAVA_PREFIX' &>/dev/null || ACCUMULO_JAVA_PREFIX='' - -## ACCUMULO_MAIN_ARGS can be used to pass extra arguments directly to -## org.apache.accumulo.start.Main (after the JVM options). Declare as an array -## to avoid issues with spaces in values. -#case "$cmd" in -# monitor) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=9995") ;; -# tserver) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=20000-20049") ;; -# compactor) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=20050-20099") ;; -# sserver) ACCUMULO_MAIN_ARGS=(-o "rpc.bind.port=20100-20149") ;; -#esac From 7375e2d112d53257df3d9bc9f8b201ee2ff79df7 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 25 Nov 2025 13:58:11 -0500 Subject: [PATCH 5/7] remove .trim() use isValidFormat() in getPortStream() --- .../accumulo/core/conf/AccumuloConfiguration.java | 12 +++++++----- .../org/apache/accumulo/core/conf/PropertyType.java | 13 +++++++------ 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java index ad4bb58d7e0..6ea69e2de38 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java @@ -346,10 +346,12 @@ public int[] getPort(Property property) { public IntStream getPortStream(Property property) { checkType(property, PropertyType.PORT); - String portString = get(property); - Preconditions.checkArgument(portString != null, "Port cannot be null."); - portString = portString.trim(); - Preconditions.checkArgument(!portString.isEmpty(), "Port cannot be empty."); + String portString = Objects.requireNonNull(get(property), "Port cannot be null."); + if (!PropertyType.PORT.isValidFormat(portString)) { + log.error("Invalid port syntax for {}: '{}'. Using default value instead: {}", + property.getKey(), portString, property.getDefaultValue()); + return PortRange.parse(property.getDefaultValue()); + } if (portString.contains("-")) { // value is a range, parse it as such @@ -364,7 +366,7 @@ public IntStream getPortStream(Property property) { } log.error("Invalid port number: {}. Using default instead: {}", port, property.getDefaultValue()); - return PortRange.parse(property.getDefaultValue().trim()); + return PortRange.parse(property.getDefaultValue()); } catch (NumberFormatException e) { throw new IllegalArgumentException( "Invalid port syntax. Must be a single positive integer or a range M-N.", e); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java index 82be56a9842..91ca14091bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java @@ -427,20 +427,22 @@ public boolean test(final String input) { if (input == null) { return true; } - final String trimmed = input.trim(); - if (trimmed.isEmpty()) { + if (input.isEmpty()) { return false; } - if ("0".equals(trimmed)) { + if (input.chars().anyMatch(Character::isWhitespace)) { + return false; + } + if ("0".equals(input)) { return true; } try { - int port = Integer.parseInt(trimmed); + int port = Integer.parseInt(input); return PortRange.VALID_RANGE.contains(port); } catch (NumberFormatException e) { try { - PortRange.parse(trimmed); + PortRange.parse(input); return true; } catch (IllegalArgumentException ex) { return false; @@ -457,7 +459,6 @@ private PortRange() {} public static IntStream parse(String value) { Objects.requireNonNull(value, "Port range cannot be null."); - value = value.trim(); Preconditions.checkArgument(!value.isEmpty(), "Port range cannot be empty."); Preconditions.checkArgument(value.contains("-"), "Invalid port range, expected format like M-N."); From d6d85530ffae62d969127195612e1a1d7b6890a7 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Tue, 25 Nov 2025 14:56:35 -0500 Subject: [PATCH 6/7] add to property docs, improve tests --- .../apache/accumulo/core/conf/Property.java | 6 +-- .../core/conf/AccumuloConfigurationTest.java | 23 ++++++----- .../accumulo/server/rpc/TServerUtils.java | 14 +------ .../accumulo/server/rpc/TServerUtilsTest.java | 38 ++++++++++++------- 4 files changed, 42 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6ffb18f92a9..ff09354ce09 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -98,9 +98,9 @@ public enum Property { RPC_BIND_PORT("rpc.bind.port", "19000-19999", PropertyType.PORT, """ The port or range of ports servers attempt to bind for RPC traffic. Provide a single \ - value to target an exact port (will attempt higher ports if given port is already in use, \ - up to 1000 additional checks), or a range using formats like '19000-19999' to allow searching for \ - the first available port within that range. + value to target an exact port, or a range using formats like '19000-19999' to allow searching for \ + the first available port within that range. A value of '0' is also allowed and will have the \ + OS assign an ephemeral port at bind time. """, "4.0.0"), RPC_MAX_MESSAGE_SIZE("rpc.message.size.max", Integer.toString(Integer.MAX_VALUE), diff --git a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java index b28a7036a1a..e5a2f0c8ab1 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/AccumuloConfigurationTest.java @@ -95,24 +95,29 @@ public void testGetPortRange() { assertArrayEquals(new int[] {36000, 36001, 36002}, ports); } + /** + * Test that when an invalid range is supplied, the default value is used as fallback + */ @Test public void testGetPortRangeInvalid() { ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); - cc.set(Property.RPC_BIND_PORT, "1020-1026"); - var msg = - assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.RPC_BIND_PORT)); - assertTrue(msg.getMessage().startsWith("Port range bounds must be 1024 to 65535")); - - cc.set(Property.RPC_BIND_PORT, "65533-65538"); - msg = assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.RPC_BIND_PORT)); - assertTrue(msg.getMessage().startsWith("Port range bounds must be 1024 to 65535")); + cc.set(Property.RPC_BIND_PORT, "1020-1026"); // some ports below min + int[] defaultPorts = DefaultConfiguration.getInstance().getPort(Property.RPC_BIND_PORT); + assertArrayEquals(defaultPorts, cc.getPort(Property.RPC_BIND_PORT)); + + cc.set(Property.RPC_BIND_PORT, "65533-65538"); // some ports over max + assertArrayEquals(defaultPorts, cc.getPort(Property.RPC_BIND_PORT)); } + /** + * When an invalid port value is provided, we should fallback to default + */ @Test public void testGetPortInvalidSyntax() { ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); cc.set(Property.RPC_BIND_PORT, "bad-port"); - assertThrows(IllegalArgumentException.class, () -> cc.getPort(Property.RPC_BIND_PORT)); + int[] defaultPorts = DefaultConfiguration.getInstance().getPort(Property.RPC_BIND_PORT); + assertArrayEquals(defaultPorts, cc.getPort(Property.RPC_BIND_PORT)); } private static class TestConfiguration extends AccumuloConfiguration { diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index f855cbd94d7..f5dc241a45c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -39,7 +39,6 @@ import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.conf.PropertyType.PortRange; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.SslConnectionParams; @@ -74,7 +73,6 @@ */ public class TServerUtils { private static final Logger log = LoggerFactory.getLogger(TServerUtils.class); - private static final int SINGLE_PORT_FALLBACK_RANGE = 1000; /** * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client @@ -94,17 +92,7 @@ public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) { return new HostAndPort[0]; } - IntStream candidates; - if (configuredPorts.length == 1 && configuredPorts[0] > 0) { - int basePort = configuredPorts[0]; - int maxPort = PortRange.VALID_RANGE.getMaximum(); - int searchUpperBound = Math.min(maxPort, basePort + SINGLE_PORT_FALLBACK_RANGE); - IntStream fallback = basePort < searchUpperBound - ? IntStream.rangeClosed(basePort + 1, searchUpperBound) : IntStream.empty(); - candidates = IntStream.concat(IntStream.of(basePort), fallback); - } else { - candidates = Arrays.stream(configuredPorts); - } + IntStream candidates = Arrays.stream(configuredPorts); return candidates.mapToObj(port -> HostAndPort.fromParts(hostname, port)) .toArray(HostAndPort[]::new); diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 1c73c765f97..49d3df4f90f 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -26,6 +26,7 @@ import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -126,23 +127,14 @@ public void testStartServerFreePort() throws Exception { @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing") @Test - public void testStartServerUsedPortFallsBack() throws Exception { - TServer server = null; + public void testStartServerUsedPortFails() throws Exception { InetAddress addr = InetAddress.getByName("localhost"); - int[] port = findTwoFreeSequentialPorts(1024); + int port = getFreePort(1024); // Bind to the port - conf.set(Property.RPC_BIND_PORT, Integer.toString(port[0])); - try (ServerSocket s = new ServerSocket(port[0], 50, addr)) { + conf.set(Property.RPC_BIND_PORT, Integer.toString(port)); + try (ServerSocket s = new ServerSocket(port, 50, addr)) { assertNotNull(s); - ServerAddress address = startServer(); - assertNotNull(address); - server = address.getServer(); - assertNotNull(server); - assertEquals(port[1], address.getAddress().getPort()); - } finally { - if (server != null) { - server.stop(); - } + assertThrows(UnknownHostException.class, this::startServer); } } @@ -169,6 +161,24 @@ public void testStartServerUsedPortWithSearch() throws Exception { } } + /** + * Make sure an exception is thrown if the given port range has no available ports in it + */ + @SuppressFBWarnings(value = "UNENCRYPTED_SERVER_SOCKET", justification = "socket for testing") + @Test + public void testStartServerPortRangeAllUsedFails() throws Exception { + InetAddress addr = InetAddress.getByName("localhost"); + int[] port = findTwoFreeSequentialPorts(1024); + String portRange = port[0] + "-" + port[1]; + conf.set(Property.RPC_BIND_PORT, portRange); + try (ServerSocket s1 = new ServerSocket(port[0], 50, addr); + ServerSocket s2 = new ServerSocket(port[1], 50, addr)) { + assertNotNull(s1); + assertNotNull(s2); + assertThrows(UnknownHostException.class, this::startServer); + } + } + @Test public void testStartServerPortRange() throws Exception { TServer server = null; From 81d398f9db6f508dce6011a943259de7cbf602b9 Mon Sep 17 00:00:00 2001 From: Dom Garguilo Date: Mon, 1 Dec 2025 13:39:15 -0500 Subject: [PATCH 7/7] revert to simpler code in TServerUtils --- .../org/apache/accumulo/server/rpc/TServerUtils.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index f5dc241a45c..0759111d035 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -87,14 +87,7 @@ public class TServerUtils { * @return array of HostAndPort objects */ public static HostAndPort[] getHostAndPorts(String hostname, IntStream ports) { - int[] configuredPorts = ports.toArray(); - if (configuredPorts.length == 0) { - return new HostAndPort[0]; - } - - IntStream candidates = Arrays.stream(configuredPorts); - - return candidates.mapToObj(port -> HostAndPort.fromParts(hostname, port)) + return ports.mapToObj(port -> HostAndPort.fromParts(hostname, port)) .toArray(HostAndPort[]::new); }