refactor: use OwnedSemaphore for connection limiting (#100)
This commit is contained in:
committed by
GitHub
parent
25062cd238
commit
cf1e4e465e
@@ -84,13 +84,6 @@ struct Handler {
|
|||||||
/// the byte level protocol parsing details encapsulated in `Connection`.
|
/// the byte level protocol parsing details encapsulated in `Connection`.
|
||||||
connection: Connection,
|
connection: Connection,
|
||||||
|
|
||||||
/// Max connection semaphore.
|
|
||||||
///
|
|
||||||
/// When the handler is dropped, a permit is returned to this semaphore. If
|
|
||||||
/// the listener is waiting for connections to close, it will be notified of
|
|
||||||
/// the newly available permit and resume accepting connections.
|
|
||||||
limit_connections: Arc<Semaphore>,
|
|
||||||
|
|
||||||
/// Listen for shutdown notifications.
|
/// Listen for shutdown notifications.
|
||||||
///
|
///
|
||||||
/// A wrapper around the `broadcast::Receiver` paired with the sender in
|
/// A wrapper around the `broadcast::Receiver` paired with the sender in
|
||||||
@@ -229,18 +222,18 @@ impl Listener {
|
|||||||
loop {
|
loop {
|
||||||
// Wait for a permit to become available
|
// Wait for a permit to become available
|
||||||
//
|
//
|
||||||
// `acquire` returns a permit that is bound via a lifetime to the
|
// `acquire_owned` returns a permit that is bound to the semaphore.
|
||||||
// semaphore. When the permit value is dropped, it is automatically
|
// When the permit value is dropped, it is automatically returned
|
||||||
// returned to the semaphore. This is convenient in many cases.
|
// to the semaphore.
|
||||||
// However, in this case, the permit must be returned in a different
|
|
||||||
// task than it is acquired in (the handler task). To do this, we
|
|
||||||
// "forget" the permit, which drops the permit value **without**
|
|
||||||
// incrementing the semaphore's permits. Then, in the handler task
|
|
||||||
// we manually add a new permit when processing completes.
|
|
||||||
//
|
//
|
||||||
// `acquire()` returns `Err` when the semaphore has been closed. We
|
// `acquire_owned()` returns `Err` when the semaphore has been
|
||||||
// don't ever close the sempahore, so `unwrap()` is safe.
|
// closed. We don't ever close the sempahore, so `unwrap()` is safe.
|
||||||
self.limit_connections.acquire().await.unwrap().forget();
|
let permit = self
|
||||||
|
.limit_connections
|
||||||
|
.clone()
|
||||||
|
.acquire_owned()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Accept a new socket. This will attempt to perform error handling.
|
// Accept a new socket. This will attempt to perform error handling.
|
||||||
// The `accept` method internally attempts to recover errors, so an
|
// The `accept` method internally attempts to recover errors, so an
|
||||||
@@ -256,11 +249,6 @@ impl Listener {
|
|||||||
// buffers to perform redis protocol frame parsing.
|
// buffers to perform redis protocol frame parsing.
|
||||||
connection: Connection::new(socket),
|
connection: Connection::new(socket),
|
||||||
|
|
||||||
// The connection state needs a handle to the max connections
|
|
||||||
// semaphore. When the handler is done processing the
|
|
||||||
// connection, a permit is added back to the semaphore.
|
|
||||||
limit_connections: self.limit_connections.clone(),
|
|
||||||
|
|
||||||
// Receive shutdown notifications.
|
// Receive shutdown notifications.
|
||||||
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
|
shutdown: Shutdown::new(self.notify_shutdown.subscribe()),
|
||||||
|
|
||||||
@@ -276,6 +264,9 @@ impl Listener {
|
|||||||
if let Err(err) = handler.run().await {
|
if let Err(err) = handler.run().await {
|
||||||
error!(cause = ?err, "connection error");
|
error!(cause = ?err, "connection error");
|
||||||
}
|
}
|
||||||
|
// Move the permit into the task and drop it after completion.
|
||||||
|
// This returns the permit back to the semaphore.
|
||||||
|
drop(permit);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -380,19 +371,3 @@ impl Handler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Handler {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
// Add a permit back to the semaphore.
|
|
||||||
//
|
|
||||||
// Doing so unblocks the listener if the max number of
|
|
||||||
// connections has been reached.
|
|
||||||
//
|
|
||||||
// This is done in a `Drop` implementation in order to guarantee that
|
|
||||||
// the permit is added even if the task handling the connection panics.
|
|
||||||
// If `add_permit` was called at the end of the `run` function and some
|
|
||||||
// bug causes a panic. The permit would never be returned to the
|
|
||||||
// semaphore.
|
|
||||||
self.limit_connections.add_permits(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user