diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index e928cbd1544..7567d620202 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -74,9 +74,10 @@ public class Fate { private static final Logger log = LoggerFactory.getLogger(Fate.class); private final FateStore store; - private final ScheduledFuture fatePoolsWatcherFuture; + private ScheduledFuture fatePoolsWatcherFuture; private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0); - private final ExecutorService deadResCleanerExecutor; + private ExecutorService deadResCleanerExecutor; + private final FateConfiguration fateConfig; private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); public static final Duration INITIAL_DELAY = Duration.ofSeconds(3); @@ -247,6 +248,10 @@ public void run() { } } + private record FateConfiguration(T environment, boolean runDeadResCleaner, + AccumuloConfiguration conf, ScheduledThreadPoolExecutor genSchedExecutor) { + } + /** * Creates a Fault-tolerant executor for the given store type. * @@ -258,6 +263,18 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, Function,String> toLogStrFunc, AccumuloConfiguration conf, ScheduledThreadPoolExecutor genSchedExecutor) { this.store = FateLogger.wrap(store, toLogStrFunc, false); + this.fateConfig = + new FateConfiguration<>(environment, runDeadResCleaner, conf, genSchedExecutor); + + } + + public void start() { + log.info("Start {} FATE", store.type()); + keepRunning.set(true); + final var environment = fateConfig.environment; + final var runDeadResCleaner = fateConfig.runDeadResCleaner; + final var conf = fateConfig.conf; + final var genSchedExecutor = fateConfig.genSchedExecutor; fatePoolsWatcherFuture = genSchedExecutor.scheduleWithFixedDelay(new FatePoolsWatcher(environment, conf), diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 3eb2d5437f9..e4a01fd59fc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1267,6 +1267,7 @@ protected Fate initializeFateInstance(ServerContext context, FateStore< final Fate fateInstance = new Fate<>(this, store, true, TraceRepo::toLogString, getConfiguration(), context.getScheduledExecutor()); + fateInstance.start(); var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index f23596569f3..e5a107cafb3 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -40,6 +40,7 @@ public FastFate(T environment, FateStore store, boolean runDeadResCleaner, Function,String> toLogStrFunc, AccumuloConfiguration conf) { super(environment, store, runDeadResCleaner, toLogStrFunc, conf, new ScheduledThreadPoolExecutor(2)); + start(); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java index ee3427fc2c2..dd49ba210a8 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateExecutionOrderITBase.java @@ -194,9 +194,12 @@ private void waitFor(FateStore store, FateId txid) throws Exception } protected Fate initializeFate(AccumuloClient client, FateStore store) { - return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", + Fate fate = new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1, "AllFateOps"), new ScheduledThreadPoolExecutor(2)); + + fate.start(); + return fate; } private static Entry toIdStep(Entry e) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java index d965881c177..8ac090aaab0 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateITBase.java @@ -554,9 +554,12 @@ private void submitDeferred(Fate fate, ServerContext sctx, Set } protected Fate initializeFate(FateStore store) { - return new Fate<>(new TestEnv(), store, false, r -> r + "", + Fate fate = new Fate<>(new TestEnv(), store, false, r -> r + "", FateTestUtil.updateFateConfig(new ConfigurationCopy(), 1, "AllFateOps"), new ScheduledThreadPoolExecutor(2)); + + fate.start(); + return fate; } protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsITBase.java index 0f62184103c..d96ba9d2a0c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsITBase.java @@ -917,8 +917,10 @@ protected FastFate initFateWithDeadResCleaner(FateStore initFateNoDeadResCleaner(FateStore store) { - return new Fate<>(new LatchTestEnv(), store, false, Object::toString, + Fate fate = new Fate<>(new LatchTestEnv(), store, false, Object::toString, DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + fate.start(); + return fate; } private boolean wordIsTStatus(String word) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java index 00825815ee6..68519f596a0 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java @@ -43,6 +43,7 @@ public FlakyFate(T environment, FateStore store, Function,String> toL fateExecutors.add(new FlakyFateExecutor<>(this, environment, poolConfig.getKey(), poolConfig.getValue().getValue(), poolConfig.getValue().getKey())); } + start(); } private static class FlakyFateExecutor extends FateExecutor { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java index d2c79855f4c..9f43ee98dd3 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java @@ -254,8 +254,10 @@ private void testMultipleFateInstances(TestStoreFactory testSto Fate fate1 = new Fate<>(testEnv1, store1, true, Object::toString, DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + fate1.start(); Fate fate2 = new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance(), new ScheduledThreadPoolExecutor(2)); + fate2.start(); try { for (int i = 0; i < numFateIds; i++) { @@ -361,6 +363,7 @@ private void testDeadReservationsCleanup(TestStoreFactory testStor // fate1. fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config, new ScheduledThreadPoolExecutor(2)); + fate2.start(); // Wait for the "dead" reservations to be deleted and picked up again (reserved using // fate2/store2/lock2 now). diff --git a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java index 65a3783778d..a5b7ddc4682 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/SlowFateSplit.java @@ -58,6 +58,7 @@ public SlowFateSplit(T environment, FateStore store, Function,String> fateExecutors.add(new SlowFateSplitExecutor(this, environment, poolConfig.getKey(), poolConfig.getValue().getValue(), poolConfig.getValue().getKey())); } + start(); } private class SlowFateSplitExecutor extends FateExecutor {