diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java index 4d322ad97..99781ff16 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java @@ -31,24 +31,35 @@ class BigQueryArrowBatchWrapper { // Marks the end of the stream for the ResultSet private final boolean isLast; + private final Exception exception; + private BigQueryArrowBatchWrapper( - ArrowRecordBatch currentArrowBatch, JsonStringArrayList nestedRecords, boolean isLast) { + ArrowRecordBatch currentArrowBatch, + JsonStringArrayList nestedRecords, + boolean isLast, + Exception exception) { this.currentArrowBatch = currentArrowBatch; this.nestedRecords = nestedRecords; this.isLast = isLast; + this.exception = exception; } static BigQueryArrowBatchWrapper of(ArrowRecordBatch currentArrowBatch, boolean... isLast) { LOG.finest("++enter++"); boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryArrowBatchWrapper(currentArrowBatch, null, isLastFlag); + return new BigQueryArrowBatchWrapper(currentArrowBatch, null, isLastFlag, null); } static BigQueryArrowBatchWrapper getNestedFieldValueListWrapper( JsonStringArrayList nestedRecords, boolean... isLast) { LOG.finest("++enter++"); boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryArrowBatchWrapper(null, nestedRecords, isLastFlag); + return new BigQueryArrowBatchWrapper(null, nestedRecords, isLastFlag, null); + } + + static BigQueryArrowBatchWrapper ofError(Exception exception) { + LOG.finest("++enter++"); + return new BigQueryArrowBatchWrapper(null, null, true, exception); } ArrowRecordBatch getCurrentArrowBatch() { @@ -65,4 +76,8 @@ boolean isLast() { LOG.finest("++enter++"); return this.isLast; } + + Exception getException() { + return this.exception; + } } diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index aeea6783d..1d7d89e3f 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.exception.BigQueryJdbcException; +import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; import com.google.cloud.bigquery.storage.v1.ArrowSchema; import java.io.IOException; @@ -236,6 +237,9 @@ public boolean next() throws SQLException { /* Start of iteration or we have exhausted the current batch */ // Advance the cursor. Potentially blocking operation. BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); + if (batchWrapper.getException() != null) { + throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); + } if (batchWrapper.isLast()) { /* Marks the end of the records */ if (this.vectorSchemaRoot != null) { diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java index 9d8b1b2f7..39740e021 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java @@ -39,28 +39,36 @@ class BigQueryFieldValueListWrapper { // This flag marks the end of the stream for the ResultSet private boolean isLast = false; + private final Exception exception; static BigQueryFieldValueListWrapper of( FieldList fieldList, FieldValueList fieldValueList, boolean... isLast) { boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryFieldValueListWrapper(fieldList, fieldValueList, null, isLastFlag); + return new BigQueryFieldValueListWrapper(fieldList, fieldValueList, null, isLastFlag, null); } static BigQueryFieldValueListWrapper getNestedFieldValueListWrapper( FieldList fieldList, List arrayFieldValueList, boolean... isLast) { boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryFieldValueListWrapper(fieldList, null, arrayFieldValueList, isLastFlag); + return new BigQueryFieldValueListWrapper( + fieldList, null, arrayFieldValueList, isLastFlag, null); + } + + static BigQueryFieldValueListWrapper ofError(Exception exception) { + return new BigQueryFieldValueListWrapper(null, null, null, true, exception); } private BigQueryFieldValueListWrapper( FieldList fieldList, FieldValueList fieldValueList, List arrayFieldValueList, - boolean isLast) { + boolean isLast, + Exception exception) { this.fieldList = fieldList; this.fieldValueList = fieldValueList; this.arrayFieldValueList = arrayFieldValueList; this.isLast = isLast; + this.exception = exception; } public FieldList getFieldList() { @@ -78,4 +86,8 @@ public List getArrayFieldValueList() { public boolean isLast() { return this.isLast; } + + public Exception getException() { + return this.exception; + } } diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index 3773b99aa..da2ade028 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -159,6 +159,9 @@ public boolean next() throws SQLException { try { // Advance the cursor,Potentially blocking operation this.cursor = this.buffer.take(); + if (this.cursor.getException() != null) { + throw new BigQueryJdbcRuntimeException(this.cursor.getException()); + } this.rowCnt++; // Check for end of stream if (this.cursor.isLast()) { diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 3fb2c2d58..7039628d5 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -94,6 +94,8 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; private static final String DEFAULT_TABLE_NAME = "temp_table_"; private static final String JDBC_JOB_PREFIX = "google-jdbc-"; + private static final int MAX_RETRY_COUNT = 5; + private static final long RETRY_DELAY_MS = 2000L; protected ResultSet currentResultSet; protected long currentUpdateCount = -1; protected List jobIds = new ArrayList<>(); @@ -808,30 +810,77 @@ Thread populateArrowBufferedQueue( Runnable arrowStreamProcessor = () -> { + long rowsRead = 0; + int retryCount = 0; try { // Use the first stream to perform reading. + if (readSession.getStreamsCount() == 0) { + return; + } String streamName = readSession.getStreams(0).getName(); - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder().setReadStream(streamName).build(); - // Process each block of rows as they arrive and decode using our simple row reader. - com.google.api.gax.rpc.ServerStream stream = - bqReadClient.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown()) { // do not process and shutdown + while (true) { + try { + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder() + .setReadStream(streamName) + .setOffset(rowsRead) + .build(); + + // Process each block of rows as they arrive and decode using our simple row reader. + com.google.api.gax.rpc.ServerStream stream = + bqReadClient.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + break; + } + + ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); + arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch)); + rowsRead += response.getRowCount(); + } break; + } catch (com.google.api.gax.rpc.ApiException e) { + if (e.getStatusCode().getCode() + == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + LOG.warning("Read session expired or not found: %s", e.getMessage()); + break; + } + if (retryCount >= MAX_RETRY_COUNT) { + LOG.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor, max retries exceeded", + e); + try { + arrowBatchWrapperBlockingQueue.put( + BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + break; + } + retryCount++; + LOG.info( + "Connection interrupted during arrow stream read, retrying. attempt: %d", + retryCount); + Thread.sleep(RETRY_DELAY_MS); } - - ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); - arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch)); } - } catch (RuntimeException | InterruptedException e) { + } catch (InterruptedException e) { LOG.log( Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", e); + try { + arrowBatchWrapperBlockingQueue.put( + BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + Thread.currentThread().interrupt(); } finally { // logic needed for graceful shutdown // marking end of stream try { @@ -916,18 +965,21 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) { if (jobId != null) { // Thread to make rpc calls to fetch data from the server Thread nextPageWorker = - runNextPageTaskAsync(results, results.getNextPageToken(), jobId, rpcResponseQueue); + runNextPageTaskAsync( + results, + results.getNextPageToken(), + jobId, + rpcResponseQueue, + this.bigQueryFieldValueListWrapperBlockingQueue); threadList.add(nextPageWorker); } else { try { populateFirstPage(results, rpcResponseQueue); rpcResponseQueue.put(Tuple.of(null, false)); } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ processJsonQueryResponseResults"); + LOG.warning( + "\n%s Interrupted @ processJsonQueryResponseResults: %s", + Thread.currentThread().getName(), e.getMessage()); } } @@ -964,9 +1016,9 @@ void populateFirstPage( // this is the first page which we have received. rpcResponseQueue.put(Tuple.of(result, true)); } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ populateFirstPage"); + LOG.warning( + "\n%s Interrupted @ populateFirstPage: %s", + Thread.currentThread().getName(), e.getMessage()); } } @@ -983,7 +1035,8 @@ Thread runNextPageTaskAsync( TableResult result, String firstPageToken, JobId jobId, - BlockingQueue> rpcResponseQueue) { + BlockingQueue> rpcResponseQueue, + BlockingQueue bigQueryFieldValueListWrapperBlockingQueue) { LOG.finest("++enter++"); // parse and put the first page in the pageCache before the other pages are parsed from the RPC // calls @@ -992,45 +1045,76 @@ Thread runNextPageTaskAsync( // This thread makes the RPC calls and paginates Runnable nextPageTask = () -> { - // results.getPageToken(); - String pageToken = firstPageToken; + int retryCount = 0; + String currentPageToken = firstPageToken; + TableResult currentResults = result; TableId destinationTable = null; if (firstPageToken != null) { destinationTable = getDestinationTable(jobId); } + try { - // paginate for non null token - while (pageToken != null) { + while (currentPageToken != null) { // do not process further pages and shutdown if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - LOG.log( - Level.WARNING, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ runNextPageTaskAsync"); + LOG.warning( + "\n%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); break; } - long startTime = System.nanoTime(); - TableResult results = - this.bigQuery.listTableData( - destinationTable, - TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), - TableDataListOption.pageToken(pageToken)); - - pageToken = results.getNextPageToken(); - // this will be parsed asynchronously without blocking the current - // thread - rpcResponseQueue.put(Tuple.of(results, true)); - LOG.fine( - "Fetched %d results from the server in %d ms.", - querySettings.getMaxResultPerPage(), - (int) ((System.nanoTime() - startTime) / 1000000)); + + try { + long startTime = System.nanoTime(); + currentResults = + this.bigQuery.listTableData( + destinationTable, + TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), + TableDataListOption.pageToken(currentPageToken)); + + currentPageToken = currentResults.getNextPageToken(); + // this will be parsed asynchronously without blocking the current + // thread + rpcResponseQueue.put(Tuple.of(currentResults, true)); + LOG.fine( + "Fetched %d results from the server in %d ms.", + querySettings.getMaxResultPerPage(), + (int) ((System.nanoTime() - startTime) / 1000000)); + } catch (com.google.cloud.bigquery.BigQueryException ex) { + if (ex.getCode() == 404) { + throw ex; + } + if (retryCount >= MAX_RETRY_COUNT) { + throw new BigQueryJdbcRuntimeException(ex); // Re-throw max retries exceeded + } + retryCount++; + LOG.info( + "Connection interrupted during json stream read, retrying. attempt: %d", + retryCount); + Thread.sleep(RETRY_DELAY_MS); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new BigQueryJdbcRuntimeException(ex); + } catch (Exception ex) { + throw new BigQueryJdbcRuntimeException(ex); + } } - // this will stop the parseDataTask as well when the pagination - // completes - rpcResponseQueue.put(Tuple.of(null, false)); } catch (Exception ex) { - throw new BigQueryJdbcRuntimeException(ex); + try { + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } finally { + try { + // this will stop the parseDataTask as well when the pagination + // completes + rpcResponseQueue.put(Tuple.of(null, false)); + } catch (InterruptedException ie) { + LOG.warning( + "\n%s Interrupted sending end-of-stream sentinel @ runNextPageTaskAsync", + Thread.currentThread().getName()); + Thread.currentThread().interrupt(); + } } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not // have finished processing the records and even that will be interrupted @@ -1054,61 +1138,75 @@ Thread parseAndPopulateRpcDataAsync( Runnable populateBufferRunnable = () -> { // producer thread populating the buffer - Iterable fieldValueLists; - // as we have to process the first page - boolean hasRows = true; - while (hasRows) { - try { - Tuple nextPageTuple = rpcResponseQueue.take(); - if (nextPageTuple.x() != null) { - fieldValueLists = nextPageTuple.x().getValues(); - } else { - fieldValueLists = null; - } - hasRows = nextPageTuple.y(); + try { + Iterable fieldValueLists; + // as we have to process the first page + boolean hasRows = true; + while (hasRows) { + try { + Tuple nextPageTuple = rpcResponseQueue.take(); + if (nextPageTuple.x() != null) { + fieldValueLists = nextPageTuple.x().getValues(); + } else { + fieldValueLists = null; + } + hasRows = nextPageTuple.y(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - // Thread might get interrupted while calling the Cancel method, which is - // expected, so logging this instead of throwing the exception back - break; - } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + // Thread might get interrupted while calling the Cancel method, which is + // expected, so logging this instead of throwing the exception back + break; + } - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() - || fieldValueLists == null) { - // do not process further pages and shutdown (outerloop) - break; - } + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() + || fieldValueLists == null) { + // do not process further pages and shutdown (outerloop) + break; + } - long startTime = System.nanoTime(); - long results = 0; - for (FieldValueList fieldValueList : fieldValueLists) { - try { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - // do not process further pages and shutdown (inner loop) - break; + long startTime = System.nanoTime(); + long results = 0; + for (FieldValueList fieldValueList : fieldValueLists) { + try { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + // do not process further pages and shutdown (inner loop) + break; + } + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); + results += 1; + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); } - bigQueryFieldValueListWrapperBlockingQueue.put( - BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); - results += 1; - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); } + LOG.fine( + "Processed %d results in %d ms.", + results, (int) ((System.nanoTime() - startTime) / 1000000)); } - LOG.fine( - "Processed %d results in %d ms.", - results, (int) ((System.nanoTime() - startTime) / 1000000)); - } - try { - // All the pages has been processed, put this marker - bigQueryFieldValueListWrapperBlockingQueue.put( - BigQueryFieldValueListWrapper.of(null, null, true)); - } catch (InterruptedException e) { + } catch (Exception ex) { LOG.log( Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", - e); + "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", + ex); + try { + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } finally { + try { + // All the pages has been processed, put this marker + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.of(null, null, true)); + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", + e); + } } };