Skip to content

Commit 9f3fea5

Browse files
committed
better docs, no warnings, and to_shared_tcp_socket
Signed-off-by: Andrea Lattuada <andrea.lattuada@inf.ethz.ch>
1 parent 65f3824 commit 9f3fea5

File tree

7 files changed

+40
-74
lines changed

7 files changed

+40
-74
lines changed

communication/src/logging.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ use timely_logging::{CommsEvent, CommsSetup};
44
pub use timely_logging::CommunicationEvent;
55
pub use timely_logging::SerializationEvent;
66

7-
/// TODO(andreal)
7+
/// A log writer for a communication thread.
88
pub type CommsLogger = Rc<::timely_logging::BufferingLogger<CommsSetup, CommsEvent>>;

logging/src/lib.rs

-11
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,6 @@ pub fn get_precise_time_ns() -> u64 {
2929
(time::precise_time_ns() as i64 - delta) as u64
3030
}
3131

32-
// /// TODO(andreal)
33-
// fn initialize_precise_time_ns() {
34-
// unsafe {
35-
// precise_time_ns_delta = Some({
36-
// let wall_time = time::get_time();
37-
// let wall_time_ns = wall_time.nsec as i64 + wall_time.sec * 1000000000;
38-
// time::precise_time_ns() as i64 - wall_time_ns
39-
// });
40-
// }
41-
// }
42-
4332
/// Logging methods
4433
pub trait Logger {
4534
/// The type of loggable record.

src/dataflow/scopes/child.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct Child<'a, G: ScopeParent, T: Timestamp> {
1818
pub subgraph: &'a RefCell<SubgraphBuilder<G::Timestamp, T>>,
1919
/// A copy of the child's parent scope.
2020
pub parent: G,
21-
/// TODO(andreal)
21+
/// The log writer for this scope.
2222
pub logging: Logger,
2323
}
2424

src/dataflow/scopes/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,6 @@ pub trait Scope: ScopeParent {
7676
/// ```
7777
fn scoped<T: Timestamp, R, F:FnOnce(&mut Child<Self, T>)->R>(&mut self, func: F) -> R;
7878

79-
/// TODO(andreal)
79+
/// Obtains the logger associated with this scope.
8080
fn logging(&self) -> Logger;
8181
}

src/dataflow/scopes/root.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl<A: Allocate> Root<A> {
5252
// discard completed dataflows.
5353
self.dataflows.borrow_mut().retain(|dataflow| dataflow.active());
5454

55-
// TODO(andreal) flush logs?
55+
// TODO(andreal) do we want to flush logs here?
5656

5757
active
5858
}

src/logging.rs

+33-54
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ use timely_logging::LoggerBatch;
2525
type LogMessage = (u64, EventsSetup, LogEvent);
2626
type CommsMessage = (u64, CommsSetup, CommsEvent);
2727

28-
/// TODO(andreal)
28+
/// A log writer.
2929
pub type Logger = Rc<BufferingLogger<EventsSetup, LogEvent>>;
3030

31-
/// TODO(andreal)
31+
/// A log writer that does not log anything.
3232
pub fn new_inactive_logger() -> Logger {
3333
BufferingLogger::<(), ()>::new_inactive()
3434
}
@@ -83,37 +83,22 @@ impl LogManager {
8383
}
8484
}
8585

