Skip to content

RUST-1875 Define separate types for summary and verbose bulk write results #1103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 59 additions & 21 deletions src/action/bulk_write.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,62 @@
#![allow(missing_docs)]

use std::collections::HashMap;
use std::{collections::HashMap, marker::PhantomData};

use crate::{
bson::{Bson, Document},
error::{BulkWriteError, Error, ErrorKind, Result},
operation::bulk_write::BulkWrite as BulkWriteOperation,
options::{BulkWriteOptions, WriteConcern, WriteModel},
results::BulkWriteResult,
results::{BulkWriteResult, SummaryBulkWriteResult, VerboseBulkWriteResult},
Client,
ClientSession,
};

use super::{action_impl, option_setters};

impl Client {
pub fn bulk_write(&self, models: impl IntoIterator<Item = WriteModel>) -> BulkWrite {
pub fn bulk_write(
&self,
models: impl IntoIterator<Item = WriteModel>,
) -> BulkWrite<SummaryBulkWriteResult> {
BulkWrite::new(self, models.into_iter().collect())
}
}

#[must_use]
pub struct BulkWrite<'a> {
pub struct BulkWrite<'a, R> {
client: &'a Client,
models: Vec<WriteModel>,
options: Option<BulkWriteOptions>,
session: Option<&'a mut ClientSession>,
_phantom: PhantomData<R>,
}

impl<'a> BulkWrite<'a> {
impl<'a> BulkWrite<'a, SummaryBulkWriteResult> {
pub fn verbose_results(self) -> BulkWrite<'a, VerboseBulkWriteResult> {
BulkWrite {
client: self.client,
models: self.models,
options: self.options,
session: self.session,
_phantom: PhantomData,
}
}
}

