您的位置:首页 > 文旅 > 美景 > 百度公司网站制作_软件开发流程是哪几个_seo独立站优化_一元友情链接平台

百度公司网站制作_软件开发流程是哪几个_seo独立站优化_一元友情链接平台

2025/2/24 10:20:13 来源:https://blog.csdn.net/weixin_54217201/article/details/144903629  浏览:    关键词:百度公司网站制作_软件开发流程是哪几个_seo独立站优化_一元友情链接平台
百度公司网站制作_软件开发流程是哪几个_seo独立站优化_一元友情链接平台

下载模型

from pathlib import Path
from typing import Any, Union
from pydantic import Field, BaseModelclass DownloadUrlModel(BaseModel):title: str = Field(..., description="文件名")save_path: Union[Path, str] = Field(..., description="文件路径")url: str = Field(..., description="m3u8文件路径")isM3u8: bool = Field(True, description="是否为m3u8文件")

M3U8解密

pip install pycryptodome

from Crypto.Cipher import AES
from Crypto.Util.Padding import padclass DecodeByte:# 解密@staticmethoddef do_decode(key, iv, data, method="AES-128") -> bytes:if isinstance(key, str):key = key.encode('utf-8')if isinstance(iv, str):iv = iv.encode('utf-8')if "AES-128" == method:aes = AES.new(key, AES.MODE_CBC, iv)if data and (len(data) % 16) != 0:data = pad(data, 16)return aes.decrypt(data)else:return None

下载线程

FFMPEG_EXE_PATHffmpeg.exe的绝对路径,若没有在官网上下载
ffmpeg下载M3U8没有进度显示,并且它是同步下载,会比较慢,使用httpx异步比较快

