From 5b73dcc8abc3133c98603c64339c35e0ed90c577 Mon Sep 17 00:00:00 2001 From: Andrew Gallant Date: Wed, 9 Nov 2016 17:19:40 -0500 Subject: [PATCH] Rework parallelism in directory iterator. Previously, ignore::WalkParallel would invoke the callback for all *explicitly* given file paths in a single thread, which effectively meant that `rg pattern foo bar baz ...` didn't actually search foo, bar and baz in parallel. The code was structured that way to avoid spinning up workers if no directory paths were given. The original intention was probably to have a separate pool of threads responsible for searching, but ripgrep ended up just reusing the ignore::WalkParallel workers themselves for searching, and thereby subjected to its sub-par performance in this case. The code has been restructured so that file paths are sent to the workers, which brings back parallelism. Fixes #226 --- ignore/src/walk.rs | 61 ++++++++++++++++++++++++++-------------------- src/args.rs | 3 ++- 2 files changed, 36 insertions(+), 28 deletions(-) diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index f82b233..43b1049 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -747,40 +747,33 @@ impl WalkParallel { let mut f = mkf(); let threads = self.threads(); let queue = Arc::new(MsQueue::new()); - let mut any_dirs = false; + let mut any_work = false; // Send the initial set of root paths to the pool of workers. // Note that we only send directories. For files, we send to them the // callback directly. for path in self.paths { - if path == Path::new("-") { - if f(Ok(DirEntry::new_stdin())).is_quit() { - return; - } - continue; - } - let dent = match DirEntryRaw::from_path(0, path) { - Ok(dent) => DirEntry::new_raw(dent, None), - Err(err) => { - if f(Err(err)).is_quit() { - return; + let dent = + if path == Path::new("-") { + DirEntry::new_stdin() + } else { + match DirEntryRaw::from_path(0, path) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + if f(Err(err)).is_quit() { + return; + } + continue; + } } - continue; - } - }; - if !dent.file_type().map_or(false, |t| t.is_dir()) { - if f(Ok(dent)).is_quit() { - return; - } - } else { - any_dirs = true; - queue.push(Message::Work(Work { - dent: dent, - ignore: self.ig_root.clone(), - })); - } + }; + queue.push(Message::Work(Work { + dent: dent, + ignore: self.ig_root.clone(), + })); + any_work = true; } // ... but there's no need to start workers if we don't need them. - if !any_dirs { + if !any_work { return; } // Create the workers and then wait for them to finish. @@ -839,6 +832,11 @@ struct Work { } impl Work { + /// Returns true if and only if this work item is a directory. + fn is_dir(&self) -> bool { + self.dent.file_type().map_or(false, |t| t.is_dir()) + } + /// Adds ignore rules for parent directories. /// /// Note that this only applies to entries at depth 0. On all other @@ -921,6 +919,15 @@ impl Worker { fn run(mut self) { while let Some(mut work) = self.get_work() { let depth = work.dent.depth(); + // If this is an explicitly given path and is not a directory, + // then execute the caller's callback and move on. + if depth == 0 && !work.is_dir() { + if (self.f)(Ok(work.dent)).is_quit() { + self.quit_now(); + return; + } + continue; + } if self.parents { if let Some(err) = work.add_parents() { if (self.f)(Err(err)).is_quit() { diff --git a/src/args.rs b/src/args.rs index 4a3b111..521dda8 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,3 +1,4 @@ +use std::cmp; use std::env; use std::io; use std::path::{Path, PathBuf}; @@ -376,7 +377,7 @@ impl RawArgs { }; let threads = if self.flag_threads == 0 { - num_cpus::get() + cmp::min(12, num_cpus::get()) } else { self.flag_threads };