fix: Split only on valid utf-8 boundary (#369)

* fix: Split only on valid utf-8 boundary

* refact

* make CI happy

Signed-off-by: rany <rany2@riseup.net>

---------

Signed-off-by: rany <rany2@riseup.net>
Co-authored-by: rany <rany2@riseup.net>
This commit is contained in:
WANG Lei
2025-04-15 20:13:28 +08:00
committed by GitHub
parent ecf50b916e
commit 7aabaaf480

View File

@@ -99,20 +99,114 @@ def connect_id() -> str:
return str(uuid.uuid4()).replace("-", "")
def _find_last_newline_or_space_within_limit(text: bytes, limit: int) -> int:
"""
Finds the index of the rightmost preferred split character (newline or space)
within the initial `limit` bytes of the text.
This helps find a natural word or sentence boundary for splitting, prioritizing
newlines over spaces.
Args:
text (bytes): The byte string to search within.
limit (int): The maximum index (exclusive) to search up to.
Returns:
int: The index of the last found newline or space within the limit,
or -1 if neither is found in that range.
"""
# Prioritize finding a newline character
split_at = text.rfind(b"\n", 0, limit)
# If no newline is found, search for a space
if split_at < 0:
split_at = text.rfind(b" ", 0, limit)
return split_at
def _find_safe_utf8_split_point(text_segment: bytes) -> int:
"""
Finds the rightmost possible byte index such that the
segment `text_segment[:index]` is a valid UTF-8 sequence.
This prevents splitting in the middle of a multi-byte UTF-8 character.
Args:
text_segment (bytes): The byte segment being considered for splitting.
Returns:
int: The index of the safe split point. Returns 0 if no valid split
point is found (e.g., if the first byte is part of a multi-byte
sequence longer than the limit allows).
"""
split_at = len(text_segment)
while split_at > 0:
try:
text_segment[:split_at].decode("utf-8")
# Found the largest valid UTF-8 sequence
return split_at
except UnicodeDecodeError:
# The byte at split_at-1 is part of an incomplete multi-byte char, try earlier
split_at -= 1
return split_at
def _adjust_split_point_for_xml_entity(text: bytes, split_at: int) -> int:
"""
Adjusts a proposed split point backward to prevent splitting inside an XML entity.
For example, if `text` is `b"this &amp; that"` and `split_at` falls between
`&` and `;`, this function moves `split_at` to the index before `&`.
Args:
text (bytes): The text segment being considered.
split_at (int): The proposed split point index, determined by whitespace
or UTF-8 safety.
Returns:
int: The adjusted split point index. It will be moved to the '&'
if an unterminated entity is detected right before the original `split_at`.
Otherwise, the original `split_at` is returned.
"""
while split_at > 0 and b"&" in text[:split_at]:
ampersand_index = text.rindex(b"&", 0, split_at)
# Check if a semicolon exists between the ampersand and the split point
if text.find(b";", ampersand_index, split_at) != -1:
# Found a terminated entity (like &amp;), safe to break at original split_at
break
# Ampersand is not terminated before split_at, move split_at to it
split_at = ampersand_index
return split_at
def split_text_by_byte_length(
text: Union[str, bytes], byte_length: int
) -> Generator[bytes, None, None]:
"""
Splits a string into a list of strings of a given byte length
while attempting to keep words together. This function assumes
text will be inside of an XML tag.
Splits text into chunks, each not exceeding a maximum byte length.
This function prioritizes splitting at natural boundaries (newlines, spaces)
while ensuring that:
1. No chunk exceeds `byte_length` bytes.
2. Chunks do not end with an incomplete UTF-8 multi-byte character.
3. Chunks do not split XML entities (like `&amp;`) in the middle.
Args:
text (str or bytes): The string to be split. If bytes, it must be UTF-8 encoded.
byte_length (int): The maximum byte length of each string in the list.
text (str or bytes): The input text. If str, it's encoded to UTF-8.
byte_length (int): The maximum allowed byte length for any yielded chunk.
Must be positive.
Yield:
bytes: The next string in the list.
Yields:
bytes: Text chunks (UTF-8 encoded, stripped of leading/trailing whitespace)
that conform to the byte length and integrity constraints.
Raises:
TypeError: If `text` is not str or bytes.
ValueError: If `byte_length` is not positive, or if a split point
cannot be determined (e.g., due to extremely small byte_length
relative to character/entity sizes).
"""
if isinstance(text, str):
text = text.encode("utf-8")
@@ -123,35 +217,37 @@ def split_text_by_byte_length(
raise ValueError("byte_length must be greater than 0")
while len(text) > byte_length:
# Find the last space in the string
split_at = text.rfind(b" ", 0, byte_length)
# Find the initial split point based on whitespace or UTF-8 boundary
split_at = _find_last_newline_or_space_within_limit(text, byte_length)
# If no space found, split_at is byte_length
split_at = split_at if split_at != -1 else byte_length
if split_at < 0:
## No newline or space found, so we need to find a safe UTF-8 split point
split_at = _find_safe_utf8_split_point(text)
# Verify all & are terminated with a ;
while b"&" in text[:split_at]:
ampersand_index = text.rindex(b"&", 0, split_at)
if text.find(b";", ampersand_index, split_at) != -1:
break
# Adjust the split point to avoid cutting in the middle of an xml entity, such as '&amp;'
split_at = _adjust_split_point_for_xml_entity(text, split_at)
split_at = ampersand_index - 1
if split_at < 0:
raise ValueError("Maximum byte length is too small or invalid text")
if split_at == 0:
break
if split_at < 0:
# This should not happen if byte_length is reasonable,
# but guards against edge cases.
raise ValueError(
"Maximum byte length is too small or "
"invalid text structure near '&' or invalid UTF-8"
)
# Append the string to the list
new_text = text[:split_at].strip()
if new_text:
yield new_text
if split_at == 0:
split_at = 1
text = text[split_at:]
# Yield the chunk
chunk = text[:split_at].strip()
if chunk:
yield chunk
new_text = text.strip()
if new_text:
yield new_text
# Prepare for the next iteration
# If split_at became 0 after adjustment, advance by 1 to avoid infinite loop
text = text[split_at if split_at > 0 else 1 :]
# Yield the remaining part
remaining_chunk = text.strip()
if remaining_chunk:
yield remaining_chunk
def mkssml(tc: TTSConfig, escaped_text: Union[str, bytes]) -> str: