Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@

package org.apache.iotdb.db.it;

import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -35,6 +43,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Locale;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -184,4 +194,76 @@ public void testFlushNotExistGroupNoData() {
fail(e.getMessage());
}
}

@Test
public void testStreamingQueryMemTableWithOverlappedData()
throws IoTDBConnectionException, StatementExecutionException {
String device = "root.stream1.d1";
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
session.open();
generateTimeRangeWithTimestamp(session, device, 1, 10);

generateTimeRangeWithTimestamp(session, device, 500000, 510000);
session.executeNonQueryStatement("flush");
generateTimeRangeWithTimestamp(session, device, 100000, 350000);

SessionDataSet sessionDataSet =
session.executeQueryStatement("select count(*) from root.stream1.d1");
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
long count = 0;
while (iterator.next()) {
count = iterator.getLong(1);
}
Assert.assertEquals(10 + 10001 + 250001, count);
}
}

@Test
public void testStreamingQueryMemTableWithOverlappedData2()
throws IoTDBConnectionException, StatementExecutionException {
String device = "root.stream2.d1";
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
session.open();
generateTimeRangeWithTimestamp(session, device, 1, 10);

generateTimeRangeWithTimestamp(session, device, 500000, 510000);
session.executeNonQueryStatement("flush");
generateTimeRangeWithTimestamp(session, device, 1, 20);
generateTimeRangeWithTimestamp(session, device, 100000, 210000);
session.executeNonQueryStatement("flush");

generateTimeRangeWithTimestamp(session, device, 150000, 450000);

SessionDataSet sessionDataSet =
session.executeQueryStatement("select count(*) from root.stream2.d1");
SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
long count = 0;
while (iterator.next()) {
count = iterator.getLong(1);
}
Assert.assertEquals(20 + 10001 + 350001, count);
}
}

private static void generateTimeRangeWithTimestamp(
ISession session, String device, long start, long end)
throws IoTDBConnectionException, StatementExecutionException {
List<MeasurementSchema> measurementSchemas =
Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64));
Tablet tablet = new Tablet(device, measurementSchemas);
for (long currentTime = start; currentTime <= end; currentTime++) {
int rowIndex = tablet.rowSize;
if (rowIndex == tablet.getMaxRowNumber()) {
session.insertTablet(tablet);
tablet.reset();
rowIndex = 0;
}
tablet.addTimestamp(rowIndex, currentTime);
tablet.addValue("s1", 0, currentTime);
tablet.rowSize++;
}
if (tablet.rowSize > 0) {
session.insertTablet(tablet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -869,15 +869,28 @@ private boolean hasNextOverlappedPage() throws IOException {
return true;
}

tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
// init the merge reader for current call
// The original process is changed to lazy loading because different mem page readers
// belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is
// necessary to ensure that these mem page readers cannot coexist in the mergeReader at the
// same time.
// The initial endPointTime is calculated as follows:
// 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped
// unseq pages and take the end point.
// 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping
// unseq pages and take the end point.
long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();

while (true) {

// may has overlapped data
if (mergeReader.hasNextTimeValuePair()) {

TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList());
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
long currentPageEndPointTime =
orderUtils.getAscending()
? Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime)
: Math.min(mergeReader.getCurrentReadStopTime(), initialEndPointTime);
while (mergeReader.hasNextTimeValuePair()) {

/*
Expand Down Expand Up @@ -907,7 +920,7 @@ private boolean hasNextOverlappedPage() throws IOException {
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
timeValuePair.getTimestamp(), false);
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
unpackAllOverlappedUnseqPageReadersToMergeReader();

// update if there are unpacked unSeqPageReaders
timeValuePair = mergeReader.currentTimeValuePair();
Expand Down Expand Up @@ -996,33 +1009,71 @@ private long updateEndPointTime(long currentPageEndPointTime, IVersionPageReader
}
}

private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
do {
/*
* no cached page readers
*/
if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
return mergeReader.getCurrentReadStopTime();
}

/*
* no cached page readers
*/
if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
return;
}
/*
* init firstPageReader
*/
if (firstPageReader == null) {
initFirstPageReader();
}
if (!mergeReader.hasNextTimeValuePair()) {
putPageReaderToMergeReader(firstPageReader);
firstPageReader = null;
}
} while (!mergeReader.hasNextTimeValuePair());

/*
* init firstPageReader
* put all currently directly overlapped unseq page reader to merge reader
*/
if (firstPageReader == null) {
initFirstPageReader();
}
long mergeReaderStopTime = mergeReader.getCurrentReadStopTime();
unpackAllOverlappedUnseqPageReadersToMergeReader();

long currentPageEndpointTime;
if (mergeReader.hasNextTimeValuePair()) {
currentPageEndpointTime = mergeReader.getCurrentReadStopTime();
} else {
currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
}
return calculateInitialEndPointTime(mergeReaderStopTime);
}

/*
* put all currently directly overlapped unseq page reader to merge reader
*/
unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
private long calculateInitialEndPointTime(final long currentReadStopTime) {
long initialReadStopTime = currentReadStopTime;
if (firstPageReader != null
&& !firstPageReader.isSeq()
&& orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) {
if (orderUtils.getAscending()) {
initialReadStopTime =
Math.max(
initialReadStopTime,
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
} else {
initialReadStopTime =
Math.min(
initialReadStopTime,
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
}
}
for (IVersionPageReader unSeqPageReader : unSeqPageReaders) {
if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) {
if (orderUtils.getAscending()) {
initialReadStopTime =
Math.max(
initialReadStopTime,
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
} else {
initialReadStopTime =
Math.min(
initialReadStopTime,
orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics()));
}
} else {
break;
}
}
return initialReadStopTime;
}

private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) {
Expand Down Expand Up @@ -1113,17 +1164,26 @@ private IVersionPageReader getFirstPageReaderFromCachedReaders() {
return firstPageReader;
}

private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
throws IOException {
while (!unSeqPageReaders.isEmpty()
&& orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().getStatistics())) {
putPageReaderToMergeReader(unSeqPageReaders.poll());
}
// This process loads overlapped unseq pages based on the current time value pair of the
// mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq
// page is added.
// The current time obtained from mergeReader each time is not necessarily the minimum among all
// the actual unseq data, so it is necessary to repeatedly calculate and include potentially
// overlapping unseq pages.
private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws IOException {
long actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
if (firstPageReader != null
&& !firstPageReader.isSeq()
&& orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
&& orderUtils.isOverlapped(actualFirstTimeOfMergeReader, firstPageReader.getStatistics())) {
putPageReaderToMergeReader(firstPageReader);
firstPageReader = null;
actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
}
while (!unSeqPageReaders.isEmpty()
&& orderUtils.isOverlapped(
actualFirstTimeOfMergeReader, unSeqPageReaders.peek().getStatistics())) {
putPageReaderToMergeReader(unSeqPageReaders.poll());
actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp();
}
}

Expand Down
Loading