您的位置:首页 > 健康 > 美食 > 安徽省建设信息管理平台_中国空间站设计在轨飞行多少年_头条收录提交入口_网络推广平台有哪些?

安徽省建设信息管理平台_中国空间站设计在轨飞行多少年_头条收录提交入口_网络推广平台有哪些?

2025/1/10 2:16:58 来源:https://blog.csdn.net/m0_68716275/article/details/144992938  浏览:    关键词:安徽省建设信息管理平台_中国空间站设计在轨飞行多少年_头条收录提交入口_网络推广平台有哪些?
安徽省建设信息管理平台_中国空间站设计在轨飞行多少年_头条收录提交入口_网络推广平台有哪些?

 1.概念

通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程

  • 通用爬虫:抓取的是一整张页面数据
  • 聚焦爬虫:抓取的是页面中的特定局部内容
  • 增量式爬虫:监测网站中数据更新的情况,只会抓取网站中最新更新出来的数据

robots.txt协议:

君子协议,网站后面添加robotx.txt可进行查看

https://www.baidu.com/robots.txt

1.1 http协议

服务器和客户端进行数据交互的一种形式

常用的请求头信息:

  • User-Agent: 请求载体的身份标识,如: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36
  • Connection: 请求完毕完毕之后,是断开连接还是保持连接(close和keep-alive)

常用响应头信息:

  • Content-Type: 服务器响应客户端的数据类型 如:(text/html; charset=utf-8)

1.2 https协议

安全的超文本传输协议,在客户端和服务端进行数据传输和数据交互的过程中,数据是进行加密的.

数据加密的方式

  • 对称秘钥加密: 客户端制定加密方式,加加密的密文和生成的秘钥都传输给服务端.服务端根据秘钥对于密文进行解密.(但是密文和秘钥可能会被同时拦截)
  • 非对称秘钥加密:服务端生成秘钥对,将生成的公钥传递给客户端,然后将生成的密文传递给服务端,服务端再使用私钥进行解密(客户端拿到的秘钥,不一定是从服务端传递过来的)
  • 证书秘钥加密:服务器端制定加密方式,服务端将公钥传递给证书认证机构,认证机构将公钥通过认证之后,进行数字签名.将携带数字签名的公钥封装到证书当中,将证书一并发送给客户端

2. requests模块

作用:模拟浏览器发送请求

requests模块的编码流程:

  • 指定url
  • 发起请求(get/post)
  • 获取相应数据
  • 持久化存储

2.1 简易网页采集器

2.1.1 UA 伪装

将python脚本发送的请求,伪装成为浏览器发送的请求

2.1.2 Get请求携带参数

将url的参数封装成为字典,传递给params

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests# 参考url
# https://www.baidu.com/s?wd=%E6%88%90%E5%8A%9F
url = "https://www.baidu.com/s?"# UA伪装:模拟浏览器
header = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}word = input("enter your word:").strip()# get请求携带的参数
param = {"wd":word
}# 获取响应对象
response = requests.get(url, headers=header, params=param)# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content# 持久化储存
file_name = word + ".html"
with open(file_name, "wb") as f:f.write(page_text)print(f"{file_name} has been save successfully")

2.2 破解百度翻译

2.2.1 POST请求携带参数

将传递的参数封装成为字典,并且传递给data

180e7e948fa9407cb8ffb152aa6caa93.png

2.2.2 Ajax请求

单词输入后,局部页面就会刷新

Ajax(Asynchronous JavaScript and XML)是一种在Web应用程序中进行异步通信的技术,它使用JavaScript和XML(现在通常使用JSON)来实现在不刷新整个页面的情况下与服务器进行数据交换

97db308ad6cc45dfa31e7f16dcee5cc8.png

2.2.3 JSON模块的使用

  • 反序列化 loads:将json字符串转化为python对象字典
  • 序列化 dump: 将python对象字典转化为json字符串,并写入文件

