From 00db6e1c24c00bfdc8ac429f643882c51da42a13 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Tue, 16 Dec 2025 16:14:02 -0500 Subject: [PATCH 1/7] Enables SharedVariableAtomicityDetector Enables the SharedVariableAtomicityDetector SpotBugs detector. Resolved all race conditions and all false-positives (with suppressions or NotThreadSafe annotation) --- core/pom.xml | 4 ++ .../client/ClientSideIteratorScanner.java | 1 + .../core/client/rfile/RFileScanner.java | 3 + .../core/clientImpl/ScannerOptions.java | 3 + .../TabletServerBatchReaderIterator.java | 3 + .../core/clientImpl/ThriftTransportPool.java | 3 + .../crypto/streams/BlockedInputStream.java | 3 + .../zookeeper/DistributedReadWriteLock.java | 62 +++++++++++-------- .../accumulo/core/file/BloomFilterLayer.java | 3 + .../impl/SeekableByteArrayInputStream.java | 3 + .../core/file/map/MapFileOperations.java | 3 + .../accumulo/core/file/rfile/RFile.java | 4 ++ .../bcfile/SimpleBufferedOutputStream.java | 3 + .../streams/BoundedRangeFileInputStream.java | 3 + .../system/ColumnFamilySkippingIterator.java | 3 + .../iteratorsImpl/system/StatsIterator.java | 3 + .../core/metadata/schema/TabletsMetadata.java | 3 + .../core/singletons/SingletonManager.java | 4 ++ .../compaction/DefaultCompactionPlanner.java | 8 ++- .../core/spi/crypto/AESCryptoService.java | 2 + pom.xml | 2 +- server/base/pom.xml | 4 ++ .../server/compaction/CountingIterator.java | 3 + .../accumulo/server/fs/FileManager.java | 3 + .../problems/ProblemReportingIterator.java | 3 + .../AuthenticationTokenKeyManager.java | 8 +++ .../accumulo/server/tablets/TabletTime.java | 9 +-- server/gc/pom.xml | 4 ++ .../java/org/apache/accumulo/gc/GCRun.java | 3 + server/manager/pom.xml | 4 ++ .../manager/tableOps/bulkVer2/LoadFiles.java | 3 + .../org/apache/accumulo/monitor/Monitor.java | 20 +++--- server/tserver/pom.xml | 4 ++ .../apache/accumulo/tserver/InMemoryMap.java | 3 + .../apache/accumulo/tserver/NativeMap.java | 3 + .../tserver/TabletServerResourceManager.java | 2 +- .../tserver/tablet/ScanDataSource.java | 3 + .../accumulo/tserver/tablet/Scanner.java | 3 + .../accumulo/tserver/tablet/Tablet.java | 4 +- .../CompactableImplFileManagerTest.java | 3 + test/pom.xml | 4 ++ .../functional/ErrorThrowingIterator.java | 3 + .../performance/scan/CollectTabletStats.java | 3 + 43 files changed, 178 insertions(+), 47 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 9710b0afc0d..77630d7ef07 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -47,6 +47,10 @@ auto-service true + + com.google.code.findbugs + jsr305 + com.google.code.gson gson diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 0209e8deece..1370eaf6678 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -177,6 +177,7 @@ public ClientSideIteratorScanner(final Scanner scanner) { this.range = scanner.getRange(); this.size = scanner.getBatchSize(); this.retryTimeout = scanner.getTimeout(MILLISECONDS); + // TODO Is this intended to be getBatchTimeout()? this.batchTimeout = scanner.getTimeout(MILLISECONDS); this.readaheadThreshold = scanner.getReadaheadThreshold(); SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration(); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 9266a53e510..39632375688 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -28,6 +28,8 @@ import java.util.Set; import java.util.SortedSet; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -76,6 +78,7 @@ import com.google.common.base.Preconditions; +@NotThreadSafe class RFileScanner extends ScannerOptions implements Scanner { private static class RFileScannerEnvironmentImpl extends ClientServiceEnvironmentImpl { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java index 2b044f6fd84..fb288a519f5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerOptions.java @@ -35,6 +35,8 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -46,6 +48,7 @@ import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.io.Text; +@NotThreadSafe public class ScannerOptions implements ScannerBase { protected List serverSideIteratorList = Collections.emptyList(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index bf47678b9a7..232ff8e8e1e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -45,6 +45,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.SampleNotPresentException; @@ -355,6 +357,7 @@ private String getTableInfo() { return context.getPrintableTableInfoFromId(tableId); } + @NotThreadSafe private class QueryTask implements Runnable { private final String tsLocation; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index 75db7bffeda..73a8aa0c0bc 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@ -41,6 +41,8 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; @@ -577,6 +579,7 @@ public TransportPoolShutdownException(String msg) { private static final long serialVersionUID = 1L; } + @NotThreadSafe private static class CachedTTransport extends TTransport { private final ThriftTransportKey cacheKey; diff --git a/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedInputStream.java b/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedInputStream.java index 3e37f6f4045..4777b341532 100644 --- a/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/crypto/streams/BlockedInputStream.java @@ -23,10 +23,13 @@ import java.io.IOException; import java.io.InputStream; +import javax.annotation.concurrent.NotThreadSafe; + /** * Reader corresponding to BlockedOutputStream. Expects all data to be in the form of size (int) * data (size bytes) junk (however many bytes it takes to complete a block) */ +@NotThreadSafe public class BlockedInputStream extends InputStream { byte[] array; // ReadPos is where to start reading diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index da27d408a08..cd9cd4f676a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.SortedMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -111,7 +112,7 @@ static class ReadLock implements Lock { QueueLock qlock; byte[] userData; - long entry = -1; + AtomicLong entry; ReadLock(QueueLock qlock, byte[] userData) { this.qlock = qlock; @@ -122,7 +123,7 @@ static class ReadLock implements Lock { ReadLock(QueueLock qlock, byte[] userData, long entry) { this.qlock = qlock; this.userData = userData; - this.entry = entry; + this.entry = new AtomicLong(entry); } protected LockType lockType() { @@ -154,22 +155,27 @@ public void lockInterruptibly() throws InterruptedException { @Override public boolean tryLock() { - if (entry == -1) { - entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); - log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); - } - SortedMap entries = qlock.getEarlierEntries(entry); + var entryVal = entry.updateAndGet(val -> { + if (val == -1) { + var newVal = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); + log.info("Added lock entry {} userData {} lockType {}", newVal, + new String(this.userData, UTF_8), lockType()); + return newVal; + } else { + return val; + } + }); + SortedMap entries = qlock.getEarlierEntries(entryVal); for (Entry entry : entries.entrySet()) { ParsedLock parsed = new ParsedLock(entry.getValue()); - if (entry.getKey().equals(this.entry)) { + if (entry.getKey().equals(entryVal)) { return true; } if (parsed.type == LockType.WRITE) { return false; } } - throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + throw new IllegalStateException("Did not find our own lock in the queue: " + entryVal + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } @@ -190,13 +196,14 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { @Override public void unlock() { - if (entry == -1) { - return; - } - log.debug("Removing lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); - qlock.removeEntry(entry); - entry = -1; + entry.updateAndGet(val -> { + if (val != -1) { + log.debug("Removing lock entry {} userData {} lockType {}", val, + new String(this.userData, UTF_8), lockType()); + qlock.removeEntry(val); + } + return -1; + }); } @Override @@ -222,18 +229,23 @@ protected LockType lockType() { @Override public boolean tryLock() { - if (entry == -1) { - entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); - log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); - } - SortedMap entries = qlock.getEarlierEntries(entry); + var entryVal = entry.updateAndGet(val -> { + if (val == -1) { + var newVal = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); + log.info("Added lock entry {} userData {} lockType {}", newVal, + new String(this.userData, UTF_8), lockType()); + return newVal; + } else { + return val; + } + }); + SortedMap entries = qlock.getEarlierEntries(entryVal); Iterator> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { - throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + throw new IllegalStateException("Did not find our own lock in the queue: " + entryVal + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } - return iterator.next().getKey().equals(entry); + return iterator.next().getKey().equals(entryVal); } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index 7650ef8572d..c9eb55420aa 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@ -35,6 +35,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.bloomfilter.DynamicBloomFilter; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -346,6 +348,7 @@ public void close() { } } + @NotThreadSafe public static class Reader implements FileSKVIterator { private final BloomFilterLoader bfl; diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java index c231e88b942..732123d1990 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/SeekableByteArrayInputStream.java @@ -23,12 +23,15 @@ import java.io.IOException; import java.io.InputStream; +import javax.annotation.concurrent.NotThreadSafe; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; /** * This class is like byte array input stream with two differences. It supports seeking and avoids * synchronization. */ +@NotThreadSafe public class SeekableByteArrayInputStream extends InputStream { // making this volatile for the following case diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java index c2da8c00665..26d24e5846a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -43,6 +45,7 @@ public class MapFileOperations extends FileOperations { private static final String MSG = "Map files are not supported"; + @NotThreadSafe public static class RangeIterator implements FileSKVIterator { SortedKeyValueIterator reader; diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 68e2be016d1..d1919d05718 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -40,6 +40,8 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.sample.Sampler; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -557,6 +559,7 @@ public void close() throws IOException { } } + @NotThreadSafe public static class Writer implements FileSKVWriter { public static final int MAX_CF_IN_DLG = 1000; @@ -754,6 +757,7 @@ public long getLength() { } } + @NotThreadSafe private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator { private final CachableBlockFile.Reader reader; diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java index 87e96a602d8..e1bd64ed78f 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.io.OutputStream; +import javax.annotation.concurrent.NotThreadSafe; + /** * A simplified BufferedOutputStream with borrowed buffer, and allow users to see how much data have * been buffered. */ +@NotThreadSafe class SimpleBufferedOutputStream extends FilterOutputStream { protected byte[] buf; // the borrowed buffer protected int count = 0; // bytes used in buffer. diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java index d9f41862ae0..ff6a7be71ba 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.hadoop.fs.Seekable; /** @@ -28,6 +30,7 @@ * regular input stream. One can create multiple BoundedRangeFileInputStream on top of the same * FSDataInputStream and they would not interfere with each other. */ +@NotThreadSafe public class BoundedRangeFileInputStream extends InputStream { private volatile boolean closed = false; diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/ColumnFamilySkippingIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/ColumnFamilySkippingIterator.java index 3745bf0fc33..5c48a581e0c 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/ColumnFamilySkippingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/ColumnFamilySkippingIterator.java @@ -25,6 +25,8 @@ import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; @@ -36,6 +38,7 @@ import org.apache.accumulo.core.iterators.ServerSkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +@NotThreadSafe public class ColumnFamilySkippingIterator extends ServerSkippingIterator implements InterruptibleIterator { diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java index 47a08819731..e9c0d7dccae 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/StatsIterator.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -31,6 +33,7 @@ import org.apache.accumulo.core.iterators.ServerWrappingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +@NotThreadSafe public class StatsIterator extends ServerWrappingIterator { private int numRead = 0; diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 41189e8d663..6c2bcbfe4c8 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -44,6 +44,8 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IsolatedScanner; @@ -89,6 +91,7 @@ */ public class TabletsMetadata implements Iterable, AutoCloseable { + @NotThreadSafe public static class Builder implements TableRangeOptions, TableOptions, RangeOptions, Options { private final List families = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/singletons/SingletonManager.java b/core/src/main/java/org/apache/accumulo/core/singletons/SingletonManager.java index 5f3e151b58f..a8dee7fffb0 100644 --- a/core/src/main/java/org/apache/accumulo/core/singletons/SingletonManager.java +++ b/core/src/main/java/org/apache/accumulo/core/singletons/SingletonManager.java @@ -27,6 +27,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + /** * This class automates management of static singletons that maintain state for Accumulo clients. * Historically, Accumulo client code that used Connector had no control over these singletons. The @@ -81,6 +83,8 @@ public enum Mode { private static List services; @VisibleForTesting + @SuppressFBWarnings(value = "AT_NONATOMIC_64BIT_PRIMITIVE", + justification = "only called in static init block and testing, no sync needed") static void reset() { reservations = 0; mode = Mode.CLIENT; diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index 99bfd8472c2..8750e59aa61 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -169,8 +169,10 @@ public String toString() { private int maxFilesToCompact; private double lowestRatio; - @SuppressFBWarnings(value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD"}, - justification = "Field is written by Gson") + @SuppressFBWarnings( + value = {"UWF_UNWRITTEN_FIELD", "NP_UNWRITTEN_FIELD", "AT_NONATOMIC_64BIT_PRIMITIVE"}, + justification = "UWF_UNWRITTEN_FIELD and NP_UNWRITTEN_FIELD: Field is written by Gson. " + + "AT_NONATOMIC_64BIT_PRIMITIVE: Fields modified here are initialized once, and read-only after.") @Override public void init(InitParameters params) { @@ -243,6 +245,8 @@ public void init(InitParameters params) { } @SuppressWarnings("removal") + @SuppressFBWarnings(value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", + justification = "only called in init") private void determineMaxFilesToCompact(InitParameters params) { String fqo = params.getFullyQualifiedOption("maxOpen"); if (!params.getServiceEnvironment().getConfiguration().isSet(fqo) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java index 555103fd983..86aa95b26d2 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/crypto/AESCryptoService.java @@ -117,6 +117,8 @@ protected Cipher initialValue() { }; @Override + @SuppressFBWarnings(value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", + justification = "Fields modified here are initialized once, and read-only after.") public void init(Map conf) throws CryptoException { ensureNotInit(); String keyLocation = Objects.requireNonNull(conf.get(KEY_URI_PROPERTY), diff --git a/pom.xml b/pom.xml index e7ed0e963f8..e5ea99b37b3 100644 --- a/pom.xml +++ b/pom.xml @@ -769,7 +769,7 @@ under the License. true 1024 16 - ConstructorThrow,SharedVariableAtomicityDetector + ConstructorThrow -Dcom.overstock.findbugs.ignore=com.google.common.util.concurrent.RateLimiter,com.google.common.hash.Hasher,com.google.common.hash.HashCode,com.google.common.hash.HashFunction,com.google.common.hash.Hashing,com.google.common.cache.Cache,com.google.common.io.CountingOutputStream,com.google.common.io.ByteStreams,com.google.common.cache.LoadingCache,com.google.common.base.Stopwatch,com.google.common.cache.RemovalNotification,com.google.common.util.concurrent.Uninterruptibles,com.google.common.reflect.ClassPath,com.google.common.reflect.ClassPath$ClassInfo,com.google.common.base.Throwables,com.google.common.collect.Iterators diff --git a/server/base/pom.xml b/server/base/pom.xml index bef6dd39600..ae1a1d5a14e 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -44,6 +44,10 @@ auto-service true + + com.google.code.findbugs + jsr305 + com.google.code.gson gson diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CountingIterator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CountingIterator.java index 4c768ba6f00..f029f022848 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CountingIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CountingIterator.java @@ -23,12 +23,15 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.WrappingIterator; +@NotThreadSafe public class CountingIterator extends WrappingIterator { private long count; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index b2f9daeccd3..7a69bb8537b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -386,6 +388,7 @@ private void releaseReaders(KeyExtent tablet, List readers, } + @NotThreadSafe static class FileDataSource implements DataSource { private SortedKeyValueIterator iter; diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java index 4344c6ba975..e72cd6dab45 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -33,6 +35,7 @@ import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator; import org.apache.accumulo.server.ServerContext; +@NotThreadSafe public class ProblemReportingIterator implements InterruptibleIterator { private final SortedKeyValueIterator source; private boolean sawError = false; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java index 5348e3bb9bd..7596e4e172d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java @@ -26,6 +26,8 @@ import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + /** * Service that handles generation of the secret key used to create delegation tokens. */ @@ -93,6 +95,9 @@ public void run() { } @VisibleForTesting + @SuppressFBWarnings( + value = {"AT_STALE_THREAD_WRITE_OF_PRIMITIVE", "AT_NONATOMIC_64BIT_PRIMITIVE"}, + justification = "only called from run() and testing") void updateStateFromCurrentKeys() { try { List currentKeys = keyDistributor.getCurrentKeys(); @@ -138,6 +143,9 @@ int getIdSeq() { * * @param now The current time in millis since epoch. */ + @SuppressFBWarnings( + value = {"AT_STALE_THREAD_WRITE_OF_PRIMITIVE", "AT_NONATOMIC_64BIT_PRIMITIVE"}, + justification = "only called from run() and testing") void _run(long now) { // clear any expired keys int removedKeys = secretManager.removeExpiredKeys(keyDistributor); diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java index 45d3870db15..516f10a0ba4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.schema.MetadataTime; @@ -67,6 +69,7 @@ public static MetadataTime maxMetadataTime(MetadataTime mv1, MetadataTime mv2) { return mv1.compareTo(mv2) < 0 ? mv2 : mv1; } + @NotThreadSafe static class MillisTime extends TabletTime { private long lastTime; @@ -157,11 +160,9 @@ private LogicalTime(Long time) { @Override public void useMaxTimeFromWALog(long time) { - time++; + final long finalTime = time + 1; - if (this.nextTime.get() < time) { - this.nextTime.set(time); - } + this.nextTime.getAndUpdate(val -> Math.max(val, finalTime)); } @Override diff --git a/server/gc/pom.xml b/server/gc/pom.xml index 148e63b58e7..86fea18db88 100644 --- a/server/gc/pom.xml +++ b/server/gc/pom.xml @@ -36,6 +36,10 @@ auto-service true + + com.google.code.findbugs + jsr305 + com.google.guava guava diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index 22275c9e119..98a7e53ead4 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IsolatedScanner; @@ -91,6 +93,7 @@ /** * A single garbage collection performed on a table (Root, MD) or all User tables. */ +@NotThreadSafe public class GCRun implements GarbageCollectionEnvironment { // loggers are not static to support unique naming by level private final Logger log; diff --git a/server/manager/pom.xml b/server/manager/pom.xml index 5f1516cb2e4..89543ae7398 100644 --- a/server/manager/pom.xml +++ b/server/manager/pom.xml @@ -36,6 +36,10 @@ auto-service true + + com.google.code.findbugs + jsr305 + com.google.code.gson gson diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index e26db3c4e63..ee523485694 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -38,6 +38,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.clientImpl.bulk.Bulk; @@ -159,6 +161,7 @@ void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exce abstract long finish() throws Exception; } + @NotThreadSafe static class OnlineLoader extends Loader { private final int maxConnections; diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index f9672ed1965..5f2b0af0d35 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -130,14 +130,14 @@ public static void main(String[] args) throws Exception { } private final AtomicLong lastRecalc = new AtomicLong(0L); - private double totalIngestRate = 0.0; - private double totalQueryRate = 0.0; - private double totalScanRate = 0.0; - private long totalEntries = 0L; - private int totalTabletCount = 0; - private long totalHoldTime = 0; - private long totalLookups = 0; - private int totalTables = 0; + private volatile double totalIngestRate = 0.0; + private volatile double totalQueryRate = 0.0; + private volatile double totalScanRate = 0.0; + private volatile long totalEntries = 0L; + private volatile int totalTabletCount = 0; + private volatile long totalHoldTime = 0; + private volatile long totalLookups = 0; + private volatile int totalTables = 0; private final AtomicBoolean monitorInitialized = new AtomicBoolean(false); private static List> newMaxList() { @@ -998,10 +998,6 @@ public Optional getCoordinatorHost() { return coordinatorHost; } - public int getLivePort() { - return livePort; - } - @Override public ServiceLock getLock() { return monitorLock; diff --git a/server/tserver/pom.xml b/server/tserver/pom.xml index 40622e37a18..ec3b5de64e4 100644 --- a/server/tserver/pom.xml +++ b/server/tserver/pom.xml @@ -44,6 +44,10 @@ auto-service true + + com.google.code.findbugs + jsr305 + com.google.guava guava diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 306a4e7d270..c8182e6e1f1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -37,6 +37,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.sample.Sampler; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -531,6 +533,7 @@ public synchronized long getNumEntries() { private final Set activeIters = Collections.synchronizedSet(new HashSet<>()); + @NotThreadSafe class MemoryDataSource implements DataSource { private boolean switched = false; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java index 0b8a6816c29..0173b2b95be 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/NativeMap.java @@ -34,6 +34,8 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.ColumnUpdate; @@ -534,6 +536,7 @@ public void delete() { } } + @NotThreadSafe private static class NMSKVIter implements InterruptibleIterator { private ConcurrentIterator iter; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 748d74705bd..38f9c80d87b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -412,7 +412,7 @@ public int getOpenFiles() { public static class AssignmentWatcher implements Runnable { private static final Logger log = LoggerFactory.getLogger(AssignmentWatcher.class); - private static long longAssignments = 0; + private static volatile long longAssignments = 0; private final Map activeAssignments; private final AccumuloConfiguration conf; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 6bcfbc4b9e0..9b6169a039b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.thrift.IterInfo; @@ -57,6 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@NotThreadSafe class ScanDataSource implements DataSource { private static final Logger log = LoggerFactory.getLogger(ScanDataSource.class); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 2b89a2005b1..0f2b83d30e6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -37,6 +39,7 @@ import com.google.common.base.Preconditions; +@NotThreadSafe public class Scanner { private static final Logger log = LoggerFactory.getLogger(Scanner.class); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 90a53c734a0..4c89e1b5d92 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -226,8 +226,8 @@ enum CompactionState { private final Rate ingestByteRate = new Rate(0.95); private final Rate scannedRate = new Rate(0.95); - private long lastMinorCompactionFinishTime = 0; - private long lastMapFileImportTime = 0; + private volatile long lastMinorCompactionFinishTime = 0; + private volatile long lastMapFileImportTime = 0; private volatile long numEntries = 0; private volatile long numEntriesInMemory = 0; diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java index 743ac8feca3..aa9d8189076 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java @@ -37,6 +37,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -478,6 +480,7 @@ private void assertNoCandidates(TestFileManager fileMgr, Set t } + @NotThreadSafe static class TestFileManager extends CompactableImpl.FileManager { public static final Duration SELECTION_EXPIRATION = Duration.ofMinutes(2); diff --git a/test/pom.xml b/test/pom.xml index c58e3778697..0587a4f3ebb 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -46,6 +46,10 @@ auto-service true + + com.google.code.findbugs + jsr305 + com.google.code.gson gson diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java index d9cb2c0e8dd..13b6194c0fc 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -37,6 +39,7 @@ * Iterator used in tests *and* the test class must spawn a new MAC instance for each test since the * timesThrown variable is static. */ +@NotThreadSafe public class ErrorThrowingIterator extends WrappingIterator { public static final String TIMES = "error.throwing.iterator.times"; diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index 62e034bbef5..9256d518d42 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -36,6 +36,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; @@ -248,6 +250,7 @@ public int runTest() throws Exception { threadPool.shutdown(); } + @NotThreadSafe private abstract static class Test implements Runnable { private int count; From 5784d49b5bc546ae33add6056373eb789c7efdec Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Tue, 16 Dec 2025 16:57:43 -0500 Subject: [PATCH 2/7] NotThreadSafe to CountingInputStream --- .../org/apache/accumulo/core/util/CountingInputStream.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java index 15d41b18d70..5f6a03871ed 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/util/CountingInputStream.java @@ -18,6 +18,8 @@ import java.io.InputStream; import java.util.Objects; +import javax.annotation.concurrent.NotThreadSafe; + /** * This class was copied from Guava and modified. If this class was not final in Guava it could have * been extended. Guava has issue 590 open about this. @@ -26,6 +28,7 @@ * * @author Chris Nokleberg */ +@NotThreadSafe public final class CountingInputStream extends FilterInputStream { private long count; From 72d9e823a25df37f236bbc7d085d2d2b760377e8 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 17 Dec 2025 11:44:01 -0500 Subject: [PATCH 3/7] revert changes to DistributedReadWriteLock.ReadLock, mark not thread safe instead --- .../zookeeper/DistributedReadWriteLock.java | 67 ++++++++----------- 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index cd9cd4f676a..6ad096c9a70 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -27,7 +27,6 @@ import java.util.Map.Entry; import java.util.SortedMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -35,6 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.NotThreadSafe; + /** * A ReadWriteLock that can be implemented in ZooKeeper. Features the ability to store data with the * lock, and recover the lock using that data to find the lock. @@ -108,11 +109,12 @@ public interface QueueLock { private static final Logger log = LoggerFactory.getLogger(DistributedReadWriteLock.class); + @NotThreadSafe static class ReadLock implements Lock { QueueLock qlock; byte[] userData; - AtomicLong entry; + long entry = -1; ReadLock(QueueLock qlock, byte[] userData) { this.qlock = qlock; @@ -123,7 +125,7 @@ static class ReadLock implements Lock { ReadLock(QueueLock qlock, byte[] userData, long entry) { this.qlock = qlock; this.userData = userData; - this.entry = new AtomicLong(entry); + this.entry = entry; } protected LockType lockType() { @@ -155,28 +157,23 @@ public void lockInterruptibly() throws InterruptedException { @Override public boolean tryLock() { - var entryVal = entry.updateAndGet(val -> { - if (val == -1) { - var newVal = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); - log.info("Added lock entry {} userData {} lockType {}", newVal, - new String(this.userData, UTF_8), lockType()); - return newVal; - } else { - return val; - } - }); - SortedMap entries = qlock.getEarlierEntries(entryVal); + if (entry == -1) { + entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); + log.info("Added lock entry {} userData {} lockType {}", entry, + new String(this.userData, UTF_8), lockType()); + } + SortedMap entries = qlock.getEarlierEntries(entry); for (Entry entry : entries.entrySet()) { ParsedLock parsed = new ParsedLock(entry.getValue()); - if (entry.getKey().equals(entryVal)) { + if (entry.getKey().equals(this.entry)) { return true; } if (parsed.type == LockType.WRITE) { return false; } } - throw new IllegalStateException("Did not find our own lock in the queue: " + entryVal - + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); + throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } @Override @@ -196,14 +193,13 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { @Override public void unlock() { - entry.updateAndGet(val -> { - if (val != -1) { - log.debug("Removing lock entry {} userData {} lockType {}", val, + if (entry == -1) { + return; + } + log.debug("Removing lock entry {} userData {} lockType {}", entry, new String(this.userData, UTF_8), lockType()); - qlock.removeEntry(val); - } - return -1; - }); + qlock.removeEntry(entry); + entry = -1; } @Override @@ -229,23 +225,18 @@ protected LockType lockType() { @Override public boolean tryLock() { - var entryVal = entry.updateAndGet(val -> { - if (val == -1) { - var newVal = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); - log.info("Added lock entry {} userData {} lockType {}", newVal, - new String(this.userData, UTF_8), lockType()); - return newVal; - } else { - return val; - } - }); - SortedMap entries = qlock.getEarlierEntries(entryVal); + if (entry == -1) { + entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); + log.info("Added lock entry {} userData {} lockType {}", entry, + new String(this.userData, UTF_8), lockType()); + } + SortedMap entries = qlock.getEarlierEntries(entry); Iterator> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { - throw new IllegalStateException("Did not find our own lock in the queue: " + entryVal - + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); + throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry + + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } - return iterator.next().getKey().equals(entryVal); + return iterator.next().getKey().equals(entry); } } From 2d8ddd25713464abe64c59973ab733f76d832e02 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Wed, 17 Dec 2025 11:46:06 -0500 Subject: [PATCH 4/7] formatting --- .../fate/zookeeper/DistributedReadWriteLock.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 6ad096c9a70..484426d8094 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -30,12 +30,12 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import javax.annotation.concurrent.NotThreadSafe; + import org.apache.accumulo.core.util.UtilWaitThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.NotThreadSafe; - /** * A ReadWriteLock that can be implemented in ZooKeeper. Features the ability to store data with the * lock, and recover the lock using that data to find the lock. @@ -160,7 +160,7 @@ public boolean tryLock() { if (entry == -1) { entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); + new String(this.userData, UTF_8), lockType()); } SortedMap entries = qlock.getEarlierEntries(entry); for (Entry entry : entries.entrySet()) { @@ -173,7 +173,7 @@ public boolean tryLock() { } } throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry - + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); + + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } @Override @@ -197,7 +197,7 @@ public void unlock() { return; } log.debug("Removing lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); + new String(this.userData, UTF_8), lockType()); qlock.removeEntry(entry); entry = -1; } @@ -228,13 +228,13 @@ public boolean tryLock() { if (entry == -1) { entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); + new String(this.userData, UTF_8), lockType()); } SortedMap entries = qlock.getEarlierEntries(entry); Iterator> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry - + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); + + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); } return iterator.next().getKey().equals(entry); } From f827d1a99a1902ccd8386f65ad88a70f97392b48 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 22 Dec 2025 11:18:32 -0500 Subject: [PATCH 5/7] changes: * changed `batchTimeout = scanner.getTimeout` to `batchTimeout = scanner.getBatchTimeout` in ClientSideIteratorScanner, appeared to be calling incorrect method * address review --- .../core/client/ClientSideIteratorScanner.java | 3 +-- .../apache/accumulo/server/tablets/TabletTime.java | 11 +++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 1370eaf6678..48fb306c86c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -177,8 +177,7 @@ public ClientSideIteratorScanner(final Scanner scanner) { this.range = scanner.getRange(); this.size = scanner.getBatchSize(); this.retryTimeout = scanner.getTimeout(MILLISECONDS); - // TODO Is this intended to be getBatchTimeout()? - this.batchTimeout = scanner.getTimeout(MILLISECONDS); + this.batchTimeout = scanner.getBatchTimeout(MILLISECONDS); this.readaheadThreshold = scanner.getReadaheadThreshold(); SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration(); if (samplerConfig != null) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java index 516f10a0ba4..b83fe6bdbe5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java @@ -21,14 +21,14 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.NotThreadSafe; - import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.server.data.ServerMutation; import org.apache.accumulo.server.util.time.RelativeTime; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + public abstract class TabletTime { public abstract void useMaxTimeFromWALog(long time); @@ -69,7 +69,6 @@ public static MetadataTime maxMetadataTime(MetadataTime mv1, MetadataTime mv2) { return mv1.compareTo(mv2) < 0 ? mv2 : mv1; } - @NotThreadSafe static class MillisTime extends TabletTime { private long lastTime; @@ -90,6 +89,8 @@ public MetadataTime getMetadataTime(long time) { } @Override + @SuppressFBWarnings(value = "AT_NONATOMIC_64BIT_PRIMITIVE", + justification = "this is only called in tablet constructor, so does not need to be done atomically") public void useMaxTimeFromWALog(long time) { if (time > lastTime) { lastTime = time; @@ -160,9 +161,7 @@ private LogicalTime(Long time) { @Override public void useMaxTimeFromWALog(long time) { - final long finalTime = time + 1; - - this.nextTime.getAndUpdate(val -> Math.max(val, finalTime)); + this.nextTime.getAndUpdate(val -> Math.max(val, time + 1)); } @Override From 2b3db9091c3ea6f56d59b98f42ec9715e0c75e7a Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 22 Dec 2025 11:52:04 -0500 Subject: [PATCH 6/7] remove NotThreadSafe from Scanner, add suppression --- .../java/org/apache/accumulo/tserver/tablet/Scanner.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index cbb464201a2..adc93997101 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.concurrent.NotThreadSafe; - import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -43,7 +41,8 @@ import com.google.common.base.Preconditions; -@NotThreadSafe +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + public class Scanner { private static final Logger log = LoggerFactory.getLogger(Scanner.class); @@ -216,6 +215,8 @@ public Thread getLockOwner() { * read is in progress), it interrupts the reading thread. This ensures the reading thread can * finish its current operation and release the lock, allowing close to finish. */ + @SuppressFBWarnings(value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", + justification = "accesses non-volatile/non-atomic vars only when lock is held") public boolean close() { interruptFlag.set(true); From ce5c4ac8fab129909e4d7fa457b850102b14864c Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 22 Dec 2025 15:56:29 -0500 Subject: [PATCH 7/7] revert a change to ClientSideIteratorScanner in favor of opening a new PR --- .../apache/accumulo/core/client/ClientSideIteratorScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 48fb306c86c..0209e8deece 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -177,7 +177,7 @@ public ClientSideIteratorScanner(final Scanner scanner) { this.range = scanner.getRange(); this.size = scanner.getBatchSize(); this.retryTimeout = scanner.getTimeout(MILLISECONDS); - this.batchTimeout = scanner.getBatchTimeout(MILLISECONDS); + this.batchTimeout = scanner.getTimeout(MILLISECONDS); this.readaheadThreshold = scanner.getReadaheadThreshold(); SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration(); if (samplerConfig != null) {