Python8

news/2025/3/30 17:46:13/文章来源:https://www.cnblogs.com/elmhome/p/18795762

tcp服务器

v1

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定地址,127.0.0.1也可不填为空
# sk.bind(('', 5000)) # 与下相同
sk.bind(('127.0.0.1', 5000))# 监听连接请求
sk.listen(5)    # 半连接池大小
print('服务端启动成功,在5000端口等待客户端连接')# 取出连接请求,开始服务
conn,addr = sk.accept()
print('连接对象:',conn)
print('客户端ip+端口:',addr)# 数据传输
data = conn.recv(1024) # 接收
data = data.decode('utf-8')
print('客户端发送的数据:',data)conn.send(data.upper().encode('utf-8')) # 发送# 结束服务
conn.close()# 关闭服务端
# sk.close()

v2

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定地址
sk.bind(('127.0.0.1', 5000))# 监听连接请求
sk.listen(5)    # 半连接池大小
print('服务端启动成功,在5000端口等待客户端连接')# 取出连接请求,开始服务
conn,addr = sk.accept()
print('连接对象:',conn)
print('客户端ip+端口:',addr)# 数据传输
while True:data = conn.recv(1024) # 接收data = data.decode('utf-8')if data == 'q':breakprint('客户端发送的数据:',data)conn.send(data.upper().encode('utf-8')) # 发送# 结束服务
conn.close()# 关闭服务端
# sk.close()

v3

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定地址
sk.bind(('127.0.0.1', 5000))# 监听连接请求
sk.listen(5)    # 半连接池大小
print('服务端启动成功,在5000端口等待客户端连接')# 取出连接请求,开始服务
conn,addr = sk.accept()
print('连接对象:',conn)
print('客户端ip+端口:',addr)# 数据传输
while True:try:data = conn.recv(1024) # 接收except:break# Linux和mac系统上使用的退出if not data:breakdata = data.decode('utf-8')print('客户端发送的数据:',data)conn.send(data.upper().encode('utf-8')) # 发送# 结束服务
conn.close()# 关闭服务端
# sk.close()

v4

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定地址
sk.bind(('127.0.0.1', 5000))# 监听连接请求
sk.listen(5)    # 半连接池大小
print('服务端启动成功,在5000端口等待客户端连接')# 取出连接请求,开始服务
conn,addr = sk.accept()
print('连接对象:',conn)
print('客户端ip+端口:',addr)# 数据传输
while True:try:data = conn.recv(1024) # 接收except:break# Linux和mac系统上使用的退出if not data:breakdata = data.decode('utf-8')print('客户端发送的数据:',data)conn.send(data.upper().encode('utf-8')) # 发送# 结束服务
conn.close()# 关闭服务端
# sk.close()

v5

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定地址
sk.bind(('127.0.0.1', 5000))# 监听连接请求
sk.listen(5)    # 半连接池大小
print('服务端启动成功,在5000端口等待客户端连接')# 取出连接请求,开始服务
# 服务多个对象
while True:conn,addr = sk.accept()print('连接对象:',conn)print('客户端ip+端口:',addr)# 数据传输while True:try:data = conn.recv(1024) # 接收except:break# Linux和mac系统上使用的退出if not data:breakdata = data.decode('utf-8')print('客户端发送的数据:',data)conn.send(data.upper().encode('utf-8')) # 发送# 结束服务conn.close()# 关闭服务端
# sk.close()

并发手动实现

# 进程实现
import socket
from multiprocessing import Process# socket.socket(socket.AF_INET,socket.SOCK_STREAM) #流式协议, TCP协议
s = socket.socket() # 同上# 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(('127.0.0.1',5000))s.listen(5)def task(conn):while True:try:data = conn.recv(1024)except:breakif not data:breakprint(data.decode('utf-8'))conn.send(data.upper())conn.close()if __name__ == "__main__":while True:conn, addr = s.accept()p = Process(target=task,args=(conn,))p.start()# 线程实现
import socket
from threading import Thread# socket.socket(socket.AF_INET,socket.SOCK_STREAM) #流式协议, TCP协议
s = socket.socket() # 同上# 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
s.bind(('127.0.0.1',5000))s.listen(5)def task(conn):while True:try:data = conn.recv(1024)except:breakif not data:breakprint(data.decode('utf-8'))conn.send(data.upper())conn.close()if __name__ == "__main__":while True:conn, addr = s.accept()p = Thread(target=task,args=(conn,))p.start()

面向对象编程方法