4c96e3dfa22742a1a2ce45a179a8f6cd.png

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requests
import json# 参考url
url = "https://fanyi.baidu.com/sug"# UA伪装:模拟浏览器
header = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36","Referer":"https://fanyi.baidu.com/mtpe-individual/multimodal"
}word = input("enter your word:").strip()# get请求携带的参数
param = {"kw":word
}# 获取响应对象
response = requests.post(url, headers=header,data=param)# 获取响应内容
word_text = json.loads(response.text)# 持久化储存
file_name = word + ".json"
with open(file_name, "w",encoding="utf-8") as f:json.dump(word_text["data"][0], f,ensure_ascii=False)print(f"{file_name} has been save successfully")

f35d983fa23d484db43827f19e219270.png

2.3 电影 

2.4 公司url

  • 动态加载数据分析
  • 获取每家公司的url,但是发现每家公司的详情页面也是动态加载出来的

3.数据解析

  • 正则
  • bs4
  • xpath

数据解析原理:

解析局部的文本内容都会在标签之间或是标签的属性中进行存储

  • 进行指定标签的定位
  • 标签或是标签对应属性存储值的获取

3.1 正则解析

3.2 bs4解析

只可以被应用于python中

3.2.1 数据解析原理

  • 实例化BeautifulSoup对象,并且将页面源码数据加载到该对象里面
  • 通过调用BeautifulSoup对象中的相关的属性或是方法,进行标签定位和数据提取

3.2.2 环境安装

pip3 install beautifulsoup4  -i  https://mirrors.aliyun.com/pypi/simple
pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.2.3 bs4的具体使用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:07
# @Author : George
from bs4 import BeautifulSoup
import refp = open("./result.html", "r", encoding="utf-8")
soup = BeautifulSoup(fp, "lxml")
# soup.tagName:返回的是html中第一次出现的tag标签
print(soup.div)
# -------------------------------------------
# 等同于soup.tagName
print(soup.find("a"))
# 属性定位
print(soup.find("a", href=re.compile(".*ip138.com/$")).text)
# # 加载源码中所有的tag标签组成的列表
print(soup.find_all("a"))
# -------------------------------------------
# 可以使用选择器,id/类/标签/选择器,返回的是一个列表
print(soup.select('.center'))
# 层级选择器,
# “ ”空格就是表示多个层级
# > 表示一个层级
for i in soup.select('.center > ul > li > a'):print(i.text)
print(soup.select('.center > ul a')[0])
# -------------------------------------------
# 获取标签之间的文本数据 soup.a.text/string/get_text()
# --text/get_text可以获取标签里面啊所有的文本内容
# --string只可以获取该标签下的直系文本内容
print(soup.select('.center > ul a')[0].string)
# -------------------------------------------
# 获取标签里面的属性值
print(soup.select('.center > ul a')[0]["href"])

3.2.4 bs4爬取三国演义

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/26 22:43
# @Author : George
# ==================================================
# <a href="/guwen/bookv_6dacadad4420.aspx">第一回</a>
# 第一回网址
# https://www.gushiwen.cn/guwen/bookv_6dacadad4420.aspx
# ==================================================from bs4 import BeautifulSoup
import requestsurl = "https://www.gushiwen.cn/guwen/book_46653FD803893E4F7F702BCF1F7CCE17.aspx"# UA伪装:模拟浏览器
header = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 获取响应对象
response = requests.get(url, headers=header)
# 实例化bs对象
soup = BeautifulSoup(response.text, "lxml")
a_liat = soup.select(".bookcont >ul > span >a")
fp = open("三国.txt","w",encoding="utf-8")
# 解析章节标题和详情页面的url
for tag in a_liat:title = tag.textdetail_url = "https://www.gushiwen.cn/"+tag["href"]# 对详情页面发起请求detail_page = requests.get(detail_url, headers=header)# 解析出详情页面中的内容detail_soup = BeautifulSoup(detail_page.text,"lxml")# 使用此种方法出现一个问题,就是文章都是在p标签里面,所以文章不会换行# content = detail_soup.find("div",class_="contson").textfp.write(title + ":")for line in detail_soup.select('.contson > p'):fp.write(line.text+"\n")fp.write("\n\n")print(f"{title}爬取成功")
fp.close()

效果:

2fed6d19354c427799d07f88da4f0cb3.png

3.3 xpath解析

最常用且是最便捷高效的一种解析方式

3.3.1 xpath解析原理

  • 实例化一个etree的对象,且将被解析的页面远吗数据加载到该对象中
  • 调用etree对象中的xpath方法结合xpath表达式实现标签的定位和内容的捕获

