Skip to content

Commit ed4aaf2

Browse files
committed
ENH: Add use_bqstorage_api option to read_gbq
The BigQuery Storage API provides a way to read query results quickly (and using multiple threads). It only works with large query results (~125 MB), but as of 1.11.1, the google-cloud-bigquery library can fallback to the BigQuery API to download results when a request to the BigQuery Storage API fails. As this API can increase costs (and may not be enabled on the user's project), this option is disabled by default.
1 parent 0e1ebf5 commit ed4aaf2

File tree

2 files changed

+100
-2
lines changed

2 files changed

+100
-2
lines changed

pandas_gbq/gbq.py

+73-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
import numpy as np
77

8+
try:
9+
# The BigQuery Storage API client is an optional dependency. It is only
10+
# required when use_bqstorage_api=True.
11+
from google.cloud import bigquery_storage_v1beta1
12+
except ImportError: # pragma: NO COVER
13+
bigquery_storage_v1beta1 = None
14+
815
from pandas_gbq.exceptions import AccessDenied
916

1017
logger = logging.getLogger(__name__)
@@ -302,6 +309,7 @@ def __init__(
302309
dialect="standard",
303310
location=None,
304311
credentials=None,
312+
use_bqstorage_api=False,
305313
):
306314
global context
307315
from google.api_core.exceptions import GoogleAPIError
@@ -352,6 +360,9 @@ def __init__(
352360
context.project = self.project_id
353361

354362
self.client = self.get_client()
363+
self.bqstorage_client = _make_bqstorage_client(
364+
use_bqstorage_api, self.credentials
365+
)
355366

356367
# BQ Queries costs $5 per TB. First 1 TB per month is free
357368
# see here for more: https://cloud.google.com/bigquery/pricing
@@ -489,7 +500,9 @@ def run_query(self, query, **kwargs):
489500

490501
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
491502
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
492-
df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes)
503+
df = rows_iter.to_dataframe(
504+
dtypes=nullsafe_dtypes, bqstorage_client=self.bqstorage_client
505+
)
493506

494507
if df.empty:
495508
df = _cast_empty_df_dtypes(schema_fields, df)
@@ -702,6 +715,21 @@ def _cast_empty_df_dtypes(schema_fields, df):
702715
return df
703716

704717

718+
def _make_bqstorage_client(use_bqstorage_api, credentials):
719+
if not use_bqstorage_api:
720+
return None
721+
722+
if bigquery_storage_v1beta1 is None:
723+
raise ImportError(
724+
"Install the google-cloud-bigquery-storage and fastavro packages "
725+
"to use the BigQuery Storage API."
726+
)
727+
728+
return bigquery_storage_v1beta1.BigQueryStorageClient(
729+
credentials=credentials
730+
)
731+
732+
705733
def read_gbq(
706734
query,
707735
project_id=None,
@@ -713,6 +741,7 @@ def read_gbq(
713741
location=None,
714742
configuration=None,
715743
credentials=None,
744+
use_bqstorage_api=False,
716745
verbose=None,
717746
private_key=None,
718747
):
@@ -790,6 +819,27 @@ def read_gbq(
790819
:class:`google.oauth2.service_account.Credentials` directly.
791820
792821
.. versionadded:: 0.8.0
822+
use_bqstorage_api : bool, default False
823+
Use the `BigQuery Storage API
824+
<https://cloud.google.com/bigquery/docs/reference/storage/>`__ to
825+
download query results quickly, but at an increased cost. To use this
826+
API, first `enable it in the Cloud Console
827+
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__.
828+
You must also have the `bigquery.readsessions.create
829+
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
830+
permission on the project you are billing queries to.
831+
832+
**Note:** Due to a `known issue in the ``google-cloud-bigquery``
833+
package
834+
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
835+
(fixed in version 1.11.0), you must write your query results to a
836+
destination table. To do this with ``read_gbq``, supply a
837+
``configuration`` dictionary.
838+
839+
This feature requires the ``google-cloud-bigquery-storage`` and
840+
``fastavro`` packages.
841+
842+
.. versionadded:: 0.10.0
793843
verbose : None, deprecated
794844
Deprecated in Pandas-GBQ 0.4.0. Use the `logging module
795845
to adjust verbosity instead
@@ -810,6 +860,27 @@ def read_gbq(
810860
-------
811861
df: DataFrame
812862
DataFrame representing results of query.
863+
864+
Examples
865+
--------
866+
867+
Use the BigQuery Storage API to fetch results quickly, but at an addition
868+
cost. Due to a known issue in the BigQuery Storage API, you must write
869+
your query results to a destination table.
870+
871+
>>> pandas_gbq.read_gbq(
872+
... query_string,
873+
... configuration={
874+
... 'query': {
875+
... 'destinationTable': {
876+
... 'projectId': 'your-project',
877+
... 'datasetId': 'destination_dataset',
878+
... 'tableId': 'new_table_name',
879+
... }
880+
... }
881+
... },
882+
... use_bqstorage_api=True,
883+
... )
813884
"""
814885
global context
815886

@@ -846,6 +917,7 @@ def read_gbq(
846917
location=location,
847918
credentials=credentials,
848919
private_key=private_key,
920+
use_bqstorage_api=use_bqstorage_api,
849921
)
850922

851923
final_df = connector.run_query(query, configuration=configuration)

tests/system/test_gbq.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -888,10 +888,36 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path):
888888
location="asia-northeast1",
889889
private_key=private_key_path,
890890
)
891-
print(df)
892891
assert df["max_year"][0] >= 2000
893892

894893

894+
@pytest.mark.skip(reason="large query for BQ Storage API tests")
895+
def test_read_gbq_w_bqstorage_api(credentials):
896+
df = gbq.read_gbq(
897+
"""
898+
SELECT
899+
dependency_name,
900+
dependency_platform,
901+
project_name,
902+
project_id,
903+
version_number,
904+
version_id,
905+
dependency_kind,
906+
optional_dependency,
907+
dependency_requirements,
908+
dependency_project_id
909+
FROM
910+
`bigquery-public-data.libraries_io.dependencies`
911+
WHERE
912+
LOWER(dependency_platform) = 'npm'
913+
LIMIT 2500000
914+
""",
915+
use_bqstorage_api=True,
916+
credentials=credentials,
917+
)
918+
assert len(df) == 2500000
919+
920+
895921
class TestToGBQIntegration(object):
896922
@pytest.fixture(autouse=True, scope="function")
897923
def setup(self, project, credentials, random_dataset_id):

0 commit comments

Comments
 (0)