diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java index b560dd66028b..1ceff184af0c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentOperationsThrottlerTest.java @@ -308,11 +308,14 @@ public void testServingQueriesDisabledWithAcquireRelease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); @@ -322,12 +325,11 @@ public void testServingQueriesDisabledWithAcquireRelease() operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - defaultPermitsBeforeQuery); + initialPermits - numPermitsToTake); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); @@ -340,11 +342,11 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() int initialPermits = 4; List segmentOperationsThrottlerList = new ArrayList<>(); segmentOperationsThrottlerList.add(new SegmentAllIndexPreprocessThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); segmentOperationsThrottlerList.add(new SegmentStarTreePreprocessThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); segmentOperationsThrottlerList.add(new SegmentDownloadThrottler(initialPermits, Integer.parseInt( - CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES), false)); + CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false)); for (BaseSegmentOperationsThrottler operationsThrottler : segmentOperationsThrottlerList) { int defaultPermitsBeforeQuery = operationsThrottler instanceof SegmentAllIndexPreprocessThrottler @@ -353,14 +355,17 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5); + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - 5); + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5); + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1 - 5); } // Double the permits for before serving queries config @@ -370,29 +375,29 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease() : operationsThrottler instanceof SegmentStarTreePreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - String.valueOf(defaultPermitsBeforeQuery * 2)); + String.valueOf(defaultPermitsBeforeQuery)); operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2); - // We doubled permits but took all of the previous ones - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); + // We increased permits but took some before the increase + Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - numPermitsToTake); - // Take remaining permits - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + // Take more permits + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2); - Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); + Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.availablePermits(), + defaultPermitsBeforeQuery - numPermitsToTake - i - 1); } // Once the server is ready to server queries, we should reset the throttling configurations to be as configured operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - (defaultPermitsBeforeQuery * 2)); + initialPermits - (numPermitsToTake * 2)); - for (int i = 0; i < defaultPermitsBeforeQuery * 2; i++) { + for (int i = 0; i < numPermitsToTake * 2; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery * 2) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake * 2) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); @@ -418,11 +423,14 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigDecrease() ? Integer.parseInt( CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) : Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES); + // Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too + // long to finish + int numPermitsToTake = 10000; // We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally // preprocessing more segments is acceptable and cannot affect the query performance Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.acquire(); Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery); Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1); @@ -430,27 +438,26 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigDecrease() // Half the permits for before serving queries config Map updatedClusterConfigs = new HashMap<>(); + int newDefaultPermits = defaultPermitsBeforeQuery / 2; updatedClusterConfigs.put(operationsThrottler instanceof SegmentAllIndexPreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : operationsThrottler instanceof SegmentStarTreePreprocessThrottler ? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES : CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES, - String.valueOf(defaultPermitsBeforeQuery / 2)); + String.valueOf(newDefaultPermits)); operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs); - Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery / 2); + Assert.assertEquals(operationsThrottler.totalPermits(), newDefaultPermits); // We doubled permits but took all of the previous ones - Assert.assertEquals(operationsThrottler.availablePermits(), -(defaultPermitsBeforeQuery / 2)); + Assert.assertEquals(operationsThrottler.availablePermits(), newDefaultPermits - numPermitsToTake); // Once the server is ready to server queries, we should reset the throttling configurations to be as configured operationsThrottler.startServingQueries(); Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); - Assert.assertEquals(operationsThrottler.availablePermits(), - initialPermits - defaultPermitsBeforeQuery); + Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits - numPermitsToTake); - for (int i = 0; i < defaultPermitsBeforeQuery; i++) { + for (int i = 0; i < numPermitsToTake; i++) { operationsThrottler.release(); - Assert.assertEquals(operationsThrottler.availablePermits(), - (initialPermits - defaultPermitsBeforeQuery) + i + 1); + Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1); } Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits); Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3933187f7f65..9ba8b4f85fdd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -252,26 +252,33 @@ public static class Instance { // Preprocess throttle configs public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM = "pinot.server.max.segment.preprocess.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(100); + // Setting to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); // Before serving queries is enabled, we should use a higher preprocess parallelism to process segments faster public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.preprocess.parallelism.before.serving.queries"; - // Use the below default before enabling queries on the server - public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(100); + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = + String.valueOf(Integer.MAX_VALUE); // Preprocess throttle config specifically for StarTree index rebuild public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = "pinot.server.max.segment.startree.preprocess.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(100); + // Setting to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.startree.preprocess.parallelism.before.serving.queries"; + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = - String.valueOf(100); + String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "pinot.server.max.segment.download.parallelism"; - public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "100"; + // Setting to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = String.valueOf(Integer.MAX_VALUE); public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "pinot.server.max.segment.download.parallelism.before.serving.queries"; - public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "100"; + // Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default + public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = + String.valueOf(Integer.MAX_VALUE); } public static class Broker {