Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ private void setUpPinotController() {
_tableSizeReader =
new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager,
_leadControllerManager);
_helixResourceManager.registerTableSizeReader(_tableSizeReader);
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,9 @@ public RebalanceResult rebalance(
@ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to rebalance table in dry-run mode") @DefaultValue("false") @QueryParam("dryRun")
boolean dryRun,
@ApiParam(value = "Whether to return dry-run summary instead of full, dry-run must be enabled to use this")
@DefaultValue("false") @QueryParam("summary")
boolean summary,
@ApiParam(value = "Whether to enable pre-checks for table, must be in dry-run mode to enable")
@DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
@ApiParam(value = "Whether to reassign instances before reassigning segments") @DefaultValue("false")
Expand Down Expand Up @@ -646,6 +649,7 @@ public RebalanceResult rebalance(
String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr);
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
rebalanceConfig.setSummary(summary);
rebalanceConfig.setPreChecks(preChecks);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
Expand All @@ -666,7 +670,7 @@ public RebalanceResult rebalance(
String rebalanceJobId = TableRebalancer.createUniqueRebalanceJobIdentifier();

try {
if (dryRun || preChecks || downtime) {
if (dryRun || summary || preChecks || downtime) {
// For dry-run, preChecks or rebalance with downtime, directly return the rebalance result as it should return
// immediately
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false);
Expand All @@ -687,7 +691,7 @@ public RebalanceResult rebalance(
String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType);
LOGGER.error(errorMsg, t);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null,
null);
null, null);
}
});
boolean isJobIdPersisted = waitForRebalanceToPersist(
Expand All @@ -708,7 +712,7 @@ public RebalanceResult rebalance(
return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(),
dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment(),
dryRunResult.getPreChecksResult());
dryRunResult.getPreChecksResult(), dryRunResult.getRebalanceSummaryResult());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
Expand Down Expand Up @@ -241,6 +242,7 @@ private enum LineageUpdateType {
private TableCache _tableCache;
private final LineageManager _lineageManager;
private final RebalancePreChecker _rebalancePreChecker;
private TableSizeReader _tableSizeReader;

public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays,
Expand Down Expand Up @@ -449,6 +451,15 @@ public RebalancePreChecker getRebalancePreChecker() {
}

/**
* Get the table size reader.
*
* @return table size reader
*/
public TableSizeReader getTableSizeReader() {
return _tableSizeReader;
}

/**
* Instance related APIs
*/

Expand Down Expand Up @@ -1943,6 +1954,10 @@ public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManage
_pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
}

public void registerTableSizeReader(TableSizeReader tableSizeReader) {
_tableSizeReader = tableSizeReader;
}

private void assignInstances(TableConfig tableConfig, boolean override) {
String tableNameWithType = tableConfig.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
Expand Down Expand Up @@ -3612,7 +3627,8 @@ public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig tabl
tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
}
TableRebalancer tableRebalancer =
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker);
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker,
_tableSizeReader);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable
}

@Override
public Map<String, String> check(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig) {
public Map<String, String> check(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) {
LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId);

Map<String, String> preCheckResult = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _dryRun = false;

// Whether to return only dry-run summary instead of full dry-run output, can only be used in dry-run mode
@JsonProperty("summary")
@ApiModelProperty(example = "false")
private boolean _summary = false;

// Whether to perform pre-checks for rebalance. This only returns the status of each pre-check and does not fail
// rebalance
@JsonProperty("preChecks")
Expand Down Expand Up @@ -124,6 +129,14 @@ public void setDryRun(boolean dryRun) {
_dryRun = dryRun;
}

public boolean isSummary() {
return _summary;
}

public void setSummary(boolean summary) {
_summary = summary;
}

public boolean isPreChecks() {
return _preChecks;
}
Expand Down Expand Up @@ -246,10 +259,10 @@ public void setRetryInitialDelayInMs(long retryInitialDelayInMs) {

@Override
public String toString() {
return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + _preChecks + ", _reassignInstances="
+ _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + _bootstrap
+ ", _downtime=" + _downtime + ", _minAvailableReplicas=" + _minAvailableReplicas + ", _bestEfforts="
+ _bestEfforts + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _summary=" + _summary + ", preChecks=" + _preChecks
+ ", _reassignInstances=" + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _bootstrap="
+ _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + _minAvailableReplicas
+ ", _bestEfforts=" + _bestEfforts + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
+ ", _externalViewStabilizationTimeoutInMs=" + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
+ _updateTargetTier + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
+ _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs="
Expand All @@ -259,6 +272,7 @@ public String toString() {
public static RebalanceConfig copy(RebalanceConfig cfg) {
RebalanceConfig rc = new RebalanceConfig();
rc._dryRun = cfg._dryRun;
rc._summary = cfg._summary;
rc._preChecks = cfg._preChecks;
rc._reassignInstances = cfg._reassignInstances;
rc._includeConsuming = cfg._includeConsuming;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class RebalanceResult {
private final String _description;
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<String, String> _preChecksResult;
@JsonInclude(JsonInclude.Include.NON_NULL)
private final RebalanceSummaryResult _rebalanceSummaryResult;

@JsonCreator
public RebalanceResult(@JsonProperty(value = "jobId", required = true) String jobId,
Expand All @@ -50,14 +52,16 @@ public RebalanceResult(@JsonProperty(value = "jobId", required = true) String jo
@JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
@JsonProperty("tierInstanceAssignment") @Nullable Map<String, InstancePartitions> tierInstanceAssignment,
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment,
@JsonProperty("preChecksResult") @Nullable Map<String, String> preChecksResult) {
@JsonProperty("preChecksResult") @Nullable Map<String, String> preChecksResult,
@JsonProperty("rebalanceSummaryResult") @Nullable RebalanceSummaryResult rebalanceSummaryResult) {
_jobId = jobId;
_status = status;
_description = description;
_instanceAssignment = instanceAssignment;
_tierInstanceAssignment = tierInstanceAssignment;
_segmentAssignment = segmentAssignment;
_preChecksResult = preChecksResult;
_rebalanceSummaryResult = rebalanceSummaryResult;
}

@JsonProperty
Expand Down Expand Up @@ -95,6 +99,11 @@ public Map<String, String> getPreChecksResult() {
return _preChecksResult;
}

@JsonProperty
public RebalanceSummaryResult getRebalanceSummaryResult() {
return _rebalanceSummaryResult;
}

public enum Status {
// FAILED if the job has ended with known exceptions;
// ABORTED if the job is stopped by others but retry is still allowed;
Expand Down
Loading