ripgrep: move minimum version to Rust stable

This also updates some code to make use of our more liberal versioning
requirement, including the use of crossbeam-channel instead of the MsQueue
from the older an unmaintained crossbeam 0.3. This does regrettably add
a sizable number of dependencies, however, compile times seem mostly
unaffected.

Closes #1019
This commit is contained in:
Andrew Gallant
2018-08-21 19:47:12 -04:00
parent edd6eb4e06
commit 0eef05142a
10 changed files with 204 additions and 111 deletions

View File

@@ -18,7 +18,7 @@ name = "ignore"
bench = false
[dependencies]
crossbeam = "0.3"
crossbeam-channel = "0.2"
globset = { version = "0.4.0", path = "../globset" }
lazy_static = "1"
log = "0.4"

View File

@@ -1,14 +1,12 @@
extern crate crossbeam;
extern crate crossbeam_channel as channel;
extern crate ignore;
extern crate walkdir;
use std::env;
use std::io::{self, Write};
use std::path::Path;
use std::sync::Arc;
use std::thread;
use crossbeam::sync::MsQueue;
use ignore::WalkBuilder;
use walkdir::WalkDir;
@@ -16,7 +14,7 @@ fn main() {
let mut path = env::args().nth(1).unwrap();
let mut parallel = false;
let mut simple = false;
let queue: Arc<MsQueue<Option<DirEntry>>> = Arc::new(MsQueue::new());
let (tx, rx) = channel::bounded::<DirEntry>(100);
if path == "parallel" {
path = env::args().nth(2).unwrap();
parallel = true;
@@ -25,10 +23,9 @@ fn main() {
simple = true;
}
let stdout_queue = queue.clone();
let stdout_thread = thread::spawn(move || {
let mut stdout = io::BufWriter::new(io::stdout());
while let Some(dent) = stdout_queue.pop() {
for dent in rx {
write_path(&mut stdout, dent.path());
}
});
@@ -36,26 +33,26 @@ fn main() {
if parallel {
let walker = WalkBuilder::new(path).threads(6).build_parallel();
walker.run(|| {
let queue = queue.clone();
let tx = tx.clone();
Box::new(move |result| {
use ignore::WalkState::*;
queue.push(Some(DirEntry::Y(result.unwrap())));
tx.send(DirEntry::Y(result.unwrap()));
Continue
})
});
} else if simple {
let walker = WalkDir::new(path);
for result in walker {
queue.push(Some(DirEntry::X(result.unwrap())));
tx.send(DirEntry::X(result.unwrap()));
}
} else {
let walker = WalkBuilder::new(path).build();
for result in walker {
queue.push(Some(DirEntry::Y(result.unwrap())));
tx.send(DirEntry::Y(result.unwrap()));
}
}
queue.push(None);
drop(tx);
stdout_thread.join().unwrap();
}

View File

@@ -46,7 +46,7 @@ See the documentation for `WalkBuilder` for many other options.
#![deny(missing_docs)]
extern crate crossbeam;
extern crate crossbeam_channel as channel;
extern crate globset;
#[macro_use]
extern crate lazy_static;

View File

@@ -10,7 +10,7 @@ use std::thread;
use std::time::Duration;
use std::vec;
use crossbeam::sync::MsQueue;
use channel;
use same_file::Handle;
use walkdir::{self, WalkDir};
@@ -956,7 +956,14 @@ impl WalkParallel {
) where F: FnMut() -> Box<FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static> {
let mut f = mkf();
let threads = self.threads();
let queue = Arc::new(MsQueue::new());
// TODO: Figure out how to use a bounded channel here. With an
// unbounded channel, the workers can run away and will up memory
// with all of the file paths. But a bounded channel doesn't work since
// our producers are also are consumers, so they end up getting stuck.
//
// We probably need to rethink parallel traversal completely to fix
// this.
let (tx, rx) = channel::unbounded();
let mut any_work = false;
// Send the initial set of root paths to the pool of workers.
// Note that we only send directories. For files, we send to them the
@@ -976,7 +983,7 @@ impl WalkParallel {
}
}
};
queue.push(Message::Work(Work {
tx.send(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
}));
@@ -994,7 +1001,8 @@ impl WalkParallel {
for _ in 0..threads {
let worker = Worker {
f: mkf(),
queue: queue.clone(),
tx: tx.clone(),
rx: rx.clone(),
quit_now: quit_now.clone(),
is_waiting: false,
is_quitting: false,
@@ -1007,6 +1015,8 @@ impl WalkParallel {
};
handles.push(thread::spawn(|| worker.run()));
}
drop(tx);
drop(rx);
for handle in handles {
handle.join().unwrap();
}
@@ -1099,8 +1109,10 @@ impl Work {
struct Worker {
/// The caller's callback.
f: Box<FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
/// A queue of work items. This is multi-producer and multi-consumer.
queue: Arc<MsQueue<Message>>,
/// The push side of our mpmc queue.
tx: channel::Sender<Message>,
/// The receive side of our mpmc queue.
rx: channel::Receiver<Message>,
/// Whether all workers should quit at the next opportunity. Note that
/// this is distinct from quitting because of exhausting the contents of
/// a directory. Instead, this is used when the caller's callback indicates
@@ -1235,7 +1247,7 @@ impl Worker {
};
if !should_skip_path && !should_skip_filesize {
self.queue.push(Message::Work(Work {
self.tx.send(Message::Work(Work {
dent: dent,
ignore: ig.clone(),
}));
@@ -1252,7 +1264,7 @@ impl Worker {
if self.is_quit_now() {
return None;
}
match self.queue.try_pop() {
match self.rx.try_recv() {
Some(Message::Work(work)) => {
self.waiting(false);
self.quitting(false);
@@ -1294,7 +1306,7 @@ impl Worker {
self.quitting(false);
if self.num_waiting() == self.threads {
for _ in 0..self.threads {
self.queue.push(Message::Quit);
self.tx.send(Message::Quit);
}
} else {
// You're right to consider this suspicious, but it's