Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -346,22 +346,30 @@ public int[] getPort(Property property) {
public IntStream getPortStream(Property property) {
checkType(property, PropertyType.PORT);

String portString = get(property);
try {
String portString = Objects.requireNonNull(get(property), "Port cannot be null.");
if (!PropertyType.PORT.isValidFormat(portString)) {
log.error("Invalid port syntax for {}: '{}'. Using default value instead: {}",
property.getKey(), portString, property.getDefaultValue());
return PortRange.parse(property.getDefaultValue());
}

if (portString.contains("-")) {
// value is a range, parse it as such
return PortRange.parse(portString);
} catch (IllegalArgumentException e) {
try {
int port = Integer.parseInt(portString);
if (port == 0 || PortRange.VALID_RANGE.contains(port)) {
return IntStream.of(port);
} else {
log.error("Invalid port number {}; Using default {}", port, property.getDefaultValue());
return IntStream.of(Integer.parseInt(property.getDefaultValue()));
}
} catch (NumberFormatException e1) {
throw new IllegalArgumentException("Invalid port syntax. Must be a single positive "
+ "integers or a range (M-N) of positive integers");
}

// getting to this point means the value is a single port (not a range)
try {
int port = Integer.parseInt(portString);
if (port == 0 || PortRange.VALID_RANGE.contains(port)) {
return IntStream.of(port);
}
log.error("Invalid port number: {}. Using default instead: {}", port,
property.getDefaultValue());
return PortRange.parse(property.getDefaultValue());
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid port syntax. Must be a single positive integer or a range M-N.", e);
}
}

Expand Down
66 changes: 26 additions & 40 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public enum Property {
The local IP address to which this server should bind for sending \
and receiving network traffic. If not set then the process binds to all addresses.
""", "2.1.4"),
RPC_BIND_PORT("rpc.bind.port", "19000-19999", PropertyType.PORT,
"""
The port or range of ports servers attempt to bind for RPC traffic. Provide a single \
value to target an exact port, or a range using formats like '19000-19999' to allow searching for \
the first available port within that range. A value of '0' is also allowed and will have the \
OS assign an ephemeral port at bind time.
""",
"4.0.0"),
RPC_MAX_MESSAGE_SIZE("rpc.message.size.max", Integer.toString(Integer.MAX_VALUE),
PropertyType.BYTES, "The maximum size of a message that can be received by a server.",
"2.1.3"),
Expand Down Expand Up @@ -399,8 +407,6 @@ was changed and it now can accept multiple class names. The metrics spi was intr
// properties that are specific to manager server behavior
MANAGER_PREFIX("manager.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the manager server.", "2.1.0"),
MANAGER_CLIENTPORT("manager.port.client", "9999", PropertyType.PORT,
"The port used for handling client connections on the manager.", "1.3.5"),
Comment on lines -402 to -403
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably a good idea to keep these around and deprecated. If the new property is set, then we should use the new behavior and ignore anything in the old properties. Otherwise, we should read the old properties and use them.

Ideally, we'd introduce this in 3.1 and deprecate them there, so we don't need to have these in 4.0 at all.

Because this is a much simpler config, that hopefully users just don't have to worry about much because the defaults are reasonable, I don't think it would be completely terrible if we just changed this without a deprecation, but we would definitely need to call that out in the release notes.

MANAGER_TABLET_BALANCER("manager.tablet.balancer",
"org.apache.accumulo.core.spi.balancer.TableLoadBalancer", PropertyType.CLASSNAME,
"The balancer class that accumulo will use to make tablet assignment and "
Expand Down Expand Up @@ -589,11 +595,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J
expiration time, will trigger a background refresh for future hits. \
Value must be less than 100%. Set to 0 will disable refresh.
""", "2.1.3"),
SSERV_PORTSEARCH("sserver.port.search", "true", PropertyType.BOOLEAN,
"if the sserver.port.client ports are in use, search higher ports until one is available.",
"2.1.0"),
SSERV_CLIENTPORT("sserver.port.client", "9996", PropertyType.PORT,
"The port used for handling client connections on the tablet servers.", "2.1.0"),
SSERV_MINTHREADS("sserver.server.threads.minimum", "20", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", "2.1.0"),
SSERV_MINTHREADS_TIMEOUT("sserver.server.threads.timeout", "0s", PropertyType.TIMEDURATION,
Expand Down Expand Up @@ -641,11 +642,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J
"Specifies the size of the cache for RFile index blocks.", "1.3.5"),
TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY,
"Specifies the size of the cache for summary data on each tablet server.", "2.0.0"),
TSERV_PORTSEARCH("tserver.port.search", "true", PropertyType.BOOLEAN,
"if the tserver.port.client ports are in use, search higher ports until one is available.",
"1.3.5"),
TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT,
"The port used for handling client connections on the tablet servers.", "1.3.5"),
TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "5%", PropertyType.MEMORY,
"The amount of memory used to store write-ahead-log mutations before flushing them.",
"1.7.0"),
Expand Down Expand Up @@ -850,8 +846,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J
"Time between garbage collection cycles. In each cycle, old RFiles or write-ahead logs "
+ "no longer in use are removed from the filesystem.",
"1.3.5"),
GC_PORT("gc.port.client", "9998", PropertyType.PORT,
"The listening port for the garbage collector's monitor service.", "1.3.5"),
GC_DELETE_WAL_THREADS("gc.threads.delete.wal", "4", PropertyType.COUNT,
"The number of threads used to delete write-ahead logs and recovery files.", "2.1.4"),
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT,
Expand All @@ -868,8 +862,6 @@ Each key is the name of the pool (can be assigned any string). Each value is a J
// properties that are specific to the monitor server behavior
MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the monitor web server.", "1.3.5"),
MONITOR_PORT("monitor.port.client", "9995", PropertyType.PORT,
"The listening port for the monitor's http service.", "1.3.5"),
MONITOR_SSL_KEYSTORE("monitor.ssl.keyStore", "", PropertyType.PATH,
"The keystore for enabling monitor SSL.", "1.5.0"),
@Sensitive
Expand Down Expand Up @@ -1302,11 +1294,6 @@ start with the category prefix, followed by a scope (minc, majc, scan, \
+ " should be cancelled. This checks for situations like was the tablet deleted (split "
+ " and merge do this), was the table deleted, was a user compaction canceled, etc.",
"2.1.4"),
COMPACTOR_PORTSEARCH("compactor.port.search", "true", PropertyType.BOOLEAN,
"If the compactor.port.client ports are in use, search higher ports until one is available.",
"2.1.0"),
COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
"The port used for handling client connections on the compactor servers.", "2.1.0"),
COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION,
"The minimum amount of time to wait between checks for the next compaction job, backing off"
+ "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
Expand Down Expand Up @@ -1643,6 +1630,7 @@ public static boolean isValidTablePropertyKey(String key) {
// RPC options
RPC_BACKLOG, RPC_SSL_KEYSTORE_TYPE, RPC_SSL_TRUSTSTORE_TYPE, RPC_USE_JSSE,
RPC_SSL_ENABLED_PROTOCOLS, RPC_SSL_CLIENT_PROTOCOL, RPC_SASL_QOP, RPC_MAX_MESSAGE_SIZE,
RPC_BIND_PORT,

// INSTANCE options
INSTANCE_ZK_HOST, INSTANCE_ZK_TIMEOUT, INSTANCE_SECRET, INSTANCE_SECURITY_AUTHENTICATOR,
Expand All @@ -1661,40 +1649,38 @@ public static boolean isValidTablePropertyKey(String key) {
// MANAGER options
MANAGER_THREADCHECK, MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, MANAGER_METADATA_SUSPENDABLE,
MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT,
MANAGER_CLIENTPORT, MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT,
MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
MANAGER_TABLET_REFRESH_MINTHREADS, MANAGER_TABLET_REFRESH_MAXTHREADS,
MANAGER_TABLET_MERGEABILITY_INTERVAL, MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX,
MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME,
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, MANAGER_TABLET_REFRESH_MINTHREADS,
MANAGER_TABLET_REFRESH_MAXTHREADS, MANAGER_TABLET_MERGEABILITY_INTERVAL,
MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX,

// SSERV options
SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_CLIENTPORT,
SSERV_PORTSEARCH, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE,
SSERV_DEFAULT_BLOCKSIZE, SSERV_SCAN_REFERENCE_EXPIRATION_TIME,
SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT,
SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME,
SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_DATACACHE_SIZE,
SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, SSERV_DEFAULT_BLOCKSIZE,
SSERV_SCAN_REFERENCE_EXPIRATION_TIME, SSERV_CACHED_TABLET_METADATA_EXPIRATION,
SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT, SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME,

// TSERV options
TSERV_TOTAL_MUTATION_QUEUE_MAX, TSERV_WAL_MAX_SIZE, TSERV_WAL_MAX_AGE,
TSERV_WAL_TOLERATED_CREATION_FAILURES, TSERV_WAL_TOLERATED_WAIT_INCREMENT,
TSERV_WAL_TOLERATED_MAXIMUM_WAIT_DURATION, TSERV_MAX_IDLE, TSERV_SESSION_MAXIDLE,
TSERV_SCAN_RESULTS_MAX_TIMEOUT, TSERV_MINC_MAXCONCURRENT, TSERV_THREADCHECK,
TSERV_LOG_BUSY_TABLETS_COUNT, TSERV_LOG_BUSY_TABLETS_INTERVAL, TSERV_WAL_SORT_MAX_CONCURRENT,
TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_CLIENTPORT, TSERV_PORTSEARCH,
TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE,
TSERV_MINTHREADS, TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM,
TSERV_SCAN_MAX_OPENFILES, TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME,
TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_DATACACHE_SIZE,
TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE, TSERV_MINTHREADS,
TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, TSERV_SCAN_MAX_OPENFILES,
TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME,

// GC options
GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START, GC_PORT,
GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START,

// MONITOR options
MONITOR_PORT, MONITOR_SSL_KEYSTORETYPE, MONITOR_SSL_TRUSTSTORETYPE,
MONITOR_SSL_INCLUDE_PROTOCOLS, MONITOR_LOCK_CHECK_INTERVAL, MONITOR_ROOT_CONTEXT,
MONITOR_SSL_KEYSTORETYPE, MONITOR_SSL_TRUSTSTORETYPE, MONITOR_SSL_INCLUDE_PROTOCOLS,
MONITOR_LOCK_CHECK_INTERVAL, MONITOR_ROOT_CONTEXT,

// COMPACTOR options
COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_CLIENTPORT, COMPACTOR_THREADCHECK,
COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS, COMPACTOR_MINTHREADS_TIMEOUT,
COMPACTOR_GROUP_NAME,
COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_THREADCHECK, COMPACTOR_MINTHREADS,
COMPACTOR_MINTHREADS_TIMEOUT, COMPACTOR_GROUP_NAME,

// COMPACTION_COORDINATOR options
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL,
Expand Down
90 changes: 58 additions & 32 deletions core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.file.rfile.RFile;
Expand Down Expand Up @@ -96,13 +95,11 @@ public enum PropertyType {
+ " 'localhost:2000,www.example.com,10.10.1.1:500' and 'localhost'.\n"
+ "Examples of invalid host lists are '', ':1000', and 'localhost:80000'"),

PORT("port",
x -> Stream.of(new Bounds(1024, 65535), in(true, "0"), new PortRange("\\d{4,5}-\\d{4,5}"))
.anyMatch(y -> y.test(x)),
"An positive integer in the range 1024-65535 (not already in use or"
+ " specified elsewhere in the configuration),\n"
+ "zero to indicate any open ephemeral port, or a range of positive"
+ " integers specified as M-N"),
PORT("port", new PortPredicate(),
"A positive integer in the range 1024-65535 (not already in use or specified elsewhere in the"
+ " configuration), zero to indicate any open ephemeral port, or a range of positive"
+ " integers expressed as M-N (inclusive) or using interval notation such as [M,N) or"
+ " [M,N]."),

COUNT("count", new Bounds(0, Integer.MAX_VALUE),
"A non-negative integer in the range of 0-" + Integer.MAX_VALUE),
Expand Down Expand Up @@ -423,43 +420,72 @@ public boolean test(final String input) {

}

public static class PortRange extends Matches {

public static final Range<Integer> VALID_RANGE = Range.of(1024, 65535);

public PortRange(final String pattern) {
super(pattern);
}
private static class PortPredicate implements Predicate<String> {

@Override
public boolean test(final String input) {
if (super.test(input)) {
if (input == null) {
return true;
}
if (input.isEmpty()) {
return false;
}
if (input.chars().anyMatch(Character::isWhitespace)) {
return false;
}
if ("0".equals(input)) {
return true;
}

try {
int port = Integer.parseInt(input);
return PortRange.VALID_RANGE.contains(port);
} catch (NumberFormatException e) {
try {
PortRange.parse(input);
return true;
} catch (IllegalArgumentException e) {
} catch (IllegalArgumentException ex) {
return false;
}
} else {
return false;
}
}
}

public static IntStream parse(String portRange) {
int idx = portRange.indexOf('-');
if (idx != -1) {
int low = Integer.parseInt(portRange.substring(0, idx));
int high = Integer.parseInt(portRange.substring(idx + 1));
if (!VALID_RANGE.contains(low) || !VALID_RANGE.contains(high) || low > high) {
throw new IllegalArgumentException(
"Invalid port range specified, only 1024 to 65535 supported.");
}
return IntStream.rangeClosed(low, high);
public static class PortRange {

public static final Range<Integer> VALID_RANGE = Range.of(1024, 65535);

private PortRange() {}

public static IntStream parse(String value) {
Objects.requireNonNull(value, "Port range cannot be null.");
Preconditions.checkArgument(!value.isEmpty(), "Port range cannot be empty.");
Preconditions.checkArgument(value.contains("-"),
"Invalid port range, expected format like M-N.");
String[] parts = value.split("-", 2);
Preconditions.checkArgument(parts.length == 2, "Invalid port range, must use M-N notation.");

int low;
int high;
try {
low = Integer.parseInt(parts[0].trim());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid port value: " + parts[0], e);
}
try {
high = Integer.parseInt(parts[1].trim());
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid port value: " + parts[1], e);
}
throw new IllegalArgumentException(
"Invalid port range specification, must use M-N notation.");
}

Preconditions.checkArgument(VALID_RANGE.contains(low),
"Port range bounds must be 1024 to 65535. Got " + low);
Preconditions.checkArgument(VALID_RANGE.contains(high),
"Port range bounds must be 1024 to 65535 Got " + high);
Preconditions.checkArgument(high > low, "Upper bound must be >= lower bound.");

return IntStream.rangeClosed(low, high);
}
}

private static class ValidFateConfig implements Predicate<String> {
Expand Down
Loading