chore: update to Tokio 1.0 (#70)

This commit is contained in:
Carl Lerche
2020-12-23 09:41:23 -08:00
committed by GitHub
parent da29371460
commit 0fb264826d
7 changed files with 120 additions and 123 deletions

View File

@@ -10,7 +10,7 @@ use bytes::Bytes;
use std::io::{Error, ErrorKind};
use std::time::Duration;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::stream::Stream;
use tokio_stream::Stream;
use tracing::{debug, instrument};
/// Established connection with a Redis server.

View File

@@ -4,8 +4,8 @@ use crate::{Command, Connection, Db, Frame, Shutdown};
use bytes::Bytes;
use std::pin::Pin;
use tokio::select;
use tokio::stream::{Stream, StreamExt, StreamMap};
use tokio::sync::broadcast;
use tokio_stream::{Stream, StreamExt, StreamMap};
/// Subscribes the client to one or more channels.
///

View File

@@ -145,7 +145,7 @@ impl Frame {
return Err(Error::Incomplete);
}
let data = Bytes::copy_from_slice(&src.bytes()[..len]);
let data = Bytes::copy_from_slice(&src.chunk()[..len]);
// skip that number of bytes + 2 (\r\n).
skip(src, n)?;
@@ -215,7 +215,7 @@ fn peek_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {
return Err(Error::Incomplete);
}
Ok(src.bytes()[0])
Ok(src.chunk()[0])
}
fn get_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> {

View File

@@ -238,7 +238,10 @@ impl Listener {
// "forget" the permit, which drops the permit value **without**
// incrementing the semaphore's permits. Then, in the handler task
// we manually add a new permit when processing completes.
self.limit_connections.acquire().await.forget();
//
// `acquire()` returns `Err` when the semaphore has been closed. We
// don't ever close the sempahore, so `unwrap()` is safe.
self.limit_connections.acquire().await.unwrap().forget();
// Accept a new socket. This will attempt to perform error handling.
// The `accept` method internally attempts to recover errors, so an