From a794a3329484067f597cc629a1914799e2552a0b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 26 Oct 2021 17:05:10 -0500 Subject: [PATCH 01/19] feat: `to_gbq` uses Parquet by default, use `api_method="load_csv"` for old behavior --- tests/system/test_gbq.py | 71 ---------------------------------- tests/system/test_to_gbq.py | 76 ++++++++++++++++++++++++++----------- 2 files changed, 53 insertions(+), 94 deletions(-) diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 00bbd3d6..a168bd59 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -1185,77 +1185,6 @@ def test_google_upload_errors_should_raise_exception(self, project_id): credentials=self.credentials, ) - def test_upload_chinese_unicode_data(self, project_id): - test_id = "2" - test_size = 6 - df = DataFrame(np.random.randn(6, 4), index=range(6), columns=list("ABCD")) - df["s"] = u"信用卡" - - gbq.to_gbq( - df, - self.destination_table + test_id, - project_id, - credentials=self.credentials, - chunksize=10000, - ) - - result_df = gbq.read_gbq( - "SELECT * FROM {0}".format(self.destination_table + test_id), - project_id=project_id, - credentials=self.credentials, - dialect="legacy", - ) - - assert len(result_df) == test_size - - if sys.version_info.major < 3: - pytest.skip(msg="Unicode comparison in Py2 not working") - - result = result_df["s"].sort_values() - expected = df["s"].sort_values() - - tm.assert_numpy_array_equal(expected.values, result.values) - - def test_upload_other_unicode_data(self, project_id): - test_id = "3" - test_size = 3 - df = DataFrame( - { - "s": ["Skywalker™", "lego", "hülle"], - "i": [200, 300, 400], - "d": [ - "2017-12-13 17:40:39", - "2017-12-13 17:40:39", - "2017-12-13 17:40:39", - ], - } - ) - - gbq.to_gbq( - df, - self.destination_table + test_id, - project_id=project_id, - credentials=self.credentials, - chunksize=10000, - ) - - result_df = gbq.read_gbq( - "SELECT * FROM {0}".format(self.destination_table + test_id), - project_id=project_id, - credentials=self.credentials, - dialect="legacy", - ) - - assert len(result_df) == test_size - - if sys.version_info.major < 3: - pytest.skip(msg="Unicode comparison in Py2 not working") - - result = result_df["s"].sort_values() - expected = df["s"].sort_values() - - tm.assert_numpy_array_equal(expected.values, result.values) - def test_upload_mixed_float_and_int(self, project_id): """Test that we can upload a dataframe containing an int64 and float64 column. See: https://github.com/pydata/pandas-gbq/issues/116 diff --git a/tests/system/test_to_gbq.py b/tests/system/test_to_gbq.py index b0d2d031..547ceeac 100644 --- a/tests/system/test_to_gbq.py +++ b/tests/system/test_to_gbq.py @@ -3,9 +3,10 @@ # license that can be found in the LICENSE file. import functools +import random + import pandas import pandas.testing - import pytest @@ -21,31 +22,60 @@ def method_under_test(credentials, project_id): ) -def test_float_round_trip(method_under_test, random_dataset_id, bigquery_client): - """Ensure that 64-bit floating point numbers are unchanged. - - See: https://github.com/pydata/pandas-gbq/issues/326 - """ - - table_id = "{}.float_round_trip".format(random_dataset_id) - input_floats = pandas.Series( - [ - 0.14285714285714285, - 0.4406779661016949, - 1.05148, - 1.05153, - 1.8571428571428572, - 2.718281828459045, - 3.141592653589793, - 2.0988936657440586e43, - ], - name="float_col", +@pytest.mark.parametrize( + ["input_series"], + [ + # Ensure that 64-bit floating point numbers are unchanged. + # See: https://github.com/pydata/pandas-gbq/issues/326 + ( + pandas.Series( + [ + 0.14285714285714285, + 0.4406779661016949, + 1.05148, + 1.05153, + 1.8571428571428572, + 2.718281828459045, + 3.141592653589793, + 2.0988936657440586e43, + ], + name="test_col", + ), + ), + ( + pandas.Series( + [ + "abc", + "defg", + # Ensure that empty strings are written as empty string, + # not NULL. See: + # https://github.com/googleapis/python-bigquery-pandas/issues/366 + "", + # Ensure that unicode characters are encoded. See: + # https://github.com/googleapis/python-bigquery-pandas/issues/106 + "信用卡", + "Skywalker™", + "hülle", + ], + name="test_col", + ), + ), + ], +) +def test_round_trip( + method_under_test, random_dataset_id, bigquery_client, input_series +): + table_id = f"{random_dataset_id}.round_trip_{random.randrange(1_000_000)}" + input_series = input_series.sort_values().reset_index(drop=True) + df = pandas.DataFrame( + # Some errors only occur in multi-column dataframes. See: + # https://github.com/googleapis/python-bigquery-pandas/issues/366 + {"test_col": input_series, "test_col2": input_series} ) - df = pandas.DataFrame({"float_col": input_floats}) method_under_test(df, table_id) round_trip = bigquery_client.list_rows(table_id).to_dataframe() - round_trip_floats = round_trip["float_col"].sort_values() + round_trip_series = round_trip["test_col"].sort_values().reset_index(drop=True) pandas.testing.assert_series_equal( - round_trip_floats, input_floats, check_exact=True, + round_trip_series, input_series, check_exact=True, ) From 64bae4901d230a39530869e952c28997ee4697cc Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Oct 2021 16:20:18 -0500 Subject: [PATCH 02/19] warn when chunksize is used --- pandas_gbq/gbq.py | 25 +++++++- pandas_gbq/load.py | 117 ++++++++++++++++++++++++++++-------- tests/system/test_to_gbq.py | 3 +- tests/unit/test_gbq.py | 23 +++++++ 4 files changed, 141 insertions(+), 27 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 856c1285..f63cc60a 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -520,7 +520,7 @@ def _download_results( df = rows_iter.to_dataframe( dtypes=conversion_dtypes, progress_bar_type=progress_bar_type, - **to_dataframe_kwargs + **to_dataframe_kwargs, ) except self.http_error as ex: self.process_http_error(ex) @@ -541,6 +541,7 @@ def load_data( chunksize=None, schema=None, progress_bar=True, + api_method: str = "load_parquet", ): from pandas_gbq import load @@ -554,6 +555,7 @@ def load_data( chunksize=chunksize, schema=schema, location=self.location, + api_method=api_method, ) if progress_bar and tqdm: chunks = tqdm.tqdm(chunks) @@ -876,6 +878,7 @@ def to_gbq( location=None, progress_bar=True, credentials=None, + api_method: str = "load_parquet", verbose=None, private_key=None, ): @@ -964,6 +967,11 @@ def to_gbq( :class:`google.oauth2.service_account.Credentials` directly. .. versionadded:: 0.8.0 + api_method : str, optional + API method used to upload DataFrame to BigQuery. One of "load_parquet", + "load_csv". Default "load_parquet". + + .. versionadded:: 0.16.0 verbose : bool, deprecated Deprecated in Pandas-GBQ 0.4.0. Use the `logging module to adjust verbosity instead @@ -988,6 +996,20 @@ def to_gbq( stacklevel=1, ) + if chunksize is not None: + if api_method == "load_parquet": + warnings.warn( + "chunksize is ignored when using api_method='load_parquet'", + DeprecationWarning, + stacklevel=2, + ) + elif api_method == "load_csv": + warnings.warn( + "chunksize will be ignored when using api_method='load_csv' in a future version of pandas-gbq", + PendingDeprecationWarning, + stacklevel=2, + ) + if if_exists not in ("fail", "replace", "append"): raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) @@ -1071,6 +1093,7 @@ def to_gbq( chunksize=chunksize, schema=table_schema, progress_bar=progress_bar, + api_method=api_method, ) diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index faa674c2..ddee6b8f 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -5,7 +5,9 @@ """Helper methods for loading data into BigQuery""" import io +from typing import Any, Dict, Optional +import pandas from google.cloud import bigquery from pandas_gbq.features import FEATURES @@ -52,45 +54,110 @@ def split_dataframe(dataframe, chunksize=None): yield remaining_rows, chunk -def load_chunks( - client, - dataframe, - destination_table_ref, - chunksize=None, - schema=None, - location=None, +def load_parquet( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + schema: Optional[Dict[str, Any]], +): + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = "WRITE_APPEND" + job_config.source_format = "PARQUET" + + if schema is not None: + schema = pandas_gbq.schema.remove_policy_tags(schema) + job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + + client.load_table_from_dataframe( + dataframe, destination_table_ref, job_config=job_config, location=location, + ).result() + + +def load_csv( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + chunksize: Optional[int], + schema: Optional[Dict[str, Any]], ): job_config = bigquery.LoadJobConfig() job_config.write_disposition = "WRITE_APPEND" job_config.source_format = "CSV" job_config.allow_quoted_newlines = True - # Explicit schema? Use that! if schema is not None: schema = pandas_gbq.schema.remove_policy_tags(schema) job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) - # If not, let BigQuery determine schema unless we are encoding the CSV files ourselves. - elif not FEATURES.bigquery_has_from_dataframe_with_csv: + + chunks = split_dataframe(dataframe, chunksize=chunksize) + for remaining_rows, chunk in chunks: + yield remaining_rows + + client.load_table_from_dataframe( + chunk, destination_table_ref, job_config=job_config, location=location, + ).result() + + +def load_csv_from_file( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + chunksize: Optional[int], + schema: Optional[Dict[str, Any]], +): + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = "WRITE_APPEND" + job_config.source_format = "CSV" + job_config.allow_quoted_newlines = True + + if schema is None: schema = pandas_gbq.schema.generate_bq_schema(dataframe) - schema = pandas_gbq.schema.remove_policy_tags(schema) - job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + + schema = pandas_gbq.schema.remove_policy_tags(schema) + job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) chunks = split_dataframe(dataframe, chunksize=chunksize) for remaining_rows, chunk in chunks: yield remaining_rows - if FEATURES.bigquery_has_from_dataframe_with_csv: - client.load_table_from_dataframe( - chunk, destination_table_ref, job_config=job_config, location=location, + try: + chunk_buffer = encode_chunk(chunk) + client.load_table_from_file( + chunk_buffer, + destination_table_ref, + job_config=job_config, + location=location, ).result() + finally: + chunk_buffer.close() + + +def load_chunks( + client, + dataframe, + destination_table_ref, + chunksize=None, + schema=None, + location=None, + api_method="load_parquet", +): + if api_method == "load_parquet": + load_parquet(client, dataframe, destination_table_ref, location, schema) + # TODO: yield progress depending on result() with timeout + return [0] + elif api_method == "load_csv": + if FEATURES.bigquery_has_from_dataframe_with_csv: + return load_csv( + client, dataframe, destination_table_ref, location, chunksize, schema + ) else: - try: - chunk_buffer = encode_chunk(chunk) - client.load_table_from_file( - chunk_buffer, - destination_table_ref, - job_config=job_config, - location=location, - ).result() - finally: - chunk_buffer.close() + return load_csv_from_file( + client, dataframe, destination_table_ref, location, chunksize, schema + ) + else: + raise ValueError( + f"got unexpected api_method: {api_method!r}, expected one of 'load_parquet', 'load_csv'" + ) diff --git a/tests/system/test_to_gbq.py b/tests/system/test_to_gbq.py index 547ceeac..d16997fd 100644 --- a/tests/system/test_to_gbq.py +++ b/tests/system/test_to_gbq.py @@ -51,6 +51,7 @@ def method_under_test(credentials, project_id): # not NULL. See: # https://github.com/googleapis/python-bigquery-pandas/issues/366 "", + None, # Ensure that unicode characters are encoded. See: # https://github.com/googleapis/python-bigquery-pandas/issues/106 "信用卡", @@ -62,7 +63,7 @@ def method_under_test(credentials, project_id): ), ], ) -def test_round_trip( +def test_series_round_trip( method_under_test, random_dataset_id, bigquery_client, input_series ): table_id = f"{random_dataset_id}.round_trip_{random.randrange(1_000_000)}" diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 3b603412..0a5ecad2 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -124,6 +124,29 @@ def test_to_gbq_with_no_project_id_given_should_fail(monkeypatch): gbq.to_gbq(DataFrame([[1]]), "dataset.tablename") +@pytest.mark.parametrize( + ["api_method", "warning_message", "warning_type"], + [ + ("load_parquet", "chunksize is ignored", DeprecationWarning), + ("load_csv", "chunksize will be ignored", PendingDeprecationWarning), + ], +) +def test_to_gbq_with_chunksize_warns_deprecation( + api_method, warning_message, warning_type +): + with pytest.warns(warning_type, match=warning_message): + try: + gbq.to_gbq( + DataFrame([[1]]), + "dataset.tablename", + project_id="my-project", + api_method=api_method, + chunksize=100, + ) + except gbq.TableCreationError: + pass + + @pytest.mark.parametrize(["verbose"], [(True,), (False,)]) def test_to_gbq_with_verbose_new_pandas_warns_deprecation(monkeypatch, verbose): monkeypatch.setattr( From 80d2cbcc99c6ef9070e65ac2fbce345e4929dc4b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Oct 2021 16:40:52 -0500 Subject: [PATCH 03/19] adjust exceptions --- pandas_gbq/exceptions.py | 15 +++++++++++++++ pandas_gbq/gbq.py | 15 +++++---------- pandas_gbq/load.py | 13 ++++++++++--- setup.py | 1 + tests/system/test_gbq.py | 3 +++ 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index aec0ea1a..454e4c07 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -3,6 +3,14 @@ # license that can be found in the LICENSE file. +class GenericGBQException(ValueError): + """ + Raised when an unrecognized Google API Error occurs. + """ + + pass + + class AccessDenied(ValueError): """ Raised when invalid credentials are provided, or tokens have expired. @@ -11,6 +19,13 @@ class AccessDenied(ValueError): pass +class ConversionError(GenericGBQException): + """ + Raised when there is a problem converting the DataFrame to a format + required to upload it to BigQuery. + """ + + class InvalidPrivateKeyFormat(ValueError): """ Raised when provided private key has invalid format. diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index f63cc60a..b245d212 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -18,8 +18,11 @@ bigquery = None google_exceptions = None -from pandas_gbq.exceptions import AccessDenied -from pandas_gbq.exceptions import PerformanceWarning +from pandas_gbq.exceptions import ( + AccessDenied, + GenericGBQException, + PerformanceWarning, +) from pandas_gbq import features from pandas_gbq.features import FEATURES import pandas_gbq.schema @@ -69,14 +72,6 @@ class DatasetCreationError(ValueError): pass -class GenericGBQException(ValueError): - """ - Raised when an unrecognized Google API Error occurs. - """ - - pass - - class InvalidColumnOrder(ValueError): """ Raised when the provided column order for output diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index ddee6b8f..86415f69 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -8,8 +8,10 @@ from typing import Any, Dict, Optional import pandas +import pyarrow.lib from google.cloud import bigquery +from pandas_gbq import exceptions from pandas_gbq.features import FEATURES import pandas_gbq.schema @@ -69,9 +71,14 @@ def load_parquet( schema = pandas_gbq.schema.remove_policy_tags(schema) job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) - client.load_table_from_dataframe( - dataframe, destination_table_ref, job_config=job_config, location=location, - ).result() + try: + client.load_table_from_dataframe( + dataframe, destination_table_ref, job_config=job_config, location=location, + ).result() + except pyarrow.lib.ArrowInvalid as exc: + raise exceptions.ConversionError( + "Could not convert DataFrame to Parquet." + ) from exc def load_csv( diff --git a/setup.py b/setup.py index a7e23eec..5153e024 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ dependencies = [ "setuptools", "pandas>=0.23.2", + "pyarrow >=3.0.0, <7.0dev", "pydata-google-auth", "google-auth", "google-auth-oauthlib", diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index a168bd59..a8d6bd0d 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -1383,6 +1383,9 @@ def test_upload_data_with_different_df_and_user_schema(self, project_id): project_id, credentials=self.credentials, table_schema=test_schema, + # Loading string pandas series to FLOAT column not supported with + # Parquet. + api_method="load_csv", ) dataset, table = destination_table.split(".") assert verify_schema( From 1b29cedc00890ba4b9569edeb0b4b74b64d1bf99 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Oct 2021 16:51:41 -0500 Subject: [PATCH 04/19] fix unit tests --- tests/unit/test_load.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/tests/unit/test_load.py b/tests/unit/test_load.py index 2ddb4f50..ba86ac09 100644 --- a/tests/unit/test_load.py +++ b/tests/unit/test_load.py @@ -16,10 +16,10 @@ from pandas_gbq import load -def load_method(bqclient): - if FEATURES.bigquery_has_from_dataframe_with_csv: - return bqclient.load_table_from_dataframe - return bqclient.load_table_from_file +def load_method(bqclient, api_method): + if not FEATURES.bigquery_has_from_dataframe_with_csv and api_method == "load_csv": + return bqclient.load_table_from_file + return bqclient.load_table_from_dataframe def test_encode_chunk_with_unicode(): @@ -91,9 +91,12 @@ def test_encode_chunks_with_chunksize_none(): assert len(chunk.index) == 6 -@pytest.mark.parametrize(["bigquery_has_from_dataframe_with_csv"], [(True,), (False,)]) +@pytest.mark.parametrize( + ["bigquery_has_from_dataframe_with_csv", "api_method"], + [(True, "load_parquet"), (True, "load_csv"), (False, "load_csv")], +) def test_load_chunks_omits_policy_tags( - monkeypatch, mock_bigquery_client, bigquery_has_from_dataframe_with_csv + monkeypatch, mock_bigquery_client, bigquery_has_from_dataframe_with_csv, api_method ): """Ensure that policyTags are omitted. @@ -117,11 +120,20 @@ def test_load_chunks_omits_policy_tags( ] } - _ = list(load.load_chunks(mock_bigquery_client, df, destination, schema=schema)) + _ = list( + load.load_chunks( + mock_bigquery_client, df, destination, schema=schema, api_method=api_method + ) + ) - mock_load = load_method(mock_bigquery_client) + mock_load = load_method(mock_bigquery_client, api_method=api_method) assert mock_load.called _, kwargs = mock_load.call_args assert "job_config" in kwargs sent_field = kwargs["job_config"].schema[0].to_api_repr() assert "policyTags" not in sent_field + + +def test_load_chunks_with_invalid_api_method(): + with pytest.raises(ValueError, match="got unexpected api_method:"): + load.load_chunks(None, None, None, api_method="not_a_thing") From 0fea9fa25af44b726640f596cf14e36fad43a97c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Oct 2021 16:54:38 -0500 Subject: [PATCH 05/19] mention pyarrow as a dependency --- docs/install.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/install.rst b/docs/install.rst index 6810a44a..9887c799 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -29,7 +29,7 @@ Install from Source .. code-block:: shell - $ pip install git+https://github.com/pydata/pandas-gbq.git + $ pip install git+https://github.com/googleapis/python-bigquery-pandas.git Dependencies @@ -38,6 +38,7 @@ Dependencies This module requires following additional dependencies: - `pydata-google-auth `__: Helpers for authentication to Google's API +- `pyarrow `__: Format for getting data to/from Google BigQuery - `google-auth `__: authentication and authorization for Google's API - `google-auth-oauthlib `__: integration with `oauthlib `__ for end-user authentication - `google-cloud-bigquery `__: Google Cloud client library for BigQuery From a2ea621e147d0c17cc761b231dc747d6622f091e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Oct 2021 17:01:14 -0500 Subject: [PATCH 06/19] add pyarrow to conda deps --- ci/requirements-3.7-0.23.2.conda | 1 + ci/requirements-3.9-NIGHTLY.conda | 1 + 2 files changed, 2 insertions(+) diff --git a/ci/requirements-3.7-0.23.2.conda b/ci/requirements-3.7-0.23.2.conda index af4768ab..586f56ba 100644 --- a/ci/requirements-3.7-0.23.2.conda +++ b/ci/requirements-3.7-0.23.2.conda @@ -4,6 +4,7 @@ fastavro flake8 numpy==1.14.5 google-cloud-bigquery==1.11.1 +pyarrow==3.0.0 pydata-google-auth pytest pytest-cov diff --git a/ci/requirements-3.9-NIGHTLY.conda b/ci/requirements-3.9-NIGHTLY.conda index 9dfe3f6b..ccaa87e5 100644 --- a/ci/requirements-3.9-NIGHTLY.conda +++ b/ci/requirements-3.9-NIGHTLY.conda @@ -1,6 +1,7 @@ pydata-google-auth google-cloud-bigquery google-cloud-bigquery-storage +pyarrow pytest pytest-cov codecov From 4aece99f7d33592889743ab9e1b7ed52fb6835d4 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 09:28:38 -0500 Subject: [PATCH 07/19] update minimum numpy --- ci/requirements-3.7-0.23.2.conda | 2 +- setup.py | 1 + testing/constraints-3.7.txt | 5 +++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ci/requirements-3.7-0.23.2.conda b/ci/requirements-3.7-0.23.2.conda index 586f56ba..1da6d226 100644 --- a/ci/requirements-3.7-0.23.2.conda +++ b/ci/requirements-3.7-0.23.2.conda @@ -2,7 +2,7 @@ codecov coverage fastavro flake8 -numpy==1.14.5 +numpy==1.16.6 google-cloud-bigquery==1.11.1 pyarrow==3.0.0 pydata-google-auth diff --git a/setup.py b/setup.py index 5153e024..b1169cef 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ release_status = "Development Status :: 4 - Beta" dependencies = [ "setuptools", + "numpy>=1.16.6", "pandas>=0.23.2", "pyarrow >=3.0.0, <7.0dev", "pydata-google-auth", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 251c81b4..7c67d275 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,11 +5,12 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -numpy==1.14.5 -pandas==0.23.2 google-auth==1.4.1 google-auth-oauthlib==0.0.1 google-cloud-bigquery==1.11.1 google-cloud-bigquery-storage==1.1.0 +numpy==1.16.6 +pandas==0.23.2 +pyarrow==3.0.0 pydata-google-auth==0.1.2 tqdm==4.23.0 From ea3b8ccceda42023506e4bba1eb1d833b41ba7a1 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 10:13:22 -0500 Subject: [PATCH 08/19] test with Python 3.7 --- noxfile.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 2542369d..0a221069 100644 --- a/noxfile.py +++ b/noxfile.py @@ -28,7 +28,10 @@ BLACK_PATHS = ["docs", "pandas_gbq", "tests", "noxfile.py", "setup.py"] DEFAULT_PYTHON_VERSION = "3.8" -SYSTEM_TEST_PYTHON_VERSIONS = ["3.8", "3.9"] + +# Test with all versions, since unit test coverage is not good enough yet to +# catch many issues. +SYSTEM_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() From 9a798d4d45a1320dd07804cc660cff128d39bc4e Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 28 Oct 2021 15:15:53 +0000 Subject: [PATCH 09/19] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- noxfile.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/noxfile.py b/noxfile.py index 0a221069..2542369d 100644 --- a/noxfile.py +++ b/noxfile.py @@ -28,10 +28,7 @@ BLACK_PATHS = ["docs", "pandas_gbq", "tests", "noxfile.py", "setup.py"] DEFAULT_PYTHON_VERSION = "3.8" - -# Test with all versions, since unit test coverage is not good enough yet to -# catch many issues. -SYSTEM_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] +SYSTEM_TEST_PYTHON_VERSIONS = ["3.8", "3.9"] UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() From 96cc97fd64b9ae7e609c73c0dd7902cf58bc4a57 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 10:23:39 -0500 Subject: [PATCH 10/19] bump minimum pyarrow --- ci/requirements-3.7-0.23.2.conda | 2 +- setup.py | 2 +- testing/constraints-3.7.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/requirements-3.7-0.23.2.conda b/ci/requirements-3.7-0.23.2.conda index 1da6d226..f3afa305 100644 --- a/ci/requirements-3.7-0.23.2.conda +++ b/ci/requirements-3.7-0.23.2.conda @@ -4,7 +4,7 @@ fastavro flake8 numpy==1.16.6 google-cloud-bigquery==1.11.1 -pyarrow==3.0.0 +pyarrow==4.0.1 pydata-google-auth pytest pytest-cov diff --git a/setup.py b/setup.py index b1169cef..f12a92de 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ "setuptools", "numpy>=1.16.6", "pandas>=0.23.2", - "pyarrow >=3.0.0, <7.0dev", + "pyarrow >=4.0.1, <7.0dev", "pydata-google-auth", "google-auth", "google-auth-oauthlib", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 7c67d275..6a39f632 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -11,6 +11,6 @@ google-cloud-bigquery==1.11.1 google-cloud-bigquery-storage==1.1.0 numpy==1.16.6 pandas==0.23.2 -pyarrow==3.0.0 +pyarrow==4.0.1 pydata-google-auth==0.1.2 tqdm==4.23.0 From f790cfe50739ad4864309339b5dfab7cc96a05ec Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 10:42:38 -0500 Subject: [PATCH 11/19] try pyarrow 5.0.0 --- ci/requirements-3.7-0.23.2.conda | 2 +- setup.py | 2 +- testing/constraints-3.7.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/requirements-3.7-0.23.2.conda b/ci/requirements-3.7-0.23.2.conda index f3afa305..a4f01d9b 100644 --- a/ci/requirements-3.7-0.23.2.conda +++ b/ci/requirements-3.7-0.23.2.conda @@ -4,7 +4,7 @@ fastavro flake8 numpy==1.16.6 google-cloud-bigquery==1.11.1 -pyarrow==4.0.1 +pyarrow==5.0.0 pydata-google-auth pytest pytest-cov diff --git a/setup.py b/setup.py index f12a92de..8458ae83 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ "setuptools", "numpy>=1.16.6", "pandas>=0.23.2", - "pyarrow >=4.0.1, <7.0dev", + "pyarrow >=5.0.0, <7.0dev", "pydata-google-auth", "google-auth", "google-auth-oauthlib", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 6a39f632..61a8ff36 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -11,6 +11,6 @@ google-cloud-bigquery==1.11.1 google-cloud-bigquery-storage==1.1.0 numpy==1.16.6 pandas==0.23.2 -pyarrow==4.0.1 +pyarrow==5.0.0 pydata-google-auth==0.1.2 tqdm==4.23.0 From 845ff322a5d7900826c97a4da652aead5518ca73 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 10:46:06 -0500 Subject: [PATCH 12/19] add python 3.7 to pip tests --- noxfile.py | 2 +- owlbot.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index 2542369d..ed88b094 100644 --- a/noxfile.py +++ b/noxfile.py @@ -28,7 +28,7 @@ BLACK_PATHS = ["docs", "pandas_gbq", "tests", "noxfile.py", "setup.py"] DEFAULT_PYTHON_VERSION = "3.8" -SYSTEM_TEST_PYTHON_VERSIONS = ["3.8", "3.9"] +SYSTEM_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() diff --git a/owlbot.py b/owlbot.py index d382cf66..76a17e40 100644 --- a/owlbot.py +++ b/owlbot.py @@ -31,7 +31,7 @@ extras = ["tqdm"] templated_files = common.py_library( unit_test_python_versions=["3.7", "3.8", "3.9"], - system_test_python_versions=["3.8", "3.9"], + system_test_python_versions=["3.7", "3.8", "3.9"], cov_level=86, unit_test_extras=extras, system_test_extras=extras, From 9150471cc68836cf868f9a39d865359b7488871e Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 28 Oct 2021 15:47:51 +0000 Subject: [PATCH 13/19] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- CONTRIBUTING.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 3c7bb621..bc37b498 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -148,7 +148,7 @@ Running System Tests .. note:: - System tests are only configured to run under Python 3.8 and 3.9. + System tests are only configured to run under Python 3.7, 3.8 and 3.9. For expediency, we do not run them in older versions of Python 3. This alone will not run the tests. You'll need to change some local From 88d867666f810d22823e590db8f08d68e02c4df8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 14:03:08 -0500 Subject: [PATCH 14/19] revert change to pyarrow minimum --- ci/requirements-3.7-0.23.2.conda | 2 +- setup.py | 2 +- testing/constraints-3.7.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/requirements-3.7-0.23.2.conda b/ci/requirements-3.7-0.23.2.conda index a4f01d9b..1da6d226 100644 --- a/ci/requirements-3.7-0.23.2.conda +++ b/ci/requirements-3.7-0.23.2.conda @@ -4,7 +4,7 @@ fastavro flake8 numpy==1.16.6 google-cloud-bigquery==1.11.1 -pyarrow==5.0.0 +pyarrow==3.0.0 pydata-google-auth pytest pytest-cov diff --git a/setup.py b/setup.py index 8458ae83..b1169cef 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ "setuptools", "numpy>=1.16.6", "pandas>=0.23.2", - "pyarrow >=5.0.0, <7.0dev", + "pyarrow >=3.0.0, <7.0dev", "pydata-google-auth", "google-auth", "google-auth-oauthlib", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 61a8ff36..7c67d275 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -11,6 +11,6 @@ google-cloud-bigquery==1.11.1 google-cloud-bigquery-storage==1.1.0 numpy==1.16.6 pandas==0.23.2 -pyarrow==5.0.0 +pyarrow==3.0.0 pydata-google-auth==0.1.2 tqdm==4.23.0 From 0b158d39569639c20b74b5f1b8c5671104736a1c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 28 Oct 2021 17:02:33 -0500 Subject: [PATCH 15/19] avoid parquet for older pandas --- pandas_gbq/features.py | 10 ++++++++++ pandas_gbq/gbq.py | 13 +++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pandas_gbq/features.py b/pandas_gbq/features.py index ef1969fd..fc8ef568 100644 --- a/pandas_gbq/features.py +++ b/pandas_gbq/features.py @@ -10,6 +10,7 @@ BIGQUERY_BQSTORAGE_VERSION = "1.24.0" BIGQUERY_FROM_DATAFRAME_CSV_VERSION = "2.6.0" PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0" +PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0" class Features: @@ -89,5 +90,14 @@ def pandas_has_deprecated_verbose(self): ) return self.pandas_installed_version >= pandas_verbosity_deprecation + @property + def pandas_has_parquet_with_lossless_timestamp(self): + import pkg_resources + + desired_version = pkg_resources.parse_version( + PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION + ) + return self.pandas_installed_version >= desired_version + FEATURES = Features() diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index b245d212..5c6ae457 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -873,7 +873,7 @@ def to_gbq( location=None, progress_bar=True, credentials=None, - api_method: str = "load_parquet", + api_method: str = "default", verbose=None, private_key=None, ): @@ -964,7 +964,8 @@ def to_gbq( .. versionadded:: 0.8.0 api_method : str, optional API method used to upload DataFrame to BigQuery. One of "load_parquet", - "load_csv". Default "load_parquet". + "load_csv". Default "load_parquet" if pandas is version 1.1.0+, + otherwise "load_csv". .. versionadded:: 0.16.0 verbose : bool, deprecated @@ -991,6 +992,14 @@ def to_gbq( stacklevel=1, ) + if api_method == "default": + # Avoid using parquet if pandas doesn't support lossless conversions to + # parquet timestamp. See: https://stackoverflow.com/a/69758676/101923 + if FEATURES.pandas_has_parquet_with_lossless_timestamp: + api_method = "load_parquet" + else: + api_method = "load_csv" + if chunksize is not None: if api_method == "load_parquet": warnings.warn( From 93f851cd626640625648771760e5bb0c2ac424ba Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 1 Nov 2021 13:45:45 -0500 Subject: [PATCH 16/19] fix exception messages --- pandas_gbq/exceptions.py | 6 ------ pandas_gbq/load.py | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index 454e4c07..1b4f6925 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -8,16 +8,12 @@ class GenericGBQException(ValueError): Raised when an unrecognized Google API Error occurs. """ - pass - class AccessDenied(ValueError): """ Raised when invalid credentials are provided, or tokens have expired. """ - pass - class ConversionError(GenericGBQException): """ @@ -31,8 +27,6 @@ class InvalidPrivateKeyFormat(ValueError): Raised when provided private key has invalid format. """ - pass - class PerformanceWarning(RuntimeWarning): """ diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index 86415f69..c7731c6f 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -166,5 +166,5 @@ def load_chunks( ) else: raise ValueError( - f"got unexpected api_method: {api_method!r}, expected one of 'load_parquet', 'load_csv'" + f"Got unexpected api_method: {api_method!r}, expected one of 'load_parquet', 'load_csv'." ) From 0d5d5c51cc326f758827165cdaa1a575536a05b8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 1 Nov 2021 14:35:17 -0500 Subject: [PATCH 17/19] refactor load_csv --- pandas_gbq/load.py | 44 ++++++++++++++++++++++++++++------------- tests/unit/test_load.py | 2 +- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index c7731c6f..6bbf7bb7 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -5,7 +5,7 @@ """Helper methods for loading data into BigQuery""" import io -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, List, Optional import pandas import pyarrow.lib @@ -82,30 +82,47 @@ def load_parquet( def load_csv( - client: bigquery.Client, dataframe: pandas.DataFrame, - destination_table_ref: bigquery.TableReference, - location: Optional[str], chunksize: Optional[int], - schema: Optional[Dict[str, Any]], + bq_schema: Optional[List[bigquery.SchemaField]], + load_chunk: Callable, ): job_config = bigquery.LoadJobConfig() job_config.write_disposition = "WRITE_APPEND" job_config.source_format = "CSV" job_config.allow_quoted_newlines = True - if schema is not None: - schema = pandas_gbq.schema.remove_policy_tags(schema) - job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + if bq_schema is not None: + job_config.schema = bq_schema + # TODO: Remove chunking feature for load jobs. Deprecated in 0.16.0. chunks = split_dataframe(dataframe, chunksize=chunksize) for remaining_rows, chunk in chunks: yield remaining_rows + load_chunk(chunk, job_config) + +def load_csv_from_dataframe( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + chunksize: Optional[int], + schema: Optional[Dict[str, Any]], +): + bq_schema = None + + if schema is not None: + schema = pandas_gbq.schema.remove_policy_tags(schema) + bq_schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + + def load_chunk(chunk, job_config): client.load_table_from_dataframe( chunk, destination_table_ref, job_config=job_config, location=location, ).result() + return load_csv(dataframe, chunksize, bq_schema, load_chunk,) + def load_csv_from_file( client: bigquery.Client, @@ -124,12 +141,9 @@ def load_csv_from_file( schema = pandas_gbq.schema.generate_bq_schema(dataframe) schema = pandas_gbq.schema.remove_policy_tags(schema) - job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) - - chunks = split_dataframe(dataframe, chunksize=chunksize) - for remaining_rows, chunk in chunks: - yield remaining_rows + bq_schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + def load_chunk(chunk, job_config): try: chunk_buffer = encode_chunk(chunk) client.load_table_from_file( @@ -141,6 +155,8 @@ def load_csv_from_file( finally: chunk_buffer.close() + return load_csv(dataframe, chunksize, bq_schema, load_chunk,) + def load_chunks( client, @@ -157,7 +173,7 @@ def load_chunks( return [0] elif api_method == "load_csv": if FEATURES.bigquery_has_from_dataframe_with_csv: - return load_csv( + return load_csv_from_dataframe( client, dataframe, destination_table_ref, location, chunksize, schema ) else: diff --git a/tests/unit/test_load.py b/tests/unit/test_load.py index ba86ac09..a32d2d9e 100644 --- a/tests/unit/test_load.py +++ b/tests/unit/test_load.py @@ -135,5 +135,5 @@ def test_load_chunks_omits_policy_tags( def test_load_chunks_with_invalid_api_method(): - with pytest.raises(ValueError, match="got unexpected api_method:"): + with pytest.raises(ValueError, match="Got unexpected api_method:"): load.load_chunks(None, None, None, api_method="not_a_thing") From a26e73b2098d30aa30d708a3cbd50fb7f5af2df8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 1 Nov 2021 14:39:33 -0500 Subject: [PATCH 18/19] remove unused job_config --- pandas_gbq/load.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index 6bbf7bb7..55af36da 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -132,11 +132,6 @@ def load_csv_from_file( chunksize: Optional[int], schema: Optional[Dict[str, Any]], ): - job_config = bigquery.LoadJobConfig() - job_config.write_disposition = "WRITE_APPEND" - job_config.source_format = "CSV" - job_config.allow_quoted_newlines = True - if schema is None: schema = pandas_gbq.schema.generate_bq_schema(dataframe) From 171230f92ca1aec161195ebac4cc4636c6ff2ef8 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 1 Nov 2021 14:44:12 -0500 Subject: [PATCH 19/19] trailing comma --- pandas_gbq/load.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index 55af36da..69210e41 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -121,7 +121,7 @@ def load_chunk(chunk, job_config): chunk, destination_table_ref, job_config=job_config, location=location, ).result() - return load_csv(dataframe, chunksize, bq_schema, load_chunk,) + return load_csv(dataframe, chunksize, bq_schema, load_chunk) def load_csv_from_file(