您的位置:首页 > 游戏 > 手游 > 二开webalive(判断网站存活,方便后续去重)

二开webalive(判断网站存活,方便后续去重)

2024/12/23 8:46:47 来源:https://blog.csdn.net/qq_35013491/article/details/142184256  浏览:    关键词:二开webalive(判断网站存活,方便后续去重)

用法:

python38 whichAlive.py -f url.txt -t 5  --proxy 127.0.0.1:8080

python38 whichalive.py -f url.txt  -t 5 -d --try-again

二开内容:

1.  增加了识别标题的准确性。

2. 当获取不到标题的时候,响应内容计算hash值保存到标题中。

whichAlive.py代码:

import argparse
import csv
import datetime
import hashlib
import os
import re
import socket
import time
import urllib
import urllib.parse
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, waitimport requests
import urllib3
from bs4 import BeautifulSoup
import chardeturllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
DEBUG = False
TRYAGAIN = Falseclass whichAlive(object):def __init__(self, file, THREAD_POOL_SIZE=10, allow_redirect=False, PROXY={}):self.file = fileself.filename = ''.join(file.split('/')[-1].split('.')[:-1])self.timenow = str(time.time()).split(".")[0]self.outfilename = f'{self.filename}{self.timenow}.csv'self.errorfilename = f'error_{self.filename}{self.timenow}.txt'self.urllist = self.__urlfromfile()self.tableheader = ['no', 'url', 'ip', 'state', 'state_code', 'title', 'server', 'length', 'other']self.HEADER = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',}self.THREAD_POOL_SIZE = THREAD_POOL_SIZEself.allurlnumber = len(self.urllist)self.completedurl = -1self.allow_redirect = allow_redirectself.PROXY = PROXYdef run(self):self.completedurl += 1self.__writetofile(self.tableheader)tasklist = []start_time = datetime.datetime.now()t = ThreadPoolExecutor(max_workers=self.THREAD_POOL_SIZE)for k, url in enumerate(self.urllist):tasklist.append(t.submit(self.__scan, url, k+1))print(f'total {self.allurlnumber}')if wait(tasklist, return_when=ALL_COMPLETED):end_time = datetime.datetime.now()print(f'--------------------------------\nDONE, use {(end_time - start_time).seconds} seconds')print(f'outfile: {os.path.join(os.path.abspath(os.path.dirname(__file__)), "result", self.outfilename)}')def __scan(self, url, no, tryagainflag=False):def callback(no, url, ip, state, state_code, title, server, length, other):self.completedurl += 1thisline = [no, url, ip, state, state_code, title, server, length, other]nowpercent = '%.2f'%((self.completedurl/self.allurlnumber)*100)print(f'[{nowpercent}%] {url} {ip} {state} {title} {length}')self.__writetofile(thisline)ip = ''state = ''state_code = -1title = ''server = ''length = -1other = ''try:if DEBUG: print(f'[+] {no} {url}')u = urllib.parse.urlparse(url)ip = self.__getwebip(u.netloc.split(':')[0])if self.allow_redirect:r = requests.get(url=url, headers=self.HEADER, timeout=15, verify=False, proxies=self.PROXY, allow_redirects=True)titles = [self.__getwebtitle(r)]  # 添加最后的响应标题lengths = [str(self.__getweblength(r))]servers = [self.__getwebserver(r)]for response in r.history:titles.insert(0, self.__getwebtitle(response))lengths.insert(0, str(self.__getweblength(response)))servers.insert(0, self.__getwebserver(response))state = 'alive'state_code = '->'.join([str(i.status_code) for i in r.history] + [str(r.status_code)])title = '->'.join(titles)length = '->'.join(lengths)server = '->'.join(servers)else:r = requests.get(url=url, headers=self.HEADER, allow_redirects=False, timeout=15, verify=False, proxies=self.PROXY)state = 'alive'state_code = r.status_codetitle = self.__getwebtitle(r)length = self.__getweblength(r)server = self.__getwebserver(r)callback(no, url, ip, state, state_code, title, server, length, other)except requests.exceptions.ConnectTimeout as e:if DEBUG: print(f'[ConnectTimeout] {url} {e}')self.__errorreport(str(e))state = 'dead'callback(no, url, ip, state, state_code, title, server, length, 'ConnectTimeout')except requests.exceptions.ReadTimeout as e:if DEBUG: print(f'[ReadTimeout] {url} {e}')self.__errorreport(str(e))state = 'dead'callback(no, url, ip, state, state_code, title, server, length, 'ReadTimeout')except requests.exceptions.ConnectionError as e:if DEBUG: print(f'[ConnectionError] {url} {e}')self.__errorreport(str(e))state = 'dead'callback(no, url, ip, state, state_code, title, server, length, 'ConnectionError')except Exception as e:if DEBUG: print(f'[ERROR] {no} {url} {e}')self.__errorreport(str(e))if TRYAGAIN and not tryagainflag:self.__scan(url, no, True)callback(no, url, ip, state, state_code, title, server, length, 'e')def __getwebtitle(self, response):try:detected_encoding = chardet.detect(response.content)['encoding']if detected_encoding is None:detected_encoding = 'utf-8'  # 默认编码content = response.content.decode(detected_encoding, errors='replace')soup = BeautifulSoup(content, 'html.parser')title_tag = soup.find('title')if title_tag:return title_tag.get_text(strip=True)else:# 如果未找到标题,计算内容的哈希值并返回content_hash = hashlib.md5(content.encode(detected_encoding, errors='replace')).hexdigest()return f'Hash-{content_hash[:8]}'  # 返回哈希值的前8位作为标题except Exception as e:if DEBUG: print(f'[getwebtitle ERROR] {e}')# 在无法解码内容的情况下返回哈希值content_hash = hashlib.md5(response.content).hexdigest()return f'Hash-{content_hash[:8]}'def __getwebip(self, domain):try:ip = socket.getaddrinfo(domain, 'http')return ip[0][4][0]except:return ''def __getweblength(self, response):try:return len(response.content)except Exception as e:if DEBUG: print(f'[getweblength ERROR] {e}')return -1def __getwebserver(self, response):try:return response.headers.get('server') if response.headers.get('server') else ''except:return ''def __urlfromfile(self):with open(self.file, 'r') as f:return [i.strip() for i in f.readlines()]def __writetofile(self, data: list):with open(f'result/{self.outfilename}', 'a', newline='', encoding='utf-8') as f:writer = csv.writer(f)writer.writerow(data)def __errorreport(self, message):with open(f'error/{self.errorfilename}', 'a', encoding='utf-8') as f:f.write(message + '\n')if __name__ == '__main__':parser = argparse.ArgumentParser(usage='whichAlive usage')parser.add_argument('-f', '--file', default='url.txt', help='URL lists file.')parser.add_argument('--proxy', default='', help='Set proxy, such as 127.0.0.1:8080')parser.add_argument('-t', '--thread', default=10, type=int, help='Set max threads, default 10')parser.add_argument('-d', '--debug', default=False, action='store_true', help='print some debug information')parser.add_argument('--try-again', default=False, action='store_true', help='If some error, try again scan that url once', dest='tryagain')args = parser.parse_args()DEBUG = args.debugTRYAGAIN = args.tryagainw = whichAlive(file=args.file,THREAD_POOL_SIZE=args.thread,allow_redirect=True,PROXY={'http': args.proxy, 'https': args.proxy})w.run()

mysql数据库去重语句:使用weblive的结果

navicat 导入 --  编码选择第一个 系统默认编码 -- 分隔符选无。就能导入全部的weblive结果了。


1. 给每行插入序列id:

ALTER TABLE results2 ADD id INT(4) NOT NULL PRIMARY KEY AUTO_INCREMENT FIRST;


2. 通过`host`,`status`,`contentLength`,`banner`,`title` 字段删除重复,保留最大的id

DELETE FROM results2 WHERE id NOT IN (SELECT id  FROM (SELECT MAX(id) id FROM results2 GROUP BY `ip`,`state_code`,`title`,`Length`,`server`) cc);

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com