From b698a9aab21ecde3d01bed5b855fcdf042960abe Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Wed, 11 Feb 2026 02:15:30 +0000 Subject: [PATCH 1/9] feat(jdbc): add retry mechanism for Arrow and JSON result streams --- .../bigquery/jdbc/BigQueryStatement.java | 188 +++++++++++++----- 1 file changed, 134 insertions(+), 54 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 3fb2c2d58..b042cc7eb 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -75,7 +75,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadFactory; -import java.util.logging.Level; /** * An implementation of {@link java.sql.Statement} for executing BigQuery SQL statement and @@ -808,40 +807,88 @@ Thread populateArrowBufferedQueue( Runnable arrowStreamProcessor = () -> { + long rowsRead = 0; + int retryCount = 0; + final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds(); + long startTime = System.currentTimeMillis(); try { // Use the first stream to perform reading. + if (readSession.getStreamsCount() == 0) { + return; + } String streamName = readSession.getStreams(0).getName(); - ReadRowsRequest readRowsRequest = - ReadRowsRequest.newBuilder().setReadStream(streamName).build(); - - // Process each block of rows as they arrive and decode using our simple row reader. - com.google.api.gax.rpc.ServerStream stream = - bqReadClient.readRowsCallable().call(readRowsRequest); - for (ReadRowsResponse response : stream) { - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown()) { // do not process and shutdown + + while (true) { + try { + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder() + .setReadStream(streamName) + .setOffset(rowsRead) + .build(); + + com.google.api.gax.rpc.ServerStream stream = + bqReadClient.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + break; + } + + ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); + arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch)); + rowsRead += response.getRowCount(); + startTime = System.currentTimeMillis(); + } break; + } catch (RuntimeException e) { + if (e instanceof com.google.api.gax.rpc.ApiException + && ((com.google.api.gax.rpc.ApiException) e).getStatusCode().getCode() + == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + LOG.warning("Read session expired or not found" + ": %s", e.getMessage()); + break; + } + long elapsedSecs = (System.currentTimeMillis() - startTime) / 1000; + if (elapsedSecs >= retryTimeoutInSecs) { + LOG.warning( + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor, timeout exceeded: %s", + e.getMessage()); + break; + } + retryCount++; + LOG.info( + "Connection interrupted during arrow stream read, retrying. attempt: " + + retryCount); + try { + Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); + } catch (InterruptedException ie) { + LOG.warning( + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor waiting for retry" + + ": %s", + ie.getMessage()); + break; + } } - - ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); - arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch)); } - } catch (RuntimeException | InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", - e); + } catch (InterruptedException e) { + LOG.warning( + "\n" + + Thread.currentThread().getName() + + " Interrupted @ arrowStreamProcessor" + + ": %s", + e.getMessage()); } finally { // logic needed for graceful shutdown // marking end of stream try { arrowBatchWrapperBlockingQueue.put( BigQueryArrowBatchWrapper.of(null, true)); // mark the end of the stream } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ markLast", - e); + LOG.warning( + "\n" + Thread.currentThread().getName() + " Interrupted @ markLast" + ": %s", + e.getMessage()); } } }; @@ -923,11 +970,11 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) { populateFirstPage(results, rpcResponseQueue); rpcResponseQueue.put(Tuple.of(null, false)); } catch (InterruptedException e) { - LOG.log( - Level.WARNING, + LOG.warning( "\n" + Thread.currentThread().getName() - + " Interrupted @ processJsonQueryResponseResults"); + + " Interrupted @ processJsonQueryResponseResults: %s", + e.getMessage()); } } @@ -964,9 +1011,7 @@ void populateFirstPage( // this is the first page which we have received. rpcResponseQueue.put(Tuple.of(result, true)); } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ populateFirstPage"); + LOG.warning("\n" + Thread.currentThread().getName() + " Interrupted @ populateFirstPage"); } } @@ -992,44 +1037,75 @@ Thread runNextPageTaskAsync( // This thread makes the RPC calls and paginates Runnable nextPageTask = () -> { - // results.getPageToken(); - String pageToken = firstPageToken; + long startTimeLoop = System.currentTimeMillis(); + final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds(); + int retryCount = 0; + String currentPageToken = firstPageToken; + TableResult currentResults = result; TableId destinationTable = null; if (firstPageToken != null) { destinationTable = getDestinationTable(jobId); } + try { - // paginate for non null token - while (pageToken != null) { + while (currentPageToken != null) { // do not process further pages and shutdown if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - LOG.log( - Level.WARNING, + LOG.warning( "\n" + Thread.currentThread().getName() + " Interrupted @ runNextPageTaskAsync"); break; } - long startTime = System.nanoTime(); - TableResult results = - this.bigQuery.listTableData( - destinationTable, - TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), - TableDataListOption.pageToken(pageToken)); - - pageToken = results.getNextPageToken(); - // this will be parsed asynchronously without blocking the current - // thread - rpcResponseQueue.put(Tuple.of(results, true)); - LOG.fine( - "Fetched %d results from the server in %d ms.", - querySettings.getMaxResultPerPage(), - (int) ((System.nanoTime() - startTime) / 1000000)); + + try { + long startTime = System.nanoTime(); + currentResults = + this.bigQuery.listTableData( + destinationTable, + TableDataListOption.pageSize(querySettings.getMaxResultPerPage()), + TableDataListOption.pageToken(currentPageToken)); + + currentPageToken = currentResults.getNextPageToken(); + // this will be parsed asynchronously without blocking the current + // thread + rpcResponseQueue.put(Tuple.of(currentResults, true)); + startTimeLoop = System.currentTimeMillis(); + LOG.fine( + "Fetched %d results from the server in %d ms.", + querySettings.getMaxResultPerPage(), + (int) ((System.nanoTime() - startTime) / 1000000)); + } catch (Exception ex) { + if (ex instanceof com.google.cloud.BaseServiceException + && ((com.google.cloud.BaseServiceException) ex).getCode() == 404) { + throw ex; + } + long elapsedSecs = (System.currentTimeMillis() - startTimeLoop) / 1000; + if (elapsedSecs >= retryTimeoutInSecs || ex instanceof InterruptedException) { + throw ex; + } + retryCount++; + LOG.info( + "Connection interrupted during json stream read, retrying. attempt: " + + retryCount); + try { + Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BigQueryJdbcRuntimeException(ie); + } + } } // this will stop the parseDataTask as well when the pagination // completes rpcResponseQueue.put(Tuple.of(null, false)); } catch (Exception ex) { + LOG.warning( + "\n" + + Thread.currentThread().getName() + + " Interrupted @ runNextPageTaskAsync" + + ": %s", + ex.getMessage()); throw new BigQueryJdbcRuntimeException(ex); } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not @@ -1068,7 +1144,9 @@ Thread parseAndPopulateRpcDataAsync( hasRows = nextPageTuple.y(); } catch (InterruptedException e) { - LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + LOG.warning( + "\n" + Thread.currentThread().getName() + " Interrupted" + ": %s", + e.getMessage()); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back break; @@ -1105,10 +1183,12 @@ Thread parseAndPopulateRpcDataAsync( bigQueryFieldValueListWrapperBlockingQueue.put( BigQueryFieldValueListWrapper.of(null, null, true)); } catch (InterruptedException e) { - LOG.log( - Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", - e); + LOG.warning( + "\n" + + Thread.currentThread().getName() + + " Interrupted @ populateBufferAsync" + + ": %s", + e.getMessage()); } }; From 878b6e6c5b4636b82d52a8edac68f17a9bd02ee2 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Wed, 11 Feb 2026 03:55:51 +0000 Subject: [PATCH 2/9] chore: fix logging and address gemini review comments --- .../bigquery/jdbc/BigQueryStatement.java | 74 +++++++++---------- 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index b042cc7eb..cbabdfb5e 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -75,6 +75,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadFactory; +import java.util.logging.Level; /** * An implementation of {@link java.sql.Statement} for executing BigQuery SQL statement and @@ -826,6 +827,7 @@ Thread populateArrowBufferedQueue( .setOffset(rowsRead) .build(); + // Process each block of rows as they arrive and decode using our simple row reader. com.google.api.gax.rpc.ServerStream stream = bqReadClient.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { @@ -848,47 +850,47 @@ Thread populateArrowBufferedQueue( } long elapsedSecs = (System.currentTimeMillis() - startTime) / 1000; if (elapsedSecs >= retryTimeoutInSecs) { - LOG.warning( + LOG.log( + Level.WARNING, "\n" + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor, timeout exceeded: %s", - e.getMessage()); + + " Interrupted @ arrowStreamProcessor, timeout exceeded", + e); break; } retryCount++; LOG.info( - "Connection interrupted during arrow stream read, retrying. attempt: " - + retryCount); + "Connection interrupted during arrow stream read, retrying. attempt: %d", + retryCount); try { Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); } catch (InterruptedException ie) { - LOG.warning( + LOG.log( + Level.WARNING, "\n" + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor waiting for retry" - + ": %s", - ie.getMessage()); + + " Interrupted @ arrowStreamProcessor waiting for retry", + ie); break; } } } } catch (InterruptedException e) { - LOG.warning( - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor" - + ": %s", - e.getMessage()); + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", + e); } finally { // logic needed for graceful shutdown // marking end of stream try { arrowBatchWrapperBlockingQueue.put( BigQueryArrowBatchWrapper.of(null, true)); // mark the end of the stream } catch (InterruptedException e) { - LOG.warning( - "\n" + Thread.currentThread().getName() + " Interrupted @ markLast" + ": %s", - e.getMessage()); + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ markLast", + e); } } }; @@ -971,10 +973,8 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) { rpcResponseQueue.put(Tuple.of(null, false)); } catch (InterruptedException e) { LOG.warning( - "\n" - + Thread.currentThread().getName() - + " Interrupted @ processJsonQueryResponseResults: %s", - e.getMessage()); + "\n%s Interrupted @ processJsonQueryResponseResults: %s", + Thread.currentThread().getName(), e.getMessage()); } } @@ -1011,7 +1011,9 @@ void populateFirstPage( // this is the first page which we have received. rpcResponseQueue.put(Tuple.of(result, true)); } catch (InterruptedException e) { - LOG.warning("\n" + Thread.currentThread().getName() + " Interrupted @ populateFirstPage"); + LOG.warning( + "\n%s Interrupted @ populateFirstPage: %s", + Thread.currentThread().getName(), e.getMessage()); } } @@ -1052,9 +1054,7 @@ Thread runNextPageTaskAsync( // do not process further pages and shutdown if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { LOG.warning( - "\n" - + Thread.currentThread().getName() - + " Interrupted @ runNextPageTaskAsync"); + "\n%s Interrupted @ runNextPageTaskAsync", Thread.currentThread().getName()); break; } @@ -1081,13 +1081,15 @@ Thread runNextPageTaskAsync( throw ex; } long elapsedSecs = (System.currentTimeMillis() - startTimeLoop) / 1000; - if (elapsedSecs >= retryTimeoutInSecs || ex instanceof InterruptedException) { + if (elapsedSecs >= retryTimeoutInSecs + || ex instanceof InterruptedException + || ex.getCause() instanceof InterruptedException) { throw ex; } retryCount++; LOG.info( - "Connection interrupted during json stream read, retrying. attempt: " - + retryCount); + "Connection interrupted during json stream read, retrying. attempt: %d", + retryCount); try { Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); } catch (InterruptedException ie) { @@ -1144,9 +1146,7 @@ Thread parseAndPopulateRpcDataAsync( hasRows = nextPageTuple.y(); } catch (InterruptedException e) { - LOG.warning( - "\n" + Thread.currentThread().getName() + " Interrupted" + ": %s", - e.getMessage()); + LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back break; @@ -1183,12 +1183,10 @@ Thread parseAndPopulateRpcDataAsync( bigQueryFieldValueListWrapperBlockingQueue.put( BigQueryFieldValueListWrapper.of(null, null, true)); } catch (InterruptedException e) { - LOG.warning( - "\n" - + Thread.currentThread().getName() - + " Interrupted @ populateBufferAsync" - + ": %s", - e.getMessage()); + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", + e); } }; From 1d1a5c5d26fb3ff388cafcd05f7e587182df96f1 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Wed, 11 Feb 2026 04:01:18 +0000 Subject: [PATCH 3/9] chore: remove extra log --- .../com/google/cloud/bigquery/jdbc/BigQueryStatement.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index cbabdfb5e..28c3f1699 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -1102,12 +1102,6 @@ Thread runNextPageTaskAsync( // completes rpcResponseQueue.put(Tuple.of(null, false)); } catch (Exception ex) { - LOG.warning( - "\n" - + Thread.currentThread().getName() - + " Interrupted @ runNextPageTaskAsync" - + ": %s", - ex.getMessage()); throw new BigQueryJdbcRuntimeException(ex); } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not From 3f357c2c0a1546214c8b0dc5df211d2fa737133a Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Wed, 11 Feb 2026 04:23:07 +0000 Subject: [PATCH 4/9] fix: retry for specific exceptions --- .../bigquery/jdbc/BigQueryStatement.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 28c3f1699..b38f8b5f0 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -841,10 +841,9 @@ Thread populateArrowBufferedQueue( startTime = System.currentTimeMillis(); } break; - } catch (RuntimeException e) { - if (e instanceof com.google.api.gax.rpc.ApiException - && ((com.google.api.gax.rpc.ApiException) e).getStatusCode().getCode() - == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { + } catch (com.google.api.gax.rpc.ApiException e) { + if (e.getStatusCode().getCode() + == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { LOG.warning("Read session expired or not found" + ": %s", e.getMessage()); break; } @@ -1075,15 +1074,12 @@ Thread runNextPageTaskAsync( "Fetched %d results from the server in %d ms.", querySettings.getMaxResultPerPage(), (int) ((System.nanoTime() - startTime) / 1000000)); - } catch (Exception ex) { - if (ex instanceof com.google.cloud.BaseServiceException - && ((com.google.cloud.BaseServiceException) ex).getCode() == 404) { + } catch (com.google.cloud.bigquery.BigQueryException ex) { + if (ex.getCode() == 404) { throw ex; } long elapsedSecs = (System.currentTimeMillis() - startTimeLoop) / 1000; - if (elapsedSecs >= retryTimeoutInSecs - || ex instanceof InterruptedException - || ex.getCause() instanceof InterruptedException) { + if (elapsedSecs >= retryTimeoutInSecs) { throw ex; } retryCount++; @@ -1096,6 +1092,9 @@ Thread runNextPageTaskAsync( Thread.currentThread().interrupt(); throw new BigQueryJdbcRuntimeException(ie); } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new BigQueryJdbcRuntimeException(ie); } } // this will stop the parseDataTask as well when the pagination From 9e21b4e22d28cf5e3d5c2ef19ae16f84aa31e53a Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Wed, 11 Feb 2026 04:33:16 +0000 Subject: [PATCH 5/9] chore: address gemini feedback --- .../google/cloud/bigquery/jdbc/BigQueryStatement.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index b38f8b5f0..a47b721d1 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -844,7 +844,7 @@ Thread populateArrowBufferedQueue( } catch (com.google.api.gax.rpc.ApiException e) { if (e.getStatusCode().getCode() == com.google.api.gax.rpc.StatusCode.Code.NOT_FOUND) { - LOG.warning("Read session expired or not found" + ": %s", e.getMessage()); + LOG.warning("Read session expired or not found: %s", e.getMessage()); break; } long elapsedSecs = (System.currentTimeMillis() - startTime) / 1000; @@ -864,13 +864,8 @@ Thread populateArrowBufferedQueue( try { Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); } catch (InterruptedException ie) { - LOG.log( - Level.WARNING, - "\n" - + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor waiting for retry", - ie); - break; + Thread.currentThread().interrupt(); + throw new BigQueryJdbcRuntimeException(ie); } } } From 5fd9dba3e239cc5689e7f621c1a6c560e768e7ec Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Thu, 12 Feb 2026 14:59:06 +0000 Subject: [PATCH 6/9] chore: address pr review feedback --- .../bigquery/jdbc/BigQueryStatement.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index a47b721d1..01777fba1 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -811,7 +811,7 @@ Thread populateArrowBufferedQueue( long rowsRead = 0; int retryCount = 0; final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds(); - long startTime = System.currentTimeMillis(); + long startTime = System.nanoTime(); try { // Use the first stream to perform reading. if (readSession.getStreamsCount() == 0) { @@ -838,7 +838,7 @@ Thread populateArrowBufferedQueue( ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch)); rowsRead += response.getRowCount(); - startTime = System.currentTimeMillis(); + startTime = System.nanoTime(); } break; } catch (com.google.api.gax.rpc.ApiException e) { @@ -847,7 +847,7 @@ Thread populateArrowBufferedQueue( LOG.warning("Read session expired or not found: %s", e.getMessage()); break; } - long elapsedSecs = (System.currentTimeMillis() - startTime) / 1000; + long elapsedSecs = (System.nanoTime() - startTime) / 1_000_000_000L; if (elapsedSecs >= retryTimeoutInSecs) { LOG.log( Level.WARNING, @@ -861,12 +861,7 @@ Thread populateArrowBufferedQueue( LOG.info( "Connection interrupted during arrow stream read, retrying. attempt: %d", retryCount); - try { - Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new BigQueryJdbcRuntimeException(ie); - } + Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); } } @@ -875,6 +870,8 @@ Thread populateArrowBufferedQueue( Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", e); + Thread.currentThread().interrupt(); + throw new BigQueryJdbcRuntimeException(e); } finally { // logic needed for graceful shutdown // marking end of stream try { @@ -1033,7 +1030,7 @@ Thread runNextPageTaskAsync( // This thread makes the RPC calls and paginates Runnable nextPageTask = () -> { - long startTimeLoop = System.currentTimeMillis(); + long startTimeLoop = System.nanoTime(); final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds(); int retryCount = 0; String currentPageToken = firstPageToken; @@ -1064,7 +1061,7 @@ Thread runNextPageTaskAsync( // this will be parsed asynchronously without blocking the current // thread rpcResponseQueue.put(Tuple.of(currentResults, true)); - startTimeLoop = System.currentTimeMillis(); + startTimeLoop = System.nanoTime(); LOG.fine( "Fetched %d results from the server in %d ms.", querySettings.getMaxResultPerPage(), @@ -1073,7 +1070,7 @@ Thread runNextPageTaskAsync( if (ex.getCode() == 404) { throw ex; } - long elapsedSecs = (System.currentTimeMillis() - startTimeLoop) / 1000; + long elapsedSecs = (System.nanoTime() - startTimeLoop) / 1_000_000_000L; if (elapsedSecs >= retryTimeoutInSecs) { throw ex; } @@ -1081,22 +1078,25 @@ Thread runNextPageTaskAsync( LOG.info( "Connection interrupted during json stream read, retrying. attempt: %d", retryCount); - try { - Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new BigQueryJdbcRuntimeException(ie); - } + Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new BigQueryJdbcRuntimeException(ie); } } - // this will stop the parseDataTask as well when the pagination - // completes - rpcResponseQueue.put(Tuple.of(null, false)); } catch (Exception ex) { throw new BigQueryJdbcRuntimeException(ex); + } finally { + try { + // this will stop the parseDataTask as well when the pagination + // completes + rpcResponseQueue.put(Tuple.of(null, false)); + } catch (InterruptedException ie) { + LOG.warning( + "\n%s Interrupted sending end-of-stream sentinel @ runNextPageTaskAsync", + Thread.currentThread().getName()); + Thread.currentThread().interrupt(); + } } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not // have finished processing the records and even that will be interrupted From 388db844e26b90f804e206a27980fd98d0e29bdd Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 13 Feb 2026 19:44:19 +0000 Subject: [PATCH 7/9] refactor: retry connection for arrow and json --- .../bigquery/jdbc/BigQueryStatement.java | 164 +++++++++++------- 1 file changed, 99 insertions(+), 65 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index 01777fba1..c302cb2b2 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -94,6 +94,8 @@ public class BigQueryStatement extends BigQueryNoOpsStatement { private static final String DEFAULT_DATASET_NAME = "_google_jdbc"; private static final String DEFAULT_TABLE_NAME = "temp_table_"; private static final String JDBC_JOB_PREFIX = "google-jdbc-"; + private static final int MAX_RETRY_COUNT = 5; + private static final long RETRY_DELAY_MS = 2000L; protected ResultSet currentResultSet; protected long currentUpdateCount = -1; protected List jobIds = new ArrayList<>(); @@ -810,8 +812,6 @@ Thread populateArrowBufferedQueue( () -> { long rowsRead = 0; int retryCount = 0; - final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds(); - long startTime = System.nanoTime(); try { // Use the first stream to perform reading. if (readSession.getStreamsCount() == 0) { @@ -838,7 +838,6 @@ Thread populateArrowBufferedQueue( ArrowRecordBatch currentBatch = response.getArrowRecordBatch(); arrowBatchWrapperBlockingQueue.put(BigQueryArrowBatchWrapper.of(currentBatch)); rowsRead += response.getRowCount(); - startTime = System.nanoTime(); } break; } catch (com.google.api.gax.rpc.ApiException e) { @@ -847,21 +846,26 @@ Thread populateArrowBufferedQueue( LOG.warning("Read session expired or not found: %s", e.getMessage()); break; } - long elapsedSecs = (System.nanoTime() - startTime) / 1_000_000_000L; - if (elapsedSecs >= retryTimeoutInSecs) { + if (retryCount >= MAX_RETRY_COUNT) { LOG.log( Level.WARNING, "\n" + Thread.currentThread().getName() - + " Interrupted @ arrowStreamProcessor, timeout exceeded", + + " Interrupted @ arrowStreamProcessor, max retries exceeded", e); + try { + arrowBatchWrapperBlockingQueue.put( + BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } break; } retryCount++; LOG.info( "Connection interrupted during arrow stream read, retrying. attempt: %d", retryCount); - Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); + Thread.sleep(RETRY_DELAY_MS); } } @@ -870,6 +874,12 @@ Thread populateArrowBufferedQueue( Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor", e); + try { + arrowBatchWrapperBlockingQueue.put( + BigQueryArrowBatchWrapper.ofError(new BigQueryJdbcRuntimeException(e))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } Thread.currentThread().interrupt(); throw new BigQueryJdbcRuntimeException(e); } finally { // logic needed for graceful shutdown @@ -956,7 +966,12 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) { if (jobId != null) { // Thread to make rpc calls to fetch data from the server Thread nextPageWorker = - runNextPageTaskAsync(results, results.getNextPageToken(), jobId, rpcResponseQueue); + runNextPageTaskAsync( + results, + results.getNextPageToken(), + jobId, + rpcResponseQueue, + this.bigQueryFieldValueListWrapperBlockingQueue); threadList.add(nextPageWorker); } else { try { @@ -1021,7 +1036,8 @@ Thread runNextPageTaskAsync( TableResult result, String firstPageToken, JobId jobId, - BlockingQueue> rpcResponseQueue) { + BlockingQueue> rpcResponseQueue, + BlockingQueue bigQueryFieldValueListWrapperBlockingQueue) { LOG.finest("++enter++"); // parse and put the first page in the pageCache before the other pages are parsed from the RPC // calls @@ -1030,8 +1046,6 @@ Thread runNextPageTaskAsync( // This thread makes the RPC calls and paginates Runnable nextPageTask = () -> { - long startTimeLoop = System.nanoTime(); - final long retryTimeoutInSecs = this.connection.getRetryTimeoutInSeconds(); int retryCount = 0; String currentPageToken = firstPageToken; TableResult currentResults = result; @@ -1061,7 +1075,6 @@ Thread runNextPageTaskAsync( // this will be parsed asynchronously without blocking the current // thread rpcResponseQueue.put(Tuple.of(currentResults, true)); - startTimeLoop = System.nanoTime(); LOG.fine( "Fetched %d results from the server in %d ms.", querySettings.getMaxResultPerPage(), @@ -1070,21 +1083,28 @@ Thread runNextPageTaskAsync( if (ex.getCode() == 404) { throw ex; } - long elapsedSecs = (System.nanoTime() - startTimeLoop) / 1_000_000_000L; - if (elapsedSecs >= retryTimeoutInSecs) { - throw ex; + if (retryCount >= MAX_RETRY_COUNT) { + throw new BigQueryJdbcRuntimeException(ex); // Re-throw max retries exceeded } retryCount++; LOG.info( "Connection interrupted during json stream read, retrying. attempt: %d", retryCount); - Thread.sleep(this.connection.getRetryInitialDelayInSeconds() * 1000); - } catch (InterruptedException ie) { + Thread.sleep(RETRY_DELAY_MS); + } catch (InterruptedException ex) { Thread.currentThread().interrupt(); - throw new BigQueryJdbcRuntimeException(ie); + throw new BigQueryJdbcRuntimeException(ex); + } catch (Exception ex) { + throw new BigQueryJdbcRuntimeException(ex); } } } catch (Exception ex) { + try { + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } throw new BigQueryJdbcRuntimeException(ex); } finally { try { @@ -1120,61 +1140,75 @@ Thread parseAndPopulateRpcDataAsync( Runnable populateBufferRunnable = () -> { // producer thread populating the buffer - Iterable fieldValueLists; - // as we have to process the first page - boolean hasRows = true; - while (hasRows) { - try { - Tuple nextPageTuple = rpcResponseQueue.take(); - if (nextPageTuple.x() != null) { - fieldValueLists = nextPageTuple.x().getValues(); - } else { - fieldValueLists = null; - } - hasRows = nextPageTuple.y(); + try { + Iterable fieldValueLists; + // as we have to process the first page + boolean hasRows = true; + while (hasRows) { + try { + Tuple nextPageTuple = rpcResponseQueue.take(); + if (nextPageTuple.x() != null) { + fieldValueLists = nextPageTuple.x().getValues(); + } else { + fieldValueLists = null; + } + hasRows = nextPageTuple.y(); - } catch (InterruptedException e) { - LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - // Thread might get interrupted while calling the Cancel method, which is - // expected, so logging this instead of throwing the exception back - break; - } + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); + // Thread might get interrupted while calling the Cancel method, which is + // expected, so logging this instead of throwing the exception back + break; + } - if (Thread.currentThread().isInterrupted() - || queryTaskExecutor.isShutdown() - || fieldValueLists == null) { - // do not process further pages and shutdown (outerloop) - break; - } + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() + || fieldValueLists == null) { + // do not process further pages and shutdown (outerloop) + break; + } - long startTime = System.nanoTime(); - long results = 0; - for (FieldValueList fieldValueList : fieldValueLists) { - try { - if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { - // do not process further pages and shutdown (inner loop) - break; + long startTime = System.nanoTime(); + long results = 0; + for (FieldValueList fieldValueList : fieldValueLists) { + try { + if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { + // do not process further pages and shutdown (inner loop) + break; + } + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); + results += 1; + } catch (InterruptedException ex) { + throw new BigQueryJdbcRuntimeException(ex); } - bigQueryFieldValueListWrapperBlockingQueue.put( - BigQueryFieldValueListWrapper.of(schema.getFields(), fieldValueList)); - results += 1; - } catch (InterruptedException ex) { - throw new BigQueryJdbcRuntimeException(ex); } + LOG.fine( + "Processed %d results in %d ms.", + results, (int) ((System.nanoTime() - startTime) / 1000000)); } - LOG.fine( - "Processed %d results in %d ms.", - results, (int) ((System.nanoTime() - startTime) / 1000000)); - } - try { - // All the pages has been processed, put this marker - bigQueryFieldValueListWrapperBlockingQueue.put( - BigQueryFieldValueListWrapper.of(null, null, true)); - } catch (InterruptedException e) { + } catch (Exception ex) { LOG.log( Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", - e); + "\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync", + ex); + try { + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.ofError(new BigQueryJdbcRuntimeException(ex))); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } finally { + try { + // All the pages has been processed, put this marker + bigQueryFieldValueListWrapperBlockingQueue.put( + BigQueryFieldValueListWrapper.of(null, null, true)); + } catch (InterruptedException e) { + LOG.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", + e); + } } }; From aea4f3bfba89c6667afd90bbf40f4d5bf2ed29c6 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 13 Feb 2026 19:44:55 +0000 Subject: [PATCH 8/9] chore: add fix for silent truncation --- .../jdbc/BigQueryArrowBatchWrapper.java | 21 ++++++++++++++++--- .../bigquery/jdbc/BigQueryArrowResultSet.java | 4 ++++ .../jdbc/BigQueryFieldValueListWrapper.java | 18 +++++++++++++--- .../bigquery/jdbc/BigQueryJsonResultSet.java | 3 +++ 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java index 4d322ad97..99781ff16 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowBatchWrapper.java @@ -31,24 +31,35 @@ class BigQueryArrowBatchWrapper { // Marks the end of the stream for the ResultSet private final boolean isLast; + private final Exception exception; + private BigQueryArrowBatchWrapper( - ArrowRecordBatch currentArrowBatch, JsonStringArrayList nestedRecords, boolean isLast) { + ArrowRecordBatch currentArrowBatch, + JsonStringArrayList nestedRecords, + boolean isLast, + Exception exception) { this.currentArrowBatch = currentArrowBatch; this.nestedRecords = nestedRecords; this.isLast = isLast; + this.exception = exception; } static BigQueryArrowBatchWrapper of(ArrowRecordBatch currentArrowBatch, boolean... isLast) { LOG.finest("++enter++"); boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryArrowBatchWrapper(currentArrowBatch, null, isLastFlag); + return new BigQueryArrowBatchWrapper(currentArrowBatch, null, isLastFlag, null); } static BigQueryArrowBatchWrapper getNestedFieldValueListWrapper( JsonStringArrayList nestedRecords, boolean... isLast) { LOG.finest("++enter++"); boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryArrowBatchWrapper(null, nestedRecords, isLastFlag); + return new BigQueryArrowBatchWrapper(null, nestedRecords, isLastFlag, null); + } + + static BigQueryArrowBatchWrapper ofError(Exception exception) { + LOG.finest("++enter++"); + return new BigQueryArrowBatchWrapper(null, null, true, exception); } ArrowRecordBatch getCurrentArrowBatch() { @@ -65,4 +76,8 @@ boolean isLast() { LOG.finest("++enter++"); return this.isLast; } + + Exception getException() { + return this.exception; + } } diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java index aeea6783d..1d7d89e3f 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryArrowResultSet.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.exception.BigQueryJdbcException; +import com.google.cloud.bigquery.exception.BigQueryJdbcRuntimeException; import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; import com.google.cloud.bigquery.storage.v1.ArrowSchema; import java.io.IOException; @@ -236,6 +237,9 @@ public boolean next() throws SQLException { /* Start of iteration or we have exhausted the current batch */ // Advance the cursor. Potentially blocking operation. BigQueryArrowBatchWrapper batchWrapper = this.buffer.take(); + if (batchWrapper.getException() != null) { + throw new BigQueryJdbcRuntimeException(batchWrapper.getException()); + } if (batchWrapper.isLast()) { /* Marks the end of the records */ if (this.vectorSchemaRoot != null) { diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java index 9d8b1b2f7..39740e021 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryFieldValueListWrapper.java @@ -39,28 +39,36 @@ class BigQueryFieldValueListWrapper { // This flag marks the end of the stream for the ResultSet private boolean isLast = false; + private final Exception exception; static BigQueryFieldValueListWrapper of( FieldList fieldList, FieldValueList fieldValueList, boolean... isLast) { boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryFieldValueListWrapper(fieldList, fieldValueList, null, isLastFlag); + return new BigQueryFieldValueListWrapper(fieldList, fieldValueList, null, isLastFlag, null); } static BigQueryFieldValueListWrapper getNestedFieldValueListWrapper( FieldList fieldList, List arrayFieldValueList, boolean... isLast) { boolean isLastFlag = isLast != null && isLast.length == 1 && isLast[0]; - return new BigQueryFieldValueListWrapper(fieldList, null, arrayFieldValueList, isLastFlag); + return new BigQueryFieldValueListWrapper( + fieldList, null, arrayFieldValueList, isLastFlag, null); + } + + static BigQueryFieldValueListWrapper ofError(Exception exception) { + return new BigQueryFieldValueListWrapper(null, null, null, true, exception); } private BigQueryFieldValueListWrapper( FieldList fieldList, FieldValueList fieldValueList, List arrayFieldValueList, - boolean isLast) { + boolean isLast, + Exception exception) { this.fieldList = fieldList; this.fieldValueList = fieldValueList; this.arrayFieldValueList = arrayFieldValueList; this.isLast = isLast; + this.exception = exception; } public FieldList getFieldList() { @@ -78,4 +86,8 @@ public List getArrayFieldValueList() { public boolean isLast() { return this.isLast; } + + public Exception getException() { + return this.exception; + } } diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index 3773b99aa..da2ade028 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -159,6 +159,9 @@ public boolean next() throws SQLException { try { // Advance the cursor,Potentially blocking operation this.cursor = this.buffer.take(); + if (this.cursor.getException() != null) { + throw new BigQueryJdbcRuntimeException(this.cursor.getException()); + } this.rowCnt++; // Check for end of stream if (this.cursor.isLast()) { From 366c1cc86d1c5f9a79ba56109e8cfc9cc82b1e8c Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 13 Feb 2026 19:59:06 +0000 Subject: [PATCH 9/9] chore: remove redundant exception thrown --- .../java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java index c302cb2b2..7039628d5 100644 --- a/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java +++ b/google-cloud-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryStatement.java @@ -881,7 +881,6 @@ Thread populateArrowBufferedQueue( Thread.currentThread().interrupt(); } Thread.currentThread().interrupt(); - throw new BigQueryJdbcRuntimeException(e); } finally { // logic needed for graceful shutdown // marking end of stream try { @@ -1105,7 +1104,6 @@ Thread runNextPageTaskAsync( } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } - throw new BigQueryJdbcRuntimeException(ex); } finally { try { // this will stop the parseDataTask as well when the pagination