3.3.2 环境的安装

pip3 install lxml  -i  https://mirrors.aliyun.com/pypi/simple

3.3.3 具体使用

xpath只能够根据层级关系定位标签页面.

1d80a29970ae42d885f98586b60dd8f9.png

ba9263465f7742479051cd9574f5c8fd.png

<!DOCTYPE html>
<html lang="zh-CN">
<head>
<!--    <meta charset="UTF-8">-->
<!--    <meta name="viewport" content="width=device-width, initial-scale=1.0">--><title>小型HTML页面示例</title>
</head>
<body><div class="container"><div class="div1"><h2>第一个Div</h2><p>这是第一个div的内容。它使用了类标签.div1进行样式定位。</p><p>这是第一个div的内容。它使用了类标签.div2进行样式定位。</p><p><title>这是第一个div的内容。它使用了类标签.div3进行样式定位。</title></p></div><div class="div2"><h2>第二个Div</h2><p>这是第二个div的内容。它使用了类标签.div2进行样式定位。</p></div><div class="div3"><h2 class="div3_h2">第三个Div</h2><p>这是第三个div的内容。它使用了类标签.div3进行样式定位。</p></div></div>
</body>
</html>
from lxml import etreetree_ = etree.parse("./baidu.html")
# r = tree_.xpath("/html/head/title")  # => [<Element title at 0x10a79ee60>],返回的是定位出来的对象
# r = tree_.xpath("/html//title")  # => [<Element title at 0x10a510dc0>],定位出来的效果是一样的
# r = tree_.xpath("//title") # => [<Element title at 0x106b75dc0>],找到源码里面所有的title标签# 属性定位
# r = tree_.xpath('//div[@class="div1"]') # => [<Element div at 0x101807e60>]# 索引定位,索引位置从1开始
# r = tree_.xpath('//div[@class="div1"]/p[3]')# 取文本 /text()获取的标签里面直系的文本内容, //text() 获取标签下面所有的文本内容
# text = tree_.xpath('//div[@class="div1"]/p[3]/text()')
# print(text)  # => ['这是第一个div的内容。它使用了类标签.div3进行样式定位。']
# text = tree_.xpath('//div[@class="div1"]//text()')
# print(text)# 获取属性
# attr = tree_.xpath('//div[@class="div3"]/h2/@class')  # ['div3_h2']
# print(attr)

3.3.4 爬取ppt模板

已经成功,结果如下,但是是大学时多亏了它的免费模板,就不贴代码给它带来麻烦了

99221c29b8094f67b7b8f70744dd24fa.png

3.3.5 爬取美女图片

https://pic.netbian.com/4kmeinv/

爬取美女图片失败,开始进入网站总是有人机验证,进去什么都爬取不了,后面再试一下

db1321caf5304010a2a3d806cc43f60c.png

4.模拟登录

4.1 验证码识别 

要收费,自己写个图片文字识别

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 13:44
# @Author : George
from PIL import Image
import pytesseract# 如果你没有将tesseract.exe添加到系统的PATH中,
# 你需要在这里指定tesseract可执行文件的完整路径
pytesseract.pytesseract.tesseract_cmd = r'D:\Tesseract-OCR\tesseract.exe'# 打开一个图像文件
image_path = 'abc_1.png'  # 替换为你的图像路径
image = Image.open(image_path)# 使用pytesseract进行OCR
text = pytesseract.image_to_string(image, lang='chi_sim')  # lang参数指定语言,例如'chi_sim'表示简体中文# 打印识别出的文字
print(text)

4.2 模拟登录逻辑

5.cookie

5.1含义及作用

