import os import threading import requests from queue import Queue # 假设m3u8文件的URL m3u8_url = 'https://pili-vod.songy.info/9098b1ba-bbd6-44eb-b1b8-641c07750321.m3u8' # 下载的.ts文件存放的目录 save_dir = './m3u8/mp4' # 合并后的mp4文件名 output_mp4 = 'xy7nh.mp4' # 下载.ts文件的函数 def download_ts(ts_url, session): try: response = session.get(ts_url, stream=True) if response.status_code == 200: ts_filename = ts_url.split('/')[-1] with open(f"{save_dir}/{ts_filename}", 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"Downloaded {ts_filename}") except Exception as e: print(f"Error downloading {ts_url}: {e}") # 解析m3u8文件并获取所有.ts文件的URL def parse_m3u8(m3u8_url): with requests.Session() as session: response = session.get(m3u8_url) ts_urls = [] if response.status_code == 200: for line in response.text.splitlines(): if line.endswith('.ts'): ts_urls.append(line) return ts_urls # 多线程下载函数 def multithreaded_download(ts_urls): # 创建线程安全的队列 queue = Queue() for ts_url in ts_urls: queue.put(ts_url) # 创建会话,用于保持连接 with requests.Session() as session: threads = [] for _ in range(10): # 假设我们使用10个线程 thread = threading.Thread(target=download_worker, args=(queue, session)) thread.start() threads.append(thread) for thread in threads: thread.join() # 线程工作函数 def download_worker(queue, session): while not queue.empty(): ts_url = queue.get() download_ts(ts_url, session) queue.task_done() # 主函数 def main(): ts_urls = parse_m3u8(m3u8_url) multithreaded_download(ts_urls) # 下载完成后,使用ffmpeg合并文件 os.system(f'ffmpeg -f concat -safe 0 -i "{save_dir}/file_list.txt" -c copy "{output_mp4}"') if __name__ == '__main__': main()