Skip to content

feat: to_gbq uses Parquet by default, use api_method="load_csv" for old behavior #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ def _download_results(
df = rows_iter.to_dataframe(
dtypes=conversion_dtypes,
progress_bar_type=progress_bar_type,
**to_dataframe_kwargs
**to_dataframe_kwargs,
)
except self.http_error as ex:
self.process_http_error(ex)
Expand All @@ -541,6 +541,7 @@ def load_data(
chunksize=None,
schema=None,
progress_bar=True,
api_method: str = "load_parquet",
):
from pandas_gbq import load

Expand All @@ -554,6 +555,7 @@ def load_data(
chunksize=chunksize,
schema=schema,
location=self.location,
api_method=api_method,
)
if progress_bar and tqdm:
chunks = tqdm.tqdm(chunks)
Expand Down Expand Up @@ -876,6 +878,7 @@ def to_gbq(
location=None,
progress_bar=True,
credentials=None,
api_method: str = "load_parquet",
verbose=None,
private_key=None,
):
Expand Down Expand Up @@ -964,6 +967,11 @@ def to_gbq(
:class:`google.oauth2.service_account.Credentials` directly.

.. versionadded:: 0.8.0
api_method : str, optional
API method used to upload DataFrame to BigQuery. One of "load_parquet",
"load_csv". Default "load_parquet".

.. versionadded:: 0.16.0
verbose : bool, deprecated
Deprecated in Pandas-GBQ 0.4.0. Use the `logging module
to adjust verbosity instead
Expand All @@ -988,6 +996,20 @@ def to_gbq(
stacklevel=1,
)

if chunksize is not None:
if api_method == "load_parquet":
warnings.warn(
"chunksize is ignored when using api_method='load_parquet'",
DeprecationWarning,
stacklevel=2,
)
elif api_method == "load_csv":
warnings.warn(
"chunksize will be ignored when using api_method='load_csv' in a future version of pandas-gbq",
PendingDeprecationWarning,
stacklevel=2,
)

if if_exists not in ("fail", "replace", "append"):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

Expand Down Expand Up @@ -1071,6 +1093,7 @@ def to_gbq(
chunksize=chunksize,
schema=table_schema,
progress_bar=progress_bar,
api_method=api_method,
)


Expand Down
117 changes: 92 additions & 25 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"""Helper methods for loading data into BigQuery"""

import io
from typing import Any, Dict, Optional

import pandas
from google.cloud import bigquery

from pandas_gbq.features import FEATURES
Expand Down Expand Up @@ -52,45 +54,110 @@ def split_dataframe(dataframe, chunksize=None):
yield remaining_rows, chunk


def load_chunks(
client,
dataframe,
destination_table_ref,
chunksize=None,
schema=None,
location=None,
def load_parquet(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
location: Optional[str],
schema: Optional[Dict[str, Any]],
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.source_format = "PARQUET"

if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

client.load_table_from_dataframe(
dataframe, destination_table_ref, job_config=job_config, location=location,
).result()


def load_csv(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

# Explicit schema? Use that!
if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
# If not, let BigQuery determine schema unless we are encoding the CSV files ourselves.
elif not FEATURES.bigquery_has_from_dataframe_with_csv:

chunks = split_dataframe(dataframe, chunksize=chunksize)
for remaining_rows, chunk in chunks:
yield remaining_rows

client.load_table_from_dataframe(
chunk, destination_table_ref, job_config=job_config, location=location,
).result()


def load_csv_from_file(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

if schema is None:
schema = pandas_gbq.schema.generate_bq_schema(dataframe)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may introduce a failure if the schema is None and the generate_bq_schema is left unused.

The parquet conversion may be successful, but the actual BQ table schema type may not match the resultant conversion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that our tests wouldn't have caught that. Do you have an example of a dataframe that demonstrates this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, the reason we don't have this here is that the google-cloud-bigquery library does similar dataframe to BQ schema conversion logic if the schema is not populated on the job config: https://github.com/googleapis/python-bigquery/blob/66b3dd9f9aec3fda9610a3ceec8d8a477f2ab3b9/google/cloud/bigquery/client.py#L2625

schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

chunks = split_dataframe(dataframe, chunksize=chunksize)
for remaining_rows, chunk in chunks:
yield remaining_rows

if FEATURES.bigquery_has_from_dataframe_with_csv:
client.load_table_from_dataframe(
chunk, destination_table_ref, job_config=job_config, location=location,
try:
chunk_buffer = encode_chunk(chunk)
client.load_table_from_file(
chunk_buffer,
destination_table_ref,
job_config=job_config,
location=location,
).result()
finally:
chunk_buffer.close()


def load_chunks(
client,
dataframe,
destination_table_ref,
chunksize=None,
schema=None,
location=None,
api_method="load_parquet",
):
if api_method == "load_parquet":
load_parquet(client, dataframe, destination_table_ref, location, schema)
# TODO: yield progress depending on result() with timeout
return [0]
elif api_method == "load_csv":
if FEATURES.bigquery_has_from_dataframe_with_csv:
return load_csv(
client, dataframe, destination_table_ref, location, chunksize, schema
)
else:
try:
chunk_buffer = encode_chunk(chunk)
client.load_table_from_file(
chunk_buffer,
destination_table_ref,
job_config=job_config,
location=location,
).result()
finally:
chunk_buffer.close()
return load_csv_from_file(
client, dataframe, destination_table_ref, location, chunksize, schema
)
else:
raise ValueError(
f"got unexpected api_method: {api_method!r}, expected one of 'load_parquet', 'load_csv'"
)
71 changes: 0 additions & 71 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,77 +1185,6 @@ def test_google_upload_errors_should_raise_exception(self, project_id):
credentials=self.credentials,
)

def test_upload_chinese_unicode_data(self, project_id):
test_id = "2"
test_size = 6
df = DataFrame(np.random.randn(6, 4), index=range(6), columns=list("ABCD"))
df["s"] = u"信用卡"

gbq.to_gbq(
df,
self.destination_table + test_id,
project_id,
credentials=self.credentials,
chunksize=10000,
)

result_df = gbq.read_gbq(
"SELECT * FROM {0}".format(self.destination_table + test_id),
project_id=project_id,
credentials=self.credentials,
dialect="legacy",
)

assert len(result_df) == test_size

if sys.version_info.major < 3:
pytest.skip(msg="Unicode comparison in Py2 not working")

result = result_df["s"].sort_values()
expected = df["s"].sort_values()

tm.assert_numpy_array_equal(expected.values, result.values)

def test_upload_other_unicode_data(self, project_id):
test_id = "3"
test_size = 3
df = DataFrame(
{
"s": ["Skywalker™", "lego", "hülle"],
"i": [200, 300, 400],
"d": [
"2017-12-13 17:40:39",
"2017-12-13 17:40:39",
"2017-12-13 17:40:39",
],
}
)

gbq.to_gbq(
df,
self.destination_table + test_id,
project_id=project_id,
credentials=self.credentials,
chunksize=10000,
)

result_df = gbq.read_gbq(
"SELECT * FROM {0}".format(self.destination_table + test_id),
project_id=project_id,
credentials=self.credentials,
dialect="legacy",
)

assert len(result_df) == test_size

if sys.version_info.major < 3:
pytest.skip(msg="Unicode comparison in Py2 not working")

result = result_df["s"].sort_values()
expected = df["s"].sort_values()

tm.assert_numpy_array_equal(expected.values, result.values)

def test_upload_mixed_float_and_int(self, project_id):
"""Test that we can upload a dataframe containing an int64 and float64 column.
See: https://github.com/pydata/pandas-gbq/issues/116
Expand Down
77 changes: 54 additions & 23 deletions tests/system/test_to_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# license that can be found in the LICENSE file.

import functools
import random

import pandas
import pandas.testing

import pytest


Expand All @@ -21,31 +22,61 @@ def method_under_test(credentials, project_id):
)


def test_float_round_trip(method_under_test, random_dataset_id, bigquery_client):
"""Ensure that 64-bit floating point numbers are unchanged.

See: https://github.com/pydata/pandas-gbq/issues/326
"""

table_id = "{}.float_round_trip".format(random_dataset_id)
input_floats = pandas.Series(
[
0.14285714285714285,
0.4406779661016949,
1.05148,
1.05153,
1.8571428571428572,
2.718281828459045,
3.141592653589793,
2.0988936657440586e43,
],
name="float_col",
@pytest.mark.parametrize(
["input_series"],
[
# Ensure that 64-bit floating point numbers are unchanged.
# See: https://github.com/pydata/pandas-gbq/issues/326
(
pandas.Series(
[
0.14285714285714285,
0.4406779661016949,
1.05148,
1.05153,
1.8571428571428572,
2.718281828459045,
3.141592653589793,
2.0988936657440586e43,
],
name="test_col",
),
),
(
pandas.Series(
[
"abc",
"defg",
# Ensure that empty strings are written as empty string,
# not NULL. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/366
"",
None,
# Ensure that unicode characters are encoded. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/106
"信用卡",
"Skywalker™",
"hülle",
],
name="test_col",
),
),
],
)
def test_series_round_trip(
method_under_test, random_dataset_id, bigquery_client, input_series
):
table_id = f"{random_dataset_id}.round_trip_{random.randrange(1_000_000)}"
input_series = input_series.sort_values().reset_index(drop=True)
df = pandas.DataFrame(
# Some errors only occur in multi-column dataframes. See:
# https://github.com/googleapis/python-bigquery-pandas/issues/366
{"test_col": input_series, "test_col2": input_series}
)
df = pandas.DataFrame({"float_col": input_floats})
method_under_test(df, table_id)

round_trip = bigquery_client.list_rows(table_id).to_dataframe()
round_trip_floats = round_trip["float_col"].sort_values()
round_trip_series = round_trip["test_col"].sort_values().reset_index(drop=True)
pandas.testing.assert_series_equal(
round_trip_floats, input_floats, check_exact=True,
round_trip_series, input_series, check_exact=True,
)
Loading