网页的Cookie是一种在Web开发中广泛使用的技术,用于在用户的计算机上存储小块的数据。这些数据通常由Web服务器发送给用户的浏览器(所以主要是服务器创建的),并在用户后续的访问中被浏览器返回给服务器。Cookie的主要功能和用途包括:

  1. 会话管理:Cookie可以用于保持用户的会话状态,例如在用户登录到网站后,服务器可以发送一个包含会话ID的Cookie到用户的浏览器。在用户后续的请求中,浏览器会自动包含这个会话ID,使得服务器能够识别并持续管理用户的会话。

  2. 个性化设置:Cookie可以用来存储用户的偏好设置,例如网站的语言、主题颜色、字体大小等。这样,当用户再次访问网站时,这些设置可以自动恢复,提高用户体验。

  3. 跟踪用户行为:通过Cookie,网站可以跟踪和分析用户的行为,例如用户访问了哪些页面、停留了多长时间、点击了哪些链接等。这些信息对于网站优化、广告定位等非常有用。

  4. 安全性:在某些情况下,Cookie还可以用于增强网站的安全性,例如通过存储加密的令牌来验证用户的身份。

Cookie具有以下几个特点:

  • 存储在客户端:Cookie数据存储在用户的浏览器上,而不是服务器上。这意味着即使用户关闭了浏览器或计算机,只要没有删除Cookie,数据仍然存在。
  • 自动发送:在用户访问与Cookie相关的网站时,浏览器会自动将相关的Cookie数据发送到服务器。
  • 有限的大小和数量:每个Cookie的大小和数量都有限制,这取决于浏览器和Web服务器的配置。
  • 过期时间:Cookie可以设置过期时间,在过期时间之前,Cookie一直有效。如果没有设置过期时间,Cookie就是一个会话Cookie,在用户关闭浏览器时自动删除。

5.2 session的含义及特点

  • 一、Session的基本概念

Session是服务器用于跟踪用户会话的一种机制。它允许服务器在多个请求之间识别同一个用户,并维护该用户的状态信息。这些状态信息可以包括用户的登录状态、购物车内容、偏好设置等。

  • 二、Session的工作原理
  1. 创建Session:当用户首次访问网站时,服务器会创建一个新的Session对象,并为其分配一个唯一的Session ID。
  2. 发送Session ID:服务器通常会将这个Session ID以Cookie的形式发送给客户端浏览器。客户端浏览器会在后续的请求中自动将这个Session ID附加在请求头中。
  3. 识别用户:服务器通过检查请求头中的Session ID来识别用户,并获取相应的Session数据。这样,服务器就可以在多个请求之间保持用户的状态信息。
  • 三、Session的作用
  1. 保持用户状态:Session允许服务器在多个请求之间跟踪用户的状态信息,如登录状态、购物车内容等。这使得用户可以在不同页面之间无缝切换,而无需重新认证或输入信息。
  2. 个性化服务:根据用户的喜好和历史行为,服务器可以为用户提供个性化的内容和服务。这有助于提高用户体验和满意度。
  3. 安全性:通过验证Session ID来确认用户的请求,服务器可以防止未授权访问和非法操作。这有助于保护用户的隐私和数据安全。
  • 四、Session与Cookie的关系

Session和Cookie是密切相关的两种技术。Cookie是服务器发送到客户端浏览器并保存在本地的一小块数据,而Session则是服务器用于跟踪用户会话的一种机制。Cookie通常用于存储Session ID,以便服务器在后续的请求中识别用户。因此,可以说Cookie是Session的一种实现方式。

  • 五、Session的管理

在网络请求中,管理Session是非常重要的。开发人员需要确保Session的安全性、有效性和可维护性。这包括:

  1. 设置合理的Session过期时间:为了避免用户长时间未操作而导致的会话过期问题,开发人员需要设置合理的Session过期时间。
  2. 保护Session ID:Session ID是用户身份的重要标识,开发人员需要采取措施来保护它免受攻击和泄露。例如,可以使用HTTPS协议来加密传输Session ID,或者使用更复杂的Session ID生成算法来提高安全性。
  3. 清理无效的Session:为了节省服务器资源和提高性能,开发人员需要定期清理无效的Session对象。这可以通过设置Session的失效时间、使用数据库存储Session数据并定期清理过期数据等方式来实现。

5.3 http和https协议的特点

无状态。即是说,即使你的第一次请求已经实现了自动登录。但是你第二次发送请求时,服务器端并不知道你的请求是基于第一次登录状态的。

cookie可以让服务器端记录客户端的相关状态

5.3 cookie值的处理

  • 手动抓包cookie值,将其封装到headers里面,但是这种方法面对cookie是动态变化的时候就很难处理了
  • session会话对象
    • 可以进行请求的发送
    • 如果请求过程中产生了cookie,则该cookie会被自动存储/携带在该session会话对象中
  • b3b252aa2fe644b3aef552a0ef159520.png

