diff --git a/bin/edge-playback b/bin/edge-playback
index 9ce36be..ad67ab9 100755
--- a/bin/edge-playback
+++ b/bin/edge-playback
@@ -1,10 +1,15 @@
#!/usr/bin/env bash
-unset stdin
-if [ "$1" == "stdin" ]
+media=$(mktemp)
+subs=$(mktemp)
+echo ""
+echo "Media file: $media"
+echo "Subtitle file: $subs"
+echo ""
+if [ "$1" == "NO_DELETE" ]
then
- stdin=$(cat)
shift 1
- edge-tts -f <(printf '%s' "$stdin") "$@" | mpv --keep-open=yes -
else
- edge-tts "$@" | mpv --keep-open=yes -
+ trap 'rm -f "${media:?}" "${subs:?}"' EXIT
fi
+edge-tts -w "$@" >"$media" 2>"$subs"
+mpv --keep-open=yes --sub-file="$subs" "$media"
diff --git a/examples/input_example.py b/examples/input_example.py
old mode 100644
new mode 100755
index 84681c9..6ec43b3
--- a/examples/input_example.py
+++ b/examples/input_example.py
@@ -2,17 +2,16 @@
# Example Python script that shows how to use edge-tts as a module
import asyncio
import tempfile
-import edgeTTS as e
+import edgeTTS
from playsound import playsound
async def main():
+ communicate = edgeTTS.Communicate()
ask = input("What do you want TTS to say? ")
- overhead = len(e.mkssmlmsg('').encode('utf-8'))
- ask = e._minimize(e.escape(e.removeIncompatibleControlChars(ask)), b" ", 2**16 - overhead)
with tempfile.NamedTemporaryFile() as fp:
- for part in ask:
- async for i in e.run_tts(e.mkssmlmsg(part.decode('utf-8'))):
- fp.write(i)
+ async for i in communicate.run(ask):
+ if i[2] is not None:
+ fp.write(i[2])
playsound(fp.name)
if __name__ == "__main__":
diff --git a/setup.cfg b/setup.cfg
index b9d11b6..7f2a8ed 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = edge-tts
-version = 1.1.5
+version = 2.0.0
author = rany
author_email = ranygh@riseup.net
description = Microsoft Edge's TTS
diff --git a/src/edgeTTS/__init__.py b/src/edgeTTS/__init__.py
index 821b624..6a579cc 100755
--- a/src/edgeTTS/__init__.py
+++ b/src/edgeTTS/__init__.py
@@ -8,7 +8,8 @@ import ssl
import websockets
import logging
import httpx
-from email.utils import formatdate
+import time
+import math
from xml.sax.saxutils import escape
# Default variables
@@ -17,6 +18,14 @@ trustedClientToken = '6A5AA1D4EAFF4E9FB37E23D68491D6F4'
wssUrl = 'wss://speech.platform.bing.com/consumer/speech/synthesize/readaloud/edge/v1?TrustedClientToken=' + trustedClientToken
voiceList = 'https://speech.platform.bing.com/consumer/speech/synthesize/readaloud/voices/list?trustedclienttoken=' + trustedClientToken
+# Return date format in Microsoft Edge's broken way (Edge does it wrong because they
+# append Z to a date with locale time zone). They probably just use Date().toString()
+def formatdate():
+ return time.strftime('%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)', time.gmtime())
+
+def bool_to_lower_str(x):
+ return 'true' if x else 'false'
+
def connectId():
return str(uuid.uuid4()).replace("-", "")
@@ -35,6 +44,13 @@ def removeIncompatibleControlChars(s):
logger.debug("Generated %s" % output.encode('utf-8'))
return output
+def mktimestamp(ns):
+ hour = math.floor(ns / 10000 / 1000 / 3600)
+ minute = math.floor((ns / 10000 / 1000 / 60) % 60)
+ seconds = math.floor((ns / 10000 / 1000) % 60)
+ mili = float(str(math.modf((ns / 10000) - (1000 * seconds))[1])[:3])
+ return "%.02d:%.02d:%.02d.%.03d" % (hour, minute, seconds, mili)
+
def list_voices():
logger = logging.getLogger("edgeTTS.list_voices")
with httpx.Client(http2=True, headers={
@@ -54,53 +70,102 @@ def list_voices():
logger.debug("JSON Loaded")
return data
-def mkssmlmsg(text="", voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", pitchString="+0Hz", rateString="+0%", volumeString="+0%", customspeak=False):
- message='X-RequestId:'+connectId()+'\r\nContent-Type:application/ssml+xml\r\n'
- message+='X-Timestamp:'+formatdate()+'Z\r\nPath:ssml\r\n\r\n'
- if customspeak:
- message+=text
- else:
- message+=""
- message+="" + "" + text + ''
- return message
+class SubMaker:
+ def __init__(self):
+ self.subsAndOffset = {}
-def bool_to_lower_str(x): return 'true' if x else 'false'
-async def run_tts(msg, sentenceBoundary=False, wordBoundary=False, codec="audio-24khz-48kbitrate-mono-mp3"):
- sentenceBoundary = bool_to_lower_str(sentenceBoundary)
- wordBoundary = bool_to_lower_str(wordBoundary)
- # yes, the connectid() in websockets.connect is different
- async with websockets.connect(
- wssUrl + "&ConnectionId=" + connectId(),
- ssl=ssl_context,
- compression="deflate",
- extra_headers={
- "Pragma": "no-cache",
- "Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
- "Accept-Encoding": "gzip, deflate, br",
- "Accept-Language": "en-US,en;q=0.9",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
- "Cache-Control": "no-cache"
- }
- ) as ws:
- message='X-Timestamp:'+formatdate()+'\r\nContent-Type:application/json; charset=utf-8\r\nPath:speech.config\r\n\r\n'
- message+='{"context":{"synthesis":{"audio":{"metadataoptions":{"sentenceBoundaryEnabled":"'+sentenceBoundary+'","wordBoundaryEnabled":"'+wordBoundary+'"},"outputFormat":"' + codec + '"}}}}\r\n'
- await ws.send(message)
- await ws.send(msg)
- download = False
- async for recv in ws:
- if type(recv) is str:
- if 'turn.start' in recv:
- download = True
- elif 'turn.end' in recv:
- download = False
- await ws.close()
- # TODO: add some sort of captioning based on audio:metadata. It's just JSON with offset.
- # WordBoundary is the only thing supported. SentenceBoundary does nothing.
- #elif 'audio.metadata' in recv:
- # print("".join(recv.split('Path:audio.metadata\r\n\r\n')[1:]), file=sys.stderr)
- elif type(recv) is bytes:
- if download:
- yield b"".join(recv.split(b'Path:audio\r\n')[1:])
+ def formatter(self, offset1, offset2, subdata):
+ data = "%s --> %s\r\n" % (mktimestamp(offset1), mktimestamp(offset2))
+ data += "%s\r\n\r\n" % escape(subdata)
+ return data
+
+ def createSub(self, timestamp, text):
+ self.subsAndOffset.update({ timestamp: text })
+
+ def generateSubs(self):
+ oldTimeStamp = None
+ oldSubData = None
+ data = "WEBVTT\r\n"
+ first = sorted(self.subsAndOffset.keys(), key=int)[0]
+ data += self.formatter(0, first, self.subsAndOffset[first])
+ try:
+ for sub in sorted(self.subsAndOffset.keys(), key=int)[1:]:
+ if (oldTimeStamp and oldSubData) is not None:
+ data += self.formatter(oldTimeStamp, sub, oldSubData)
+ oldTimeStamp = sub
+ oldSubData = self.subsAndOffset[sub]
+ data += self.formatter(oldTimeStamp, oldTimeStamp + ((10**7) * 10), oldSubData)
+ except:
+ pass
+ return data
+
+class Communicate:
+ def __init__(self):
+ self.date = formatdate()
+
+ def mkssmlmsg(self, text="", voice="", pitch="", rate="", volume="", customspeak=False):
+ message='X-RequestId:'+connectId()+'\r\nContent-Type:application/ssml+xml\r\n'
+ message+='X-Timestamp:'+self.date+'Z\r\nPath:ssml\r\n\r\n'
+ if customspeak:
+ message+=text
+ else:
+ message+=""
+ message+="" + "" + text + ''
+ return message
+
+ async def run(self, msg, sentenceBoundary=False, wordBoundary=False, codec="audio-24khz-48kbitrate-mono-mp3", voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)", pitch="+0Hz", rate="+0%", volume="+0%", customspeak=False):
+ sentenceBoundary = bool_to_lower_str(sentenceBoundary)
+ wordBoundary = bool_to_lower_str(wordBoundary)
+
+ if not customspeak:
+ wsmax = 2 ** 16
+ overhead = len(self.mkssmlmsg("", voice, pitch, rate, volume, customspeak=False))
+ msgs = _minimize(escape(removeIncompatibleControlChars(msg)), b" ", wsmax - overhead)
+
+ async with websockets.connect(
+ wssUrl + "&ConnectionId=" + connectId(),
+ ssl=ssl_context,
+ compression="deflate",
+ extra_headers={
+ "Pragma": "no-cache",
+ "Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
+ "Accept-Encoding": "gzip, deflate, br",
+ "Accept-Language": "en-US,en;q=0.9",
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
+ "Cache-Control": "no-cache"
+ }
+ ) as ws:
+ for msg in msgs:
+ self.date = formatdate() # Each message needs to have its send date
+
+ if not customspeak:
+ msg = self.mkssmlmsg(msg.decode('utf-8'), voice, pitch, rate, volume, customspeak=False)
+ else:
+ msg = self.mkssmlmsg(msg, customspeak=True)
+
+ message='X-Timestamp:'+self.date+'\r\nContent-Type:application/json; charset=utf-8\r\nPath:speech.config\r\n\r\n'
+ message+='{"context":{"synthesis":{"audio":{"metadataoptions":{"sentenceBoundaryEnabled":"'+sentenceBoundary+'","wordBoundaryEnabled":"'+wordBoundary+'"},"outputFormat":"' + codec + '"}}}}\r\n'
+ await ws.send(message)
+ await ws.send(msg)
+ download = False
+ async for recv in ws:
+ if type(recv) is str:
+ if 'turn.start' in recv:
+ download = True
+ elif 'turn.end' in recv:
+ download = False
+ break
+ elif 'audio.metadata' in recv:
+ #print("".join(recv.split('Path:audio.metadata\r\n\r\n')[1:]), file=sys.stderr)
+ metadata = json.loads("".join(recv.split('Path:audio.metadata\r\n\r\n')[1:]))
+ text = metadata['Metadata'][0]['Data']['text']['Text']
+ offset = metadata['Metadata'][0]['Data']['Offset']
+ yield [ offset, text, None ]
+ elif type(recv) is bytes:
+ if download:
+ yield [ None, None, b"".join(recv.split(b'Path:audio\r\n')[1:]) ]
+
+ await ws.close()
# Based on https://github.com/pndurette/gTTS/blob/6d9309f05b3ad26ca356654732f3b5b9c3bec538/gtts/utils.py#L13-L54
# Modified to measure based on bytes rather than number of characters
@@ -161,15 +226,15 @@ async def _main():
logger.debug("reading from %s" % args.file)
with open(args.file, 'r') as file:
args.text = file.read()
- if args.custom_ssml:
- async for i in run_tts(mkssmlmsg(text=args.text, customspeak=True), args.enable_sentence_boundary, args.enable_word_boundary, args.codec):
- sys.stdout.buffer.write(i)
- else:
- overhead = len(mkssmlmsg('', args.voice, args.pitch, args.rate, args.volume).encode('utf-8'))
- wsmax = 65536 - overhead
- for text in _minimize(escape(removeIncompatibleControlChars(args.text)), b" ", wsmax):
- async for i in run_tts(mkssmlmsg(text.decode('utf-8'), args.voice, args.pitch, args.rate, args.volume), args.enable_sentence_boundary, args.enable_word_boundary, args.codec):
- sys.stdout.buffer.write(i)
+ tts = Communicate()
+ subs = SubMaker()
+ async for i in tts.run(args.text, args.enable_sentence_boundary, args.enable_word_boundary, args.codec, args.voice, args.pitch, args.rate, args.volume, customspeak=args.custom_ssml):
+ if i[2] is not None:
+ sys.stdout.buffer.write(i[2])
+ elif (i[0] and i[1]) is not None:
+ subs.createSub(i[0], i[1])
+ if not subs.subsAndOffset == {}:
+ sys.stderr.write(subs.generateSubs())
elif args.list_voices:
seperator = False
for voice in list_voices():