20. 从零用Rust编写正反向代理,四层反向代理stream(tcp与udp)实现

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

四层代理

四层代理,也称为网络层代理,是基于IP地址和端口号的代理方式。它只关心数据包的源IP地址、目的IP地址、源端口号和目的端口号,不关心数据包的具体内容。四层代理主要通过报文中的目标地址和端口,再加上负载均衡设备设置的服务器选择方式,决定最终选择的内部服务器。

因为四层代理不用处理任何相关的包信息,只需将包数据传递给正确的服务器即可,所以实现相对比较简单。

以下是OSI七层模型的示意图,来源于网上

图片.png

实现方式

双端建立连接,也就是收到客户端的连接的时候,同时建立一条通往服务端的连接,然后做双向绑定即可完成服务。

四层代理还有udp的转发需求,需要同步将udp的数据进行转发,udp的处理方式处理会相对复杂一些,因为当前地址只有绑定一份,但是可能来自各种不同的地址,不同的客户端的(remote_ip, remote_port)我们需要当成一个全新的客户端。

而且有时候无法主动感知是否已经被断开了,所以也必须有超时机制,好在超时的时候能及时释放掉连接,好让系统及时的socket资源。

TCP实现

tcp找到相应的地址,连接,并双向绑定即可

pub async fn process<T>(data: Arc<Mutex<StreamConfig>>,local_addr: SocketAddr,mut inbound: T,_addr: SocketAddr,
) -> ProxyResult<()>
whereT: AsyncRead + AsyncWrite + Unpin + std::marker::Send + 'static,
{let value = data.lock().await;for (_, s) in value.server.iter().enumerate() {if s.bind_addr.port() == local_addr.port() {let addr = ReverseHelper::get_upstream_addr(&s.upstream, "")?;let mut connect = HealthCheck::connect(&addr).await?;copy_bidirectional(&mut inbound, &mut connect).await?;break;}}Ok(())
}
UDP实现

UDP相对比较复杂,下面我们先列举内部的流程图

根据地址连接发送数据到
将Receiver传到以接收数据
否,将数据Sender给
异步读取数据并发送
绑定反向udp端口
客户端
是否第一次
创建异步协程
异步协程中

在stream绑定的时候,要区分出TCP还是UDP的,做分别的绑定

/// stream的绑定,按bind_mode区分出udp或者是tcp,返回相应的列表
pub async fn bind(&mut self) -> ProxyResult<(Vec<TcpListener>, Vec<StreamUdp>)> {let mut listeners = vec![];let mut udp_listeners = vec![];let mut bind_port = HashSet::new();for value in &self.server.clone() {if bind_port.contains(&value.bind_addr.port()) {continue;}bind_port.insert(value.bind_addr.port());if value.bind_mode == "udp" {let listener = Helper::bind_upd(value.bind_addr).await?;udp_listeners.push(StreamUdp::new(listener, value.clone()));} else {let listener = Helper::bind(value.bind_addr).await?;listeners.push(listener);}}Ok((listeners, udp_listeners))
}

我们会对连接做分别的监听,下面是udp的获取是否有新数据:

async fn multi_udp_listen_work(listens: &mut Vec<StreamUdp>,
) -> (io::Result<(Vec<u8>, SocketAddr)>, usize) {if !listens.is_empty() {let (data, index, _) =select_all(listens.iter_mut().map(|listener| {listener.next().boxed()})).await;if data.is_none() {return (Err(io::Error::new(io::ErrorKind::InvalidInput, "read none data")), index)}(data.unwrap(), index)} else {let pend = std::future::pending();let () = pend.await;unreachable!()}
}

此处我们用next,也就是我们实现了 futures_core::Stream接口,用Poll的方式来注册实现有事件的时候来通知。

在tokio中,在read或者write的时候返回Poll::Pending,将会将socket的可读可写注册到底层,如果一旦系统可读可写就会通知该接口,将会重新执行一遍futures_core::Stream

我们将同时可以处理可读可写可发送事件,如果接口超时我们将关闭相应的接口。

