From cf1e4e465eceaaddd9497353e809fe6b814d7b19 Mon Sep 17 00:00:00 2001 From: Vilgot Fredenberg Date: Sun, 29 May 2022 12:27:39 +0200 Subject: [PATCH] refactor: use `OwnedSemaphore` for connection limiting (#100) --- src/server.rs | 53 ++++++++++++++------------------------------------- 1 file changed, 14 insertions(+), 39 deletions(-) diff --git a/src/server.rs b/src/server.rs index 05dbed4..de94456 100644 --- a/src/server.rs +++ b/src/server.rs @@ -84,13 +84,6 @@ struct Handler { /// the byte level protocol parsing details encapsulated in `Connection`. connection: Connection, - /// Max connection semaphore. - /// - /// When the handler is dropped, a permit is returned to this semaphore. If - /// the listener is waiting for connections to close, it will be notified of - /// the newly available permit and resume accepting connections. - limit_connections: Arc, - /// Listen for shutdown notifications. /// /// A wrapper around the `broadcast::Receiver` paired with the sender in @@ -229,18 +222,18 @@ impl Listener { loop { // Wait for a permit to become available // - // `acquire` returns a permit that is bound via a lifetime to the - // semaphore. When the permit value is dropped, it is automatically - // returned to the semaphore. This is convenient in many cases. - // However, in this case, the permit must be returned in a different - // task than it is acquired in (the handler task). To do this, we - // "forget" the permit, which drops the permit value **without** - // incrementing the semaphore's permits. Then, in the handler task - // we manually add a new permit when processing completes. + // `acquire_owned` returns a permit that is bound to the semaphore. + // When the permit value is dropped, it is automatically returned + // to the semaphore. // - // `acquire()` returns `Err` when the semaphore has been closed. We - // don't ever close the sempahore, so `unwrap()` is safe. - self.limit_connections.acquire().await.unwrap().forget(); + // `acquire_owned()` returns `Err` when the semaphore has been + // closed. We don't ever close the sempahore, so `unwrap()` is safe. + let permit = self + .limit_connections + .clone() + .acquire_owned() + .await + .unwrap(); // Accept a new socket. This will attempt to perform error handling. // The `accept` method internally attempts to recover errors, so an @@ -256,11 +249,6 @@ impl Listener { // buffers to perform redis protocol frame parsing. connection: Connection::new(socket), - // The connection state needs a handle to the max connections - // semaphore. When the handler is done processing the - // connection, a permit is added back to the semaphore. - limit_connections: self.limit_connections.clone(), - // Receive shutdown notifications. shutdown: Shutdown::new(self.notify_shutdown.subscribe()), @@ -276,6 +264,9 @@ impl Listener { if let Err(err) = handler.run().await { error!(cause = ?err, "connection error"); } + // Move the permit into the task and drop it after completion. + // This returns the permit back to the semaphore. + drop(permit); }); } } @@ -380,19 +371,3 @@ impl Handler { Ok(()) } } - -impl Drop for Handler { - fn drop(&mut self) { - // Add a permit back to the semaphore. - // - // Doing so unblocks the listener if the max number of - // connections has been reached. - // - // This is done in a `Drop` implementation in order to guarantee that - // the permit is added even if the task handling the connection panics. - // If `add_permit` was called at the end of the `run` function and some - // bug causes a panic. The permit would never be returned to the - // semaphore. - self.limit_connections.add_permits(1); - } -}