fix sentence boundary

This commit is contained in:
rany
2022-03-01 21:42:01 +02:00
parent 43ee535752
commit 16d5dc273c
5 changed files with 112 additions and 52 deletions

2
lint.sh Executable file
View File

@@ -0,0 +1,2 @@
find src -name '*.py' | xargs black
find src -name '*.py' | xargs isort

View File

@@ -18,16 +18,14 @@ def main():
with tempfile.NamedTemporaryFile() as media:
with tempfile.NamedTemporaryFile() as subtitle:
print()
print(f"Media file {media.name}")
print(f"Subtitle file {subtitle.name}\n")
print(f"Media file: {media.name}")
print(f"Subtitle file: {subtitle.name}\n")
with subprocess.Popen(
[
"edge-tts",
"-w",
"--write-media",
media.name,
"--write-subtitles",
subtitle.name,
"--boundary-type=2",
f"--write-media={media.name}",
f"--write-subtitles={subtitle.name}",
]
+ sys.argv[1:]
) as process:

View File

@@ -207,8 +207,7 @@ class Communicate: # pylint: disable=too-few-public-methods
async def run(
self,
messages,
sentence_boundary=False,
word_boundary=False,
boundary_type=0,
codec="audio-24khz-48kbitrate-mono-mp3",
voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
pitch="+0Hz",
@@ -221,8 +220,7 @@ class Communicate: # pylint: disable=too-few-public-methods
Args:
messages (str or list): A list of SSML strings or a single text.
sentence_boundary (bool): Whether to use sentence boundary.
word_boundary (bool): Whether to use word boundary.
boundery_type (int): The type of boundary to use. 0 for none, 1 for word_boundary, 2 for sentence_boundary.
codec (str): The codec to use.
voice (str): The voice to use (only applicable to non-customspeak).
pitch (str): The pitch to use (only applicable to non-customspeak).
@@ -234,11 +232,19 @@ class Communicate: # pylint: disable=too-few-public-methods
tuple: The subtitle offset, subtitle, and audio data.
"""
word_boundary = False
sentence_boundary = False
if boundary_type > 0:
word_boundary = True
if boundary_type > 1:
sentence_boundary = True
sentence_boundary = str(sentence_boundary).lower()
word_boundary = str(word_boundary).lower()
if not customspeak:
websocket_max_size = 2 ** 16
websocket_max_size = 2**16
overhead_per_message = (
len(
ssml_headers_plus_data(
@@ -312,6 +318,9 @@ class Communicate: # pylint: disable=too-few-public-methods
# Begin listening for the response.
download = False
current_subtitle = ""
first_offset = None
last_offset = None
async for received in websocket:
if received.type in (
aiohttp.WSMsgType.CLOSED,
@@ -337,13 +346,53 @@ class Communicate: # pylint: disable=too-few-public-methods
and parameters["Path"] == "audio.metadata"
):
metadata = json.loads(data)
text = metadata["Metadata"][0]["Data"]["text"]["Text"]
offset = metadata["Metadata"][0]["Data"]["Offset"]
yield (
offset,
text,
None,
)
metadata_type = metadata["Metadata"][0]["Type"]
metadata_offset = metadata["Metadata"][0]["Data"][
"Offset"
]
try:
metadata_duration = metadata["Metadata"][0]["Data"][
"Duration"
]
except KeyError:
metadata_duration = 0
metadata_text = metadata["Metadata"][0]["Data"]["text"][
"Text"
]
if boundary_type == 1:
yield (
[
metadata_offset,
metadata_duration,
],
metadata_text,
None,
)
else:
if metadata_type == "WordBoundary":
if current_subtitle:
current_subtitle += " "
current_subtitle += metadata_text
if first_offset is None:
first_offset = metadata_offset
last_offset = [
metadata_offset,
metadata_duration,
]
elif metadata_type == "SentenceBoundary":
if current_subtitle:
yield (
[
first_offset,
sum(last_offset) - first_offset,
],
current_subtitle,
None,
)
current_subtitle = ""
first_offset = None
last_offset = None
elif received.type == aiohttp.WSMsgType.BINARY:
if download:
yield (
@@ -353,4 +402,10 @@ class Communicate: # pylint: disable=too-few-public-methods
received.data.split(b"Path:audio\r\n")[1:]
),
)
if current_subtitle:
yield (
[first_offset, sum(last_offset) - first_offset],
current_subtitle,
None,
)
await websocket.close()

View File

@@ -28,9 +28,9 @@ def mktimestamp(time_unit):
Returns:
str: The timecode of the subtitle.
"""
hour = math.floor(time_unit / 10 ** 7 / 3600)
minute = math.floor((time_unit / 10 ** 7 / 60) % 60)
seconds = (time_unit / 10 ** 7) % 60
hour = math.floor(time_unit / 10**7 / 3600)
minute = math.floor((time_unit / 10**7 / 60) % 60)
seconds = (time_unit / 10**7) % 60
return f"{hour:02d}:{minute:02d}:{seconds:06.3f}"
@@ -49,7 +49,7 @@ class SubMaker:
"""
self.subs_and_offset = []
self.broken_offset = []
self.overlapping = overlapping * (10 ** 7)
self.overlapping = overlapping * (10**7)
def create_sub(self, timestamp, text):
"""
@@ -57,16 +57,19 @@ class SubMaker:
and adds it to the list of subtitles
Args:
timestamp (int): The timestamp of the subtitle.
timestamp (tuple): The offset and duration of the subtitle.
text (str): The text of the subtitle.
Returns:
None
"""
timestamp[1] += timestamp[0]
if len(self.subs_and_offset) >= 2:
if self.subs_and_offset[-2] >= timestamp + sum(self.broken_offset):
self.broken_offset.append(self.subs_and_offset[-2])
timestamp = timestamp + sum(self.broken_offset)
if self.subs_and_offset[-2][-1] >= timestamp[1] + sum(self.broken_offset):
self.broken_offset.append(self.subs_and_offset[-2][1])
timestamp[0] += sum(self.broken_offset)
timestamp[1] += sum(self.broken_offset)
self.subs_and_offset.append(timestamp)
self.subs_and_offset.append(text)
@@ -80,19 +83,27 @@ class SubMaker:
"""
if len(self.subs_and_offset) >= 2:
data = "WEBVTT\r\n\r\n"
old_time_stamp = None
old_sub_data = None
for offset, subs in zip(
self.subs_and_offset[::2], self.subs_and_offset[1::2]
):
if old_time_stamp is not None and old_sub_data is not None:
data += formatter(
old_time_stamp, offset + self.overlapping, old_sub_data
)
old_time_stamp = offset
old_sub_data = subs
data += formatter(
old_time_stamp, old_time_stamp + ((10 ** 7) * 10), old_sub_data
)
subs = [subs[i : i + 79] for i in range(0, len(subs), 79)]
for i in range(len(subs) - 1):
sub = subs[i]
split_at_word = True
if sub[-1] == " ":
subs[i] = sub[:-1]
split_at_word = False
if sub[0] == " ":
subs[i] = sub[1:]
split_at_word = False
if split_at_word:
subs[i] += "-"
subs = "\r\n".join(subs)
data += formatter(offset[0], offset[1] + self.overlapping, subs)
return data
return ""

View File

@@ -32,8 +32,7 @@ async def _tts(args):
media_file = open(args.write_media, "wb") # pylint: disable=consider-using-with
async for i in tts.run(
args.text,
args.enable_sentence_boundary,
args.enable_word_boundary,
args.boundary_type,
args.codec,
args.voice,
args.pitch,
@@ -108,18 +107,6 @@ async def _main():
help="set TTS volume. Default +0%%. For more info check https://bit.ly/3eAE5Nx",
default="+0%",
)
parser.add_argument(
"-s",
"--enable-sentence-boundary",
help="enable sentence boundary",
action="store_true",
)
parser.add_argument(
"-w",
"--enable-word-boundary",
help="enable word boundary",
action="store_true",
)
parser.add_argument(
"-O",
"--overlapping",
@@ -127,6 +114,13 @@ async def _main():
default=5,
type=float,
)
parser.add_argument(
"-b",
"--boundary-type",
help="set boundary type for subtitles. Default 0 for none. Set 1 for word_boundary, 2 for sentence_boundary",
default=0,
type=int,
)
parser.add_argument(
"--write-media", help="instead of stdout, send media output to provided file"
)