6.代理

需要绕过IP封锁、限制或进行大规模数据抓取时。代理服务器充当客户端和目标服务器之间的中介,可以隐藏你的真实IP地址,提供额外的安全性,有时还能加速请求

测试网址:

https://httpbin.org/get

现在基本上没有免费正常的代理可以被使用,我这个也是失败的。看到一个博主写建立代理ip池的python3之爬虫代理IP的使用+建立代理IP池_python爬虫代理池-CSDN博客,代码写的不错,爬取出来的ip也没什么能用的了。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/28 19:14
# @Author : George
import requestsurl = "https://www.baidu.com/s?"# 将爬虫程序伪装成为浏览器来发送请求
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
}# 设置代理
proxies = {"http": "154.203.132.49:8080","https":"49.73.4.158:8089"
}
param = {"wd":"ip"
}response = requests.get(url,headers=headers,proxies=proxies,verify=False,params=param)
with open("proxy.html","wt",encoding="utf-8") as f:f.write(response.text)

公司的联网也是需要代理的

# _*_ coding utf-8 _*_
# george
# time: 2024/12/24下午7:56
# name: test.py
# comment:
import requestsuser = ""
passwd = "url = "https://www.baidu.com/"# 模拟浏览器
header = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
# 指定代理,不指定代理,无法上网
proxies = {"http": f"http://{user}:{passwd}@ip:port","https": f"https://{user}:{passwd}@ip:port"
}# 获取响应对象
response = requests.get(url, headers=header, proxies=proxies)# 获取响应内容,可以通过response.text获取字符串形式的响应数据
page_text = response.content# 持久化储存
with open("baidu.html", "wb") as f:f.write(page_text)

7.异步爬虫(进程池)

本来是针对视频进行爬取的,但是ajax请求时的请求地址,看不懂mrd这个怎么来的,暂时跳过,我灰太狼一定会回来的!!!!!!!!!!!!

295b11c4eb6346fba989e6cdf2874352.png

d684c278cec246cf8b9eb62f1497660d.png

7.1 视频爬取

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 13:16
# @Author : George
import requests
import chardet
import os
from lxml import etree
"""
"https://www.pearvideo.com/category_1"
['video_1797596', 'video_1797399', 'video_1797404', 'video_1797260']https://www.pearvideo.com/video_1797596ajax请求
https://www.pearvideo.com/videoStatus.jsp?contId=1797596&mrd=0.4308675768914054
https://www.pearvideo.com/videoStatus.jsp?contId=1797399&mrd=0.6241695396585363
"""class LiVideo(object):def __init__(self):#  定义输出ppt的文件夹self.extract_to_dir = f"./video"os.makedirs("./video", exist_ok=True)# 添加Referer防止反爬虫self.header = {"Referer": "https://www.pearvideo.com/","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}# 编码self.encoding = Noneself.home_url = "https://www.pearvideo.com/category_1"def get_HTML(self, url_param):response = requests.get(url_param, headers=self.header)if response.status_code == 200:# 使用 chardet 自动检测编码self.encoding = chardet.detect(response.content)['encoding']response.encoding = self.encoding# 创建etree对象tree = etree.HTML(response.text)return treedef deal(self):home_tree = self.get_HTML(self.home_url)home_url_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/@href")name_list = home_tree.xpath("//*[@id='listvideoListUl']/li/div/a[1]/div[2]/text()")for name,url in zip(name_list,home_url_list):url = "https://www.pearvideo.com/"+urldetail_tree = self.get_HTML(url)if __name__ == '__main__':vi = LiVideo()vi.deal()

 7.2 诗文异步爬取

