目录
背景
使用iptables实现
利用iptables丢弃某ip数据包
使用 -L 列出所有规则
IP 连通性 通信 测试
插入一条规则,丢弃此ip 的所有协议请求
列出所有规则
测试 丢弃规则内的IP 连通性
清除 规则列表的 限制
模拟ip进行丢包50%的处理。
mysql proxy 代理
proxy代码
直接使用pymysql 测试
Python 版本低于3.7
其他扩展
总结
资料获取方法
前景
当前的应用都使用了前后端分离的架构,前后端系统需要协同以实现各种功能。后端系统通常负责处理业务逻辑、数据存储和与其他服务的交互,而前端则负责用户界面和用户交互。而在前后端数据交互的过程中,各种异常和错误都有可能发生,确认异常发生时前后端系统的处理是否合理是测试验证中非常重要的一环。
在上一篇博客中我介绍了如何使用测试桩来隔离对环境的依赖,这次我们一起看看如何使用异常注入来应对联调中的异常场景。
使用iptables实现
在系统异常中,数据库连接失败、第三方服务不可用等都是比较典型的场景。常见的验证手段往往是前端的同学告知后台同学开启网络隔离,然后再进行验证。
利用iptables
丢弃某ip数据包
使用 -L 列出所有规则
具体操作:
$ iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destinationChain FORWARD (policy ACCEPT)
target prot opt source destinationChain OUTPUT (policy ACCEPT)
target prot opt source destination
IP 连通性 通信 测试
# 检查发现能 是否能正常 ping通
$ ping {数据库/后端地址IP}
PING 1.1.1.1 (1.1.1.1) 56(84) bytes of data.
64 bytes from 1.1.1.*: icmp_seq=1 ttl=61 time=0.704 ms
64 bytes from 1.1.1.*: icmp_seq=2 ttl=61 time=0.802 ms
^C
--- 1.1.1.1 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1001ms
rtt min/avg/max/mdev = 0.704/0.753/0.802/0.049 ms
插入一条规则,丢弃此ip 的所有协议请求
$ iptables -I INPUT -p all -s {数据库/后端地址IP} -j DROP
列出所有规则
$ iptables -LChain INPUT (policy ACCEPT)
target prot opt source destination
DROP all -- 1.1.1.* anywhereChain FORWARD (policy ACCEPT)
target prot opt source destinationChain OUTPUT (policy ACCEPT)
target prot opt source destination
测试 丢弃规则内的IP 连通性
$ ping 1.1.1.*
PING 1.1.1.1 (1.1.1.*) 56(84) bytes of data.
^C
--- 1.1.1.1 ping statistics ---
85 packets transmitted, 0 received, 100% packet loss, time 84312ms
清除 规则列表的 限制
$ iptables -F
模拟ip进行丢包50%的处理。
iptables -I INPUT -s {后端IP} -m statistic --mode random --probability 0.5 -j DROP
上面这种方式能实现相关的联调,但是有两点可以改进的地方:
- 这个方式最大的限制是会影响所有的调用这个系统的测试人员。
- 需要人为的介入,每次都需要人为操作
那么有没有侵入更小,更优雅的异常场景验证方式呢?答案是肯定的。
mysql proxy 代理
更优雅的方式:写一个mysql proxy
代理,让后端svr 直接连接这个proxy,proxy再连接真实的mysql。
- 普通请求经过proxy时,proxy直接转发给真实mysql,并把mysql 的回包正常返回给调用端。
- 当收到某个关键字(
timeout
)时,直接断开TCP连接,模拟连接DB超时场景
proxy代码
import asyncio# 真实mysqlIP端口
mysql_host = settings.MYSQL_HOST
mysql_port = settings.MYSQL_PORT# 处理客户端连接
async def handle_connection(client_reader, client_writer):# 连接到实际的 MySQL 服务器mysql_reader, mysql_writer = await asyncio.open_connection(mysql_host, mysql_port)# 转发握手包handshake_packet = await mysql_reader.read(4096)client_writer.write(handshake_packet)await client_writer.drain()# 处理客户端认证请求auth_packet = await client_reader.read(4096)mysql_writer.write(auth_packet)await mysql_writer.drain()# 转发认证响应auth_response = await mysql_reader.read(4096)client_writer.write(auth_response)await client_writer.drain()# 转发请求和响应while True:data = await client_reader.read(4096)if not data:breaksql_query = data[5:].decode('utf-8')if "timeout" in sql_query: # sql 包含timeout 关键字时,直接关闭连接await asyncio.sleep(1)breakelse:mysql_writer.write(data)await mysql_writer.drain()response = await mysql_reader.read(4096)client_writer.write(response)await client_writer.drain()# 关闭连接client_writer.close()mysql_writer.close()async def main(host, port):server = await asyncio.start_server(handle_connection, host, port)async with server:await server.serve_forever()# 使用示例
if __name__ == "__main__":# 本地监听 3307, svr 连接到3307host = "0.0.0.0"port = 3307asyncio.run(main(host, port))
直接使用pymysql 测试
下面的代码会在执行到第二条select 语句时超时:
import pymysqlconnection = pymysql.connect(host="192.168.31.76",port=8899,# 真实mysql 账户信息user="{user}",password="{password}",database="mydb",)curs = connection.cursor()
curs.execute("select * from user where name='bingo';")
print(curs.fetchall())curs.execute("insert into user set name='bingtime';")
connection.commit()curs.execute("select * from user where name='timeoutbingo';")
print(curs.fetchall())
curs.close()
connection.close()
Python 版本低于3.7
低版本的Python没有asyncio.run
和server.serve_forever()
需要修改main函数.
# 主函数,启动服务器并监听连接
async def main(host, port):server = await asyncio.start_server(handle_connection, host, port)try:# Python 3.6.5 中没有 server.serve_forever() 方法,所以需要使用一个无限循环来保持服务器运行。# 我们使用 asyncio.sleep() 来避免阻塞事件循环,使其可以处理其他任务,如新连接。while True:await asyncio.sleep(3600) # 1 hourexcept KeyboardInterrupt:passfinally:server.close()await server.wait_closed()# 使用示例
if __name__ == "__main__":host = "0.0.0.0"port = 3307loop = asyncio.get_event_loop()try:loop.run_until_complete(main(host, port))except KeyboardInterrupt:passfinally:loop.close()
其他扩展
写一个proxy,监听来往的SQL 语句:
import pymysql
import socket
import threading
# 配置
SERVER_ADDRESS = (settings.MYSQL_HOST, settings.MYSQL_PORT) # 真实MySQL 服务器地址
PROXY_ADDRESS = ('0.0.0.0', 8899) # 监听代理服务器地址def print_query(data):try:command = data[4]if command == pymysql.constants.COMMAND.COM_QUERY:query = data[5:].decode("utf-8")print(f"SQL: {query}")except Exception as e:print(f"Error parsing packet: {e}")def forward_data(src_socket, dst_socket, parser=None):while True:try:data = src_socket.recv(4096)if not data:breakif parser:parser(data)dst_socket.sendall(data)except OSError as e:if e.errno == 9:breakelse:raisedef main():# 创建代理服务器套接字proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)proxy_socket.bind(PROXY_ADDRESS)proxy_socket.listen(5)print(f"Proxy server listening on {PROXY_ADDRESS}")while True:client_socket, client_address = proxy_socket.accept()print(f"Client {client_address} connected")# 连接到 MySQL 服务器server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.connect(SERVER_ADDRESS)try:# 创建线程转发客户端数据到服务器client_to_server = threading.Thread(target=forward_data, args=(client_socket, server_socket, print_query))client_to_server.start()# 创建线程转发服务器数据到客户端server_to_client = threading.Thread(target=forward_data, args=(server_socket, client_socket))server_to_client.start()# 等待线程完成client_to_server.join()server_to_client.join()finally:client_socket.close()server_socket.close()print(f"Client {client_address} disconnected")if __name__ == "__main__":main()
如果使用上面的pymysql测试代码执行,会打印如下的信息:
Client ('127.0.0.1', 57184) connected
SQL: SET AUTOCOMMIT = 0
SQL: select * from user where name='bingo';
SQL: insert into user set name='bing21211';
SQL: COMMIT
SQL: select * from user where name='bingo';
SQL: select * from user where name='bingo';
SQL: select * from user where name='timeoutbingo';
Client ('127.0.0.1', 57184) disconnected
总结
通过实现合适的异常处理机制,可以确保用户在遇到问题时获得有用的反馈,验证这些处理机制能提高系统的稳定性和安全性。iptables
功能强大但是需要手动操作,mysql proxy
代理功能直接,但是应用场景较为有限,大家可以根据实际情况进行选择。
资料获取方法
【留言777】
各位想获取源码等教程资料的朋友请点赞 + 评论 + 收藏,三连!
三连之后我会在评论区挨个私信发给你们~