下载模型
from pathlib import Path
from typing import Any, Union
from pydantic import Field, BaseModelclass DownloadUrlModel(BaseModel):title: str = Field(..., description="文件名")save_path: Union[Path, str] = Field(..., description="文件路径")url: str = Field(..., description="m3u8文件路径")isM3u8: bool = Field(True, description="是否为m3u8文件")
M3U8解密
pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Util.Padding import padclass DecodeByte:# 解密@staticmethoddef do_decode(key, iv, data, method="AES-128") -> bytes:if isinstance(key, str):key = key.encode('utf-8')if isinstance(iv, str):iv = iv.encode('utf-8')if "AES-128" == method:aes = AES.new(key, AES.MODE_CBC, iv)if data and (len(data) % 16) != 0:data = pad(data, 16)return aes.decrypt(data)else:return None
下载线程
FFMPEG_EXE_PATH
为ffmpeg.exe
的绝对路径,若没有在官网上下载
ffmpeg
下载M3U8没有进度显示,并且它是同步下载,会比较慢,使用httpx异步比较快
# coding = utf-8
import asyncio
import os
import re
import shutil
from pathlib import Path
from typing import Unionimport httpx
from PySide6.QtCore import QThread, Signalfrom ..models import DownloadUrlModel
from ..utils import HTTPRequest, DecodeByte
from ..config import FFMPEG_EXE_PATHclass DownloadM3U8Thread(QThread):loggerSignal = Signal(str)progressSignal = Signal(int)stopSignal = Signal(bool)def __init__(self, parent=None):super().__init__(parent)self.__is_stop = Falseself.__model: DownloadUrlModel = Noneself.finished_file: Path = Noneself.failed_file: Path = Noneself.temp_path: Path = Noneself.num = 0self.list_length = 0self.retry_max = 7# 解密信息self.cry = {"key": "","iv": "","method": "",}self.started.connect(self.startedSlot)def sendRequest(self, url: str, *, method='GET', **kwargs):response = httpx.request(method, url, verify=False, headers=HTTPRequest.headers, timeout=HTTPRequest.timeout,**kwargs)response.raise_for_status()return responseasync def aiohttp_send(self, client: httpx.AsyncClient, url: str, index: int, *, retry_count=0):ts_name = f'{str(index).zfill(6)}.ts'try:response = await client.get(url)self.save_data_file(response.content, ts_name)progress = round(self.num / self.list_length * 100, 3)self.loggerSignal.emit(f'下载进度: {progress} %')self.progressSignal.emit(int(progress))except httpx.ConnectError as e:await asyncio.sleep(4)if retry_count < self.retry_max:retry_count += 1await self.aiohttp_send(client, url, index, retry_count=retry_count)else:self.save_failed_file(f'{url}_{ts_name}')async def aiohttp_download(self, urls):async with httpx.AsyncClient(headers=HTTPRequest.headers, timeout=HTTPRequest.timeout) as client:tasks = []self.list_length = len(urls)for index, url in enumerate(urls, 1):if self.__is_stop:self.wait()self.quit()breakts_name = f'{str(index).zfill(6)}.ts'if ts_name in self.open_finished_file():continuetask = asyncio.ensure_future(self.aiohttp_send(client, url, index))tasks.append(task)await asyncio.gather(*tasks)def get_full_ts_url(self, url: str, ts_name: str) -> str:"""获取完整的ts文件url:param url: 原始url:param ts_name: ts文件名:return: str"""if ts_name.startswith('http'):return ts_nametl = ts_name.split('/')new_url = []# 循环url,去掉ts name中重复的部分for s in url.split('/')[:-1]:if s in tl:tl.remove(s)new_url.append(s)# 拼接ts namenew_url.extend(tl)result = '/'.join(new_url)return resultdef setCryInfo(self, text, url):# 获取加密参数x_key = re.findall('#EXT-X-KEY:(.*?)\n', text)cry_obj = dict()if len(x_key) > 0:# 提取for item in x_key[0].split(','):key = item.split('=')[0]value = item.replace(key, '')[1:].replace('"', '')cry_obj[key] = value# formatif cry_obj.get('URI') and not cry_obj['URI'].startswith('http'):cry_obj['URI'] = self.get_full_ts_url(url, cry_obj['URI'])elif not cry_obj.get('URI'):cry_obj['URI'] = ''# 获取keyres = self.sendRequest(cry_obj['URI'])self.cry['key'] = res.content# 加密方式self.cry['method'] = cry_obj.get('METHOD')# iv值if cry_obj.get('IV'):self.cry['iv'] = cry_obj['IV'][2:18]else:passdef save_data_file(self, data: bytes, ts_name: str):# 如果有加密,需要data解密后再存储if self.cry.get('key'):# 如果源文件有iv就读取,如果没有就用文件名iv = self.cry["iv"] if self.cry.get("iv") else ts_name.split('.')[0].zfill(16)data = DecodeByte.do_decode(self.cry["key"], iv, data, self.cry["method"])if not data:raise Exception('解密失败')# 保存with open(self.temp_path / ts_name, 'wb') as f:f.write(data)self.save_finished_file(ts_name)self.num += 1def open_failed_file(self) -> list:return self.failed_file.read_text(encoding='utf-8').split('\n')def save_failed_file(self, ts_name: str):with self.failed_file.open('a', encoding='utf-8') as f:f.write(ts_name + '\n')def open_finished_file(self) -> list:return self.finished_file.read_text(encoding='utf-8').split('\n')def save_finished_file(self, ts_name: str):with self.finished_file.open('a', encoding='utf-8') as f:f.write(ts_name + '\n')def save_ffmpeg_file(self, file_list: list):# 保存ffmpeg合并文件ffmpeg_file = self.temp_path / 'ffmpeg.txt'ffmpeg_file.touch(exist_ok=True)with ffmpeg_file.open('a+', encoding='utf-8') as f:for file in file_list:f.write(f"file '{file}'\n")return ffmpeg_filedef combine_ts(self, source_path: Path, dest_file: Union[Path, str]):# 获取所有缓存文件file_list = [str(file.name) for file in source_path.glob('**/*.ts')]if not file_list:return# 名称排序file_list.sort(key=lambda s: s.split('.')[0])ffmpeg_txt = self.save_ffmpeg_file(file_list)# 文件总数length = len(file_list)# 开始合并文件cmd = f'{FFMPEG_EXE_PATH} -f concat -safe 0 -i {ffmpeg_txt} -c copy {dest_file}'os.system(cmd)# with open(dest_file, 'ab') as f:# # 循环文件列表# for i, file in enumerate(file_list, 1):# # 读取每个文件# with open(os.path.join(source_path, file), 'rb') as rf:# # 把每个文件的内容 追加到同一个文件# data = rf.read()# f.write(data)# # 打印进度# self.loggerSignal.emit('合并中: {:3.2f}%'.format(i / length * 100))# # 移除缓存文件夹self.sleep(2)try:shutil.rmtree(source_path.parent)except Exception as e:self.loggerSignal.emit(f'删除文件夹错误:{e}')def run(self):url = self.__model.urlsave_path = Path(self.__model.save_path)video_path = save_path / self.__model.titleself.temp_path = video_path / 'temp'self.finished_file = self.temp_path / 'finished.txt'self.failed_file = self.temp_path / 'failed.txt'self.temp_path.mkdir(parents=True, exist_ok=True)self.finished_file.touch(exist_ok=True)self.failed_file.touch(exist_ok=True)try:self.loggerSignal.emit(f"开始加载m3u8文件")m3u8_text = self.sendRequest(url).textself.setCryInfo(m3u8_text, url)new_urls = []self.loggerSignal.emit("解析m3u8文件")for line in m3u8_text.split('\n'):if not line.startswith('#'):new_urls.append(self.get_full_ts_url(url, line))self.loggerSignal.emit('开始下载视频...')asyncio.run(self.aiohttp_download(new_urls))self.loggerSignal.emit('开始合并文件...')self.combine_ts(self.temp_path, str(save_path / f'{self.__model.title}.mp4'))self.loggerSignal.emit(f"视频下载完成")except Exception as e:self.loggerSignal.emit(f"视频下载失败")returndef stop(self):self.__is_stop = Trueself.stopSignal.emit(True)self.loggerSignal.emit('下载已停止')def startedSlot(self):self.__is_stop = Falseself.stopSignal.emit(False)def setDownloadTask(self, model: DownloadUrlModel):self.__model = model
combine_ts
使用
open
方法写的合并函数,会出现格式错误的问题,ffmpeg
没有此问题
def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):# 获取所有缓存文件pass