Skip to content

Commit 6d03544

Browse files
committed
RUST-1986 Use Collection type parameter for change streams (mongodb#1162)
1 parent dcd5baf commit 6d03544

File tree

2 files changed

+30
-15
lines changed

2 files changed

+30
-15
lines changed

src/action/watch.rs

+20-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use std::time::Duration;
1+
use std::{marker::PhantomData, time::Duration};
22

33
use bson::{Bson, Document, Timestamp};
4+
use serde::de::DeserializeOwned;
45

56
use super::{action_impl, deeplink, option_setters, ExplicitSession, ImplicitSession};
67
use crate::{
@@ -96,11 +97,11 @@ where
9697
/// Change streams require either a "majority" read concern or no read concern. Anything else
9798
/// will cause a server error.
9899
///
99-
/// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
100-
/// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
100+
/// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<T>>>`] or
101+
/// d[`Result<SessionChangeStream<ChangeStreamEvent<T>>>`] if a
101102
/// [`ClientSession`] has been provided.
102103
#[deeplink]
103-
pub fn watch(&self) -> Watch {
104+
pub fn watch(&self) -> Watch<T> {
104105
Watch::new(self.client(), self.namespace().into())
105106
}
106107
}
@@ -153,24 +154,25 @@ where
153154
///
154155
/// Change streams require either a "majority" read concern or no read concern. Anything else
155156
/// will cause a server error.
156-
pub fn watch(&self) -> Watch {
157+
pub fn watch(&self) -> Watch<T> {
157158
self.async_collection.watch()
158159
}
159160
}
160161

161162
/// Starts a new [`ChangeStream`] that receives events for all changes in a given scope. Create by
162163
/// calling [`Client::watch`], [`Database::watch`], or [`Collection::watch`].
163164
#[must_use]
164-
pub struct Watch<'a, S = ImplicitSession> {
165+
pub struct Watch<'a, T = Document, S = ImplicitSession> {
165166
client: &'a Client,
166167
target: AggregateTarget,
167168
pipeline: Vec<Document>,
168169
options: Option<ChangeStreamOptions>,
169170
session: S,
170171
cluster: bool,
172+
phantom: PhantomData<fn() -> T>,
171173
}
172174

173-
impl<'a> Watch<'a, ImplicitSession> {
175+
impl<'a, T> Watch<'a, T, ImplicitSession> {
174176
fn new(client: &'a Client, target: AggregateTarget) -> Self {
175177
Self {
176178
client,
@@ -179,6 +181,7 @@ impl<'a> Watch<'a, ImplicitSession> {
179181
options: None,
180182
session: ImplicitSession,
181183
cluster: false,
184+
phantom: PhantomData,
182185
}
183186
}
184187

@@ -190,6 +193,7 @@ impl<'a> Watch<'a, ImplicitSession> {
190193
options: None,
191194
session: ImplicitSession,
192195
cluster: true,
196+
phantom: PhantomData,
193197
}
194198
}
195199
}
@@ -235,28 +239,29 @@ impl<'a, S> Watch<'a, S> {
235239
);
236240
}
237241

238-
impl<'a> Watch<'a, ImplicitSession> {
242+
impl<'a, T> Watch<'a, T, ImplicitSession> {
239243
/// Use the provided ['ClientSession'].
240244
pub fn session<'s>(
241245
self,
242246
session: impl Into<&'s mut ClientSession>,
243-
) -> Watch<'a, ExplicitSession<'s>> {
247+
) -> Watch<'a, T, ExplicitSession<'s>> {
244248
Watch {
245249
client: self.client,
246250
target: self.target,
247251
pipeline: self.pipeline,
248252
options: self.options,
249253
session: ExplicitSession(session.into()),
250254
cluster: self.cluster,
255+
phantom: PhantomData,
251256
}
252257
}
253258
}
254259

255-
#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<Document>>)]
256-
impl<'a> Action for Watch<'a, ImplicitSession> {
260+
#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<T>>)]
261+
impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ImplicitSession> {
257262
type Future = WatchFuture;
258263

259-
async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
264+
async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<T>>> {
260265
resolve_options!(
261266
self.client,
262267
self.options,
@@ -273,11 +278,11 @@ impl<'a> Action for Watch<'a, ImplicitSession> {
273278
}
274279
}
275280

276-
#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<Document>>)]
277-
impl<'a> Action for Watch<'a, ExplicitSession<'a>> {
281+
#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<T>>)]
282+
impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ExplicitSession<'a>> {
278283
type Future = WatchSessionFuture;
279284

280-
async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
285+
async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<T>>> {
281286
resolve_read_concern_with_session!(self.client, self.options, Some(&mut *self.session.0))?;
282287
resolve_selection_criteria_with_session!(
283288
self.client,

src/test/change_stream.rs

+10
Original file line numberDiff line numberDiff line change
@@ -662,3 +662,13 @@ async fn split_large_event() -> Result<()> {
662662

663663
Ok(())
664664
}
665+
666+
// Regression test: `Collection::watch` uses the type parameter. This is not flagged as a test to
667+
// run because it's just asserting that this compiles.
668+
#[allow(unreachable_code, unused_variables, clippy::diverging_sub_expression)]
669+
async fn _collection_watch_typed() {
670+
let coll: Collection<bson::RawDocumentBuf> = unimplemented!();
671+
let mut stream = coll.watch().await.unwrap();
672+
let _: Option<crate::error::Result<ChangeStreamEvent<bson::RawDocumentBuf>>> =
673+
stream.next().await;
674+
}

0 commit comments

Comments
 (0)