只要将任务提交给线程池,线程池就会自动安排一个线程来执行这个任务.同过线程池提交的任务是异步提交.异步提交的结果就是不等待任务的执行结果,继续往下执行 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/29 14:26
# @Author : George
"""
https://www.gushiwen.cn/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb第二层
https://www.gushiwen.cn/guwen/book_4e6b88d8a0bc.aspx
https://www.gushiwen.cn/guwen/book_a09880163008.aspx第三層
https://www.gushiwen.cn/guwen/bookv_b630af160f65.aspx
"""
import os
import requests
import chardet
from lxml import etree
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from typing import Dict, List, Tupleclass NovelDownloader:def __init__(self):self.output_dir = "./novels"os.makedirs(self.output_dir, exist_ok=True)self.headers = {"Referer": "https://www.gushiwen.cn/guwen/Default.aspx?p=1","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"}self.base_url = "https://www.gushiwen.cn"self.home_url = f"{self.base_url}/guwen/Default.aspx?p=1&type=%e5%b0%8f%e8%af%b4%e5%ae%b6%e7%b1%bb"def get_html_tree(self, url: str) -> etree._Element:"""获取页面HTML并返回etree对象"""response = requests.get(url, headers=self.headers)if response.status_code != 200:raise Exception(f"Failed to get {url}, status code: {response.status_code}")encoding = chardet.detect(response.content)['encoding']response.encoding = encodingreturn etree.HTML(response.text)def get_chapter_details(self) -> Dict[str, Dict[str, str]]:"""获取所有章节详情"""home_tree = self.get_html_tree(self.home_url)# 获取书籍链接和标题urls = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/@href")[1:3]titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")[1:3]book_details = {}for title, url in zip(titles, urls):detail_tree = self.get_html_tree(f"{self.base_url}{url}")# 获取章节链接和标题chapter_urls = [f"{self.base_url}{url}" for url in detail_tree.xpath("//*[@class='bookcont']/ul/span/a/@href")]chapter_titles = detail_tree.xpath("//*[@class='bookcont']/ul/span/a/text()")book_details[title] = dict(zip(chapter_titles, chapter_urls))return book_detailsdef download_novel(self, title: str, chapters: Dict[str, str]):"""下载单本小说"""output_path = os.path.join(self.output_dir, f"{title}.txt")print(f"开始下载 {title}".center(100, "="))with open(output_path, "w", encoding="utf-8") as f:for chapter_title, chapter_url in chapters.items():response = requests.get(chapter_url, headers=self.headers)soup = BeautifulSoup(response.text, "lxml")f.write(f"{chapter_title}:\n")for paragraph in soup.select('.contson > p'):f.write(f"{paragraph.text}\n")f.write("\n\n")print(f"{chapter_title} 下载完成".center(20, "-"))print(f"{title} 下载完成".center(100, "="))def main():start_time = time.time()downloader = NovelDownloader()books = downloader.get_chapter_details()# 使用线程池并行下载with ThreadPoolExecutor(max_workers=10) as pool:futures = [pool.submit(downloader.download_novel, title, chapters) for title, chapters in books.items()]print(f"总耗时: {time.time() - start_time:.2f}秒")if __name__ == '__main__':main()

8.异步爬虫(协程)

  协程内容不赘述:CSDN

c62484262bf6460187337af4d2ab3996.png

基于单线程和协程,实现异步爬虫。 

8.1. 基于flask搭建服务器

简单的学习了一下,感觉不是很复杂,后面等着详细学习

261a05e6ee984879838d6b8015f8b299.png

pip3 install Flask -i  https://mirrors.aliyun.com/pypi/simple
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 21:47
# @Author : George
"""
# @app.route('/'): 这是一个装饰器,用于告诉 Flask 哪个 URL 应该触发下面的函数。
在这个例子中,它指定了根 URL(即网站的主页)。# return 'Hello, World!': 这行代码是 hello_world 函数的返回值。
当用户访问根 URL 时,这个字符串将被发送回用户的浏览器。"""
from flask import Flask, request, jsonify
import timedef get_client_ip(request):# 如果使用了反向代理,优先从 X-Forwarded-For 头部获取 IP 地址x_forwarded_for = request.headers.get('X-Forwarded-For', '').split(',')if x_forwarded_for:client_ip = x_forwarded_for[0]  # 通常第一个 IP 地址是客户端的真实 IP 地址else:client_ip = request.remote_addrreturn client_ipapp = Flask(__name__)@app.route('/')
def index():return 'Hello, World!'@app.route('/bobo')
def index_bobo():# 获取client的user_agent和refereruser_agent = request.headers.get('User-Agent')user_referer = request.headers.get('Referer')print(user_agent, user_referer)# client ip地址没有获取到client_ip = get_client_ip(request)print({'client_ip': client_ip})time.sleep(3)return 'bobo'@app.route('/jar')
def index_jar():time.sleep(3)return 'jar'@app.route('/test')
def index_test():time.sleep(3)return 'test'if __name__ == '__main__':app.run(threaded=True)

