Skip to content

Commit 8b5e80b

Browse files
committed
ENH: Add use_bqstorage_api option to read_gbq
The BigQuery Storage API provides a way to read query results quickly (and using multiple threads). It only works with large query results (~125 MB), but as of 1.11.1, the google-cloud-bigquery library can fallback to the BigQuery API to download results when a request to the BigQuery Storage API fails. As this API can increase costs (and may not be enabled on the user's project), this option is disabled by default.
1 parent 141b2b4 commit 8b5e80b

File tree

2 files changed

+100
-2
lines changed

2 files changed

+100
-2
lines changed

pandas_gbq/gbq.py

+73-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
import numpy as np
77

8+
try:
9+
# The BigQuery Storage API client is an optional dependency. It is only
10+
# required when use_bqstorage_api=True.
11+
from google.cloud import bigquery_storage_v1beta1
12+
except ImportError: # pragma: NO COVER
13+
bigquery_storage_v1beta1 = None
14+
815
from pandas_gbq.exceptions import AccessDenied
916

1017
logger = logging.getLogger(__name__)
@@ -302,6 +309,7 @@ def __init__(
302309
dialect="standard",
303310
location=None,
304311
credentials=None,
312+
use_bqstorage_api=False,
305313
):
306314
global context
307315
from google.api_core.exceptions import GoogleAPIError
@@ -352,6 +360,9 @@ def __init__(
352360
context.project = self.project_id
353361

354362
self.client = self.get_client()
363+
self.bqstorage_client = _make_bqstorage_client(
364+
use_bqstorage_api, self.credentials
365+
)
355366

356367
# BQ Queries costs $5 per TB. First 1 TB per month is free
357368
# see here for more: https://cloud.google.com/bigquery/pricing
@@ -489,7 +500,9 @@ def run_query(self, query, **kwargs):
489500

490501
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
491502
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
492-
df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes)
503+
df = rows_iter.to_dataframe(
504+
dtypes=nullsafe_dtypes, bqstorage_client=self.bqstorage_client
505+
)
493506

494507
if df.empty:
495508
df = _cast_empty_df_dtypes(schema_fields, df)
@@ -727,6 +740,21 @@ def _localize_df(schema_fields, df):
727740
return df
728741

729742

743+
def _make_bqstorage_client(use_bqstorage_api, credentials):
744+
if not use_bqstorage_api:
745+
return None
746+
747+
if bigquery_storage_v1beta1 is None:
748+
raise ImportError(
749+
"Install the google-cloud-bigquery-storage and fastavro packages "
750+
"to use the BigQuery Storage API."
751+
)
752+
753+
return bigquery_storage_v1beta1.BigQueryStorageClient(
754+
credentials=credentials
755+
)
756+
757+
730758
def read_gbq(
731759
query,
732760
project_id=None,
@@ -738,6 +766,7 @@ def read_gbq(
738766
location=None,
739767
configuration=None,
740768
credentials=None,
769+
use_bqstorage_api=False,
741770
verbose=None,
742771
private_key=None,
743772
):
@@ -815,6 +844,27 @@ def read_gbq(
815844
:class:`google.oauth2.service_account.Credentials` directly.
816845
817846
.. versionadded:: 0.8.0
847+
use_bqstorage_api : bool, default False
848+
Use the `BigQuery Storage API
849+
<https://cloud.google.com/bigquery/docs/reference/storage/>`__ to
850+
download query results quickly, but at an increased cost. To use this
851+
API, first `enable it in the Cloud Console
852+
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__.
853+
You must also have the `bigquery.readsessions.create
854+
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
855+
permission on the project you are billing queries to.
856+
857+
**Note:** Due to a `known issue in the ``google-cloud-bigquery``
858+
package
859+
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
860+
(fixed in version 1.11.0), you must write your query results to a
861+
destination table. To do this with ``read_gbq``, supply a
862+
``configuration`` dictionary.
863+
864+
This feature requires the ``google-cloud-bigquery-storage`` and
865+
``fastavro`` packages.
866+
867+
.. versionadded:: 0.10.0
818868
verbose : None, deprecated
819869
Deprecated in Pandas-GBQ 0.4.0. Use the `logging module
820870
to adjust verbosity instead
@@ -835,6 +885,27 @@ def read_gbq(
835885
-------
836886
df: DataFrame
837887
DataFrame representing results of query.
888+
889+
Examples
890+
--------
891+
892+
Use the BigQuery Storage API to fetch results quickly, but at an addition
893+
cost. Due to a known issue in the BigQuery Storage API, you must write
894+
your query results to a destination table.
895+
896+
>>> pandas_gbq.read_gbq(
897+
... query_string,
898+
... configuration={
899+
... 'query': {
900+
... 'destinationTable': {
901+
... 'projectId': 'your-project',
902+
... 'datasetId': 'destination_dataset',
903+
... 'tableId': 'new_table_name',
904+
... }
905+
... }
906+
... },
907+
... use_bqstorage_api=True,
908+
... )
838909
"""
839910
global context
840911

@@ -871,6 +942,7 @@ def read_gbq(
871942
location=location,
872943
credentials=credentials,
873944
private_key=private_key,
945+
use_bqstorage_api=use_bqstorage_api,
874946
)
875947

876948
final_df = connector.run_query(query, configuration=configuration)

tests/system/test_gbq.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -895,10 +895,36 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path):
895895
location="asia-northeast1",
896896
private_key=private_key_path,
897897
)
898-
print(df)
899898
assert df["max_year"][0] >= 2000
900899

901900

901+
@pytest.mark.skip(reason="large query for BQ Storage API tests")
902+
def test_read_gbq_w_bqstorage_api(credentials):
903+
df = gbq.read_gbq(
904+
"""
905+
SELECT
906+
dependency_name,
907+
dependency_platform,
908+
project_name,
909+
project_id,
910+
version_number,
911+
version_id,
912+
dependency_kind,
913+
optional_dependency,
914+
dependency_requirements,
915+
dependency_project_id
916+
FROM
917+
`bigquery-public-data.libraries_io.dependencies`
918+
WHERE
919+
LOWER(dependency_platform) = 'npm'
920+
LIMIT 2500000
921+
""",
922+
use_bqstorage_api=True,
923+
credentials=credentials,
924+
)
925+
assert len(df) == 2500000
926+
927+
902928
class TestToGBQIntegration(object):
903929
@pytest.fixture(autouse=True, scope="function")
904930
def setup(self, project, credentials, random_dataset_id):

0 commit comments

Comments
 (0)