# 多进程web服务器
import socket
import multiprocessing
import sysclass StaticWebServer(object):# 初始化StaticWebServer对象,创建socket实例对象,并放入StaticWebServer对象当中def __init__(self,port:int) -> None:self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,True)self.server.bind(('',port))self.server.listen(128)# 启动函数def start(self):# 循环等待客户端连入while True:client,ip_port = self.server.accept()print(ip_port[0],"通过",ip_port[1],"连接成功")# 设置多进程,将服务分发给其他进程p = multiprocessing.Process(target=self.task,args=(client,))p.start()client.close()# 关闭字符套接字self.server.close# 处理函数def task(self,client):# 设置接收请求,且请求最大字节为1024request_data = client.recv(1024).decode()# 字符判断如果是0直接退出if len(request_data) == 0:client.close()# 将请求的位置进行截取出else:request_path = request_data.split(' ')[1]print('请求的位置是',request_path)# 字符判断如果是"/"根目录,则path为'/index.html'if request_path == '/':request_path = '/index.html'# 尝试打开文件返回,打开成功执行else,失败执行except,最后执行finallytry:with open('static' + request_path , 'rb') as file:file_content = file.read()except Exception as e:response_line = 'HTTP/1.1 404 NOT FOUND\r\n'response_head = 'Server: PSWS1.1\r\n'with open('static/erroe.html','r',encoding="utf-8") as f:error_data = f.read()response_data = (response_line + response_head + '\r\n' + error_data).encode('utf-8')client.send(response_data)else:response_line = 'HTTP/1.1 200 OK\r\n'response_head = 'Server: PSWS1.1\r\n'with open('static' + request_path , 'rb') as f:response_body = f.read()response_data = (response_line + response_head + '\r\n').encode('utf-8') + response_bodyclient.send(response_data)finally:client.close()if __name__ == "__main__":# 用sys库给port赋值,第0个是执行的文件,取第1个赋值给portif len(sys.argv) != 2:print('输入错误')exit(1)if sys.argv[1].isdigit():port = int(sys.argv[1])StaticWebServer(port).start()else:print('输入错误')# 多线程web服务器
import socket
import threadingclass ScoketServer(object):def __init__(self,port) -> None:# 创建socket实例,参数1代表IPv4,参数2代表tcpself.tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 设置端口号复用,程序退出端口号立即释放self.tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,True)# 对socket指定连接对象,一个人对象是一个元组包含IP地址和端口号,空IP为本地self.tcp_server_socket.bind(("",port))# 设置监听,最大等待128个,设置listen后为被动模式,不能connect主动连接self.tcp_server_socket.listen(128)# 客户端的启动函数def start(self):self.service_client_socket.accept()# 循环等到客户端的连接请求while True:# 等待客户端的连接service_client_socket,ip_port = self.tcp_server_socket.accept()print("客户端连接上来",ip_port)# 创建子线程,将连接上来的客户端放入,子线程操作接收客户端的数据# 设置守护主线程sub_thread = threading.Thread(target=self.handle_client_request,args=(service_client_socket,ip_port),daemon=True)# 开启子线程sub_thread.start()# 关闭tcp服务器的套接字tcp_server_socket.close()# 处理函数,当客户端连接上后进行交互def handle_client_request(self,service_client_socket,ip_port):# 循环接收客户端发送的请求while True:# 接收请求,且请求最大为1024recv_data = service_client_socket.recv(1024)# 判断客户端是否存活,使用if语句对数据进行判断,有数据存活,没有数据死亡if recv_data:print(recv_data.decode('gbk'),ip_port)service_client_socket.send("ok,问题正在处理...".encode("gbk"))else:print("客户端下线",ip_port)break# 终止客户端的连接service_client_socket.close()if __name__ == "__main__":# 携带数据,实例一个ScoketServerserver = ScoketServer(8080)# 运行ScoketServer下的start函数server.start()

并发socketserver