8.2 基于aiohttp实现异步爬虫

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2024/12/31 22:08
# @Author : George
import asyncio
import requests
import time
import aiohttp
from threading import currentThread"""
async def request(url): # 耗时: 9.054526805877686,requests是同步阻塞,必须替换为异步库提供的函数aiohttp
"""urls = ["http://127.0.0.1:5000/jar","http://127.0.0.1:5000/bobo","http://127.0.0.1:5000/test"
]start = time.time()# 耗时: 9.054526805877686,requests是同步操作
# async def request(url):
#     print("开始下载", url)
#     response = requests.get(url=url)
#     print(response.text)
#     print("下载结束", url)
#     print("------------")async def request_2(url):print("开始下载", url, currentThread())headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36","Referer": "https://movie.douban.com/explore"}# 设置代理proxy = "http://8.220.204.215:8008"async with aiohttp.ClientSession() as session:async with session.get(url, headers=headers) as response:text = await response.text()print(text)print("下载结束", url)async def main():tasks = []for url in urls:tasks.append(asyncio.create_task(request_2(url)))await asyncio.wait(tasks)asyncio.run(main())end = time.time()
print("耗时:", end - start)

9.selenuim使用

  • 便捷的获取网页中间动态加载的数据
  • 边界实现模拟登录

selenuim:基于浏览器自动化的一个模块

入门指南 | Selenium

b2505f3011124e5bb0606b9d5922d52f.png