86-
struct SharedVec {
87-
inner: Arc<Mutex<Vec<u8>>>,
86+
struct SharedEventWriter<T, D, W: Write> {
87+
inner: Arc<Mutex<EventWriter<T, D, W>>>,
8888
}
8989

90-
impl SharedVec {
91-
pub fn new(inner: Arc<Mutex<Vec<u8>>>) -> Self {
92-
SharedVec {
93-
inner: inner,
90+
impl<T, D, W: Write> Clone for SharedEventWriter<T, D, W> {
91+
fn clone(&self) -> Self {
92+
Self {
93+
inner: self.inner.clone(),
9494
}
9595
}
9696
}
9797

98-
impl Write for SharedVec {
99-
fn write(&mut self, data: &[u8]) -> Result<usize, ::std::io::Error> {
100-
self.inner.lock().unwrap().extend_from_slice(data);
101-
Ok(data.len())
102-
}
103-
104-
fn flush(&mut self) -> Result<(), ::std::io::Error> {
105-
Ok(())
106-
}
107-
}
108-
109-
struct SharedEventWriter<T, D, W: Write> {
110-
inner: Mutex<EventWriter<T, D, W>>,
111-
}
112-
11398
impl<T, D, W: Write> SharedEventWriter<T, D, W> {
11499
fn new(w: W) -> Self {
115100
SharedEventWriter {
116-
inner: Mutex::new(EventWriter::new(w)),
101+
inner: Arc::new(Mutex::new(EventWriter::new(w))),
117102
}
118103
}
119104
}
@@ -125,16 +110,18 @@ impl<T: Abomonation+Debug, D: Abomonation+Debug, W: Write> EventPusher<T, D> for
125110
}
126111
}
127112

128-
/// TODO(andreal)
113+
/// An handle to the LogManager that constructs a filter for the log records.
129114
pub struct FilteredLogManager<S, E> {
130115
log_manager: Arc<Mutex<LogManager>>,
131116
filter: Arc<Fn(&S)->bool+Send+Sync>,
132117
_e: ::std::marker::PhantomData<E>,
133118
}
134119

135120
impl FilteredLogManager<EventsSetup, LogEvent> {
136-
/// TODO(andreal)
137-
pub fn to_tcp_socket(&mut self) {
121+
/// Send the selected timely events to one tcp socket per log sender.
122+
///
123+
/// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable.
124+
pub fn to_tcp_sockets(&mut self) {
138125
let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254");
139126

140127
// let writer = SharedEventWriter::new(writer);
@@ -145,24 +132,20 @@ impl FilteredLogManager<EventsSetup, LogEvent> {
145132
self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), pusher);
146133
}
147134

148-
// /// TODO(andreal)
149-
// pub fn to_bufs(&mut self) -> Vec<Arc<Mutex<Vec<u8>>>> {
150-
// let mut vecs = Vec::new();
151-
152-
// for i in 0..4 {
153-
// let buf = Arc::new(Mutex::new(Vec::<u8>::with_capacity(4_000_000_000)));
154-
// let writer = SharedEventWriter::new(SharedVec::new(buf.clone()));
155-
// let pusher: Arc<EventPusher<Product<RootTimestamp, u64>, LogMessage>+Send+Sync> = Arc::new(writer);
156-
// self.log_manager.lock().unwrap().add_timely_subscription(Arc::new(move |s| s.index == i), pusher);
157-
// vecs.push(buf);
158-
// }
159-
160-
// vecs
161-
// }
135+
/// Send the selected timely events to a shared tcp socket.
136+
///
137+
/// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable.
138+
pub fn to_shared_tcp_socket(&mut self) {
139+
let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254");
140+
let pusher = Box::new(SharedEventWriter::new(TcpStream::connect(target.clone()).expect("failed to connect to logging destination")));
141+
self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), Arc::new(move || pusher.clone()));
142+
}
162143
}
163144

