代码仓库
代码我已经上传到 Github,大家需要的可以顺手点个 Star!
https://github.com/turbo-duck/biquge_fiction_spider
背景介绍
上一节已经拿到了 小说的详细内容 和 章节的列表
接下来,将章节的列表使用脚本从SQLite数据库中取出,使用脚本把数据推送至 RabbitMQ 中。
最后,Scrapy消费MQ,手动ACK确认,将数据写入到SQLite。
使用技术
- RabbitMQ
- Scrapy
- SQLite
生产者代码
连接 SQLite + RabbitMQ,构造 URL 推送!
import pika
import json
import sqlite3
import os
from dotenv import load_dotenv
load_dotenv()sql_connection = sqlite3.connect('../db/biquge.db')
cursor = sql_connection.cursor()rabbitmq_queue = os.getenv('RABBITMQ_QUEUE', 'default_queue')
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'localhost')
rabbitmq_port = os.getenv('RABBITMQ_PORT', '5672')
virtual_host = os.getenv('RABBITMQ_VHOST', '/')
username = os.getenv('RABBITMQ_USERNAME', 'guest')
password = os.getenv('RABBITMQ_PASSWORD', 'guest')credentials = pika.PlainCredentials(username,password
)connection_params_result = {'host': rabbitmq_host,'port': rabbitmq_port,'virtual_host': '/','credentials': credentials,
}
mq_connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))
channel = mq_connection.channel()
channel.queue_declare(queue=rabbitmq_queue, durable=True)# 按照页查询小说 (不然一次性太多了)
page_info = '1/1391'
sql = """
SELECT each_code FROM biquge_list WHERE page_info = ?
"""
cursor.execute(sql, (page_info, ))
results = cursor.fetchall()
for each_fiction in results:fiction_code = each_fiction[0]print(f"fiction code: {fiction_code}")# 根据 小说编码 查询 小说章节编码sql = """SELECT chapter_code FROM chapter_list WHERE fiction_code = ?"""cursor.execute(sql, (fiction_code,))chapter_results = cursor.fetchall()for each_chapter in chapter_results:chapter_code = each_chapter[0]chapter_url = f"https://www.xbiqugew.com/book/{fiction_code}/{chapter_code}.html"message = json.dumps({'url': chapter_url,})channel.basic_publish(exchange='',routing_key=rabbitmq_queue,body=message.encode('utf-8'),properties=pika.BasicProperties(delivery_mode=2))print(f"Send MQ: {message}")# i = input("======")
mq_connection.close()
sql_connection.close()
消费者代码
Scrapy 连接 RabbitMQ 将数据写入 SQLite。
Spider.py
import scrapy
import time
import pika
import sqlite3
import re
import json
from urllib import parse
from biquge_chapter_detail_spider.items import BiqugeChapterDetailSpiderItemclass SpiderSpider(scrapy.Spider):name = "spider"# allowed_domains = ["spider.com"]start_urls = []def __init__(self, **kwargs):super().__init__(**kwargs)self.queue_name = Noneself.channel = Noneself.db_params = Noneself.conn = Noneself.cursor = Noneself.tcp_uuid = 0def establish_connection(self):try:connection_params = self.settings.get('RABBITMQ_PARAMS', None)self.queue_name = connection_params['queue']credentials = pika.PlainCredentials(connection_params['username'],connection_params['password'])connection_params_result = {'host': connection_params['host'],'port': connection_params['port'],'virtual_host': connection_params['virtual_host'],'credentials': credentials,'heartbeat': 3600,'connection_attempts': 5,}connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))self.channel = connection.channel()self.channel.basic_qos(prefetch_count=1)self.tcp_uuid = int(self.tcp_uuid) + 1except Exception as e:print(f"连接MQ失败: {str(e)}")print("等待5秒后重试...")time.sleep(5)self.establish_connection()def connect_db(self):try:self.conn = sqlite3.connect("../db/biquge.db")self.cursor = self.conn.cursor()except Exception as e:print("Error connecting to DB: ", e)print("等待5秒后重试...")time.sleep(5)self.connect_db()def extract_last_number_html(self, text):# 使用正则表达式查找所有的数字numbers = re.findall(r'.*?/(\d+).html', text)# print(numbers)if numbers:# 返回最后一个数字return str(numbers[-1])else:return ""def start_requests(self):self.establish_connection()self.connect_db()while True:try:method, header, body = self.channel.basic_get(self.queue_name)except Exception as e:print("--- ---")print(e)print("--- establish_connection ---")self.establish_connection()time.sleep(1)continueif not method:continuedelivery_tag = method.delivery_tagbody = body.decode()body = parse.unquote(body)json_data = json.loads(body)print(body)url = json_data['url']if url is None or url == "":self.ack(delivery_tag)continuechapter_code = self.extract_last_number_html(url)# print(chapter_code)# 检验数据库中是否有数据 有则跳过sql = "SELECT COUNT(id) AS count FROM chapter_detail_list WHERE chapter_code = ?"try:self.cursor.execute(sql, (chapter_code,))result = self.cursor.fetchone()count = result[0]if count > 0:print(f"SQL SELECT chapter_code: {chapter_code}, COUNT: {count}, ACK: {delivery_tag} 已跳过")self.ack(delivery_tag)continueexcept Exception as e:print(e)print(sql)print("--- reconnect_db ---")self.no_ack(delivery_tag)self.connect_db()time.sleep(1)continueprint(f"准备请求: {url}, ACK: {delivery_tag}")yield self.callback(url=url,delivery_tag=delivery_tag,chapter_code=chapter_code,)def callback(self, url, delivery_tag, chapter_code):meta = {"url": url,"chapter_code": chapter_code,"delivery_tag": delivery_tag,"tcp_uuid": int(self.tcp_uuid),}print(url)return scrapy.Request(url=url,meta=meta,callback=self.parse_list,)def ack(self, delivery_tag):self.channel.basic_ack(delivery_tag=delivery_tag)print(f"提交ACK确认: {delivery_tag}")def no_ack(self, delivery_tag):self.channel.basic_reject(delivery_tag=delivery_tag, requeue=True)def parse_list(self, response):meta = response.metachapter_code = meta['chapter_code']chapter_list = response.xpath(".//div[@id='content']/text()").extract()chapter_json_list = []for each_chapter in chapter_list:each_chapter = re.sub(r' |\xa0', "", str(each_chapter))chapter_json_list.append(each_chapter)chapter_content = json.dumps(chapter_json_list, ensure_ascii=False)item = BiqugeChapterDetailSpiderItem()item['chapter_code'] = str(chapter_code)item['chapter_content'] = str(chapter_content)print(f"抓取 chapter_code: {chapter_code}")yield item# ackdelivery_tag = meta['delivery_tag']tcp_uuid = meta['tcp_uuid']if int(tcp_uuid) == self.tcp_uuid:self.ack(delivery_tag)else:print(f"ACK 跳过: tcp_uuid: {tcp_uuid}, self.tcp_uuid: {self.tcp_uuid}, delivery_tag: {delivery_tag}")
pipline.py
写入 SQLite
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import sqlite3class SQLitePipeline:def __init__(self):self.cursor = Noneself.connection = Nonedef open_spider(self, spider):# 连接到 SQLite 数据库self.connection = sqlite3.connect("../db/biquge.db")self.cursor = self.connection.cursor()def close_spider(self, spider):# 关闭数据库连接self.connection.close()def process_item(self, item, spider):sql = """INSERT INTOchapter_detail_list(chapter_code, chapter_content)VALUES(?, ?)"""# print(sql)self.cursor.execute(sql, (item['chapter_code'], item['chapter_content']))self.connection.commit()return item
settings.py
相关的配置情况
# Scrapy settings for biquge_chapter_detail_spider project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
import os
from dotenv import load_dotenvload_dotenv()BOT_NAME = "biquge_chapter_detail_spider"SPIDER_MODULES = ["biquge_chapter_detail_spider.spiders"]
NEWSPIDER_MODULE = "biquge_chapter_detail_spider.spiders"
LOG_LEVEL = "ERROR"# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "biquge_chapter_detail_spider (+http://www.yourdomain.com)"# Obey robots.txt rules
ROBOTSTXT_OBEY = False# Configure maximum concurrent requests performed by Scrapy (default: 16)
# CONCURRENT_REQUESTS = 1# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
DOWNLOAD_DELAY = 0.2
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16# Disable cookies (enabled by default)
#COOKIES_ENABLED = False# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","Accept-Language": "en","User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
# "biquge_chapter_detail_spider.middlewares.BiqugeChapterDetailSpiderSpiderMiddleware": 543,
#}# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#DOWNLOADER_MIDDLEWARES = {
# "biquge_chapter_detail_spider.middlewares.BiqugeChapterDetailSpiderDownloaderMiddleware": 543,
#}# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
# "scrapy.extensions.telnet.TelnetConsole": None,
#}# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {"biquge_chapter_detail_spider.pipelines.SQLitePipeline": 300,# "biquge_chapter_detail_spider.pipelines.BiqugeChapterDetailSpiderPipeline": 300,
}# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"# Set settings whose default value is deprecated to a future-proof value
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"RABBITMQ_PARAMS = {'queue': os.getenv('RABBITMQ_QUEUE', 'default_queue'),'host': os.getenv('RABBITMQ_HOST', 'localhost'),'port': os.getenv('RABBITMQ_PORT', '5672'),'virtual_host': os.getenv('RABBITMQ_VHOST', '/'),'username': os.getenv('RABBITMQ_USERNAME', 'guest'),'password': os.getenv('RABBITMQ_PASSWORD', 'guest'),'auto_ack': os.getenv('RABBITMQ_AUTO_ACK', False)
}
运行代码
scrapy crawl spider