import socketserverclass RequestHandle(socketserver.BaseRequestHandler):def handle(self): # 这个函数的名字不能改变# 这个是一个连接对象,针对于tcp服务,udp没有连接print(self.request) # self.request => connprint(self.client_address)# 数据传输while True:try:data = self.request.recv(1024)except:breakif not data:breakdata = data.decode('utf-8')print('客户端发送的数据',data)self.request.send(data.upper().encode('utf-8'))# 结束服务self.request.close()'''
Threading线程的意思 
Forking进程的意思 windwos系统上不支持多进程,因为创建多进程需要调用系统接口进程调用的是 os.fork()这个接口,而os.fork()是针对unix系统发起的造进程的系统调用windows系统多进程调用的不是这几个接口。
'''
sk = socketserver.ThreadingTCPServer(('127.0.0.1',5000),RequestHandle) # 多线程
sk.serve_forever() # => while True: conn,addr = sk.accept() 只负责从半连接池中获取对象,每获取一个对象就会启动一个线程

并发非阻塞IO手动

import socketserver = socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False) # 所有的网络阻塞都会变成非阻塞c_list = []
d_lis = []
while True:try:conn, addr = server.accept()c_list.append(conn)except BlockingIOError:for conn in c_list:try:data = conn.recv(1024)if not data:conn.close()d_lis.append(conn)conn.send(data.upper())except BlockingIOError:passexcept ConnectionResetError:conn.close()d_lis.append(conn)for conn in d_lis:c_list.remove(conn)d_lis.clear()

并发selectors

import socket
import selectorsdef accept(server):conn,addr = server.accept()sel.register(conn,selectors.EVENT_READ,read)def read(conn):try:data = conn.recv(1024)if not data:conn.close()sel.unregister(conn)returnconn.send(data.upper())except ConnectionResetError:conn.close()sel.unregister(conn)returnserver = socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
server.setblocking(False)sel = selectors.DefaultSelector()
sel.register(server,selectors.EVENT_READ,accept)while True:events = sel.select() # linux/mac  sel.epoll()for key,mask in events:callback = key.datacallback(key.fileobj)

并发异步IO手动

import socket
import asyncioasync def waiter(conn,loop):while True:try:data = await loop.sock_recv(conn,1024)if not data:breakawait loop.sock_sendall(conn,data.upper())except ConnectionResetError:breakconn.close()async def main(ip,port):server = socket.socket()server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)server.bind((ip,port))server.listen(5)server.setblocking(False) # 非阻塞loop = asyncio.get_running_loop()while True:conn, addr = await loop.sock_accept(server)# 创建task任务loop.create_task(waiter(conn,loop))asyncio.run(main('127.0.0.1',8000))

并发异步IOuvloop

import socket
import asyncio
# import uvloop # 目前不支持windows  
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # 但是将asyncio修改为uvloop只需要添加这一行代码即可
# pip install uvloopasync def waiter(conn,loop):while True:try:data = await loop.sock_recv(conn,1024)if not data:breakawait loop.sock_sendall(conn,data.upper())except ConnectionResetError:breakconn.close()async def main(ip,port):server = socket.socket()server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)server.bind((ip,port))server.listen(5)server.setblocking(False) # 非阻塞loop = asyncio.get_running_loop()while True:conn, addr = await loop.sock_accept(server)# 创建task任务loop.create_task(waiter(conn,loop))asyncio.run(main('127.0.0.1',8000))

tcp客户端

v1

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 建立连接
sk.connect(('127.0.0.1', 5000))# 传输数据
msg = input('请输入>>').strip()
sk.send(msg.encode('utf-8'))data = sk.recv(1024) # 接收
print('客户端发送的数据:',data.decode('utf-8'))# 关闭连接
sk.close()

v2

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 建立连接
sk.connect(('127.0.0.1', 5000))# 传输数据
while True:msg = input('请输入>>').strip()sk.send(msg.encode('utf-8'))if msg == 'q':breakdata = sk.recv(1024) # 接收print('客户端发送的数据:',data.decode('utf-8'))# 关闭连接
sk.close()

v3

import socket# 创建socket对象
sk = socket.socket() # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 建立连接
sk.connect(('127.0.0.1', 5000))# 传输数据
while True:msg = input('请输入>>').strip()sk.send(msg.encode('utf-8'))# 防止发空if not msg:continueif msg == 'q':breakdata = sk.recv(1024) # 接收print('客户端发送的数据:',data.decode('utf-8'))# 关闭连接
sk.close()

udp服务器

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议(udp协议) # 每次都会重用绑定好的ip+端口,使用后不会出现端口占用问题,在不熟悉套接字之前不要使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定地址
sk.bind(('127.0.0.1', 5000))# 取出连接请求,开始服务# 数据传输
while True:data, addr = sk.recvfrom(1024) # 接收print('客户端发送的数据:',data.decode())sk.sendto(data.upper(),addr) # 发送# 关闭服务端
# sk.close()

udp客户端

import socket# 创建socket对象
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # SOCK_STREAM tcp协议 SOCK_DGRAM 数据报协议 # 传输数据
while True:msg = input('请输入>>').strip()sk.sendto(msg.encode('utf-8'),('127.0.0.1',5000))if msg == 'q':breakdata, addr = sk.recvfrom(1024) # 接收print('客户端发送的数据:',data.decode('utf-8'))# 关闭连接
sk.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/906643.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

导入SpaceClaim的iges模型尺寸被放大1000倍的问题

ANSYS APDL 和 Workbench 联合仿真中,导入 Workbench 的 iges 模型尺寸被放大1000倍问题的解决方案问题 ANSYS APDL 和 Workbench 联合仿真时,导入 SpaceClaim 的 .iges 模型尺寸被放大 1000 倍数。 如 APDL 生成的尺寸为 10 mm(注:此处的 mm 是在 APDL 编码中设置的一致单…

Linux的vim编辑器

Linux的vi/vim编辑器 命令模式 编辑模式 输入模式 vi/ vim编辑器的基本操作Linux的vi/vim编辑器基本概念:vim文本编辑器,是由vi编辑器发展演变过来的文本编辑器,因其具有使用简单、功能强大、是 Linux 众多发行版的默认文本编辑器。很多人习惯将 vim 称为 vi,其实 vim 是 v…

高度场流体模拟

【USparkle专栏】如果你深怀绝技,爱“搞点研究”,乐于分享也博采众长,我们期待你的加入,让智慧的火花碰撞交织,让知识的传递生生不息!一、原理参考这个论文:《Real-time Simulation of Large Bodies of Water with Small Scale Details》 核心是这两个公式: 我在这篇《…

Navicat将微软数据库MS-SQLServer表内容导入MySQL数据库

前言全局说明一、说明 1.1 环境: Windows 7 旗舰版 MSSQL 2012 Navicat for MySQL 10.1.7二、MySQL准备 用 Navicat 在 mysql 新建数据库,要和 MSSQL 数据库同名注意:编码也要一致2.1 mysql 新建数据 空白处新,建 test 数据库,2.2 数据库右键查看在mysql里新建数据库编码三…

深度解析:通过 AIBrix 多节点部署 DeepSeek-R1 671B 模型

本文详细介绍了如何通过 AIBrix 分布式推理平台实现 DeepSeek-R1 671B 的多节点部署。DeepSeek-R1 通过渐进式训练框架展现出优秀的逻辑推理能力 —— 在 6710 亿总参数量中,其动态激活的 370 亿参数与 128k 上下文窗口,使其在复杂任务处理中表现卓越。然而,如此庞大的模型规…

玄机靶场 第一章 应急响应-webshell查杀

玄机靶场 第一章 应急响应-webshell查杀 1.黑客webshell里面的flag flag2. 黑客使用的什么工具的shell github地址的md5 flag 哥斯拉webshell的特征3.黑客隐藏shell的完整路径的md5 flag{md5} 注 : /xxx/xxx/xxx/xxx/xxx.xxx 发现隐藏4.黑客免杀马完整路径 md5 flag 查看这是一…

玄机靶场 第一章 应急响应-Linux日志分析

玄机靶场 第一章 应急响应-Linux日志分析 1.有多少IP在爆破主机ssh的root帐号,如果有多个使用","分割 /var/log/auth.log里面存放了相关的登录信息 直接下载看根据user=root发现三个ip 网上发现神奇妙妙脚本 cat auth.log.1 | grep -a "Failed password for ro…

【每日一题】20250327

改变不了的事,不值得烦恼。【每日一题】 1.(15分) \(\hspace{0.7cm}\)已知数列 \(\{a_n\}\),\(\{b_n\}\) 和 \(\{c_n\}\) 满足: \[a_{n+1}=\frac14b_n \]\[b_{n+1}=a_n+c_n+\frac12b_n \]\[c_{n+1}=\frac14b_n \]且 \(a_n+b_n+c_{n}=1\),\(a_1=0\),\(b_1=1\),\(\displa…

必看!SpringAI轻松构建MCP Client-Server架构

MCP 这个概念相信大家已经听了无数次了,但不同人会有不同的解释,你可能也是听得云里雾里的。 不过没关系,今天这篇内容会通过 Spring AI 给你实现一个 MCP 的 Client 和 Server 架构,让你彻底搞懂 MCP 的概念,以及学会 MCP 的开发技能。 什么是MCP? MCP 是 Model Context…

国产服务器操作系统CTyunOS,技能值拉满!

新一轮科技革命和产业变革深入发展,数字经济迎来新的发展机遇。其中,以操作系统为代表的关键基础软件是新一代信息技术的灵魂,它不仅构筑起信息技术领域的根基磐石,更在守护关键信息基础设施的安全防线、实现核心技术自主可控中,发挥着不可替代的作用。 作为云服务国家队…

CSS 实现自定义滚动条样式

CSS 实现自定义滚动条样式CSS 实现自定义滚动条样式 要兼容主流浏览器(Chrome、Safari、Edge、Firefox 等),需要综合使用 Webkit 的 ::-webkit-scrollbar 伪元素和 Firefox 支持的 scrollbar-width 以及 scrollbar-color 属性。由于目前主流浏览器中只有 Webkit 内核和 Fire…

重置OpenEuler操作系统的root管理员密码

(1)启动OpenEuler系统或在终端执行重启系统命令reboot,在系统引导界面按“e”键进入内核编辑界面。成功进入内核编辑模式界面,要求输入username,这里输入root,然后要求输入密码,默认密码为openEuler#12。(2)用户名和密码输入正确后进入引导编辑界面:(3)按向下光标,…