# coding = utf-8
import asyncio
import os
import re
import shutil
from pathlib import Path
from typing import Unionimport httpx
from PySide6.QtCore import QThread, Signalfrom ..models import DownloadUrlModel
from ..utils import HTTPRequest, DecodeByte
from ..config import FFMPEG_EXE_PATHclass DownloadM3U8Thread(QThread):loggerSignal = Signal(str)progressSignal = Signal(int)stopSignal = Signal(bool)def __init__(self, parent=None):super().__init__(parent)self.__is_stop = Falseself.__model: DownloadUrlModel = Noneself.finished_file: Path = Noneself.failed_file: Path = Noneself.temp_path: Path = Noneself.num = 0self.list_length = 0self.retry_max = 7# 解密信息self.cry = {"key": "","iv": "","method": "",}self.started.connect(self.startedSlot)def sendRequest(self, url: str, *, method='GET', **kwargs):response = httpx.request(method, url, verify=False, headers=HTTPRequest.headers, timeout=HTTPRequest.timeout,**kwargs)response.raise_for_status()return responseasync def aiohttp_send(self, client: httpx.AsyncClient, url: str, index: int, *, retry_count=0):ts_name = f'{str(index).zfill(6)}.ts'try:response = await client.get(url)self.save_data_file(response.content, ts_name)progress = round(self.num / self.list_length * 100, 3)self.loggerSignal.emit(f'下载进度: {progress} %')self.progressSignal.emit(int(progress))except httpx.ConnectError as e:await asyncio.sleep(4)if retry_count < self.retry_max:retry_count += 1await self.aiohttp_send(client, url, index, retry_count=retry_count)else:self.save_failed_file(f'{url}_{ts_name}')async def aiohttp_download(self, urls):async with httpx.AsyncClient(headers=HTTPRequest.headers, timeout=HTTPRequest.timeout) as client:tasks = []self.list_length = len(urls)for index, url in enumerate(urls, 1):if self.__is_stop:self.wait()self.quit()breakts_name = f'{str(index).zfill(6)}.ts'if ts_name in self.open_finished_file():continuetask = asyncio.ensure_future(self.aiohttp_send(client, url, index))tasks.append(task)await asyncio.gather(*tasks)def get_full_ts_url(self, url: str, ts_name: str) -> str:"""获取完整的ts文件url:param url: 原始url:param ts_name: ts文件名:return: str"""if ts_name.startswith('http'):return ts_nametl = ts_name.split('/')new_url = []# 循环url,去掉ts name中重复的部分for s in url.split('/')[:-1]:if s in tl:tl.remove(s)new_url.append(s)# 拼接ts namenew_url.extend(tl)result = '/'.join(new_url)return resultdef setCryInfo(self, text, url):# 获取加密参数x_key = re.findall('#EXT-X-KEY:(.*?)\n', text)cry_obj = dict()if len(x_key) > 0:# 提取for item in x_key[0].split(','):key = item.split('=')[0]value = item.replace(key, '')[1:].replace('"', '')cry_obj[key] = value# formatif cry_obj.get('URI') and not cry_obj['URI'].startswith('http'):cry_obj['URI'] = self.get_full_ts_url(url, cry_obj['URI'])elif not cry_obj.get('URI'):cry_obj['URI'] = ''# 获取keyres = self.sendRequest(cry_obj['URI'])self.cry['key'] = res.content# 加密方式self.cry['method'] = cry_obj.get('METHOD')# iv值if cry_obj.get('IV'):self.cry['iv'] = cry_obj['IV'][2:18]else:passdef save_data_file(self, data: bytes, ts_name: str):# 如果有加密,需要data解密后再存储if self.cry.get('key'):# 如果源文件有iv就读取,如果没有就用文件名iv = self.cry["iv"] if self.cry.get("iv") else ts_name.split('.')[0].zfill(16)data = DecodeByte.do_decode(self.cry["key"], iv, data, self.cry["method"])if not data:raise Exception('解密失败')# 保存with open(self.temp_path / ts_name, 'wb') as f:f.write(data)self.save_finished_file(ts_name)self.num += 1def open_failed_file(self) -> list:return self.failed_file.read_text(encoding='utf-8').split('\n')def save_failed_file(self, ts_name: str):with self.failed_file.open('a', encoding='utf-8') as f:f.write(ts_name + '\n')def open_finished_file(self) -> list:return self.finished_file.read_text(encoding='utf-8').split('\n')def save_finished_file(self, ts_name: str):with self.finished_file.open('a', encoding='utf-8') as f:f.write(ts_name + '\n')def save_ffmpeg_file(self, file_list: list):# 保存ffmpeg合并文件ffmpeg_file = self.temp_path / 'ffmpeg.txt'ffmpeg_file.touch(exist_ok=True)with ffmpeg_file.open('a+', encoding='utf-8') as f:for file in file_list:f.write(f"file '{file}'\n")return ffmpeg_filedef combine_ts(self, source_path: Path, dest_file: Union[Path, str]):# 获取所有缓存文件file_list = [str(file.name) for file in source_path.glob('**/*.ts')]if not file_list:return# 名称排序file_list.sort(key=lambda s: s.split('.')[0])ffmpeg_txt = self.save_ffmpeg_file(file_list)# 文件总数length = len(file_list)# 开始合并文件cmd = f'{FFMPEG_EXE_PATH} -f concat -safe 0 -i  {ffmpeg_txt}  -c  copy {dest_file}'os.system(cmd)# with open(dest_file, 'ab') as f:#     # 循环文件列表#     for i, file in enumerate(file_list, 1):#         # 读取每个文件#         with open(os.path.join(source_path, file), 'rb') as rf:#             # 把每个文件的内容 追加到同一个文件#             data = rf.read()#             f.write(data)#         # 打印进度#         self.loggerSignal.emit('合并中: {:3.2f}%'.format(i / length * 100))# # 移除缓存文件夹self.sleep(2)try:shutil.rmtree(source_path.parent)except Exception as e:self.loggerSignal.emit(f'删除文件夹错误:{e}')def run(self):url = self.__model.urlsave_path = Path(self.__model.save_path)video_path = save_path / self.__model.titleself.temp_path = video_path / 'temp'self.finished_file = self.temp_path / 'finished.txt'self.failed_file = self.temp_path / 'failed.txt'self.temp_path.mkdir(parents=True, exist_ok=True)self.finished_file.touch(exist_ok=True)self.failed_file.touch(exist_ok=True)try:self.loggerSignal.emit(f"开始加载m3u8文件")m3u8_text = self.sendRequest(url).textself.setCryInfo(m3u8_text, url)new_urls = []self.loggerSignal.emit("解析m3u8文件")for line in m3u8_text.split('\n'):if not line.startswith('#'):new_urls.append(self.get_full_ts_url(url, line))self.loggerSignal.emit('开始下载视频...')asyncio.run(self.aiohttp_download(new_urls))self.loggerSignal.emit('开始合并文件...')self.combine_ts(self.temp_path, str(save_path / f'{self.__model.title}.mp4'))self.loggerSignal.emit(f"视频下载完成")except Exception as e:self.loggerSignal.emit(f"视频下载失败")returndef stop(self):self.__is_stop = Trueself.stopSignal.emit(True)self.loggerSignal.emit('下载已停止')def startedSlot(self):self.__is_stop = Falseself.stopSignal.emit(False)def setDownloadTask(self, model: DownloadUrlModel):self.__model = model

combine_ts

使用open方法写的合并函数,会出现格式错误的问题,ffmpeg没有此问题

    def combine_ts(self, source_path: Path, dest_file: Union[Path, str]):# 获取所有缓存文件pass

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com