一、介绍
这个Python脚本是一个高度可配置的代理服务器管理工具,主要用于网络爬虫应用。其核心功能是生成大量的IPv6地址和相应的代理服务,从而允许用户在数据抓取过程中轻松绕过IP限制。这一目标是通过几个主要步骤实现的:
- IPv6地址动态生成与管理:通过Linux的
ip
命令和虚拟网络接口技术,脚本能够在指定的基础网络接口上动态生成多个虚拟接口,每个接口配置有独立的IPv6地址,极大地扩展了可用的IP资源。 - 智能代理规则配置:结合
ip6tables
的SNAT功能,脚本为每个生成的IPv6地址配置相应的源地址转换规则,实现对外请求时自动切换IP地址,以此来隐藏爬虫的真实IP,并模拟多点分布式爬取。 - 灵活的代理策略调整:通过内置的
switch_proxy_server
函数,支持基于当前代理使用情况动态调整和切换代理规则,使得爬虫在执行爬取任务时能够根据实际需要灵活变换IP。
从脚本到实战
通过综合应用以上特性,这个脚本不仅提供了一种高效的方式来生成和管理大量的IPv6代理,还为动态和灵活的网络爬虫应用提供了强大的支持。
问题和考虑因素
IPv6封锁: 如果一个网络安全系统选择封锁整段IPv6地址,那么这个脚本生成的多个IPv6代理可能会一并被封。这种情况下,即使生成了大量的IPv6地址也无法达到绕过IP限制的目的。
现代WAF(Web应用防火墙): 通常,现代的WAF不仅仅依赖于IP地址进行流量限制。它们可能还使用更复杂的算法和多种信号(如请求头、行为分析等)来识别和阻止恶意流量。但是,依赖于庞大的IPv6地址空间确实能在某种程度上绕过基于单一IP的流量限制。
……
二、管理IPV6网口
①is_root
输入变量:无。
输出内容:布尔值。如果当前用户是root用户,返回True
;否则返回False
。
执行逻辑:
- 使用
os.geteuid()
获取当前执行脚本的用户ID。 - 如果用户ID为0(root用户的ID),函数返回
True
,表示当前用户具有root权限。 - 否则,返回
False
,表示当前用户没有root权限。
这个函数的主要用途是在脚本执行需要root权限的操作前,进行权限检查,以避免执行失败。
def is_root():
# 获取当前用户的有效用户ID
return os.geteuid() == 0 # 检查是否为root用户(ID为0)
②get_existing_interfaces
输入变量:
base_interface
:字符串,表示基础网络接口的名称,默认为'eth0'
。
输出内容:字典。键为网络接口名称,值为对应的IPv6地址(非链路本地地址)。
执行逻辑:
- 使用
subprocess.run
执行ip addr show
命令,获取系统中所有网络接口的信息。 - 通过正则表达式匹配,找出所有基于
base_interface
创建的虚拟网络接口。 - 对于每个匹配的接口,再次执行
ip addr show
命令,通过正则表达式匹配非链路本地的IPv6地址。 - 将找到的接口名称和其IPv6地址存入字典
iface_ipv6_dict
中。 - 返回
iface_ipv6_dict
字典。
这个函数的目的是列出系统中所有基于指定基础接口的虚拟网络接口及其IPv6地址,为后续的代理服务器配置提供必要的信息。
def get_existing_interfaces(base_interface='eth0'):
cmd_result = subprocess.run(["ip", "addr", "show"], stdout=subprocess.PIPE)
output = cmd_result.stdout.decode()
# 通过正则表达式匹配接口名称
iface_pattern = re.compile(re.escape(base_interface) + r'_([0-9]+)@')
iface_matches = iface_pattern.findall(output)
# 构建接口名称列表并获取其IPv6地址
interfaces = [f"{base_interface}_{match}" for match in iface_matches]
iface_ipv6_dict = {}
for iface in interfaces:
cmd_result = subprocess.run(["ip", "addr", "show", iface], stdout=subprocess.PIPE)
output = cmd_result.stdout.decode()
ipv6_pattern = re.compile(r"inet6\s+([0-9a-f:]+)\/\d+")
ipv6_matches = ipv6_pattern.findall(output)
ipv6_addresses = [addr for addr in ipv6_matches if not addr.startswith("fe80")]
if ipv6_addresses:
iface_ipv6_dict[iface] = ipv6_addresses[0]
return iface_ipv6_dict
③interface_usable
输入变量:
interface_name
:字符串,表示需要检查的网络接口名称。skip_check
:布尔值,默认为False
。如果为True
,则跳过检查直接返回True
。ipv6_address
:字符串,用于ping测试的IPv6地址,默认为'2400:3200::1'
。max_retries
:整数,表示最大重试次数,默认为3。
输出内容:布尔值。如果成功ping通指定的IPv6地址,返回True
;否则返回False
。
执行逻辑:
- 首先检查
skip_check
参数,如果为True
,则不执行ping检查,直接返回True
。 - 使用
subprocess.run
执行ping
命令,尝试ping通指定的IPv6地址,最多重试max_retries
次。 - 如果任一次ping成功(
cmd_result.returncode == 0
),则返回True
。 - 如果所有尝试均失败(包括超时或其他异常),则返回
False
。
这个函数用于检测指定网络接口的连通性,特别是在配置IPv6地址后,验证地址是否正确配置且可用。
def interface_usable(interface_name, skip_check=False, ipv6_address='2400:3200::1', max_retries=3):
if skip_check:
return True
current_try = 0
while current_try < max_retries:
try:
# 使用ping命令测试网络连通性
cmd_result = subprocess.run(["ping", "-c", "1", "-I", interface_name, ipv6_address], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
if cmd_result.returncode == 0:
return True # 若成功ping通,返回True
except subprocess.TimeoutExpired:
# 处理ping命令超时
print(f"Ping attempt {current_try + 1} of {max_retries} timed out. Retrying...")
except subprocess.SubprocessError as e:
# 处理其他subprocess相关的异常
print(f"An error occurred while trying to ping: {e}. Retrying...")
current_try += 1
return False # 经过所有重试后仍未成功,返回False
④create_ipv6_addresses
输入变量:
n
:整数,指定要创建的IPv6地址数量。base_interface
:字符串,指定基础网络接口名称,默认为'eth0'
。delete_interface
:布尔值,指定是否在创建新接口前删除旧接口,默认为True
。
输出内容:
- 接口列表:成功创建的虚拟网络接口名称列表。
执行逻辑:
- 检查是否以root权限运行,若不是则在命令前添加
sudo
。 - 如果选择删除旧接口,则遍历并删除所有基于
base_interface
的虚拟接口。 - 对于每一个新接口,使用
ip link add
命令创建,并使用ip link set
命令激活。 - 对每个新激活的接口使用
dhclient
获取IPv6地址。 - 返回创建的所有接口的名称列表。
此函数负责在系统中创建指定数量的虚拟接口,并配置IPv6地址,为代理服务器的创建和管理提供基础。
def create_ipv6_addresses(n, base_interface='eth0', delete_interface=True):
sudo_cmd = ["sudo"] if not is_root() else []
if delete_interface:
delete_ipv6_addresses(base_interface)
existing_interfaces = list(get_existing_interfaces(base_interface).keys())
interfaces = []
for i in range(1, n + 1):
interface_name = f"{base_interface}_{i}"
# Check if the interface exists, if yes, delete it first
if interface_name in existing_interfaces:
if interface_usable(interface_name):
print(f"Interface {interface_name} already exists. Skipping creation.")
interfaces.append(interface_name)
continue
else:
subprocess.run(sudo_cmd + ["ip", "link", "delete", interface_name])
# Now add the interface
subprocess.run(sudo_cmd + ["ip", "link", "add", "link", base_interface, interface_name, "type", "macvlan", "mode", "bridge"])
subprocess.run(sudo_cmd + ["ip", "link", "set", interface_name, "up"])
#subprocess.run(sudo_cmd + ["dhclient", "-6", "-nw", interface_name])
interfaces.append(interface_name)
return interfaces
⑤delete_ipv6_addresses
输入变量:
base_interface
:字符串,指定基础网络接口名称,默认为'eth0'
。
输出内容:
- 无直接输出(函数执行的副作用是删除指定接口)。
执行逻辑:
- 遍历基于
base_interface
创建的所有虚拟网络接口。 - 对于每个接口,使用
ip link delete
命令删除。 - 该操作清理由
create_ipv6_addresses
函数创建的所有虚拟接口。
该函数用于清除之前创建的所有虚拟接口,是管理IPv6代理服务器生命周期的重要部分。
def delete_ipv6_addresses(base_interface='eth0'):
sudo_cmd = ["sudo"] if not is_root() else []
existing_interfaces = list(get_existing_interfaces(base_interface).keys())
for interface_name in existing_interfaces:
subprocess.run(sudo_cmd + ["ip", "link", "delete", interface_name])
⑥execute_ip6tables_command
输入变量:
command
:字符串,指定要执行的ip6tables
命令。
输出内容:
- 无直接输出(函数执行结果通过
subprocess.run
的成功与否间接体现)。
执行逻辑:
- 首先判断当前脚本是否以 root 权限运行,如果不是,则命令前添加
sudo
。 - 使用
subprocess.run
执行完整的ip6tables
命令,命令通过字符串拆分成列表形式传递给subprocess.run
。 - 该函数没有返回值,但执行的命令结果(如添加、删除或修改
ip6tables
规则)会影响系统的网络配置。 - 用于处理 IPv6 流量转发的规则设置,如设置 NAT 规则以实现特定的代理转发逻辑。
这个函数是脚本中与网络安全策略相关的核心部分,用于动态管理 ip6tables
规则,以支持复杂的代理服务器配置。
def execute_ip6tables_command(command):
sudo_cmd = ["sudo"] if not is_root() else []
cmd = sudo_cmd + command.split()
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
三、转发IPV6服务
①start_proxy_servers
输入变量:
n
、base_interface
、delete_interface
:同create_ipv6_addresses
。mode
:字符串,代理服务器的工作模式,目前只支持'normal'
。
输出内容:
- 控制台输出当前代理设置的反馈信息。
执行逻辑:
- 首先创建指定数量的IPv6地址。
- 然后根据模式更新
FAKE_IPV6_CHAIN
中的SNAT规则,以使用新创建的IPv6地址作为源地址。('normal'
模式每次创建一个规则,通过switch_proxy_server
切换代理;'random'
模式一次性创建全部规则,每个请求随机) - 输出当前使用的接口和IPv6地址信息,提供直接反馈。
该函数是启动和管理IPv6代理服务器的主入口,通过动态配置网络接口和 ip6tables
规则,实现高度灵活的代理服务管理。
def start_proxy_servers(n, mode='normal', base_interface='eth0', delete_interface=True):
global iface_ipv6_dict
interfaces = create_ipv6_addresses(n, base_interface, delete_interface)
#获取生成的接口及IP
iface_ipv6_dict = get_existing_interfaces(base_interface)
if iface_ipv6_dict:
# 删除流量重定向到自定义链
execute_ip6tables_command('ip6tables -t nat -D POSTROUTING -j FAKE_IPV6_CHAIN')
# 删除自定义链
execute_ip6tables_command('ip6tables -t nat -X FAKE_IPV6_CHAIN')
# 创建自定义链
execute_ip6tables_command('ip6tables -t nat -N FAKE_IPV6_CHAIN')
# 流量重定向到自定义链
execute_ip6tables_command(f'ip6tables -t nat -A POSTROUTING -o {base_interface} -j FAKE_IPV6_CHAIN')
if mode == 'normal':
selected_interface = list(iface_ipv6_dict.keys())[0]
ipv6_address = iface_ipv6_dict[selected_interface]
# 添加SNAT规则
execute_ip6tables_command(f'ip6tables -t nat -A FAKE_IPV6_CHAIN -j SNAT --to-source {ipv6_address}')
print(f"Using interface: {selected_interface}, Connecting to: {ipv6_address}")
elif mode == 'random':
for index, (interface, ipv6_address) in enumerate(iface_ipv6_dict.items()):
adjusted_probability = 1/(len(iface_ipv6_dict)-index)
execute_ip6tables_command(f'ip6tables -t nat -A FAKE_IPV6_CHAIN -m statistic --mode random --probability {adjusted_probability} -j SNAT --to-source {ipv6_address}')
②switch_proxy_server
输入变量:
mode
:字符串,指定代理切换的模式,当前只支持'normal'
('random'
模式不需要)。
输出内容:
- 输出到控制台的信息,显示已切换到的接口和IPv6地址。
执行逻辑:
- 基于全局
proxy_switch_count
和iface_ipv6_dict
的长度,计算下一个应使用的代理接口索引。 - 根据索引从
iface_ipv6_dict
中选择对应的接口和IPv6地址。 - 调用
execute_ip6tables_command
,更新FAKE_IPV6_CHAIN
链中的 SNAT 规则,将源地址改为选中的 IPv6 地址。 - 控制台输出当前所使用的接口名称和IPv6地址,为用户提供直接的反馈。
该函数使用户能够根据需要动态地切换代理服务器,增加爬虫任务的灵活性和隐蔽性。
def switch_proxy_server(mode='normal'):
global proxy_switch_count
global iface_ipv6_dict
if mode == 'normal':
if iface_ipv6_dict:
proxy_switch_count += 1
proxy_index = proxy_switch_count % len(iface_ipv6_dict)
selected_interface = list(iface_ipv6_dict.keys())[proxy_index]
ipv6_address = iface_ipv6_dict[selected_interface]
# 清空自定义链
execute_ip6tables_command('ip6tables -t nat -F FAKE_IPV6_CHAIN')
# 添加SNAT规则
execute_ip6tables_command(f'ip6tables -t nat -A FAKE_IPV6_CHAIN -j SNAT --to-source {ipv6_address}')
print(f"Using interface: {selected_interface}, Connecting to: {ipv6_address}")
③stop_proxy_servers
输入变量:
base_interface
: 字符串类型,指定了基础网络接口的名称,默认值为'eth0'
。这个接口是之前用来创建虚拟网络接口的基础接口。delete_interface
: 布尔类型,指示在停止代理服务器时是否删除之前创建的所有虚拟网络接口,默认值为True
。
输出内容:
- 该函数没有返回值,但它会在控制台上打印操作的状态信息,如正在关闭代理服务器、正在删除IPv6地址等。
def stop_proxy_servers(base_interface='eth0', delete_interface=True):
# 删除流量重定向到自定义链
execute_ip6tables_command('ip6tables -t nat -D POSTROUTING -j FAKE_IPV6_CHAIN')
# 删除自定义链
execute_ip6tables_command('ip6tables -t nat -X FAKE_IPV6_CHAIN')
if delete_interface:
print("正在关闭代理服务器...")
print("删除IPv6地址...")
delete_ipv6_addresses(base_interface)
print("代理服务器已关闭.")
else:
print("正在关闭代理服务器...")
print("代理服务器已关闭.")
五、使用示例
from selenium import webdriver
import gen_proxy_servers
# 生成代理IPV6数量
ipv6_count = 100
# 生成的IPV6接口前缀名称
base_interface = "eth0"
#切换IP模式
ip_mode ='normal'
# 运行完毕后删除生成的网口
delete_interface = False
#生成100个IPV6
gen_proxy_servers.start_proxy_servers(ipv6_count, ip_mode, base_interface, delete_interface)
# 设置Chrome选项
chrome_options = webdriver.ChromeOptions()
# 启动Chrome浏览器
driver = webdriver.Chrome(options=chrome_options)
# 打开网站
driver.get("http://ipw.cn")
# 切换代理(假设switch_proxy_server函数在这里被调用)
gen_proxy_servers.switch_proxy_server(ip_mode)
# 打开另一个网站
driver.get("http://ipw.cn")
#关闭
stop_proxy_servers(base_interface, delete_interface)
评论区