Compare commits
10 Commits
c4daf72e11
...
e186482ca0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e186482ca0 | ||
|
|
b63a4894a1 | ||
|
|
c2b18b96d1 | ||
|
|
ab7353bc23 | ||
|
|
5673e3325a | ||
|
|
f58e13fb08 | ||
|
|
48aba27362 | ||
|
|
9b93f751b3 | ||
|
|
a1fd215d2a | ||
|
|
04a9794003 |
7
.github/workflows/ci.yml
vendored
7
.github/workflows/ci.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Build
|
||||
run: cargo build --verbose
|
||||
- name: Build with OTel feature
|
||||
@@ -22,7 +22,4 @@ jobs:
|
||||
- name: Run tests with OTel feature
|
||||
run: cargo test --verbose --features otel
|
||||
- name: rustfmt
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: fmt
|
||||
args: --all -- --check
|
||||
run: cargo fmt --all --check
|
||||
|
||||
777
Cargo.lock
generated
777
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -30,13 +30,13 @@ tokio-stream = "0.1"
|
||||
tracing = "0.1.34"
|
||||
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
|
||||
# Implements the types defined in the OTel spec
|
||||
opentelemetry = { version = "0.19.0", optional = true }
|
||||
opentelemetry = { version = "0.20.0", optional = true }
|
||||
# Integration between the tracing crate and the opentelemetry crate
|
||||
tracing-opentelemetry = { version = "0.19.0", optional = true }
|
||||
tracing-opentelemetry = { version = "0.21.0", optional = true }
|
||||
# Provides a "propagator" to pass along an XrayId across services
|
||||
opentelemetry-aws = { version = "0.7.0", optional = true }
|
||||
opentelemetry-aws = { version = "0.8.0", optional = true }
|
||||
# Allows you to send data to the OTel collector
|
||||
opentelemetry-otlp = { version = "0.12.0", optional = true }
|
||||
opentelemetry-otlp = { version = "0.13.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
# Enable test-utilities in dev mode only. This is mostly for tests.
|
||||
|
||||
@@ -176,7 +176,7 @@ asynchronous Rust patterns with Tokio.
|
||||
Commands or other features should only be added if doing so is useful to
|
||||
demonstrate a new pattern.
|
||||
|
||||
Contributions should come with extensive comments targetted to new Tokio users.
|
||||
Contributions should come with extensive comments targeted to new Tokio users.
|
||||
|
||||
Contributions that only focus on clarifying and improving comments are very
|
||||
welcome.
|
||||
|
||||
@@ -2,13 +2,12 @@ use mini_redis::{clients::Client, DEFAULT_PORT};
|
||||
|
||||
use bytes::Bytes;
|
||||
use clap::{Parser, Subcommand};
|
||||
use std::convert::Infallible;
|
||||
use std::num::ParseIntError;
|
||||
use std::str;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(
|
||||
#[command(
|
||||
name = "mini-redis-cli",
|
||||
version,
|
||||
author,
|
||||
@@ -18,10 +17,10 @@ struct Cli {
|
||||
#[clap(subcommand)]
|
||||
command: Command,
|
||||
|
||||
#[clap(name = "hostname", long, default_value = "127.0.0.1")]
|
||||
#[arg(id = "hostname", long, default_value = "127.0.0.1")]
|
||||
host: String,
|
||||
|
||||
#[clap(long, default_value_t = DEFAULT_PORT)]
|
||||
#[arg(long, default_value_t = DEFAULT_PORT)]
|
||||
port: u16,
|
||||
}
|
||||
|
||||
@@ -29,7 +28,6 @@ struct Cli {
|
||||
enum Command {
|
||||
Ping {
|
||||
/// Message to ping
|
||||
#[clap(value_parser = bytes_from_str)]
|
||||
msg: Option<Bytes>,
|
||||
},
|
||||
/// Get the value of key.
|
||||
@@ -43,11 +41,10 @@ enum Command {
|
||||
key: String,
|
||||
|
||||
/// Value to set.
|
||||
#[clap(value_parser = bytes_from_str)]
|
||||
value: Bytes,
|
||||
|
||||
/// Expire the value after specified amount of time
|
||||
#[clap(value_parser = duration_from_ms_str)]
|
||||
#[arg(value_parser = duration_from_ms_str)]
|
||||
expires: Option<Duration>,
|
||||
},
|
||||
/// Publisher to send a message to a specific channel.
|
||||
@@ -55,7 +52,6 @@ enum Command {
|
||||
/// Name of channel
|
||||
channel: String,
|
||||
|
||||
#[clap(value_parser = bytes_from_str)]
|
||||
/// Message to publish
|
||||
message: Bytes,
|
||||
},
|
||||
@@ -153,7 +149,3 @@ fn duration_from_ms_str(src: &str) -> Result<Duration, ParseIntError> {
|
||||
let ms = src.parse::<u64>()?;
|
||||
Ok(Duration::from_millis(ms))
|
||||
}
|
||||
|
||||
fn bytes_from_str(src: &str) -> Result<Bytes, Infallible> {
|
||||
Ok(Bytes::from(src.to_string()))
|
||||
}
|
||||
|
||||
@@ -44,9 +44,9 @@ pub async fn main() -> mini_redis::Result<()> {
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(name = "mini-redis-server", version, author, about = "A Redis server")]
|
||||
#[command(name = "mini-redis-server", version, author, about = "A Redis server")]
|
||||
struct Cli {
|
||||
#[clap(long)]
|
||||
#[arg(long)]
|
||||
port: Option<u16>,
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ fn set_up_logging() -> Result<(), TryInitError> {
|
||||
// Note: If you need to pass the x-amzn-trace-id across services in the same trace,
|
||||
// you will need this line. However, this requires additional code not pictured here.
|
||||
// For a full example using hyper, see:
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/aws-xray/src/server.rs#L14-L26
|
||||
// https://github.com/open-telemetry/opentelemetry-rust/blob/v0.19.0/examples/aws-xray/src/server.rs#L14-L26
|
||||
global::set_text_map_propagator(XrayPropagator::default());
|
||||
|
||||
let tracer = opentelemetry_otlp::new_pipeline()
|
||||
|
||||
@@ -147,7 +147,7 @@ impl Set {
|
||||
frame.push_bulk(Bytes::from(self.key.into_bytes()));
|
||||
frame.push_bulk(self.value);
|
||||
if let Some(ms) = self.expire {
|
||||
// Expirations in Redis procotol can be specified in two ways
|
||||
// Expirations in Redis protocol can be specified in two ways
|
||||
// 1. SET key value EX seconds
|
||||
// 2. SET key value PX milliseconds
|
||||
// We the second option because it allows greater precision and
|
||||
|
||||
@@ -245,7 +245,7 @@ async fn handle_command(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates the response to a subcribe request.
|
||||
/// Creates the response to a subscribe request.
|
||||
///
|
||||
/// All of these functions take the `channel_name` as a `String` instead of
|
||||
/// a `&str` since `Bytes::from` can reuse the allocation in the `String`, and
|
||||
|
||||
15
src/db.rs
15
src/db.rs
@@ -11,7 +11,7 @@ use tracing::debug;
|
||||
/// this struct is dropped.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DbDropGuard {
|
||||
/// The `Db` instance that will be shut down when this `DbHolder` struct
|
||||
/// The `Db` instance that will be shut down when this `DbDropGuard` struct
|
||||
/// is dropped.
|
||||
db: Db,
|
||||
}
|
||||
@@ -97,7 +97,7 @@ struct Entry {
|
||||
}
|
||||
|
||||
impl DbDropGuard {
|
||||
/// Create a new `DbHolder`, wrapping a `Db` instance. When this is dropped
|
||||
/// Create a new `DbDropGuard`, wrapping a `Db` instance. When this is dropped
|
||||
/// the `Db`'s purge task will be shut down.
|
||||
pub(crate) fn new() -> DbDropGuard {
|
||||
DbDropGuard { db: Db::new() }
|
||||
@@ -177,8 +177,6 @@ impl Db {
|
||||
.map(|expiration| expiration > when)
|
||||
.unwrap_or(true);
|
||||
|
||||
// Track the expiration.
|
||||
state.expirations.insert((when, key.clone()));
|
||||
when
|
||||
});
|
||||
|
||||
@@ -197,10 +195,17 @@ impl Db {
|
||||
if let Some(prev) = prev {
|
||||
if let Some(when) = prev.expires_at {
|
||||
// clear expiration
|
||||
state.expirations.remove(&(when, key));
|
||||
state.expirations.remove(&(when, key.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
// Track the expiration. If we insert before remove that will cause bug
|
||||
// when current `(when, key)` equals prev `(when, key)`. Remove then insert
|
||||
// can avoid this.
|
||||
if let Some(when) = expires_at {
|
||||
state.expirations.insert((when, key));
|
||||
}
|
||||
|
||||
// Release the mutex before notifying the background task. This helps
|
||||
// reduce contention by avoiding the background task waking up only to
|
||||
// be unable to acquire the mutex due to this function still holding it.
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// A basic "hello world" style test. A server instance is started in a
|
||||
/// background task. A client instance is then established and used to intialize
|
||||
/// background task. A client instance is then established and used to initialize
|
||||
/// the buffer. Set and get commands are sent to the server. The response is
|
||||
/// then evaluated.
|
||||
#[tokio::test]
|
||||
|
||||
@@ -88,7 +88,7 @@ async fn receive_message_multiple_subscribed_channels() {
|
||||
assert_eq!(b"howdy?", &message2.content[..])
|
||||
}
|
||||
|
||||
/// test that a client accurately removes its own subscribed chanel list
|
||||
/// test that a client accurately removes its own subscribed channel list
|
||||
/// when unsubscribing to all subscribed channels by submitting an empty vec
|
||||
#[tokio::test]
|
||||
async fn unsubscribes_from_channels() {
|
||||
|
||||
Reference in New Issue
Block a user