diff --git a/crates/ignore/Cargo.toml b/crates/ignore/Cargo.toml index f33cb3c..479d595 100644 --- a/crates/ignore/Cargo.toml +++ b/crates/ignore/Cargo.toml @@ -18,7 +18,6 @@ name = "ignore" bench = false [dependencies] -crossbeam-channel = "0.4.0" crossbeam-utils = "0.7.0" globset = { version = "0.4.3", path = "../globset" } lazy_static = "1.1" @@ -32,5 +31,8 @@ walkdir = "2.2.7" [target.'cfg(windows)'.dependencies.winapi-util] version = "0.1.2" +[dev-dependencies] +crossbeam-channel = "0.4.0" + [features] simd-accel = ["globset/simd-accel"] diff --git a/crates/ignore/src/lib.rs b/crates/ignore/src/lib.rs index 71b112c..bcf0ef4 100644 --- a/crates/ignore/src/lib.rs +++ b/crates/ignore/src/lib.rs @@ -46,7 +46,6 @@ See the documentation for `WalkBuilder` for many other options. #![deny(missing_docs)] -extern crate crossbeam_channel as channel; extern crate globset; #[macro_use] extern crate lazy_static; diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index 6734788..65606ee 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -5,10 +5,11 @@ use std::fs::{self, FileType, Metadata}; use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; use std::vec; -use channel::{self, TryRecvError}; use same_file::Handle; use walkdir::{self, WalkDir}; @@ -364,7 +365,8 @@ impl DirEntryRaw { }) } - // Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32). + // Placeholder implementation to allow compiling on non-standard platforms + // (e.g. wasm32). #[cfg(not(any(windows, unix)))] fn from_entry_os( depth: usize, @@ -413,7 +415,8 @@ impl DirEntryRaw { }) } - // Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32). + // Placeholder implementation to allow compiling on non-standard platforms + // (e.g. wasm32). #[cfg(not(any(windows, unix)))] fn from_path( depth: usize, @@ -1186,16 +1189,9 @@ impl WalkParallel { /// can be merged together into a single data structure. pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) { let threads = self.threads(); - // TODO: Figure out how to use a bounded channel here. With an - // unbounded channel, the workers can run away and fill up memory - // with all of the file paths. But a bounded channel doesn't work since - // our producers are also are consumers, so they end up getting stuck. - // - // We probably need to rethink parallel traversal completely to fix - // this. The best case scenario would be finding a way to use rayon - // to do this. - let (tx, rx) = channel::unbounded(); + let stack = Arc::new(Mutex::new(vec![])); { + let mut stack = stack.lock().unwrap(); let mut visitor = builder.build(); let mut paths = Vec::new().into_iter(); std::mem::swap(&mut paths, &mut self.paths); @@ -1232,28 +1228,27 @@ impl WalkParallel { } } }; - tx.send(Message::Work(Work { + stack.push(Message::Work(Work { dent: dent, ignore: self.ig_root.clone(), root_device: root_device, - })) - .unwrap(); + })); } // ... but there's no need to start workers if we don't need them. - if tx.is_empty() { + if stack.is_empty() { return; } } // Create the workers and then wait for them to finish. let quit_now = Arc::new(AtomicBool::new(false)); - let num_pending = Arc::new(AtomicUsize::new(tx.len())); + let num_pending = + Arc::new(AtomicUsize::new(stack.lock().unwrap().len())); crossbeam_utils::thread::scope(|s| { let mut handles = vec![]; for _ in 0..threads { let worker = Worker { visitor: builder.build(), - tx: tx.clone(), - rx: rx.clone(), + stack: stack.clone(), quit_now: quit_now.clone(), num_pending: num_pending.clone(), max_depth: self.max_depth, @@ -1263,8 +1258,6 @@ impl WalkParallel { }; handles.push(s.spawn(|_| worker.run())); } - drop(tx); - drop(rx); for handle in handles { handle.join().unwrap(); } @@ -1362,10 +1355,13 @@ impl Work { struct Worker<'s> { /// The caller's callback. visitor: Box, - /// The push side of our mpmc queue. - tx: channel::Sender, - /// The receive side of our mpmc queue. - rx: channel::Receiver, + /// A stack of work to do. + /// + /// We use a stack instead of a channel because a stack lets us visit + /// directories in depth first order. This can substantially reduce peak + /// memory usage by keeping both the number of files path and gitignore + /// matchers in memory lower. + stack: Arc>>, /// Whether all workers should terminate at the next opportunity. Note /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. @@ -1550,25 +1546,25 @@ impl<'s> Worker<'s> { /// If all work has been exhausted, then this returns None. The worker /// should then subsequently quit. fn get_work(&mut self) -> Option { - let mut value = self.rx.try_recv(); + let mut value = self.recv(); loop { // Simulate a priority channel: If quit_now flag is set, we can // receive only quit messages. if self.is_quit_now() { - value = Ok(Message::Quit) + value = Some(Message::Quit) } match value { - Ok(Message::Work(work)) => { + Some(Message::Work(work)) => { return Some(work); } - Ok(Message::Quit) => { + Some(Message::Quit) => { // Repeat quit message to wake up sleeping threads, if // any. The domino effect will ensure that every thread // will quit. - self.tx.send(Message::Quit).unwrap(); + self.send_quit(); return None; } - Err(TryRecvError::Empty) => { + None => { // Once num_pending reaches 0, it is impossible for it to // ever increase again. Namely, it only reaches 0 once // all jobs have run such that no jobs have produced more @@ -1580,17 +1576,21 @@ impl<'s> Worker<'s> { if self.num_pending() == 0 { // Every other thread is blocked at the next recv(). // Send the initial quit message and quit. - self.tx.send(Message::Quit).unwrap(); + self.send_quit(); return None; } // Wait for next `Work` or `Quit` message. - value = Ok(self - .rx - .recv() - .expect("channel disconnected while worker is alive")); - } - Err(TryRecvError::Disconnected) => { - unreachable!("channel disconnected while worker is alive"); + loop { + if let Some(v) = self.recv() { + value = Some(v); + break; + } + // Our stack isn't blocking. Instead of burning the + // CPU waiting, we let the thread sleep for a bit. In + // general, this tends to only occur once the search is + // approaching termination. + thread::sleep(Duration::from_millis(1)); + } } } } @@ -1614,7 +1614,20 @@ impl<'s> Worker<'s> { /// Send work. fn send(&self, work: Work) { self.num_pending.fetch_add(1, Ordering::SeqCst); - self.tx.send(Message::Work(work)).unwrap(); + let mut stack = self.stack.lock().unwrap(); + stack.push(Message::Work(work)); + } + + /// Send a quit message. + fn send_quit(&self) { + let mut stack = self.stack.lock().unwrap(); + stack.push(Message::Quit); + } + + /// Receive work. + fn recv(&self) -> Option { + let mut stack = self.stack.lock().unwrap(); + stack.pop() } /// Signal that work has been received.