replace BTreeMap by BTreeSet and remove id form Entry (#123)
This commit is contained in:
@@ -113,7 +113,6 @@ impl Client {
|
|||||||
pub async fn ping(&mut self, msg: Option<Bytes>) -> crate::Result<Bytes> {
|
pub async fn ping(&mut self, msg: Option<Bytes>) -> crate::Result<Bytes> {
|
||||||
let frame = Ping::new(msg).into_frame();
|
let frame = Ping::new(msg).into_frame();
|
||||||
debug!(request = ?frame);
|
debug!(request = ?frame);
|
||||||
|
|
||||||
self.connection.write_frame(&frame).await?;
|
self.connection.write_frame(&frame).await?;
|
||||||
|
|
||||||
match self.read_response().await? {
|
match self.read_response().await? {
|
||||||
|
|||||||
38
src/db.rs
38
src/db.rs
@@ -2,7 +2,7 @@ use tokio::sync::{broadcast, Notify};
|
|||||||
use tokio::time::{self, Duration, Instant};
|
use tokio::time::{self, Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeSet, HashMap};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
@@ -69,19 +69,15 @@ struct State {
|
|||||||
|
|
||||||
/// Tracks key TTLs.
|
/// Tracks key TTLs.
|
||||||
///
|
///
|
||||||
/// A `BTreeMap` is used to maintain expirations sorted by when they expire.
|
/// A `BTreeSet` is used to maintain expirations sorted by when they expire.
|
||||||
/// This allows the background task to iterate this map to find the value
|
/// This allows the background task to iterate this map to find the value
|
||||||
/// expiring next.
|
/// expiring next.
|
||||||
///
|
///
|
||||||
/// While highly unlikely, it is possible for more than one expiration to be
|
/// While highly unlikely, it is possible for more than one expiration to be
|
||||||
/// created for the same instant. Because of this, the `Instant` is
|
/// created for the same instant. Because of this, the `Instant` is
|
||||||
/// insufficient for the key. A unique expiration identifier (`u64`) is used
|
/// insufficient for the key. A unique key (`String`) is used to
|
||||||
/// to break these ties.
|
/// break these ties.
|
||||||
expirations: BTreeMap<(Instant, u64), String>,
|
expirations: BTreeSet<(Instant, String)>,
|
||||||
|
|
||||||
/// Identifier to use for the next expiration. Each expiration is associated
|
|
||||||
/// with a unique identifier. See above for why.
|
|
||||||
next_id: u64,
|
|
||||||
|
|
||||||
/// True when the Db instance is shutting down. This happens when all `Db`
|
/// True when the Db instance is shutting down. This happens when all `Db`
|
||||||
/// values drop. Setting this to `true` signals to the background task to
|
/// values drop. Setting this to `true` signals to the background task to
|
||||||
@@ -92,9 +88,6 @@ struct State {
|
|||||||
/// Entry in the key-value store
|
/// Entry in the key-value store
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Entry {
|
struct Entry {
|
||||||
/// Uniquely identifies this entry.
|
|
||||||
id: u64,
|
|
||||||
|
|
||||||
/// Stored data
|
/// Stored data
|
||||||
data: Bytes,
|
data: Bytes,
|
||||||
|
|
||||||
@@ -132,8 +125,7 @@ impl Db {
|
|||||||
state: Mutex::new(State {
|
state: Mutex::new(State {
|
||||||
entries: HashMap::new(),
|
entries: HashMap::new(),
|
||||||
pub_sub: HashMap::new(),
|
pub_sub: HashMap::new(),
|
||||||
expirations: BTreeMap::new(),
|
expirations: BTreeSet::new(),
|
||||||
next_id: 0,
|
|
||||||
shutdown: false,
|
shutdown: false,
|
||||||
}),
|
}),
|
||||||
background_task: Notify::new(),
|
background_task: Notify::new(),
|
||||||
@@ -166,11 +158,6 @@ impl Db {
|
|||||||
pub(crate) fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
|
pub(crate) fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
|
||||||
let mut state = self.shared.state.lock().unwrap();
|
let mut state = self.shared.state.lock().unwrap();
|
||||||
|
|
||||||
// Get and increment the next insertion ID. Guarded by the lock, this
|
|
||||||
// ensures a unique identifier is associated with each `set` operation.
|
|
||||||
let id = state.next_id;
|
|
||||||
state.next_id += 1;
|
|
||||||
|
|
||||||
// If this `set` becomes the key that expires **next**, the background
|
// If this `set` becomes the key that expires **next**, the background
|
||||||
// task needs to be notified so it can update its state.
|
// task needs to be notified so it can update its state.
|
||||||
//
|
//
|
||||||
@@ -191,15 +178,14 @@ impl Db {
|
|||||||
.unwrap_or(true);
|
.unwrap_or(true);
|
||||||
|
|
||||||
// Track the expiration.
|
// Track the expiration.
|
||||||
state.expirations.insert((when, id), key.clone());
|
state.expirations.insert((when, key.clone()));
|
||||||
when
|
when
|
||||||
});
|
});
|
||||||
|
|
||||||
// Insert the entry into the `HashMap`.
|
// Insert the entry into the `HashMap`.
|
||||||
let prev = state.entries.insert(
|
let prev = state.entries.insert(
|
||||||
key,
|
key.clone(),
|
||||||
Entry {
|
Entry {
|
||||||
id,
|
|
||||||
data: value,
|
data: value,
|
||||||
expires_at,
|
expires_at,
|
||||||
},
|
},
|
||||||
@@ -211,7 +197,7 @@ impl Db {
|
|||||||
if let Some(prev) = prev {
|
if let Some(prev) = prev {
|
||||||
if let Some(when) = prev.expires_at {
|
if let Some(when) = prev.expires_at {
|
||||||
// clear expiration
|
// clear expiration
|
||||||
state.expirations.remove(&(when, prev.id));
|
state.expirations.remove(&(when, key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,7 +301,7 @@ impl Shared {
|
|||||||
// Find all keys scheduled to expire **before** now.
|
// Find all keys scheduled to expire **before** now.
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
while let Some((&(when, id), key)) = state.expirations.iter().next() {
|
while let Some(&(when, ref key)) = state.expirations.iter().next() {
|
||||||
if when > now {
|
if when > now {
|
||||||
// Done purging, `when` is the instant at which the next key
|
// Done purging, `when` is the instant at which the next key
|
||||||
// expires. The worker task will wait until this instant.
|
// expires. The worker task will wait until this instant.
|
||||||
@@ -324,7 +310,7 @@ impl Shared {
|
|||||||
|
|
||||||
// The key expired, remove it
|
// The key expired, remove it
|
||||||
state.entries.remove(key);
|
state.entries.remove(key);
|
||||||
state.expirations.remove(&(when, id));
|
state.expirations.remove(&(when, key.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
None
|
||||||
@@ -342,7 +328,7 @@ impl Shared {
|
|||||||
impl State {
|
impl State {
|
||||||
fn next_expiration(&self) -> Option<Instant> {
|
fn next_expiration(&self) -> Option<Instant> {
|
||||||
self.expirations
|
self.expirations
|
||||||
.keys()
|
.iter()
|
||||||
.next()
|
.next()
|
||||||
.map(|expiration| expiration.0)
|
.map(|expiration| expiration.0)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user