Compare commits
10 Commits
c4daf72e11
...
e186482ca0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e186482ca0 | ||
|
|
b63a4894a1 | ||
|
|
c2b18b96d1 | ||
|
|
ab7353bc23 | ||
|
|
5673e3325a | ||
|
|
f58e13fb08 | ||
|
|
48aba27362 | ||
|
|
9b93f751b3 | ||
|
|
a1fd215d2a | ||
|
|
04a9794003 |
7
.github/workflows/ci.yml
vendored
7
.github/workflows/ci.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Build
|
- name: Build
|
||||||
run: cargo build --verbose
|
run: cargo build --verbose
|
||||||
- name: Build with OTel feature
|
- name: Build with OTel feature
|
||||||
@@ -22,7 +22,4 @@ jobs:
|
|||||||
- name: Run tests with OTel feature
|
- name: Run tests with OTel feature
|
||||||
run: cargo test --verbose --features otel
|
run: cargo test --verbose --features otel
|
||||||
- name: rustfmt
|
- name: rustfmt
|
||||||
uses: actions-rs/cargo@v1
|
run: cargo fmt --all --check
|
||||||
with:
|
|
||||||
command: fmt
|
|
||||||
args: --all -- --check
|
|
||||||
|
|||||||
777
Cargo.lock
generated
777
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -30,13 +30,13 @@ tokio-stream = "0.1"
|
|||||||
tracing = "0.1.34"
|
tracing = "0.1.34"
|
||||||
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
|
||||||
# Implements the types defined in the OTel spec
|
# Implements the types defined in the OTel spec
|
||||||
opentelemetry = { version = "0.19.0", optional = true }
|
opentelemetry = { version = "0.20.0", optional = true }
|
||||||
# Integration between the tracing crate and the opentelemetry crate
|
# Integration between the tracing crate and the opentelemetry crate
|
||||||
tracing-opentelemetry = { version = "0.19.0", optional = true }
|
tracing-opentelemetry = { version = "0.21.0", optional = true }
|
||||||
# Provides a "propagator" to pass along an XrayId across services
|
# Provides a "propagator" to pass along an XrayId across services
|
||||||
opentelemetry-aws = { version = "0.7.0", optional = true }
|
opentelemetry-aws = { version = "0.8.0", optional = true }
|
||||||
# Allows you to send data to the OTel collector
|
# Allows you to send data to the OTel collector
|
||||||
opentelemetry-otlp = { version = "0.12.0", optional = true }
|
opentelemetry-otlp = { version = "0.13.0", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
# Enable test-utilities in dev mode only. This is mostly for tests.
|
# Enable test-utilities in dev mode only. This is mostly for tests.
|
||||||
|
|||||||
@@ -176,7 +176,7 @@ asynchronous Rust patterns with Tokio.
|
|||||||
Commands or other features should only be added if doing so is useful to
|
Commands or other features should only be added if doing so is useful to
|
||||||
demonstrate a new pattern.
|
demonstrate a new pattern.
|
||||||
|
|
||||||
Contributions should come with extensive comments targetted to new Tokio users.
|
Contributions should come with extensive comments targeted to new Tokio users.
|
||||||
|
|
||||||
Contributions that only focus on clarifying and improving comments are very
|
Contributions that only focus on clarifying and improving comments are very
|
||||||
welcome.
|
welcome.
|
||||||
|
|||||||
@@ -2,13 +2,12 @@ use mini_redis::{clients::Client, DEFAULT_PORT};
|
|||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use std::convert::Infallible;
|
|
||||||
use std::num::ParseIntError;
|
use std::num::ParseIntError;
|
||||||
use std::str;
|
use std::str;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[clap(
|
#[command(
|
||||||
name = "mini-redis-cli",
|
name = "mini-redis-cli",
|
||||||
version,
|
version,
|
||||||
author,
|
author,
|
||||||
@@ -18,10 +17,10 @@ struct Cli {
|
|||||||
#[clap(subcommand)]
|
#[clap(subcommand)]
|
||||||
command: Command,
|
command: Command,
|
||||||
|
|
||||||
#[clap(name = "hostname", long, default_value = "127.0.0.1")]
|
#[arg(id = "hostname", long, default_value = "127.0.0.1")]
|
||||||
host: String,
|
host: String,
|
||||||
|
|
||||||
#[clap(long, default_value_t = DEFAULT_PORT)]
|
#[arg(long, default_value_t = DEFAULT_PORT)]
|
||||||
port: u16,
|
port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,7 +28,6 @@ struct Cli {
|
|||||||
enum Command {
|
enum Command {
|
||||||
Ping {
|
Ping {
|
||||||
/// Message to ping
|
/// Message to ping
|
||||||
#[clap(value_parser = bytes_from_str)]
|
|
||||||
msg: Option<Bytes>,
|
msg: Option<Bytes>,
|
||||||
},
|
},
|
||||||
/// Get the value of key.
|
/// Get the value of key.
|
||||||
@@ -43,11 +41,10 @@ enum Command {
|
|||||||
key: String,
|
key: String,
|
||||||
|
|
||||||
/// Value to set.
|
/// Value to set.
|
||||||
#[clap(value_parser = bytes_from_str)]
|
|
||||||
value: Bytes,
|
value: Bytes,
|
||||||
|
|
||||||
/// Expire the value after specified amount of time
|
/// Expire the value after specified amount of time
|
||||||
#[clap(value_parser = duration_from_ms_str)]
|
#[arg(value_parser = duration_from_ms_str)]
|
||||||
expires: Option<Duration>,
|
expires: Option<Duration>,
|
||||||
},
|
},
|
||||||
/// Publisher to send a message to a specific channel.
|
/// Publisher to send a message to a specific channel.
|
||||||
@@ -55,7 +52,6 @@ enum Command {
|
|||||||
/// Name of channel
|
/// Name of channel
|
||||||
channel: String,
|
channel: String,
|
||||||
|
|
||||||
#[clap(value_parser = bytes_from_str)]
|
|
||||||
/// Message to publish
|
/// Message to publish
|
||||||
message: Bytes,
|
message: Bytes,
|
||||||
},
|
},
|
||||||
@@ -153,7 +149,3 @@ fn duration_from_ms_str(src: &str) -> Result<Duration, ParseIntError> {
|
|||||||
let ms = src.parse::<u64>()?;
|
let ms = src.parse::<u64>()?;
|
||||||
Ok(Duration::from_millis(ms))
|
Ok(Duration::from_millis(ms))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bytes_from_str(src: &str) -> Result<Bytes, Infallible> {
|
|
||||||
Ok(Bytes::from(src.to_string()))
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -44,9 +44,9 @@ pub async fn main() -> mini_redis::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[clap(name = "mini-redis-server", version, author, about = "A Redis server")]
|
#[command(name = "mini-redis-server", version, author, about = "A Redis server")]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
#[clap(long)]
|
#[arg(long)]
|
||||||
port: Option<u16>,
|
port: Option<u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -62,7 +62,7 @@ fn set_up_logging() -> Result<(), TryInitError> {
|
|||||||
// Note: If you need to pass the x-amzn-trace-id across services in the same trace,
|
// Note: If you need to pass the x-amzn-trace-id across services in the same trace,
|
||||||
// you will need this line. However, this requires additional code not pictured here.
|
// you will need this line. However, this requires additional code not pictured here.
|
||||||
// For a full example using hyper, see:
|
// For a full example using hyper, see:
|
||||||
// https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/aws-xray/src/server.rs#L14-L26
|
// https://github.com/open-telemetry/opentelemetry-rust/blob/v0.19.0/examples/aws-xray/src/server.rs#L14-L26
|
||||||
global::set_text_map_propagator(XrayPropagator::default());
|
global::set_text_map_propagator(XrayPropagator::default());
|
||||||
|
|
||||||
let tracer = opentelemetry_otlp::new_pipeline()
|
let tracer = opentelemetry_otlp::new_pipeline()
|
||||||
|
|||||||
@@ -147,7 +147,7 @@ impl Set {
|
|||||||
frame.push_bulk(Bytes::from(self.key.into_bytes()));
|
frame.push_bulk(Bytes::from(self.key.into_bytes()));
|
||||||
frame.push_bulk(self.value);
|
frame.push_bulk(self.value);
|
||||||
if let Some(ms) = self.expire {
|
if let Some(ms) = self.expire {
|
||||||
// Expirations in Redis procotol can be specified in two ways
|
// Expirations in Redis protocol can be specified in two ways
|
||||||
// 1. SET key value EX seconds
|
// 1. SET key value EX seconds
|
||||||
// 2. SET key value PX milliseconds
|
// 2. SET key value PX milliseconds
|
||||||
// We the second option because it allows greater precision and
|
// We the second option because it allows greater precision and
|
||||||
|
|||||||
@@ -245,7 +245,7 @@ async fn handle_command(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates the response to a subcribe request.
|
/// Creates the response to a subscribe request.
|
||||||
///
|
///
|
||||||
/// All of these functions take the `channel_name` as a `String` instead of
|
/// All of these functions take the `channel_name` as a `String` instead of
|
||||||
/// a `&str` since `Bytes::from` can reuse the allocation in the `String`, and
|
/// a `&str` since `Bytes::from` can reuse the allocation in the `String`, and
|
||||||
|
|||||||
15
src/db.rs
15
src/db.rs
@@ -11,7 +11,7 @@ use tracing::debug;
|
|||||||
/// this struct is dropped.
|
/// this struct is dropped.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct DbDropGuard {
|
pub(crate) struct DbDropGuard {
|
||||||
/// The `Db` instance that will be shut down when this `DbHolder` struct
|
/// The `Db` instance that will be shut down when this `DbDropGuard` struct
|
||||||
/// is dropped.
|
/// is dropped.
|
||||||
db: Db,
|
db: Db,
|
||||||
}
|
}
|
||||||
@@ -97,7 +97,7 @@ struct Entry {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DbDropGuard {
|
impl DbDropGuard {
|
||||||
/// Create a new `DbHolder`, wrapping a `Db` instance. When this is dropped
|
/// Create a new `DbDropGuard`, wrapping a `Db` instance. When this is dropped
|
||||||
/// the `Db`'s purge task will be shut down.
|
/// the `Db`'s purge task will be shut down.
|
||||||
pub(crate) fn new() -> DbDropGuard {
|
pub(crate) fn new() -> DbDropGuard {
|
||||||
DbDropGuard { db: Db::new() }
|
DbDropGuard { db: Db::new() }
|
||||||
@@ -177,8 +177,6 @@ impl Db {
|
|||||||
.map(|expiration| expiration > when)
|
.map(|expiration| expiration > when)
|
||||||
.unwrap_or(true);
|
.unwrap_or(true);
|
||||||
|
|
||||||
// Track the expiration.
|
|
||||||
state.expirations.insert((when, key.clone()));
|
|
||||||
when
|
when
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -197,10 +195,17 @@ impl Db {
|
|||||||
if let Some(prev) = prev {
|
if let Some(prev) = prev {
|
||||||
if let Some(when) = prev.expires_at {
|
if let Some(when) = prev.expires_at {
|
||||||
// clear expiration
|
// clear expiration
|
||||||
state.expirations.remove(&(when, key));
|
state.expirations.remove(&(when, key.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track the expiration. If we insert before remove that will cause bug
|
||||||
|
// when current `(when, key)` equals prev `(when, key)`. Remove then insert
|
||||||
|
// can avoid this.
|
||||||
|
if let Some(when) = expires_at {
|
||||||
|
state.expirations.insert((when, key));
|
||||||
|
}
|
||||||
|
|
||||||
// Release the mutex before notifying the background task. This helps
|
// Release the mutex before notifying the background task. This helps
|
||||||
// reduce contention by avoiding the background task waking up only to
|
// reduce contention by avoiding the background task waking up only to
|
||||||
// be unable to acquire the mutex due to this function still holding it.
|
// be unable to acquire the mutex due to this function still holding it.
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use tokio::net::TcpListener;
|
|||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
/// A basic "hello world" style test. A server instance is started in a
|
/// A basic "hello world" style test. A server instance is started in a
|
||||||
/// background task. A client instance is then established and used to intialize
|
/// background task. A client instance is then established and used to initialize
|
||||||
/// the buffer. Set and get commands are sent to the server. The response is
|
/// the buffer. Set and get commands are sent to the server. The response is
|
||||||
/// then evaluated.
|
/// then evaluated.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ async fn receive_message_multiple_subscribed_channels() {
|
|||||||
assert_eq!(b"howdy?", &message2.content[..])
|
assert_eq!(b"howdy?", &message2.content[..])
|
||||||
}
|
}
|
||||||
|
|
||||||
/// test that a client accurately removes its own subscribed chanel list
|
/// test that a client accurately removes its own subscribed channel list
|
||||||
/// when unsubscribing to all subscribed channels by submitting an empty vec
|
/// when unsubscribing to all subscribed channels by submitting an empty vec
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn unsubscribes_from_channels() {
|
async fn unsubscribes_from_channels() {
|
||||||
|
|||||||
Reference in New Issue
Block a user