impl Stream for StreamUdp {type Item = io::Result<(Vec<u8>, SocketAddr)>;fn poll_next(mut self: std::pin::Pin<&mut Self>,cx: &mut std::task::Context<'_>,) -> std::task::Poll<Option<Self::Item>> {let _ = self.poll_write(cx)?;let _ = self.poll_sender(cx)?;self.poll_read(cx)}
}

下面是主要的StreamUdp

/// Udp转发的处理结构,缓存一些数值以做中转
pub struct StreamUdp {/// 读的缓冲类,避免每次都释放pub buf: BinaryMut,/// 核心的udp绑定端口pub socket: UdpSocket,pub server: ServerConfig,/// 如果接收该数据大小为0,那么则代表通知数据关闭pub receiver: Receiver<(Vec<u8>, SocketAddr)>,/// 将发送器传达给每个子协程pub sender: Sender<(Vec<u8>, SocketAddr)>,/// 接收的缓存数据,无法保证全部直接进行发送完毕pub cache_data: LinkedList<(Vec<u8>, SocketAddr)>,/// 发送的缓存数据,无法保证全部直接进行发送完毕pub send_cache_data: LinkedList<(Vec<u8>, SocketAddr)>,/// 每个地址绑定的对象,包含Sender,最后操作时间,超时时间remote_sockets: HashMap<SocketAddr, InnerUdp>,
}

结果测试

我们自己开一个udp服务端,绑定了本地的8089,我们将接收到的数据前面加上from server:并进行返回,代理端我们绑定了84的端口,并将udp数据转发给8089端:

use tokio::net::UdpSocket;
use std::io;#[tokio::main]
async fn main() -> io::Result<()> {let sock = UdpSocket::bind("0.0.0.0:8089").await?;let mut buf = [0; 1024];loop {let (len, addr) = sock.recv_from(&mut buf).await?;let mut vec = "from server: ".as_bytes().to_vec();vec.extend(&buf[..len]);let _ = sock.send_to(&vec, addr).await?;}
}

客户端我们用nc运行:

图片.png

可以看出两个客户端互相独立,彼此返回的数据均符合预期,正常的接收及返回。

TCP我们绑定了83端口并转发到HTTP的本地端口8080,我们用curl进行测试,符合预期,如图:

图片.png

结语

至此四层的反向代理TCP/UDP均已完成,也符合预期。

点击 [关注][在看][点赞] 是对作者最大的支持

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/414714.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GPT应用程序上线注意的问题

在将GPT应用程序上线之前&#xff0c;有一些重要的问题需要注意&#xff0c;以确保应用程序的成功运行、用户满意度和合规性。以下是一些建议&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 合规性和…

如何查看iPad尺寸,这里提供两种办法

构成iPad尺寸的因素包括屏幕大小、宽度、深度和高度。由于iPad有不同的尺寸&#xff0c;你可以毫不费力地测量自己的尺寸。 苹果的iPad是当今最畅销的小工具之一。它是笔记本电脑的绝佳替代品&#xff0c;非常适合完成工作、看电影和上网。然而&#xff0c;出于各种目的&#…

STM32-调用 vTaskStartScheduler API 后出现 HardFault

STM32 移植 FreeRTOS 后调用 vTaskStartScheduler() 后出现 HardFault 异常。 原因分析&#xff1a; FreeRTOS 配置头文件 FreeRTOSConfig.h 中与中断有关的配置和通过系统接口 void NVIC_PriorityGroupConfig(uint32_t NVIC_PriorityGroup) 设置的中断分组冲突。 /* The lo…

Spring Security工作原理(一)

过滤器 Spring Security的Servlet支持是基于Servlet过滤器的&#xff0c;因此首先了解过滤器的一般作用是很有帮助的。下图显示了单个HTTP请求处理程序的典型分层结构。 处理客户端发送的请求时&#xff0c;容器创建一个FilterChain&#xff0c;其中包含Filter实例和Servlet&a…

【C++】:STL序列式容器list源码剖析

一、list概述 总的来说&#xff1a;环形双向链表 特点&#xff1a; 底层是使用链表实现的&#xff0c;支持双向顺序访问 在list中任何位置进行插入和删除的速度都很快 不支持随机访问&#xff0c;为了访问一个元素&#xff0c;必须遍历整个容器 与其他容器相比&#xff0c;额外…

基于Python的Climate Indices库计算SPEI(标准化降水蒸散发指数)05—栅格SPEI的计算

热闹的尽头是孤寂&#xff0c;在虚浮的欢闹中保持自己&#xff0c;纷繁世间&#xff0c;可报期望者不过二三。 文章目录 前言1. 概述2.1 目的2.2 说明 2. 版本2.1 天津&#xff0c;2024年1月18日&#xff0c;Version1 3. 微信公众号GISRSGeography 一、数据1. 输入数据2. 输出…

MySQL---经典SQL练习题

MySQL---经典50道练习题 素材:练习题目&#xff1a;解题&#xff1a; 素材: 1.学生表 Student(SId,Sname,Sage,Ssex) SId 学生编号,Sname 学生姓名,Sage 出生年月,Ssex 学生性别 2.课程表 Course(CId,Cname,TId) CId 课程编号,Cname 课程名称,TId 教师编号 3.教师表 Teacher(T…

Spring重要知识点

一、Spring中相关概念 1.IOC 控制反转 IoC&#xff08;Inverse of Control:控制反转&#xff09;是⼀种设计思想&#xff0c;就是将原本在程序中⼿动创建对象的控制权&#xff0c;交由Spring框架来管理。IoC 在其他语⾔中也有应⽤&#xff0c;并⾮ Spring 所独有。 IoC 容器…

ADSelfService Plus 推出离线多因素身份验证以提升远程工作安全性

采用先进验证方法&#xff0c;确保在任何时间、地点或连接问题下对业务数据的合法访问即使远程用户未连接到身份验证服务器或互联网&#xff0c;也可通过MFA安全认证。 MFA 得克萨斯州德尔瓦雷 — 2023年5月3日 — Zoho Corporation 旗下的企业IT管理部门ManageEngine今日宣布…

加速电压对扫描电子显微镜成像的影响

扫描电子显微镜&#xff08;SEM&#xff09;是一种利用聚焦电子束扫描样品表面&#xff0c;通过激发和收集二次电子、特征X射线等信号&#xff0c;获得样品表面形貌和成分信息的分析仪器。在SEM成像过程中&#xff0c;加速电压是一个关键参数&#xff0c;对成像效果具有重要影响…

WordPress回收站自动清空时间?如何关闭回收站或设置自动清理天数?

我们在WordPress后台的文章、页面、评论等页面都可以看到有回收站&#xff0c;意思就是我们不能直接删除某篇文章、页面、评论&#xff0c;而是需要现将它们移至回收站&#xff0c;然后再到回收站永久删除&#xff0c;或等回收站自动清理。 如上图所示&#xff0c;WordPress 6.…

2023 总结对AI的总结和展望

回顾ChatGPT最初 今天是AI最火的一年&#xff0c;从年初的时候OpenAI一下子火起来了&#xff0c;大家都在测试ChatGPT的智力如何&#xff0c;能力如何&#xff0c;各种视频铺天盖地的。各种测评视频大量散布在网络上面&#xff0c;一开始我只是认为他只是一个聊天小助手比较智能…