基于selenium实现浏览器自动化,自动输入并播放动漫核心代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2025/1/1 16:04
# @Author : George
"""
在 Selenium 4 中,不再直接在 webself.driver.Chrome 中设置驱动程序路径,而是通过引入 Service 对象来设置。
这样可以避免弃用警告,并确保驱动程序的正确加载
"""
import os.pathfrom selenium import webdriver
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import time
from lxml import etree
import requests
import chardet
import json
from log_test import loggerclass VideoAutoPlay(object):def __init__(self):self.driver = {}self.count_num = 1self.movies_dict =Noneself.home_url = "https://www.agedm.org/"self.headers = {"Referer": "https://www.agedm.org/","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0"}def get_html_tree(self, url: str) -> etree._Element:"""获取页面HTML并返回etree对象"""response = requests.get(url, headers=self.headers)if response.status_code != 200:raise Exception(f"Failed to get {url}, status code: {response.status_code}")encoding = chardet.detect(response.content)['encoding']response.encoding = encodingreturn etree.HTML(response.text)def movies_input(self, movie_name, n):try:if not self.driver:self.driver = self.setup_driver()self.driver.get(self.home_url)# 查找搜索框元素search_box = self.driver.find_element(By.ID, "query")# 输入搜索内容search_box.send_keys(movie_name)# 提交搜索表单search_box.send_keys(Keys.RETURN)# 等待搜索结果加载# WebDriverWait(self.driver, 10).until(#     EC.presence_of_element_located((By.CLASS_NAME, "content_left"))# )# 二级请求self.driver.get(f"https://www.agedm.org/search?query={movie_name}")# 查找在线播放btn# wait = WebDriverWait(self.driver, 10)  # 10秒超时# # titles = home_tree.xpath("//*[@class='sonspic']/div[1]/p[1]/a[1]/b/text()")button = self.driver.find_element(By.XPATH, "//*[@class='video_btns']/a[1]")# print(button)button.click()WebDriverWait(self.driver, 30).until(EC.presence_of_element_located((By.CLASS_NAME, "tab-content")))# # 等待搜索结果加载detail_url = self.driver.current_urltree = self.get_html_tree(detail_url)url_dict = {}titles = tree.xpath("//*[@class='video_detail_episode']/li/a/text()")[n - 1:]urls = tree.xpath("//*[@class='video_detail_episode']/li/a/@href")[n - 1:]for title, url in zip(titles, urls):url_dict[title] = url# print(url_dict)return url_dictexcept Exception as e:logger.error(f"搜索过程发生错误: {str(e)}")if self.driver:self.driver.quit()self.driver = Nonereturn {}def setup_driver(self):# 优化视频播放性能的设置options = webdriver.EdgeOptions()options.add_argument('--disable-gpu')  # 禁用GPU加速options.add_argument('--no-sandbox')  # 禁用沙箱模式options.add_argument('--disable-dev-shm-usage')  # 禁用/dev/shm使用options.add_argument('--disable-software-rasterizer')  # 禁用软件光栅化options.add_argument('--disable-extensions')  # 禁用扩展options.add_argument('--disable-infobars')  # 禁用信息栏options.add_argument('--autoplay-policy=no-user-gesture-required')  # 允许自动播放options.add_experimental_option('excludeSwitches', ['enable-automation'])  # 禁用自动化提示options.add_experimental_option("useAutomationExtension", False)  # 禁用自动化扩展# 设置正确的驱动路径service = EdgeService(executable_path='./msedgedriver.exe')return webdriver.Edge(options=options, service=service)def video_player(self, movie_name, n):try:url_dict = self.movies_input(movie_name, n)if not url_dict:logger.error("未找到可播放的视频链接")returnif not self.driver:self.driver = self.setup_driver()for title, url in url_dict.items():try:# 打开网站self.driver.get(url)# 切换到视频iframeself.driver.switch_to.frame("iframeForVideo")logger.info(f"开始时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")# 等待视频元素加载wait = WebDriverWait(self.driver, 20)video_element = wait.until(EC.presence_of_element_located((By.CLASS_NAME, "art-video")))logger.info(f"找到视频元素: {video_element}")logger.info(f"{movie_name}:{title}集开始播放".center(10, "="))# 等待视频加载完成后执行全屏page_stage = Truewhile page_stage:paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)print("paused_state", paused_state)if not paused_state:break# 双击使得视频全屏显示ActionChains(self.driver).double_click(video_element).perform()time.sleep(3)# 视频单击播放paused_state = self.driver.execute_script("return arguments[0].paused;", video_element)if paused_state:logger.info(f"{movie_name}:{title}触发双击全屏")ActionChains(self.driver).click(video_element).perform()# # 点击视频开始播放# action = ActionChains(self.driver)# action.move_to_element_with_offset(video_element, 100, 100).click().perform()print(f"点击播放时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}")# 等待视频播放完成time.sleep(60*21)n = n + 1self.update_movie_count(movie_name,n)logger.info(f"第{title}集播放完毕".center(10, "="))except Exception as e:logger.error(f"播放第{title}集时发生错误: {str(e)}")continueexcept Exception as e:logger.error(f"视频播放总体发生错误: {str(e)}")finally:if self.driver:self.driver.quit()self.driver = Nonedef __del__(self):"""确保在对象销毁时关闭driver"""if self.driver:try:self.driver.quit()except:passdef count_read(self, movies, n=1):# 将此文件作为播放内容的缓存if not os.path.exists("./count.json"):with open("./count.json", "w", encoding="utf-8") as f:self.movies_dict = {movies:n}json.dump({movies: n}, f, ensure_ascii=False, indent=4)return nelse:with open("./count.json", "r", encoding="utf-8") as f:self.movies_dict = json.load(f)if not self.movies_dict.get(str(movies)):  # 读取不到moviesself.movies_dict.update({str(movies):n})with open("./count.json", "w", encoding="utf-8") as f:json.dump(self.movies_dict, f, ensure_ascii=False, indent=4)return nelse:  # 读取到了movieself.count_num = self.movies_dict[str(movies)]return self.count_num# update movies countdef update_movie_count(self, movies, n):with open("./count.json", "w", encoding="utf-8") as f:self.movies_dict[str(movies)] = njson.dump(self.movies_dict, f, ensure_ascii=False, indent=4)if __name__ == "__main__":video = VideoAutoPlay()movie = "火影忍者 疾风传"# movie = "神之塔 第二季"count_num = video.count_read(movie, 1)# video.update_movie_count(movie, 4)video.video_player(movie, count_num)

10.scrapy框架

破电脑太老,安装不了!!

d684c278cec246cf8b9eb62f1497660d.png

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com