impl<'a, R> BulkWrite<'a, R>
where
R: BulkWriteResult,
{
option_setters!(options: BulkWriteOptions;
ordered: bool,
bypass_document_validation: bool,
comment: Bson,
let_vars: Document,
verbose_results: bool,
write_concern: WriteConcern,
);

pub fn session(mut self, session: &'a mut ClientSession) -> BulkWrite<'a> {
pub fn session(mut self, session: &'a mut ClientSession) -> Self {
self.session = Some(session);
self
}
Expand All @@ -49,6 +67,7 @@ impl<'a> BulkWrite<'a> {
models,
options: None,
session: None,
_phantom: PhantomData,
}
}

Expand All @@ -58,13 +77,8 @@ impl<'a> BulkWrite<'a> {
.and_then(|options| options.ordered)
.unwrap_or(true)
}
}

#[action_impl]
impl<'a> Action for BulkWrite<'a> {
type Future = BulkWriteFuture;

async fn execute(mut self) -> Result<BulkWriteResult> {
async fn execute_inner(mut self) -> Result<R> {
#[cfg(feature = "in-use-encryption-unstable")]
if self.client.should_auto_encrypt().await {
use mongocrypt::error::{Error as EncryptionError, ErrorKind as EncryptionErrorKind};
Expand Down Expand Up @@ -100,7 +114,7 @@ impl<'a> Action for BulkWrite<'a> {
.await;
let result = self
.client
.execute_operation::<BulkWriteOperation>(
.execute_operation::<BulkWriteOperation<R>>(
&mut operation,
self.session.as_deref_mut(),
)
Expand Down Expand Up @@ -128,18 +142,42 @@ impl<'a> Action for BulkWrite<'a> {
}
}

#[action_impl]
impl<'a> Action for BulkWrite<'a, SummaryBulkWriteResult> {
type Future = SummaryBulkWriteFuture;

async fn execute(mut self) -> Result<SummaryBulkWriteResult> {
self.execute_inner().await
}
}

#[action_impl]
impl<'a> Action for BulkWrite<'a, VerboseBulkWriteResult> {
type Future = VerboseBulkWriteFuture;

async fn execute(mut self) -> Result<VerboseBulkWriteResult> {
self.execute_inner().await
}
}

/// Represents the execution status of a bulk write. The status starts at `None`, indicating that no
/// writes have been attempted yet, and transitions to either `Success` or `Error` as batches are
/// executed. The contents of `Error` can be inspected to determine whether a bulk write can
/// continue with further batches or should be terminated.
enum ExecutionStatus {
Success(BulkWriteResult),
enum ExecutionStatus<R>
where
R: BulkWriteResult,
{
Success(R),
Error(Error),
None,
}

impl ExecutionStatus {
fn with_success(mut self, result: BulkWriteResult) -> Self {
impl<R> ExecutionStatus<R>
where
R: BulkWriteResult,
{
fn with_success(mut self, result: R) -> Self {
match self {
// Merge two successful sets of results together.
Self::Success(ref mut current_result) => {
Expand All @@ -149,7 +187,7 @@ impl ExecutionStatus {
// Merge the results of the new batch into the existing bulk write error.
Self::Error(ref mut current_error) => {
let bulk_write_error = Self::get_current_bulk_write_error(current_error);
bulk_write_error.merge_partial_results(result);
bulk_write_error.merge_partial_results(result.into_partial_result());
self
}
Self::None => Self::Success(result),
Expand All @@ -163,14 +201,14 @@ impl ExecutionStatus {
// set its source as the error that just occurred.
Self::Success(current_result) => match *error.kind {
ErrorKind::BulkWrite(ref mut bulk_write_error) => {
bulk_write_error.merge_partial_results(current_result);
bulk_write_error.merge_partial_results(current_result.into_partial_result());
Self::Error(error)
}
_ => {
let bulk_write_error: Error = ErrorKind::BulkWrite(BulkWriteError {
write_errors: HashMap::new(),
write_concern_errors: Vec::new(),
partial_result: Some(current_result),
partial_result: Some(current_result.into_partial_result()),
})
.into();
Self::Error(bulk_write_error.with_source(error))
Expand Down
51 changes: 5 additions & 46 deletions src/client/options/bulk_write.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,32 @@
#![allow(missing_docs)]

use serde::{ser::SerializeMap, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;

use crate::{
bson::{rawdoc, Array, Bson, Document, RawDocumentBuf},
bson_util::{get_or_prepend_id_field, replacement_document_check, update_document_check},
error::Result,
options::{UpdateModifications, WriteConcern},
serde_util::serialize_bool_or_true,
Namespace,
};

#[skip_serializing_none]
#[derive(Clone, Debug, Default, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct BulkWriteOptions {
#[serialize_always]
#[serde(serialize_with = "serialize_bool_or_true")]
pub ordered: Option<bool>,
pub bypass_document_validation: Option<bool>,
pub comment: Option<Bson>,
#[serde(rename = "let")]
pub let_vars: Option<Document>,
pub verbose_results: Option<bool>,
pub write_concern: Option<WriteConcern>,
}

impl Serialize for BulkWriteOptions {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let BulkWriteOptions {
ordered,
bypass_document_validation,
comment,
let_vars,
verbose_results,
write_concern,
} = self;

let mut map_serializer = serializer.serialize_map(None)?;

let ordered = ordered.unwrap_or(true);
map_serializer.serialize_entry("ordered", &ordered)?;

if let Some(bypass_document_validation) = bypass_document_validation {
map_serializer
.serialize_entry("bypassDocumentValidation", bypass_document_validation)?;
}

if let Some(ref comment) = comment {
map_serializer.serialize_entry("comment", comment)?;
}

if let Some(ref let_vars) = let_vars {
map_serializer.serialize_entry("let", let_vars)?;
}

let errors_only = verbose_results.map(|b| !b).unwrap_or(true);
map_serializer.serialize_entry("errorsOnly", &errors_only)?;

if let Some(ref write_concern) = write_concern {
map_serializer.serialize_entry("writeConcern", write_concern)?;
}

map_serializer.end()
}
}

#[skip_serializing_none]
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]
Expand Down
4 changes: 2 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Contains the `Error` and `Result` types that `mongodb` uses.

mod bulk_write;
pub(crate) mod bulk_write;

use std::{
any::Any,
Expand All @@ -18,7 +18,7 @@ use crate::{
sdam::{ServerType, TopologyVersion},
};

pub use bulk_write::BulkWriteError;
pub use bulk_write::{BulkWriteError, PartialBulkWriteResult};

const RECOVERING_CODES: [i32; 5] = [11600, 11602, 13436, 189, 91];
const NOTWRITABLEPRIMARY_CODES: [i32; 3] = [10107, 13435, 10058];
Expand Down
27 changes: 23 additions & 4 deletions src/error/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,46 @@ use std::collections::HashMap;

use crate::{
error::{WriteConcernError, WriteError},
results::BulkWriteResult,
results::{BulkWriteResult, SummaryBulkWriteResult, VerboseBulkWriteResult},
};

#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct BulkWriteError {
pub write_concern_errors: Vec<WriteConcernError>,
pub write_errors: HashMap<usize, WriteError>,
pub partial_result: Option<BulkWriteResult>,
pub partial_result: Option<PartialBulkWriteResult>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the sticky part of this work and why I kept it out of the main PR. There were a few ideas I thought of here, the others being:

  • Make this BulkWriteError<R: BulkWriteResult>, store Option<R> in partial_result, and define ErrorKind::SummaryBulkWrite(BulkWriteError<SummaryBulkWriteResult>) and ErrorKind::VerboseBulkWrite(BulkWriteError<VerboseBulkWriteResult>). This has the benefit of removing indirection in the partial_result field, but I didn't like the duplication in ErrorKind both for internal purposes and API implications.
  • Define a separate non-sealed BulkWriteResult trait with accessor methods and make the partial_result field an Option<Box<dyn BulkWriteResult>>. I was having a hard time getting something like this to compile (lots of object-safety-related errors), and I think Box<dyn Trait> is generally kind of annoying to work with so it didn't seem worth it to go down this path.

Matching on PartialBulkWriteResult will be kind of annoying, and it's a bummer that we can't provide the same type guarantees that we can for the actual result types when configuring verbose_results. But I think this will be fine-enough to work with, and we can add accessor methods to the enum directly if this becomes a pain point for users. (FWIW, from the discussions that we had when writing the spec it seems like users are mostly just logging these kinds of error fields.)

}

#[derive(Clone, Debug)]
#[cfg_attr(test, derive(serde::Serialize))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See RUST-1944, we haven't triaged that ticket yet but I think it'd be good to keep test-only serde impls out of the public API

#[cfg_attr(test, serde(untagged))]
pub enum PartialBulkWriteResult {
Summary(SummaryBulkWriteResult),
Verbose(VerboseBulkWriteResult),
}

impl PartialBulkWriteResult {
pub(crate) fn merge(&mut self, other: Self) {
match (self, other) {
(Self::Summary(this), Self::Summary(other)) => this.merge(other),
(Self::Verbose(this), Self::Verbose(other)) => this.merge(other),
// The operation execution path makes this an unreachable state
_ => unreachable!(),
}
}
}

impl BulkWriteError {
pub(crate) fn merge(&mut self, other: BulkWriteError) {
pub(crate) fn merge(&mut self, other: Self) {
self.write_concern_errors.extend(other.write_concern_errors);
self.write_errors.extend(other.write_errors);
if let Some(other_partial_result) = other.partial_result {
self.merge_partial_results(other_partial_result);
}
}

pub(crate) fn merge_partial_results(&mut self, other_partial_result: BulkWriteResult) {
pub(crate) fn merge_partial_results(&mut self, other_partial_result: PartialBulkWriteResult) {
if let Some(ref mut partial_result) = self.partial_result {
partial_result.merge(other_partial_result);
} else {
Expand Down
Loading