Update to edge-tts 4.0.0
This commit is contained in:
353
src/edge_tts/communicate.py
Normal file
353
src/edge_tts/communicate.py
Normal file
@@ -0,0 +1,353 @@
|
||||
"""
|
||||
Communicate package.
|
||||
"""
|
||||
|
||||
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
from xml.sax.saxutils import escape
|
||||
|
||||
import aiohttp
|
||||
|
||||
from .constants import WSS_URL
|
||||
|
||||
|
||||
def get_headers_and_data(data):
|
||||
"""
|
||||
Returns the headers and data from the given data.
|
||||
|
||||
Args:
|
||||
data (str or bytes): The data to be parsed.
|
||||
|
||||
Returns:
|
||||
tuple: The headers and data to be used in the request.
|
||||
"""
|
||||
if isinstance(data, str):
|
||||
data = data.encode("utf-8")
|
||||
|
||||
headers = {}
|
||||
for line in data.split(b"\r\n\r\n")[0].split(b"\r\n"):
|
||||
line_split = line.split(b":")
|
||||
key, value = line_split[0], b":".join(line_split[1:])
|
||||
if value.startswith(b" "):
|
||||
value = value[1:]
|
||||
headers[key.decode("utf-8")] = value.decode("utf-8")
|
||||
|
||||
return headers, b"\r\n\r\n".join(data.split(b"\r\n\r\n")[1:])
|
||||
|
||||
|
||||
def remove_incompatible_characters(string):
|
||||
"""
|
||||
The service does not support a couple character ranges.
|
||||
Most important being the vertical tab character which is
|
||||
commonly present in OCR-ed PDFs. Not doing this will
|
||||
result in an error from the service.
|
||||
|
||||
Args:
|
||||
string (str or bytes): The string to be cleaned.
|
||||
|
||||
Returns:
|
||||
str: The cleaned string.
|
||||
"""
|
||||
if isinstance(string, bytes):
|
||||
string = string.decode("utf-8")
|
||||
|
||||
string = list(string)
|
||||
|
||||
for idx in range(len(string)): # pylint: disable=consider-using-enumerate
|
||||
code = ord(string[idx])
|
||||
if (0 <= code <= 8) or (11 <= code <= 12) or (14 <= code <= 31):
|
||||
string[idx] = " "
|
||||
|
||||
return "".join(string)
|
||||
|
||||
|
||||
def connect_id():
|
||||
"""
|
||||
Returns a UUID without dashes.
|
||||
|
||||
Args:
|
||||
None
|
||||
|
||||
Returns:
|
||||
str: A UUID without dashes.
|
||||
"""
|
||||
return str(uuid.uuid4()).replace("-", "")
|
||||
|
||||
|
||||
def iter_bytes(my_bytes):
|
||||
"""
|
||||
Iterates over bytes object
|
||||
|
||||
Args:
|
||||
my_bytes: Bytes object to iterate over
|
||||
|
||||
Yields:
|
||||
the individual bytes
|
||||
"""
|
||||
for i in range(len(my_bytes)):
|
||||
yield my_bytes[i : i + 1]
|
||||
|
||||
|
||||
def split_text_by_byte_length(text, byte_length):
|
||||
"""
|
||||
Splits a string into a list of strings of a given byte length
|
||||
while attempting to keep words together.
|
||||
|
||||
Args:
|
||||
text (byte): The string to be split.
|
||||
byte_length (int): The byte length of each string in the list.
|
||||
|
||||
Returns:
|
||||
list: A list of strings of the given byte length.
|
||||
"""
|
||||
if isinstance(text, str):
|
||||
text = text.encode("utf-8")
|
||||
|
||||
split_text = []
|
||||
current_string = b""
|
||||
for character in iter_bytes(text):
|
||||
if len(current_string) + len(character) <= byte_length:
|
||||
current_string += character
|
||||
else:
|
||||
split_text.append(current_string)
|
||||
current_string = character
|
||||
if split_text[-1].find(b" ") != -1:
|
||||
while split_text[-1][-1] != b" ":
|
||||
current_string = split_text[-1][-1] + current_string
|
||||
split_text[-1] = split_text[-1][:-1]
|
||||
if current_string != b"":
|
||||
split_text.append(current_string)
|
||||
return split_text
|
||||
|
||||
|
||||
def mkssml(text, voice, pitch, rate, volume):
|
||||
"""
|
||||
Creates a SSML string from the given parameters.
|
||||
|
||||
Args:
|
||||
text (str): The text to be spoken.
|
||||
voice (str): The voice to be used.
|
||||
pitch (str): The pitch to be used.
|
||||
rate (str): The rate to be used.
|
||||
volume (str): The volume to be used.
|
||||
|
||||
Returns:
|
||||
str: The SSML string.
|
||||
"""
|
||||
if isinstance(text, bytes):
|
||||
text = text.decode("utf-8")
|
||||
|
||||
ssml = (
|
||||
"<speak version='1.0' xmlns='http://www.w3.org/2001/10/synthesis' xml:lang='en-US'>"
|
||||
f"<voice name='{voice}'><prosody pitch='{pitch}' rate='{rate}' volume='{volume}'>"
|
||||
f"{text}</prosody></voice></speak>"
|
||||
)
|
||||
return ssml
|
||||
|
||||
|
||||
def date_to_string():
|
||||
"""
|
||||
Return Javascript-style date string.
|
||||
|
||||
Args:
|
||||
None
|
||||
|
||||
Returns:
|
||||
str: Javascript-style date string.
|
||||
"""
|
||||
# %Z is not what we want, but it's the only way to get the timezone
|
||||
# without having to use a library. We'll just use UTC and hope for the best.
|
||||
# For example, right now %Z would return EEST when we need it to return
|
||||
# Eastern European Summer Time.
|
||||
#
|
||||
# return time.strftime("%a %b %d %Y %H:%M:%S GMT%z (%Z)")
|
||||
return time.strftime(
|
||||
"%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)", time.gmtime()
|
||||
)
|
||||
|
||||
|
||||
def ssml_headers_plus_data(request_id, timestamp, ssml):
|
||||
"""
|
||||
Returns the headers and data to be used in the request.
|
||||
|
||||
Args:
|
||||
request_id (str): The request ID.
|
||||
timestamp (str): The timestamp.
|
||||
ssml (str): The SSML string.
|
||||
|
||||
Returns:
|
||||
str: The headers and data to be used in the request.
|
||||
"""
|
||||
|
||||
return (
|
||||
f"X-RequestId:{request_id}\r\n"
|
||||
"Content-Type:application/ssml+xml\r\n"
|
||||
f"X-Timestamp:{timestamp}Z\r\n" # This is not a mistake, Microsoft Edge bug.
|
||||
"Path:ssml\r\n\r\n"
|
||||
f"{ssml}"
|
||||
)
|
||||
|
||||
|
||||
class Communicate: # pylint: disable=too-few-public-methods
|
||||
"""
|
||||
Class for communicating with the service.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initializes the Communicate class.
|
||||
"""
|
||||
self.date = date_to_string()
|
||||
|
||||
async def run(
|
||||
self,
|
||||
messages,
|
||||
sentence_boundary=False,
|
||||
word_boundary=False,
|
||||
codec="audio-24khz-48kbitrate-mono-mp3",
|
||||
voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
|
||||
pitch="+0Hz",
|
||||
rate="+0%",
|
||||
volume="+0%",
|
||||
customspeak=False,
|
||||
): # pylint: disable=too-many-arguments, too-many-locals
|
||||
"""
|
||||
Runs the Communicate class.
|
||||
|
||||
Args:
|
||||
messages (str or list): A list of SSML strings or a single text.
|
||||
sentence_boundary (bool): Whether to use sentence boundary.
|
||||
word_boundary (bool): Whether to use word boundary.
|
||||
codec (str): The codec to use.
|
||||
voice (str): The voice to use (only applicable to non-customspeak).
|
||||
pitch (str): The pitch to use (only applicable to non-customspeak).
|
||||
rate (str): The rate to use (only applicable to non-customspeak).
|
||||
volume (str): The volume to use (only applicable to non-customspeak).
|
||||
customspeak (bool): Whether to create the SSML or treat the messages as SSML.
|
||||
|
||||
Yields:
|
||||
tuple: The subtitle offset, subtitle, and audio data.
|
||||
"""
|
||||
|
||||
sentence_boundary = str(sentence_boundary).lower()
|
||||
word_boundary = str(word_boundary).lower()
|
||||
|
||||
if not customspeak:
|
||||
websocket_max_size = 2 ** 16
|
||||
overhead_per_message = (
|
||||
len(
|
||||
ssml_headers_plus_data(
|
||||
connect_id(), self.date, mkssml("", voice, pitch, rate, volume)
|
||||
)
|
||||
)
|
||||
+ 50
|
||||
) # margin of error
|
||||
messages = split_text_by_byte_length(
|
||||
escape(remove_incompatible_characters(messages)),
|
||||
websocket_max_size - overhead_per_message,
|
||||
)
|
||||
else:
|
||||
if isinstance(messages, str):
|
||||
messages = [messages]
|
||||
|
||||
async with aiohttp.ClientSession(trust_env=True) as session:
|
||||
async with session.ws_connect(
|
||||
f"{WSS_URL}&ConnectionId={connect_id()}",
|
||||
compress=15,
|
||||
autoclose=True,
|
||||
autoping=True,
|
||||
headers={
|
||||
"Pragma": "no-cache",
|
||||
"Cache-Control": "no-cache",
|
||||
"Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
|
||||
"Accept-Encoding": "gzip, deflate, br",
|
||||
"Accept-Language": "en-US,en;q=0.9",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||||
" (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
|
||||
},
|
||||
) as websocket:
|
||||
for message in messages:
|
||||
# Each message needs to have the proper date
|
||||
self.date = date_to_string()
|
||||
|
||||
# Prepare the request to be sent to the service.
|
||||
#
|
||||
# Note sentenceBoundaryEnabled and wordBoundaryEnabled are actually supposed
|
||||
# to be booleans, but Edge Browser seems to send them as strings.
|
||||
#
|
||||
# This is a bug in Edge as Azure Cognitive Services actually sends them as
|
||||
# bool and not string. For now I will send them as bool unless it causes
|
||||
# any problems.
|
||||
#
|
||||
# Also pay close attention to double { } in request (escape for f-string).
|
||||
request = (
|
||||
f"X-Timestamp:{self.date}\r\n"
|
||||
"Content-Type:application/json; charset=utf-8\r\n"
|
||||
"Path:speech.config\r\n\r\n"
|
||||
'{"context":{"synthesis":{"audio":{"metadataoptions":{'
|
||||
f'"sentenceBoundaryEnabled":{sentence_boundary},'
|
||||
f'"wordBoundaryEnabled":{word_boundary}}},"outputFormat":"{codec}"'
|
||||
"}}}}\r\n"
|
||||
)
|
||||
# Send the request to the service.
|
||||
await websocket.send_str(request)
|
||||
# Send the message itself.
|
||||
if not customspeak:
|
||||
await websocket.send_str(
|
||||
ssml_headers_plus_data(
|
||||
connect_id(),
|
||||
self.date,
|
||||
mkssml(message, voice, pitch, rate, volume),
|
||||
)
|
||||
)
|
||||
else:
|
||||
await websocket.send_str(
|
||||
ssml_headers_plus_data(connect_id(), self.date, message)
|
||||
)
|
||||
|
||||
# Begin listening for the response.
|
||||
download = False
|
||||
async for received in websocket:
|
||||
if received.type in (
|
||||
aiohttp.WSMsgType.CLOSED,
|
||||
aiohttp.WSMsgType.ERROR,
|
||||
):
|
||||
break
|
||||
|
||||
if received.type == aiohttp.WSMsgType.TEXT:
|
||||
parameters, data = get_headers_and_data(received.data)
|
||||
if (
|
||||
"Path" in parameters
|
||||
and parameters["Path"] == "turn.start"
|
||||
):
|
||||
download = True
|
||||
elif (
|
||||
"Path" in parameters
|
||||
and parameters["Path"] == "turn.end"
|
||||
):
|
||||
download = False
|
||||
break
|
||||
elif (
|
||||
"Path" in parameters
|
||||
and parameters["Path"] == "audio.metadata"
|
||||
):
|
||||
metadata = json.loads(data)
|
||||
text = metadata["Metadata"][0]["Data"]["text"]["Text"]
|
||||
offset = metadata["Metadata"][0]["Data"]["Offset"]
|
||||
yield (
|
||||
offset,
|
||||
text,
|
||||
None,
|
||||
)
|
||||
elif received.type == aiohttp.WSMsgType.BINARY:
|
||||
if download:
|
||||
yield (
|
||||
None,
|
||||
None,
|
||||
b"Path:audio\r\n".join(
|
||||
received.data.split(b"Path:audio\r\n")[1:]
|
||||
),
|
||||
)
|
||||
await websocket.close()
|
||||
Reference in New Issue
Block a user