164145
impl FilteredLogManager<CommsSetup, CommsEvent> {
165-
/// TODO(andreal)
146+
/// Send the selected communication events to a tcp socket.
147+
///
148+
/// Destination can be set with the TIMELY_COMM_LOG_TARGET environment variable.
166149
pub fn to_tcp_socket(&mut self) {
167150
let comm_target = ::std::env::var("TIMELY_COMM_LOG_TARGET").expect("no $TIMELY_COMM_LOG_TARGET, e.g. 127.0.0.1:34255");
168151

@@ -175,7 +158,7 @@ impl FilteredLogManager<CommsSetup, CommsEvent> {
175158
}
176159

177160
impl LogManager {
178-
/// TODO(andreal)
161+
/// Constructs a new LogManager.
179162
pub fn new() -> Arc<Mutex<Self>> {
180163
Arc::new(Mutex::new(LogManager {
181164
timely_logs: HashMap::new(),
@@ -186,17 +169,16 @@ impl LogManager {
186169
}
187170
}
188171

189-
/// TODO(andreal)
172+
/// Functions to construct log filters.
190173
pub trait LogFilter {
191-
/// TODO(andreal)
174+
/// Get a `FilteredLogManager` for all workers.
192175
fn workers(&mut self) -> FilteredLogManager<EventsSetup, LogEvent>;
193176

194-
/// TODO(andreal)
177+
/// Get a `FilteredLogManager` for all communication threads.
195178
fn comms(&mut self) -> FilteredLogManager<CommsSetup, CommsEvent>;
196179
}
197180

198181
impl LogFilter for Arc<Mutex<LogManager>> {
199-
/// TODO(andreal)
200182
#[inline] fn workers(&mut self) -> FilteredLogManager<EventsSetup, LogEvent> {
201183
FilteredLogManager {
202184
log_manager: self.clone(),
@@ -205,7 +187,6 @@ impl LogFilter for Arc<Mutex<LogManager>> {
205187
}
206188
}
207189

208-
/// TODO(andreal)
209190
#[inline] fn comms(&mut self) -> FilteredLogManager<CommsSetup, CommsEvent> {
210191
FilteredLogManager {
211192
log_manager: self.clone(),
@@ -215,11 +196,11 @@ impl LogFilter for Arc<Mutex<LogManager>> {
215196
}
216197
}
217198

218-
/// TODO(andreal)
199+
/// Shared wrapper for log writer constructors.
219200
pub struct LoggerConfig {
220-
/// TODO(andreal)
201+
/// Log writer constructors.
221202
pub timely_logging: Arc<Fn(EventsSetup)->Rc<BufferingLogger<EventsSetup, LogEvent>>+Send+Sync>,
222-
/// TODO(andreal)
203+
/// Log writer constructors for communication.
223204
pub communication_logging: Arc<Fn(CommsSetup)->Rc<BufferingLogger<CommsSetup, CommsEvent>>+Send+Sync>,
224205
}
225206

@@ -249,23 +230,21 @@ impl LoggerConfig {
249230
event_manager
250231
}
251232

252-
/// TODO(andreal)
233+
/// Makes a new `LoggerConfig` wrapper from a `LogManager`.
253234
pub fn new(log_manager: Arc<Mutex<LogManager>>) -> Self {
254235
let timely_logging_manager = log_manager.clone();
255236
let communication_logging_manager = log_manager;
256237
LoggerConfig {
257238
timely_logging: Arc::new(move |events_setup: EventsSetup| {
258239
let subscription_manager = LoggerConfig::register_timely_logger(
259240
&mut timely_logging_manager.lock().unwrap(), events_setup);
260-
//eprintln!("registered timely logger: {:?}", events_setup);
261241
Rc::new(BufferingLogger::new(events_setup, Box::new(move |data| {
262242
subscription_manager.lock().expect("cannot lock mutex").publish_batch(data);
263243
})))
264244
}),
265245
communication_logging: Arc::new(move |comms_setup: CommsSetup| {
266246
let subscription_manager = LoggerConfig::register_comms_logger(
267247
&mut communication_logging_manager.lock().unwrap(), comms_setup);
268-
//eprintln!("registered comm logger: {:?}", comms_setup);
269248
Rc::new(BufferingLogger::new(comms_setup, Box::new(move |data| {
270249
subscription_manager.lock().expect("cannot lock mutex").publish_batch(data);
271250
})))

src/progress/nested/subgraph.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -946,11 +946,9 @@ impl<T: Timestamp> PerOperatorState<T> {
946946
id: self.id, start_stop: ::timely_logging::StartStop::Start
947947
}));
948948

949-
// TODO(andreal) formerly behind "logging" feature flag {
950-
// assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty()));
951-
// assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty()));
952-
// assert!(self.produced_buffer.iter_mut().all(|cm| cm.is_empty()));
953-
// }
949+
debug_assert!(self.consumed_buffer.iter_mut().all(|cm| cm.is_empty()));
950+
debug_assert!(self.internal_buffer.iter_mut().all(|cm| cm.is_empty()));
951+
debug_assert!(self.produced_buffer.iter_mut().all(|cm| cm.is_empty()));
954952

955953
let result = if let Some(ref mut operator) = self.operator {
956954
operator.pull_internal_progress(

0 commit comments

Comments
 (0)