diff --git a/.gitignore b/.gitignore
index 7974ba3..70ae6de 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,15 +1,8 @@
-**
-!examples/
-!examples/input_example.py
-!LICENSE
-!README.md
-!setup.cfg
-!setup.py
-!pyproject.toml
-!src/
-!src/edgeTTS/
-!src/edgeTTS/__init__.py
-!src/edgeTTS/__main__.py
-!src/edgePlayback/
-!src/edgePlayback/__init__.py
-!src/edgePlayback/__main__.py
+venv/
+venv/**
+
+src/*.egg-info
+src/*.egg-info/**
+
+build/
+build/**
diff --git a/build_and_publish.sh b/build_and_publish.sh
new file mode 100755
index 0000000..69c8646
--- /dev/null
+++ b/build_and_publish.sh
@@ -0,0 +1,6 @@
+#!/bin/sh
+set -e
+rm -rf build dist src/*.egg-info
+python3 setup.py sdist bdist_wheel
+twine upload dist/*
+rm -rf build dist src/*.egg-info
diff --git a/examples/input_example.py b/examples/input_example.py
index f94d867..dac53f6 100755
--- a/examples/input_example.py
+++ b/examples/input_example.py
@@ -2,9 +2,12 @@
# Example Python script that shows how to use edge-tts as a module
import asyncio
import tempfile
-import edgeTTS
+
from playsound import playsound
+import edgeTTS
+
+
async def main():
communicate = edgeTTS.Communicate()
ask = input("What do you want TTS to say? ")
@@ -14,5 +17,6 @@ async def main():
fp.write(i[2])
playsound(fp.name)
+
if __name__ == "__main__":
asyncio.run(main())
diff --git a/setup.cfg b/setup.cfg
index db7f26f..c7e74b8 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = edge-tts
-version = 2.2.1
+version = 3.0.1
author = rany
author_email = ranygh@riseup.net
description = Microsoft Edge's TTS
@@ -27,5 +27,5 @@ where=src
[options.entry_points]
console_scripts =
- edge-tts = edgeTTS.__init__:main
+ edge-tts = edgeTTS.__main__:main
edge-playback = edgePlayback.__init__:main
diff --git a/setup.py b/setup.py
index a4f49f9..b908cbe 100644
--- a/setup.py
+++ b/setup.py
@@ -1,2 +1,3 @@
import setuptools
+
setuptools.setup()
diff --git a/src/edgePlayback/__init__.py b/src/edgePlayback/__init__.py
index 347b48f..f31731f 100755
--- a/src/edgePlayback/__init__.py
+++ b/src/edgePlayback/__init__.py
@@ -1,22 +1,42 @@
#!/usr/bin/env python3
+
+import subprocess
import sys
import tempfile
-import subprocess
from shutil import which
+
def main():
- if which('mpv') and which('edge-tts'):
+ if which("mpv") and which("edge-tts"):
with tempfile.NamedTemporaryFile() as media:
with tempfile.NamedTemporaryFile() as subtitle:
- print ()
- print ("Media file %s" % media.name)
- print ("Subtitle file %s\n" % subtitle.name)
- p = subprocess.Popen(['edge-tts', '-w', '--write-media', media.name, '--write-subtitles', subtitle.name] + sys.argv[1:])
+ print()
+ print("Media file %s" % media.name)
+ print("Subtitle file %s\n" % subtitle.name)
+ p = subprocess.Popen(
+ [
+ "edge-tts",
+ "-w",
+ "--write-media",
+ media.name,
+ "--write-subtitles",
+ subtitle.name,
+ ]
+ + sys.argv[1:]
+ )
p.communicate()
- p = subprocess.Popen(['mpv', '--keep-open=yes', '--sub-file=' + subtitle.name, media.name])
+ p = subprocess.Popen(
+ [
+ "mpv",
+ "--keep-open=yes",
+ "--sub-file=" + subtitle.name,
+ media.name,
+ ]
+ )
p.communicate()
else:
- print ("This script requires mpv and edge-tts.")
+ print("This script requires mpv and edge-tts.")
+
if __name__ == "__main__":
main()
diff --git a/src/edgeTTS/__init__.py b/src/edgeTTS/__init__.py
old mode 100755
new mode 100644
index 193e3bd..927b13c
--- a/src/edgeTTS/__init__.py
+++ b/src/edgeTTS/__init__.py
@@ -1,304 +1,3 @@
-#!/usr/bin/env python3
-import sys
-import json
-import uuid
-import argparse
-import asyncio
-import ssl
-import logging
-import time
-import math
-import aiohttp
-from xml.sax.saxutils import escape
-
-# Default variables
-trustedClientToken = '6A5AA1D4EAFF4E9FB37E23D68491D6F4'
-wssUrl = 'wss://speech.platform.bing.com/consumer/speech/synthesize/readaloud/edge/v1?TrustedClientToken=' + trustedClientToken
-voiceList = 'https://speech.platform.bing.com/consumer/speech/synthesize/readaloud/voices/list?trustedclienttoken=' + trustedClientToken
-
-# Return date format in Microsoft Edge's broken way (Edge does it wrong because they
-# append Z to a date with locale time zone). They probably just use Date().toString()
-def formatdate():
- return time.strftime('%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)', time.gmtime())
-
-# The connectID Edge sends to the service (just UUID without dashes)
-def connectId():
- return str(uuid.uuid4()).replace("-", "")
-
-# The service doesn't support a couple character ranges. Most bothering being
-# \v because it is present in OCR-ed PDFs. Not doing this causes the whole
-# connection with websockets server to crash.
-def removeIncompatibleControlChars(s):
- logger = logging.getLogger("edgeTTS.removeIncompatibleControlChars")
- output = ""
- for char in s:
- char_code = ord(char)
- if (char_code >= 0 and char_code <= 8) or (char_code >= 11 and char_code <= 12) \
- or (char_code >= 14 and char_code <= 31):
- logger.debug("Forbidden character %s" % char.encode('utf-8'))
- output += ' '
- else:
- logger.debug("Allowed character %s" % char.encode('utf-8'))
- output += char
- logger.debug("Generated %s" % output.encode('utf-8'))
- return output
-
-# Make WEBVTT formated timestamp based on TTS service's Offset value
-def mktimestamp(ns):
- hour = math.floor(ns / 10000 / 1000 / 3600)
- minute = math.floor((ns / 10000 / 1000 / 60) % 60)
- seconds = (ns / 10000 / 1000) % 60
- return "%.02d:%.02d:%06.3f" % (hour, minute, seconds)
-
-# Return loaded JSON data of list of Edge's voices
-# NOTE: It's not the total list of available voices.
-# This is only what is presented in the UI.
-async def list_voices():
- logger = logging.getLogger("edgeTTS.list_voices")
- async with aiohttp.ClientSession(trust_env=True) as session:
- async with session.get(voiceList, headers={
- 'Authority': 'speech.platform.bing.com',
- 'Sec-CH-UA': "\" Not;A Brand\";v=\"99\", \"Microsoft Edge\";v=\"91\", \"Chromium\";v=\"91\"",
- 'Sec-CH-UA-Mobile': '?0',
- 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
- 'Accept': '*/*',
- 'Sec-Fetch-Site': 'none',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Dest': 'empty',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Accept-Language': 'en-US,en;q=0.9'
- }) as url:
- logger.debug("Loading json from %s" % voiceList)
- data = json.loads(await url.text())
- logger.debug("JSON Loaded")
- return data
-
-class SubMaker:
- def __init__(self, overlapping=5):
- self.subsAndOffset = []
- self.brokenOffset = []
- self.overlapping = (overlapping * (10**7))
-
- def formatter(self, offset1, offset2, subdata):
- data = "%s --> %s\r\n" % (mktimestamp(offset1), mktimestamp(offset2))
- data += "%s\r\n\r\n" % escape(subdata)
- return data
-
- def createSub(self, timestamp, text):
- if len(self.subsAndOffset) >= 2:
- if self.subsAndOffset[-2] >= timestamp + sum(self.brokenOffset):
- self.brokenOffset.append(self.subsAndOffset[-2])
- timestamp = timestamp + sum(self.brokenOffset)
-
- self.subsAndOffset.append(timestamp)
- self.subsAndOffset.append(text)
-
- def generateSubs(self):
- if len(self.subsAndOffset) >= 2:
- data = "WEBVTT\r\n\r\n"
- oldTimeStamp = None
- oldSubData = None
- for offset, subs in zip(self.subsAndOffset[::2], self.subsAndOffset[1::2]):
- if oldTimeStamp is not None and oldSubData is not None:
- data += self.formatter(oldTimeStamp, offset + self.overlapping, oldSubData)
- oldTimeStamp = offset
- oldSubData = subs
- data += self.formatter(oldTimeStamp, oldTimeStamp + ((10**7) * 10), oldSubData)
- return data
- return ""
-
-class Communicate:
- def __init__(self):
- self.date = formatdate()
-
- def mkssmlmsg(
- self,
- text="",
- voice="",
- pitch="",
- rate="",
- volume="",
- customspeak=False
- ):
- message='X-RequestId:'+connectId()+'\r\nContent-Type:application/ssml+xml\r\n'
- message+='X-Timestamp:'+self.date+'Z\r\nPath:ssml\r\n\r\n'
- if customspeak:
- message+=text
- else:
- message+=""
- message+="" + "" + text + ''
- return message
-
- async def run(
- self,
- msgs,
- sentenceBoundary=False,
- wordBoundary=False,
- codec="audio-24khz-48kbitrate-mono-mp3",
- voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
- pitch="+0Hz",
- rate="+0%",
- volume="+0%",
- customspeak=False
- ):
- sentenceBoundary = str(sentenceBoundary).lower()
- wordBoundary = str(wordBoundary).lower()
-
- if not customspeak:
- wsmax = 2 ** 16
- overhead = len(self.mkssmlmsg("", voice, pitch, rate, volume, customspeak=False).encode('utf-8'))
- msgs = _minimize(escape(removeIncompatibleControlChars(msgs)), b" ", wsmax - overhead)
- else:
- if type(msgs) is str:
- msgs = [msgs]
-
- async with aiohttp.ClientSession(trust_env=True) as session:
- async with session.ws_connect(
- wssUrl + "&ConnectionId=" + connectId(),
- compress = 15,
- autoclose = True,
- autoping = True,
- headers={
- "Pragma": "no-cache",
- "Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
- "Accept-Encoding": "gzip, deflate, br",
- "Accept-Language": "en-US,en;q=0.9",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
- "Cache-Control": "no-cache"
- }
- ) as ws:
- for msg in msgs:
- self.date = formatdate() # Each message needs to have its send date
-
- if not customspeak:
- msg = self.mkssmlmsg(msg.decode('utf-8'), voice, pitch, rate, volume, customspeak=False)
- else:
- msg = self.mkssmlmsg(msg, customspeak=True)
-
- message='X-Timestamp:'+self.date+'\r\nContent-Type:application/json; charset=utf-8\r\nPath:speech.config\r\n\r\n'
- message+='{"context":{"synthesis":{"audio":{"metadataoptions":{"sentenceBoundaryEnabled":"'+sentenceBoundary+'","wordBoundaryEnabled":"'+wordBoundary+'"},"outputFormat":"' + codec + '"}}}}\r\n'
- await ws.send_str(message)
- await ws.send_str(msg)
- download = False
- async for recv in ws:
- if recv.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
- break
-
- if recv.type == aiohttp.WSMsgType.TEXT:
- if 'turn.start' in recv.data:
- download = True
- elif 'turn.end' in recv.data:
- download = False
- break
- elif 'audio.metadata' in recv.data:
- #print("".join(recv.data.split('Path:audio.metadata\r\n\r\n')[1:]), file=sys.stderr)
- metadata = json.loads("".join(recv.data.split('Path:audio.metadata\r\n\r\n')[1:]))
- text = metadata['Metadata'][0]['Data']['text']['Text']
- offset = metadata['Metadata'][0]['Data']['Offset']
- yield [ offset, text, None ]
-
- elif recv.type == aiohttp.WSMsgType.BINARY:
- if download:
- yield [ None, None, b"".join(recv.data.split(b'Path:audio\r\n')[1:]) ]
-
- await ws.close()
-
-# Based on https://github.com/pndurette/gTTS/blob/6d9309f05b3ad26ca356654732f3b5b9c3bec538/gtts/utils.py#L13-L54
-# Modified to measure based on bytes rather than number of characters
-def _minimize(the_string, delim, max_size):
- # Make sure we are measuring based on bytes
- the_string = the_string.encode('utf-8') if type(the_string) is str else the_string
-
- if the_string.startswith(delim):
- the_string = the_string[len(delim):]
-
- if len(the_string) > max_size:
- try:
- # Find the highest index of `delim` in `the_string[0:max_size]`
- # i.e. `the_string` will be cut in half on `delim` index
- idx = the_string.rindex(delim, 0, max_size)
- except ValueError:
- # `delim` not found in `the_string`, index becomes `max_size`
- # i.e. `the_string` will be cut in half arbitrarily on `max_size`
- idx = max_size
- # Call itself again for `the_string[idx:]`
- return [the_string[:idx]] + \
- _minimize(the_string[idx:], delim, max_size)
- else:
- return [the_string]
-
-async def _main():
- parser = argparse.ArgumentParser(description="Microsoft Edge's Online TTS Reader")
- group = parser.add_mutually_exclusive_group(required=True)
- group.add_argument('-t', '--text', help='what TTS will say')
- group.add_argument('-f', '--file', help='same as --text but read from file')
- parser.add_argument(
- "-L",
- "--log-level",
- default=logging.CRITICAL,
- type=lambda x: getattr(logging, x),
- help="configure the logging level (currently only DEBUG supported)"
- )
- parser.add_argument('-z', '--custom-ssml', help='treat text as ssml to send. For more info check https://bit.ly/3fIq13S', action='store_true')
- parser.add_argument('-v', '--voice', help='voice for TTS. Default: Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)', default='Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)')
- parser.add_argument('-c', '--codec', help="codec format. Default: audio-24khz-48kbitrate-mono-mp3. Another choice is webm-24khz-16bit-mono-opus. For more info check https://bit.ly/2T33h6S", default='audio-24khz-48kbitrate-mono-mp3')
- group.add_argument('-l', '--list-voices', help="lists available voices. Edge's list is incomplete so check https://bit.ly/2SFq1d3", action='store_true')
- parser.add_argument('-p', '--pitch', help="set TTS pitch. Default +0Hz, For more info check https://bit.ly/3eAE5Nx", default="+0Hz")
- parser.add_argument('-r', '--rate', help="set TTS rate. Default +0%%. For more info check https://bit.ly/3eAE5Nx", default="+0%")
- parser.add_argument('-V', '--volume', help="set TTS volume. Default +0%%. For more info check https://bit.ly/3eAE5Nx", default="+0%")
- parser.add_argument('-s', '--enable-sentence-boundary', help="enable sentence boundary (not implemented but settable)", action='store_true')
- parser.add_argument('-w', '--enable-word-boundary', help="enable word boundary (not implemented but settable)", action='store_true')
- parser.add_argument('-O', '--overlapping', help="overlapping subtitles in seconds", default=5, type=float)
- parser.add_argument('--write-media', help="instead of stdout, send media output to provided file")
- parser.add_argument('--write-subtitles', help="instead of stderr, send subtitle output to provided file")
- args = parser.parse_args()
- logging.basicConfig(level=args.log_level)
- logger = logging.getLogger("edgeTTS._main")
- if args.text is not None or args.file is not None:
- if args.file is not None:
- # we need to use sys.stdin.read() because some devices
- # like Windows and Termux don't have a /dev/stdin.
- if args.file == "/dev/stdin":
- logger.debug("stdin detected, reading natively from stdin")
- args.text = sys.stdin.read()
- else:
- logger.debug("reading from %s" % args.file)
- with open(args.file, 'r') as file:
- args.text = file.read()
- tts = Communicate()
- subs = SubMaker(args.overlapping)
- if args.write_media: media_file = open(args.write_media, 'wb')
- async for i in tts.run(args.text, args.enable_sentence_boundary, args.enable_word_boundary, args.codec, args.voice, args.pitch, args.rate, args.volume, customspeak=args.custom_ssml):
- if i[2] is not None:
- if not args.write_media:
- sys.stdout.buffer.write(i[2])
- else:
- media_file.write(i[2])
- elif i[0] is not None and i[1] is not None:
- subs.createSub(i[0], i[1])
- if args.write_media:
- media_file.close()
- if not args.write_subtitles:
- sys.stderr.write(subs.generateSubs())
- else:
- subtitle_file = open(args.write_subtitles, 'w')
- subtitle_file.write(subs.generateSubs())
- subtitle_file.close()
- elif args.list_voices:
- seperator = False
- for voice in await list_voices():
- if seperator: print()
- for key in voice.keys():
- logger.debug("Processing key %s" % key)
- if key in ["SuggestedCodec", "FriendlyName", "Status"]:
- logger.debug("Key %s skipped" % key)
- continue
- #print ("%s: %s" % ("Name" if key == "ShortName" else key, voice[key]))
- print ("%s: %s" % (key, voice[key]))
- seperator = True
-
-def main():
- asyncio.run(_main())
-
-if __name__ == "__main__":
- main()
+from .communicate import Communicate
+from .list_voices import list_voices
+from .submaker import SubMaker
diff --git a/src/edgeTTS/__main__.py b/src/edgeTTS/__main__.py
old mode 100755
new mode 100644
index 71fe6d1..e5e2737
--- a/src/edgeTTS/__main__.py
+++ b/src/edgeTTS/__main__.py
@@ -1,6 +1,4 @@
-#!/usr/bin/env python3
-
-from edgeTTS.__init__ import *
+from .util import main
if __name__ == "__main__":
main()
diff --git a/src/edgeTTS/communicate.py b/src/edgeTTS/communicate.py
new file mode 100644
index 0000000..8cf58c2
--- /dev/null
+++ b/src/edgeTTS/communicate.py
@@ -0,0 +1,346 @@
+"""
+Communicate package.
+"""
+
+
+import json
+import time
+import uuid
+from xml.sax.saxutils import escape
+
+import aiohttp
+
+from .constants import WSS_URL
+
+
+def get_headers_and_data(data):
+ """
+ Returns the headers and data from the given data.
+
+ Args:
+ data (str or bytes): The data to be parsed.
+
+ Returns:
+ tuple: The headers and data to be used in the request.
+ """
+ if isinstance(data, str):
+ data = data.encode("utf-8")
+
+ headers = {}
+ for line in data.split(b"\r\n\r\n")[0].split(b"\r\n"):
+ line_split = line.split(b":")
+ key, value = line_split[0], b":".join(line_split[1:])
+ if value.startswith(b" "):
+ value = value[1:]
+ headers[key.decode("utf-8")] = value.decode("utf-8")
+
+ return headers, b"\r\n\r\n".join(data.split(b"\r\n\r\n")[1:])
+
+
+def remove_incompatible_characters(string):
+ """
+ The service does not support a couple character ranges.
+ Most important being the vertical tab character which is
+ commonly present in OCR-ed PDFs. Not doing this will
+ result in an error from the service.
+
+ Args:
+ string (byte): The string to be cleaned.
+
+ Returns:
+ byte: The cleaned string.
+ """
+ cleaned_string = b""
+ for character in string:
+ character_code = ord(character)
+ if (
+ (0 <= character_code <= 8)
+ or (11 <= character_code <= 12)
+ or (14 <= character_code <= 31)
+ ):
+ character = " "
+ cleaned_string += character
+ return cleaned_string
+
+
+def connect_id():
+ """
+ Returns a UUID without dashes.
+
+ Args:
+ None
+
+ Returns:
+ str: A UUID without dashes.
+ """
+ return str(uuid.uuid4()).replace("-", "")
+
+
+def iter_bytes(my_bytes):
+ """
+ Iterates over bytes object
+
+ Args:
+ my_bytes: Bytes object to iterate over
+
+ Yields:
+ the individual bytes
+ """
+ for i in range(len(my_bytes)):
+ yield my_bytes[i : i + 1]
+
+
+def split_text_by_byte_length(text, byte_length):
+ """
+ Splits a string into a list of strings of a given byte length
+ while attempting to keep words together.
+
+ Args:
+ text (byte): The string to be split.
+ byte_length (int): The byte length of each string in the list.
+
+ Returns:
+ list: A list of strings of the given byte length.
+ """
+ if isinstance(text, str):
+ text = text.encode("utf-8")
+
+ split_text = []
+ current_string = b""
+ for character in iter_bytes(text):
+ if len(current_string) + len(character) <= byte_length:
+ current_string += character
+ else:
+ split_text.append(current_string)
+ current_string = character
+ if split_text[-1].find(b" ") != -1:
+ while split_text[-1][-1] != b" ":
+ current_string = split_text[-1][-1] + current_string
+ split_text[-1] = split_text[-1][:-1]
+ if current_string != b"":
+ split_text.append(current_string)
+ return split_text
+
+
+def mkssml(text, voice, pitch, rate, volume):
+ """
+ Creates a SSML string from the given parameters.
+
+ Args:
+ text (str): The text to be spoken.
+ voice (str): The voice to be used.
+ pitch (str): The pitch to be used.
+ rate (str): The rate to be used.
+ volume (str): The volume to be used.
+
+ Returns:
+ str: The SSML string.
+ """
+ if isinstance(text, bytes):
+ text = text.decode("utf-8")
+
+ ssml = (
+ ""
+ f"{text}"
+ )
+ return ssml
+
+
+def date_to_string():
+ """
+ Return Javascript-style date string.
+
+ Args:
+ None
+
+ Returns:
+ str: Javascript-style date string.
+ """
+ # %Z is not what we want, but it's the only way to get the timezone
+ # without having to use a library. We'll just use UTC and hope for the best.
+ # For example, right now %Z would return EEST when we need it to return
+ # Eastern European Summer Time.
+ #
+ # return time.strftime("%a %b %d %Y %H:%M:%S GMT%z (%Z)")
+ return time.strftime(
+ "%a %b %d %Y %H:%M:%S GMT+0000 (Coordinated Universal Time)", time.gmtime()
+ )
+
+
+def ssml_headers_plus_data(request_id, timestamp, ssml):
+ """
+ Returns the headers and data to be used in the request.
+
+ Args:
+ request_id (str): The request ID.
+ timestamp (str): The timestamp.
+ ssml (str): The SSML string.
+
+ Returns:
+ str: The headers and data to be used in the request.
+ """
+
+ return (
+ f"X-RequestId:{request_id}\r\n"
+ "Content-Type:application/ssml+xml\r\n"
+ f"X-Timestamp:{timestamp}Z\r\n" # This is not a mistake, Microsoft Edge bug.
+ "Path:ssml\r\n\r\n"
+ f"{ssml}"
+ )
+
+
+class Communicate:
+ """
+ Class for communicating with the service.
+ """
+
+ def __init__(self):
+ """
+ Initializes the Communicate class.
+ """
+ self.date = date_to_string()
+
+ async def run(
+ self,
+ messages,
+ sentence_boundary=False,
+ word_boundary=False,
+ codec="audio-24khz-48kbitrate-mono-mp3",
+ voice="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
+ pitch="+0Hz",
+ rate="+0%",
+ volume="+0%",
+ customspeak=False,
+ ):
+ """
+ Runs the Communicate class.
+
+ Args:
+ messages (str or list): A list of SSML strings or a single text.
+ sentence_boundary (bool): Whether to use sentence boundary.
+ word_boundary (bool): Whether to use word boundary.
+ codec (str): The codec to use.
+ voice (str): The voice to use (only applicable to non-customspeak).
+ pitch (str): The pitch to use (only applicable to non-customspeak).
+ rate (str): The rate to use (only applicable to non-customspeak).
+ volume (str): The volume to use (only applicable to non-customspeak).
+ customspeak (bool): Whether to create the SSML or treat the messages as SSML.
+
+ Yields:
+ tuple: The subtitle offset, subtitle, and audio data.
+ """
+
+ sentence_boundary = str(sentence_boundary).lower()
+ word_boundary = str(word_boundary).lower()
+
+ if not customspeak:
+ websocket_max_size = 2 ** 16
+ overhead_per_message = (
+ len(
+ ssml_headers_plus_data(
+ connect_id(), self.date, mkssml("", voice, pitch, rate, volume)
+ )
+ )
+ + 50
+ ) # margin of error
+ messages = split_text_by_byte_length(
+ escape(messages), websocket_max_size - overhead_per_message
+ )
+ else:
+ if isinstance(messages, str):
+ messages = [messages]
+
+ async with aiohttp.ClientSession(trust_env=True) as session:
+ async with session.ws_connect(
+ f"{WSS_URL}&ConnectionId={connect_id()}",
+ compress=15,
+ autoclose=True,
+ autoping=True,
+ headers={
+ "Pragma": "no-cache",
+ "Cache-Control": "no-cache",
+ "Origin": "chrome-extension://jdiccldimpdaibmpdkjnbmckianbfold",
+ "Accept-Encoding": "gzip, deflate, br",
+ "Accept-Language": "en-US,en;q=0.9",
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
+ },
+ ) as websocket:
+ for message in messages:
+ # Each message needs to have the proper date
+ self.date = date_to_string()
+
+ # Prepare the request to be sent to the service.
+ #
+ # Note that sentenceBoundaryEnabled and wordBoundaryEnabled are actually supposed
+ # to be booleans, but Edge Browser seems to send them as strings and not booleans.
+ # This is a bug in Edge Browser as Azure Cognitive Services actually sends them as
+ # booleans and not strings. For now I will send them as booleans unless it causes
+ # any problems.
+ #
+ # Also pay close attention to double { } in request (escape for Python .format()).
+ request = (
+ f"X-Timestamp:{self.date}\r\n"
+ "Content-Type:application/json; charset=utf-8\r\n"
+ "Path:speech.config\r\n\r\n"
+ f'{{"context":{{"synthesis":{{"audio":{{"metadataoptions":{{"sentenceBoundaryEnabled":{sentence_boundary},"wordBoundaryEnabled":{word_boundary}}},"outputFormat":"{codec}"}}}}}}}}\r\n'
+ )
+ # Send the request to the service.
+ await websocket.send_str(request)
+ # Send the message itself.
+ if not customspeak:
+ await websocket.send_str(
+ ssml_headers_plus_data(
+ connect_id(),
+ self.date,
+ mkssml(message, voice, pitch, rate, volume),
+ )
+ )
+ else:
+ await websocket.send_str(
+ ssml_headers_plus_data(connect_id(), self.date, message)
+ )
+
+ # Begin listening for the response.
+ download = False
+ async for received in websocket:
+ if received.type in (
+ aiohttp.WSMsgType.CLOSED,
+ aiohttp.WSMsgType.ERROR,
+ ):
+ break
+
+ if received.type == aiohttp.WSMsgType.TEXT:
+ parameters, data = get_headers_and_data(received.data)
+ if (
+ "Path" in parameters
+ and parameters["Path"] == "turn.start"
+ ):
+ download = True
+ elif (
+ "Path" in parameters
+ and parameters["Path"] == "turn.end"
+ ):
+ download = False
+ break
+ elif (
+ "Path" in parameters
+ and parameters["Path"] == "audio.metadata"
+ ):
+ metadata = json.loads(data)
+ text = metadata["Metadata"][0]["Data"]["text"]["Text"]
+ offset = metadata["Metadata"][0]["Data"]["Offset"]
+ yield (
+ offset,
+ text,
+ None,
+ )
+ elif received.type == aiohttp.WSMsgType.BINARY:
+ if download:
+ yield (
+ None,
+ None,
+ b"Path:audio\r\n".join(
+ received.data.split(b"Path:audio\r\n")[1:]
+ ),
+ )
+ await websocket.close()
diff --git a/src/edgeTTS/constants.py b/src/edgeTTS/constants.py
new file mode 100644
index 0000000..6a5669a
--- /dev/null
+++ b/src/edgeTTS/constants.py
@@ -0,0 +1,15 @@
+"""
+Constants for the edgeTTS package.
+"""
+
+TRUSTED_CLIENT_TOKEN = "6A5AA1D4EAFF4E9FB37E23D68491D6F4"
+WSS_URL = (
+ "wss://speech.platform.bing.com/consumer/speech/synthesize/"
+ + "readaloud/edge/v1?TrustedClientToken="
+ + TRUSTED_CLIENT_TOKEN
+)
+VOICE_LIST = (
+ "https://speech.platform.bing.com/consumer/speech/synthesize/"
+ + "readaloud/voices/list?trustedclienttoken="
+ + TRUSTED_CLIENT_TOKEN
+)
diff --git a/src/edgeTTS/list_voices.py b/src/edgeTTS/list_voices.py
new file mode 100644
index 0000000..0fe6091
--- /dev/null
+++ b/src/edgeTTS/list_voices.py
@@ -0,0 +1,42 @@
+"""
+list_voices package.
+"""
+
+import json
+
+import aiohttp
+
+from .constants import VOICE_LIST
+
+
+async def list_voices():
+ """
+ List all available voices and their attributes.
+
+ This pulls data from the URL used by Microsoft Edge to return a list of
+ all available voices. However many more experimental voices are available
+ than are listed here.
+ (See
+ https://docs.microsoft.com/en-us/azure/cognitive-services/speech-service/language-support)
+
+ Returns:
+ dict: A dictionary of voice attributes.
+ """
+ async with aiohttp.ClientSession(trust_env=True) as session:
+ async with session.get(
+ VOICE_LIST,
+ headers={
+ "Authority": "speech.platform.bing.com",
+ "Sec-CH-UA": '" Not;A Brand";v="99", "Microsoft Edge";v="91", "Chromium";v="91"',
+ "Sec-CH-UA-Mobile": "?0",
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36 Edg/91.0.864.41",
+ "Accept": "*/*",
+ "Sec-Fetch-Site": "none",
+ "Sec-Fetch-Mode": "cors",
+ "Sec-Fetch-Dest": "empty",
+ "Accept-Encoding": "gzip, deflate, br",
+ "Accept-Language": "en-US,en;q=0.9",
+ },
+ ) as url:
+ data = json.loads(await url.text())
+ return data
diff --git a/src/edgeTTS/submaker.py b/src/edgeTTS/submaker.py
new file mode 100644
index 0000000..44d7e92
--- /dev/null
+++ b/src/edgeTTS/submaker.py
@@ -0,0 +1,53 @@
+import math
+from xml.sax.saxutils import escape
+
+
+def formatter(offset1, offset2, subdata):
+ data = (
+ f"{mktimestamp(offset1)} --> {mktimestamp(offset2)}\r\n"
+ f"{escape(subdata)}\r\n\r\n"
+ )
+ return data
+
+
+def mktimestamp(time_unit):
+ hour = math.floor(time_unit / 10000 / 1000 / 3600)
+ minute = math.floor((time_unit / 10000 / 1000 / 60) % 60)
+ seconds = (time_unit / 10000 / 1000) % 60
+ return f"{hour:02d}:{minute:02d}:{seconds:06.3f}"
+
+
+class SubMaker:
+ def __init__(self, overlapping=5):
+ self.subs_and_offset = []
+ self.broken_offset = []
+ self.overlapping = overlapping * (10 ** 7)
+
+ def create_sub(self, timestamp, text):
+ if len(self.subs_and_offset) >= 2:
+ if self.subs_and_offset[-2] >= timestamp + sum(self.broken_offset):
+ self.broken_offset.append(self.subs_and_offset[-2])
+ timestamp = timestamp + sum(self.broken_offset)
+
+ self.subs_and_offset.append(timestamp)
+ self.subs_and_offset.append(text)
+
+ def generate_subs(self):
+ if len(self.subs_and_offset) >= 2:
+ data = "WEBVTT\r\n\r\n"
+ old_time_stamp = None
+ old_sub_data = None
+ for offset, subs in zip(
+ self.subs_and_offset[::2], self.subs_and_offset[1::2]
+ ):
+ if old_time_stamp is not None and old_sub_data is not None:
+ data += formatter(
+ old_time_stamp, offset + self.overlapping, old_sub_data
+ )
+ old_time_stamp = offset
+ old_sub_data = subs
+ data += formatter(
+ old_time_stamp, old_time_stamp + ((10 ** 7) * 10), old_sub_data
+ )
+ return data
+ return ""
diff --git a/src/edgeTTS/util.py b/src/edgeTTS/util.py
new file mode 100644
index 0000000..68c7ff3
--- /dev/null
+++ b/src/edgeTTS/util.py
@@ -0,0 +1,148 @@
+"""
+Main package.
+"""
+
+
+import argparse
+import asyncio
+import sys
+
+from edgeTTS import Communicate, SubMaker, list_voices
+
+
+async def _main():
+ parser = argparse.ArgumentParser(description="Microsoft Edge TTS")
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument("-t", "--text", help="what TTS will say")
+ group.add_argument("-f", "--file", help="same as --text but read from file")
+ parser.add_argument(
+ "-z",
+ "--custom-ssml",
+ help="treat text as ssml to send. For more info check https://bit.ly/3fIq13S",
+ action="store_true",
+ )
+ parser.add_argument(
+ "-v",
+ "--voice",
+ help="voice for TTS. Default: Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
+ default="Microsoft Server Speech Text to Speech Voice (en-US, AriaNeural)",
+ )
+ parser.add_argument(
+ "-c",
+ "--codec",
+ help="codec format. Default: audio-24khz-48kbitrate-mono-mp3. Another choice is webm-24khz-16bit-mono-opus. For more info check https://bit.ly/2T33h6S",
+ default="audio-24khz-48kbitrate-mono-mp3",
+ )
+ group.add_argument(
+ "-l",
+ "--list-voices",
+ help="lists available voices. Edge's list is incomplete so check https://bit.ly/2SFq1d3",
+ action="store_true",
+ )
+ parser.add_argument(
+ "-p",
+ "--pitch",
+ help="set TTS pitch. Default +0Hz, For more info check https://bit.ly/3eAE5Nx",
+ default="+0Hz",
+ )
+ parser.add_argument(
+ "-r",
+ "--rate",
+ help="set TTS rate. Default +0%%. For more info check https://bit.ly/3eAE5Nx",
+ default="+0%",
+ )
+ parser.add_argument(
+ "-V",
+ "--volume",
+ help="set TTS volume. Default +0%%. For more info check https://bit.ly/3eAE5Nx",
+ default="+0%",
+ )
+ parser.add_argument(
+ "-s",
+ "--enable-sentence-boundary",
+ help="enable sentence boundary",
+ action="store_true",
+ )
+ parser.add_argument(
+ "-w",
+ "--enable-word-boundary",
+ help="enable word boundary",
+ action="store_true",
+ )
+ parser.add_argument(
+ "-O",
+ "--overlapping",
+ help="overlapping subtitles in seconds",
+ default=5,
+ type=float,
+ )
+ parser.add_argument(
+ "--write-media", help="instead of stdout, send media output to provided file"
+ )
+ parser.add_argument(
+ "--write-subtitles",
+ help="instead of stderr, send subtitle output to provided file",
+ )
+ args = parser.parse_args()
+
+ if args.text is not None or args.file is not None:
+ if args.file is not None:
+ # we need to use sys.stdin.read() because some devices
+ # like Windows and Termux don't have a /dev/stdin.
+ if args.file == "/dev/stdin":
+ # logger.debug("stdin detected, reading natively from stdin")
+ args.text = sys.stdin.read()
+ else:
+ # logger.debug("reading from %s" % args.file)
+ with open(args.file, "r", encoding="utf-8") as file:
+ args.text = file.read()
+ tts = Communicate()
+ subs = SubMaker(args.overlapping)
+ if args.write_media:
+ media_file = open(args.write_media, "wb")
+ async for i in tts.run(
+ args.text,
+ args.enable_sentence_boundary,
+ args.enable_word_boundary,
+ args.codec,
+ args.voice,
+ args.pitch,
+ args.rate,
+ args.volume,
+ customspeak=args.custom_ssml,
+ ):
+ if i[2] is not None:
+ if not args.write_media:
+ sys.stdout.buffer.write(i[2])
+ else:
+ media_file.write(i[2])
+ elif i[0] is not None and i[1] is not None:
+ subs.create_sub(i[0], i[1])
+ if args.write_media:
+ media_file.close()
+ if not args.write_subtitles:
+ sys.stderr.write(subs.generate_subs())
+ else:
+ with open(args.write_subtitles, "w", encoding="utf-8") as file:
+ file.write(subs.generate_subs())
+ elif args.list_voices:
+ for idx, voice in enumerate(await list_voices()):
+ if idx != 0:
+ print()
+
+ for key in voice.keys():
+ if key in ["SuggestedCodec", "FriendlyName", "Status"]:
+ continue
+ # print ("%s: %s" % ("Name" if key == "ShortName" else key, voice[key]))
+ print(f"{key}: {voice[key]}")
+
+
+def main():
+ """
+ Main function.
+ """
+ asyncio.run(_main())
+
+
+if __name__ == "__main__":
+ main()