diff --git a/src/edge_tts/communicate.py b/src/edge_tts/communicate.py index 0ad9ebb..433e994 100644 --- a/src/edge_tts/communicate.py +++ b/src/edge_tts/communicate.py @@ -99,20 +99,114 @@ def connect_id() -> str: return str(uuid.uuid4()).replace("-", "") +def _find_last_newline_or_space_within_limit(text: bytes, limit: int) -> int: + """ + Finds the index of the rightmost preferred split character (newline or space) + within the initial `limit` bytes of the text. + + This helps find a natural word or sentence boundary for splitting, prioritizing + newlines over spaces. + + Args: + text (bytes): The byte string to search within. + limit (int): The maximum index (exclusive) to search up to. + + Returns: + int: The index of the last found newline or space within the limit, + or -1 if neither is found in that range. + """ + # Prioritize finding a newline character + split_at = text.rfind(b"\n", 0, limit) + # If no newline is found, search for a space + if split_at < 0: + split_at = text.rfind(b" ", 0, limit) + return split_at + + +def _find_safe_utf8_split_point(text_segment: bytes) -> int: + """ + Finds the rightmost possible byte index such that the + segment `text_segment[:index]` is a valid UTF-8 sequence. + + This prevents splitting in the middle of a multi-byte UTF-8 character. + + Args: + text_segment (bytes): The byte segment being considered for splitting. + + Returns: + int: The index of the safe split point. Returns 0 if no valid split + point is found (e.g., if the first byte is part of a multi-byte + sequence longer than the limit allows). + """ + split_at = len(text_segment) + while split_at > 0: + try: + text_segment[:split_at].decode("utf-8") + # Found the largest valid UTF-8 sequence + return split_at + except UnicodeDecodeError: + # The byte at split_at-1 is part of an incomplete multi-byte char, try earlier + split_at -= 1 + + return split_at + + +def _adjust_split_point_for_xml_entity(text: bytes, split_at: int) -> int: + """ + Adjusts a proposed split point backward to prevent splitting inside an XML entity. + + For example, if `text` is `b"this & that"` and `split_at` falls between + `&` and `;`, this function moves `split_at` to the index before `&`. + + Args: + text (bytes): The text segment being considered. + split_at (int): The proposed split point index, determined by whitespace + or UTF-8 safety. + + Returns: + int: The adjusted split point index. It will be moved to the '&' + if an unterminated entity is detected right before the original `split_at`. + Otherwise, the original `split_at` is returned. + """ + while split_at > 0 and b"&" in text[:split_at]: + ampersand_index = text.rindex(b"&", 0, split_at) + # Check if a semicolon exists between the ampersand and the split point + if text.find(b";", ampersand_index, split_at) != -1: + # Found a terminated entity (like &), safe to break at original split_at + break + + # Ampersand is not terminated before split_at, move split_at to it + split_at = ampersand_index + + return split_at + + def split_text_by_byte_length( text: Union[str, bytes], byte_length: int ) -> Generator[bytes, None, None]: """ - Splits a string into a list of strings of a given byte length - while attempting to keep words together. This function assumes - text will be inside of an XML tag. + Splits text into chunks, each not exceeding a maximum byte length. + + This function prioritizes splitting at natural boundaries (newlines, spaces) + while ensuring that: + 1. No chunk exceeds `byte_length` bytes. + 2. Chunks do not end with an incomplete UTF-8 multi-byte character. + 3. Chunks do not split XML entities (like `&`) in the middle. Args: - text (str or bytes): The string to be split. If bytes, it must be UTF-8 encoded. - byte_length (int): The maximum byte length of each string in the list. + text (str or bytes): The input text. If str, it's encoded to UTF-8. + byte_length (int): The maximum allowed byte length for any yielded chunk. + Must be positive. - Yield: - bytes: The next string in the list. + Yields: + bytes: Text chunks (UTF-8 encoded, stripped of leading/trailing whitespace) + that conform to the byte length and integrity constraints. + + Raises: + TypeError: If `text` is not str or bytes. + ValueError: If `byte_length` is not positive, or if a split point + cannot be determined (e.g., due to extremely small byte_length + relative to character/entity sizes). """ if isinstance(text, str): text = text.encode("utf-8") @@ -123,35 +217,37 @@ def split_text_by_byte_length( raise ValueError("byte_length must be greater than 0") while len(text) > byte_length: - # Find the last space in the string - split_at = text.rfind(b" ", 0, byte_length) + # Find the initial split point based on whitespace or UTF-8 boundary + split_at = _find_last_newline_or_space_within_limit(text, byte_length) - # If no space found, split_at is byte_length - split_at = split_at if split_at != -1 else byte_length + if split_at < 0: + ## No newline or space found, so we need to find a safe UTF-8 split point + split_at = _find_safe_utf8_split_point(text) - # Verify all & are terminated with a ; - while b"&" in text[:split_at]: - ampersand_index = text.rindex(b"&", 0, split_at) - if text.find(b";", ampersand_index, split_at) != -1: - break + # Adjust the split point to avoid cutting in the middle of an xml entity, such as '&' + split_at = _adjust_split_point_for_xml_entity(text, split_at) - split_at = ampersand_index - 1 - if split_at < 0: - raise ValueError("Maximum byte length is too small or invalid text") - if split_at == 0: - break + if split_at < 0: + # This should not happen if byte_length is reasonable, + # but guards against edge cases. + raise ValueError( + "Maximum byte length is too small or " + "invalid text structure near '&' or invalid UTF-8" + ) - # Append the string to the list - new_text = text[:split_at].strip() - if new_text: - yield new_text - if split_at == 0: - split_at = 1 - text = text[split_at:] + # Yield the chunk + chunk = text[:split_at].strip() + if chunk: + yield chunk - new_text = text.strip() - if new_text: - yield new_text + # Prepare for the next iteration + # If split_at became 0 after adjustment, advance by 1 to avoid infinite loop + text = text[split_at if split_at > 0 else 1 :] + + # Yield the remaining part + remaining_chunk = text.strip() + if remaining_chunk: + yield remaining_chunk def mkssml(tc: TTSConfig, escaped_text: Union[str, bytes]) -> str: