Refactor communicate for better readability

Also improve performance on larger documents.

Signed-off-by: rany2 <rany2@riseup.net>
This commit is contained in:
rany2
2024-02-16 18:36:20 +02:00
parent df6bac8b54
commit c9bf4247a8
2 changed files with 224 additions and 179 deletions

View File

@@ -229,6 +229,25 @@ class Communicate:
Class for communicating with the service.
"""
@staticmethod
def validate_string_param(param_name: str, param_value: str, pattern: str) -> str:
"""
Validates the given string parameter based on type and pattern.
Args:
param_name (str): The name of the parameter.
param_value (str): The value of the parameter.
pattern (str): The pattern to validate the parameter against.
Returns:
str: The validated parameter.
"""
if not isinstance(param_value, str):
raise TypeError(f"{param_name} must be str")
if re.match(pattern, param_value) is None:
raise ValueError(f"Invalid {param_name} '{param_value}'.")
return param_value
def __init__(
self,
text: str,
@@ -238,6 +257,7 @@ class Communicate:
volume: str = "+0%",
pitch: str = "+0Hz",
proxy: Optional[str] = None,
receive_timeout: int = 5,
):
"""
Initializes the Communicate class.
@@ -270,190 +290,191 @@ class Communicate:
+ f" ({lang}-{region}, {name})"
)
if (
re.match(
r"^Microsoft Server Speech Text to Speech Voice \(.+,.+\)$",
self.voice,
)
is None
):
raise ValueError(f"Invalid voice '{voice}'.")
if not isinstance(rate, str):
raise TypeError("rate must be str")
if re.match(r"^[+-]\d+%$", rate) is None:
raise ValueError(f"Invalid rate '{rate}'.")
self.rate: str = rate
if not isinstance(volume, str):
raise TypeError("volume must be str")
if re.match(r"^[+-]\d+%$", volume) is None:
raise ValueError(f"Invalid volume '{volume}'.")
self.volume: str = volume
if not isinstance(pitch, str):
raise TypeError("pitch must be str")
if re.match(r"^[+-]\d+Hz$", pitch) is None:
raise ValueError(f"Invalid pitch '{pitch}'.")
self.pitch: str = pitch
self.voice = self.validate_string_param(
"voice",
self.voice,
r"^Microsoft Server Speech Text to Speech Voice \(.+,.+\)$",
)
self.rate = self.validate_string_param("rate", rate, r"^[+-]\d+%$")
self.volume = self.validate_string_param("volume", volume, r"^[+-]\d+%$")
self.pitch = self.validate_string_param("pitch", pitch, r"^[+-]\d+Hz$")
if proxy is not None and not isinstance(proxy, str):
raise TypeError("proxy must be str")
self.proxy: Optional[str] = proxy
if not isinstance(receive_timeout, int):
raise TypeError("receive_timeout must be int")
self.receive_timeout: int = receive_timeout
async def stream(self) -> AsyncGenerator[Dict[str, Any], None]:
"""Streams audio and metadata from the service."""
async def send_command_request() -> None:
"""Sends the request to the service."""
# Prepare the request to be sent to the service.
#
# Note sentenceBoundaryEnabled and wordBoundaryEnabled are actually supposed
# to be booleans, but Edge Browser seems to send them as strings.
#
# This is a bug in Edge as Azure Cognitive Services actually sends them as
# bool and not string. For now I will send them as bool unless it causes
# any problems.
#
# Also pay close attention to double { } in request (escape for f-string).
await websocket.send_str(
f"X-Timestamp:{date_to_string()}\r\n"
"Content-Type:application/json; charset=utf-8\r\n"
"Path:speech.config\r\n\r\n"
'{"context":{"synthesis":{"audio":{"metadataoptions":{'
'"sentenceBoundaryEnabled":false,"wordBoundaryEnabled":true},'
'"outputFormat":"audio-24khz-48kbitrate-mono-mp3"'
"}}}}\r\n"
)
async def send_ssml_request() -> bool:
"""Sends the SSML request to the service."""
# Get the next string from the generator.
text = next(texts, None)
# If there are no more strings, return False.
if text is None:
return False
# Send the request to the service and return True.
await websocket.send_str(
ssml_headers_plus_data(
connect_id(),
date_to_string(),
mkssml(text, self.voice, self.rate, self.volume, self.pitch),
)
)
return True
def parse_metadata():
for meta_obj in json.loads(data)["Metadata"]:
meta_type = meta_obj["Type"]
if meta_type == "WordBoundary":
current_offset = meta_obj["Data"]["Offset"] + offset_compensation
current_duration = meta_obj["Data"]["Duration"]
return {
"type": meta_type,
"offset": current_offset,
"duration": current_duration,
"text": meta_obj["Data"]["text"]["Text"],
}
elif meta_type in ("SessionEnd",):
continue
else:
raise UnknownResponse(f"Unknown metadata type: {meta_type}")
# Split the text into multiple strings if it is too long for the service.
texts = split_text_by_byte_length(
escape(remove_incompatible_characters(self.text)),
calc_max_mesg_size(self.voice, self.rate, self.volume, self.pitch),
)
final_utterance: Dict[int, int] = {}
prev_idx = -1
shift_time = -1
# Keep track of last duration + offset to calculate the offset
# upon word split.
last_duration_offset = 0
# Current offset compensations.
offset_compensation = 0
# Create a new connection to the service.
ssl_ctx = ssl.create_default_context(cafile=certifi.where())
for idx, text in enumerate(texts):
async with aiohttp.ClientSession(
trust_env=True,
) as session, session.ws_connect(
f"{WSS_URL}&ConnectionId={connect_id()}",
compress=15,
autoclose=True,
autoping=True,
proxy=self.proxy,
headers={
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
" (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
},
ssl=ssl_ctx,
) as websocket:
# download indicates whether we should be expecting audio data,
# this is so what we avoid getting binary data from the websocket
# and falsely thinking it's audio data.
download_audio = False
async with aiohttp.ClientSession(
trust_env=True,
) as session, session.ws_connect(
f"{WSS_URL}&ConnectionId={connect_id()}",
compress=15,
proxy=self.proxy,
receive_timeout=self.receive_timeout,
headers={
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
" (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
},
ssl=ssl_ctx,
) as websocket:
# audio_was_received indicates whether we have received audio data
# from the websocket. This is so we can raise an exception if we
# don't receive any audio data.
audio_was_received = False
# audio_was_received indicates whether we have received audio data
# from the websocket. This is so we can raise an exception if we
# don't receive any audio data.
audio_was_received = False
# Send the request to the service.
await send_command_request()
# Each message needs to have the proper date.
date = date_to_string()
# Send the SSML request to the service.
await send_ssml_request()
# Prepare the request to be sent to the service.
#
# Note sentenceBoundaryEnabled and wordBoundaryEnabled are actually supposed
# to be booleans, but Edge Browser seems to send them as strings.
#
# This is a bug in Edge as Azure Cognitive Services actually sends them as
# bool and not string. For now I will send them as bool unless it causes
# any problems.
#
# Also pay close attention to double { } in request (escape for f-string).
await websocket.send_str(
f"X-Timestamp:{date}\r\n"
"Content-Type:application/json; charset=utf-8\r\n"
"Path:speech.config\r\n\r\n"
'{"context":{"synthesis":{"audio":{"metadataoptions":{'
'"sentenceBoundaryEnabled":false,"wordBoundaryEnabled":true},'
'"outputFormat":"audio-24khz-48kbitrate-mono-mp3"'
"}}}}\r\n"
)
async for received in websocket:
if received.type == aiohttp.WSMsgType.TEXT:
parameters, data = get_headers_and_data(received.data)
path = parameters.get(b"Path")
if path == b"audio.metadata":
# Parse the metadata and yield it.
parsed_metadata = parse_metadata()
yield parsed_metadata
await websocket.send_str(
ssml_headers_plus_data(
connect_id(),
date,
mkssml(text, self.voice, self.rate, self.volume, self.pitch),
)
)
# Update the last duration offset for use by the next SSML request.
last_duration_offset = (
parsed_metadata["offset"] + parsed_metadata["duration"]
)
elif path == b"turn.end":
# Update the offset compensation for the next SSML request.
offset_compensation = last_duration_offset
async for received in websocket:
if received.type == aiohttp.WSMsgType.TEXT:
parameters, data = get_headers_and_data(received.data)
path = parameters.get(b"Path")
if path == b"turn.start":
download_audio = True
elif path == b"turn.end":
download_audio = False
break # End of audio data
elif path == b"audio.metadata":
for meta_obj in json.loads(data)["Metadata"]:
meta_type = meta_obj["Type"]
if idx != prev_idx:
shift_time = sum(
final_utterance[i] for i in range(idx)
)
prev_idx = idx
if meta_type == "WordBoundary":
final_utterance[idx] = (
meta_obj["Data"]["Offset"]
+ meta_obj["Data"]["Duration"]
# Average padding added by the service
# Alternatively we could use ffmpeg to get value properly
# but I don't want to add an additional dependency
# if this is found to work well enough.
+ 8_750_000
)
yield {
"type": meta_type,
"offset": meta_obj["Data"]["Offset"]
+ shift_time,
"duration": meta_obj["Data"]["Duration"],
"text": meta_obj["Data"]["text"]["Text"],
}
elif meta_type == "SessionEnd":
continue
else:
raise UnknownResponse(
f"Unknown metadata type: {meta_type}"
)
elif path == b"response":
pass
else:
raise UnknownResponse(
"The response from the service is not recognized.\n"
+ received.data
)
elif received.type == aiohttp.WSMsgType.BINARY:
if not download_audio:
raise UnexpectedResponse(
"We received a binary message, but we are not expecting one."
)
# Use average padding typically added by the service
# to the end of the audio data. This seems to work pretty
# well for now, but we might ultimately need to use a
# more sophisticated method like using ffmpeg to get
# the actual duration of the audio data.
offset_compensation += 8_750_000
if len(received.data) < 2:
raise UnexpectedResponse(
"We received a binary message, but it is missing the header length."
)
# See: https://github.com/microsoft/cognitive-services-speech-sdk-js/blob/d071d11/src/common.speech/WebsocketMessageFormatter.ts#L46
header_length = int.from_bytes(received.data[:2], "big")
if len(received.data) < header_length + 2:
raise UnexpectedResponse(
"We received a binary message, but it is missing the audio data."
)
yield {
"type": "audio",
"data": received.data[header_length + 2 :],
}
audio_was_received = True
elif received.type == aiohttp.WSMsgType.ERROR:
raise WebSocketError(
received.data if received.data else "Unknown error"
# Send the next SSML request to the service.
if not await send_ssml_request():
break
elif path in (b"response", b"turn.start"):
pass
else:
raise UnknownResponse(
"The response from the service is not recognized.\n"
+ received.data
)
elif received.type == aiohttp.WSMsgType.BINARY:
if len(received.data) < 2:
raise UnexpectedResponse(
"We received a binary message, but it is missing the header length."
)
if not audio_was_received:
raise NoAudioReceived(
"No audio was received. Please verify that your parameters are correct."
header_length = int.from_bytes(received.data[:2], "big")
if len(received.data) < header_length + 2:
raise UnexpectedResponse(
"We received a binary message, but it is missing the audio data."
)
audio_was_received = True
yield {
"type": "audio",
"data": received.data[header_length + 2 :],
}
elif received.type == aiohttp.WSMsgType.ERROR:
raise WebSocketError(
received.data if received.data else "Unknown error"
)
if not audio_was_received:
raise NoAudioReceived(
"No audio was received. Please verify that your parameters are correct."
)
async def save(
self,
audio_fname: Union[str, bytes],