Skip to content

Do not manually loop over all rows when encoding a dataframe as JSON #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tswast opened this issue Dec 8, 2017 · 7 comments
Closed
Labels
type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@tswast
Copy link
Collaborator

tswast commented Dec 8, 2017

See: #25 (comment)

Currently the code loops over each row to encode it as JSON. This could be sped up by calling to_json() on the whole dataframe instead.

@max-sixty
Copy link
Contributor

FYI this is our internal function. It has some idiosyncracies (e.g. how it handles indexes, that it uses the filesystem), but works fairly well.

def write_gbq(df, dataset_id, table_name, project=None,
              credentials=None, block=False, if_exists='fail', **kwargs):
    """Write a DataFrame to a Google BigQuery table.
    Parameters
    ----------
    df : DataFrame
        DataFrame to be written
    dataset_id : str
        Dataset ID to contain the table
    table_name : str
        Name of table to be written
    project : str (default to env var GOOGLE_CLOUD_PROJECT)
        Google BigQuery Account project ID.
    credentials : GoogleCredentials
    block : boolean (optional)
        Return after completed writing into BigQuery
    if_exists : {'fail', 'replace', 'append'}, default 'fail'
        'fail': If table exists, raise.
        'replace': If table exists, drop it, recreate it, and insert data.
        'append': If table exists, insert data. Create if does not exist.
    """
    client = get_client(credentials=credentials, project=project)
    dataset = client.dataset(dataset_id)
    table = dataset.table(table_name)

    # reset index if an index exists
    df = df.reset_index()
    if 'index' in df.columns:
        df = df.drop('index', axis=1)

    file_path = _write_temp_file(df)
    file_size_mb = os.stat(file_path).st_size / 1024 ** 2
    logger.info("Writing file to BQ: {} mb".format(file_size_mb))

    config = bigquery.job.LoadJobConfig()
    config.write_disposition = if_exists_map[if_exists]
    config.source_format = 'CSV'
    config.schema = _bq_schema(df)

    with open(file_path, 'rb') as source_file:
        job = client.load_table_from_file(
            file_obj=source_file,
            destination=table,
            job_config=config)

    if block:
        wait_for_job(job)

    return job


def _write_temp_file(df, filename='df.csv'):
    path = tempfile.mkdtemp()
    file_path = os.path.join(path, filename)
    df.to_csv(file_path, index=False, header=False,
              encoding='utf-8', date_format='%Y-%m-%d %H:%M')
    return file_path

@tswast
Copy link
Collaborator Author

tswast commented Feb 10, 2018

Calling to_json() on the whole dataframe gives an actual JSON object, not newline-delimited JSON. This will become much easier if/when BigQuery supports a column-oriented data format such as Parquet.

@tswast
Copy link
Collaborator Author

tswast commented Feb 10, 2018

Cool, thanks @maxim-lian

@tswast
Copy link
Collaborator Author

tswast commented Feb 10, 2018

Ah, yeah CSV might be the way to go for now.

@max-sixty
Copy link
Contributor

CSV also smaller! And no nesting possible so no loss there

But if json is preferred, I think you can use orient=records

@tswast
Copy link
Collaborator Author

tswast commented Feb 10, 2018

Looks like something like this should work to avoid having to write to a temporary file.

def encode_chunk(df):
    """Return a file-like object of CSV-encoded rows.

    Args:
      df (pandas.DataFrame): A chunk of a dataframe to encode
    """
    csv_buffer = six.StringIO()
    df.to_csv(
        csv_buffer, index=False, header=False, encoding='utf-8',
        date_format='%Y-%m-%d %H:%M')

    # Convert to a BytesIO buffer so that unicode text is properly handled.
    # See: https://github.com/pydata/pandas-gbq/issues/106
    body = csv_buffer.getvalue()
    if isinstance(body, bytes):
        body = body.decode('utf-8')
    body = body.encode('utf-8')
    return six.BytesIO(body)

tswast added a commit to tswast/python-bigquery-pandas that referenced this issue Feb 10, 2018
tswast added a commit that referenced this issue Feb 12, 2018
…#117)

* BUG: Fix uploading of dataframes containing int64 and float64 columns

Fixes #116 and #96 by loading data in CSV chunks.

* ENH: allow chunksize=None to disable chunking in to_gbq()

Also, fixes lint errors.

* TST: update min g-c-bq lib to 0.29.0 in CI

* BUG: pass schema to load job for to_gbq

* Generate schema if needed for table creation.

* Restore _generate_bq_schema, as it is used in tests.

* Add fixes to changelog.
@tswast
Copy link
Collaborator Author

tswast commented Feb 12, 2018

Closed by #117

@tswast tswast closed this as completed Feb 12, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

2 participants