在如今高度互联的世界中,Web应用正承受着前所未有的安全压力。从SQL注入到DDoS攻击,威胁无处不在,防护措施不容懈怠。为了应对这些挑战,我开发了一款智能化Web应用防火墙(WAF),并配套设计了一个自动化攻击测试系统,旨在为Web应用提供全方位的安全保护。
背景与动机
随着网络攻击手段的不断进化,传统的安全防护机制已经难以应对日益复杂的威胁。企业和开发者迫切需要一种智能、自动化且可扩展的安全解决方案,以便在不影响性能的情况下,提供卓越的防护能力。因此,本项目的核心目标是打造一款集成多种防护功能的WAF,并通过自动化测试手段,确保其在面对不同攻击场景时的可靠性和有效性。
完整代码:
防火墙实验_模拟Web攻击对WAF进行测试
核心功能亮点
-
智能化DDoS攻击检测与防护
- 实时请求分析:通过监控IP请求频率,快速识别并阻止DDoS攻击,防止服务器资源被恶意耗尽。
- 自适应阈值:根据实际流量情况,动态调整检测阈值,避免误判和漏判,确保防护的精准性。
-
多层次的安全规则引擎
- 精准的正则表达式匹配:内置多套正则表达式规则库,专门针对SQL注入、XSS攻击、目录遍历等常见威胁,进行深度检测和拦截。
- IP与URI的双向过滤:结合IP黑白名单和URI路径过滤,精细控制访问权限,为应用层安全提供更全面的保护。
-
自动化攻击测试系统
- 多场景模拟攻击:涵盖SQL注入、XSS、CSRF等多种攻击类型,自动化测试系统可以在实际环境中模拟真实攻击,验证WAF的防护效果。
- 持续性压力测试:脚本在长时间运行中不断发送攻击请求,帮助开发者了解WAF在高并发和大流量场景下的表现。
-
可扩展的日志与监控系统
- 详细的攻击日志记录:每次检测到的攻击行为都会被详细记录,包括攻击时间、IP地址、攻击类型等关键信息,为后续分析和溯源提供依据。
- 实时监控与报警:整合实时监控机制,一旦检测到异常流量或攻击行为,系统可以立即发出警报,提醒管理员及时处理。
项目架构与代码实现
1. Web应用防火墙核心模块(waf_02.py
)
waf_02.py
是WAF的核心代码,包含了对DDoS攻击的检测、防御策略、请求过滤等功能。
代码片段:DDoS攻击检测
import socket
from datetime import datetime, timedelta
from threading import Thread, Lock# 存储IP请求计数和时间戳的全局字典
ip_requests = {}
lock = Lock()def check_ddos_attack(ip_address):with lock:now = datetime.now()if ip_address in ip_requests:requests, first_request_time = ip_requests[ip_address]if now - first_request_time <= timedelta(seconds=10):requests += 1ip_requests[ip_address] = (requests, first_request_time)if requests > 100:print(f"DDoS attack detected from {ip_address}.")return Trueelse:ip_requests[ip_address] = (1, now)else:ip_requests[ip_address] = (1, now)return False
功能说明:
- DDoS攻击检测:通过跟踪单一IP的请求频率,如果在短时间内请求次数超过设定阈值(如10秒内超过100次),则认为该IP存在DDoS攻击的嫌疑,系统会自动阻断该IP的连接。
代码片段:请求过滤与处理
def filter(r, addr):uri = r.uri.split('?')[0]if uri in BLACK_URI_LIST:return {"status": True, "type": 'in-black-uri'}det_data = Detect(r)result = det_data.run()if result["status"]:return {"status": True, "type": result["type"]}return result
功能说明:
- 请求过滤:通过匹配URI和利用内置的攻击检测逻辑(
Detect
类),WAF能够识别并阻止包括SQL注入、XSS攻击等在内的多种恶意请求。
2. 自动化攻击测试模块(AttackTest_03.py
)
为了验证WAF的有效性,AttackTest_03.py
脚本模拟了多种常见的Web攻击,并持续向WAF发送这些恶意请求,观察其响应情况。
代码片段:攻击模拟与测试
import requests
import time# 测试的恶意请求列表
test_requests = [{'path': '/?user=admin OR 1=1', 'description': 'SQL Injection(SQL注入攻击)'},{'path': '/?search=<script>alert(1)</script>', 'description': 'XSS Attack(跨站脚本攻击)'},{'path': '/../../../etc/passwd', 'description': 'Directory Traversal(目录遍历攻击)'},
]Attack_Num = 1
while True:print(f"正在进行第 {Attack_Num} 次攻击测试")for test in test_requests:print(f"正在测试:{test['description']}")response = requests.get(waf_address + test['path'], timeout=5)print(f"响应状态码: {response.status_code}")Attack_Num += 1time.sleep(1)
功能说明:
- 模拟攻击:该脚本生成了多种攻击请求,如SQL注入、XSS和目录遍历等,并不断向WAF发送这些请求,以测试WAF是否能有效阻止攻击。
- 持续测试:脚本以循环的方式持续运行,每轮测试后短暂休息,模拟真实的攻击环境。
3. SQL注入攻击日志展示(web_server_01.py
)
作为WAF的补充,web_server_01.py
用于模拟SQL注入攻击,并展示检测到的攻击日志。
代码片段:SQL注入攻击日志记录
from flask import Flask, request, render_template_stringapp = Flask(__name__)
sql_injection_logs = []@app.route('/simulate_sql_injection', methods=['GET', 'POST'])
def simulate_sql_injection():attacked_path = request.args.get('path', 'unknown')attacker_ip = request.remote_addrsql_injection_logs.append({'attacker_ip': attacker_ip, 'attacked_path': attacked_path})return 'Simulated SQL injection attack logged.', 200@app.route('/show_sql_injection_attacks')
def show_sql_injection_attacks():return render_template_string(f'''<h2>SQL Injection Attacks</h2>{''.join([f"<div>{log['attacked_path']} from {log['attacker_ip']}</div>" for log in sql_injection_logs])}''')
功能说明:
- SQL注入日志记录与展示:此模块负责记录模拟的SQL注入攻击,并通过简单的Web界面展示这些攻击信息,帮助开发者了解攻击路径和来源IP。
项目总结
通过本项目,我们成功构建了一个功能强大的Web应用防火墙,能够实时检测并阻止各种Web攻击,同时通过自动化攻击测试系统验证其防护效果。无论是防御DDoS攻击、阻止SQL注入还是过滤XSS脚本,WAF都展现出了卓越的性能和可靠性。
这种结合了智能检测与持续测试的安全解决方案不仅适用于当前的Web应用防护,更为未来的安全需求打下了坚实的基础。未来,我们将进一步扩展WAF的功能,引入AI与机器学习技术,打造更智能、更高效的安全防护体系。