[Go][Python]命令行工具-IP扫描

news/2025/3/25 14:11:17/文章来源:https://www.cnblogs.com/XY-Heruo/p/18788151

简介

前段时间帮朋友调试内网离线环境,其中有个小问题是要扫描局域网下有哪些存活主机,一开始用python脚本实现的,写起来比较简单,性能也还行。后来捣鼓了下又重写一个。

Python版本

并发线程

最开始就是用Python实现的,代码量也不大,临时用用也还不错。

import socket
from concurrent.futures import ThreadPoolExecutordef scan_ip_port(ip: str, port: int, timeout: int = 1) -> bool:"""尝试连接指定的 IP 和端口,返回是否成功连接。"""try:# 创建一个 socket 对象sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(timeout)  # 设置超时时间# 尝试连接result = sock.connect_ex((ip, port))sock.close()# 如果返回 0,表示连接成功return result == 0except Exception:# print(f"Error scanning {ip}:{port}")return Falsedef scan_network(base_ip: str, port: int = 80, max_workers=500):"""扫描局域网内存活的 IP 地址,通过检测指定端口是否开放。"""active_ips = []# 使用线程池并发扫描with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = []for i in range(1, 255):ip = f"{base_ip}.{i}"# 提交任务到线程池future = executor.submit(scan_ip_port, ip, port)futures.append((ip, future))# 获取结果for ip, future in futures:if future.result():print(f"{ip} is active (port {port} is open)")active_ips.append(ip)# else:#     print(f"{ip} is inactive or port {port} is closed")return active_ipsif __name__ == "__main__":# 假设局域网的 IP 段是 192.168.0.xbase_ip = "192.168.0"port = 80active_ips = scan_network(base_ip, port)print(f"Active IPs: {active_ips}")

time工具运行获取执行时间和内存消耗

$ /usr/bin/time -f 'Elapsed Time: %e s Max RSS: %M kbytes' python3 ipscan.py 
192.168.0.101 is active (port 80 is open)
192.168.0.106 is active (port 80 is open)
192.168.0.107 is active (port 80 is open)
Active IPs: ['192.168.0.101', '192.168.0.106', '192.168.0.107']
Elapsed Time: 1.08 s Max RSS: 28220 kbytes

并发异步

让DeepSeek改写成异步方式

import asyncio
import socket
from asyncio import Semaphoreasync def async_scan_ip_port(ip: str, port: int, timeout: int = 1, sem: Semaphore = None) -> bool:"""异步版本端口扫描 (使用asyncio协程实现)"""async with sem:  # 通过信号量控制并发量try:# 创建异步连接reader, writer = await asyncio.wait_for(asyncio.open_connection(ip, port),timeout=timeout)writer.close()await writer.wait_closed()return Trueexcept (socket.error, asyncio.TimeoutError, OSError):return Falseexcept Exception as e:print(f"Unexpected error with {ip}:{port} - {str(e)}")return Falseasync def async_scan_network(base_ip: str, port: int = 80, max_concurrency=500) -> list:"""异步扫描核心逻辑"""sem = Semaphore(max_concurrency)  # 控制最大并发量tasks = []# 创建扫描任务for i in range(1, 255):ip = f"{base_ip}.{i}"task = asyncio.create_task(async_scan_ip_port(ip, port, sem=sem))tasks.append((ip, task))# 收集结果active_ips = []for ip, task in tasks:try:if await task:print(f"{ip} is active (port {port} is open)")active_ips.append(ip)except Exception as e:print(f"Task error for {ip}: {str(e)}")return active_ipsif __name__ == "__main__":# 运行事件循环base_ip = "192.168.0"port = 80active_ips = asyncio.run(async_scan_network(base_ip, port))print(f"Active IPs: {active_ips}")

同样记录下运行时间和内存消耗。运行时间差不多,但内存消耗更低。

$ /usr/bin/time -f 'Elapsed Time: %e s Max RSS: %M kbytes' python3 ipscan_async.py 
192.168.0.101 is active (port 80 is open)
192.168.0.106 is active (port 80 is open)
192.168.0.107 is active (port 80 is open)
Active IPs: ['192.168.0.101', '192.168.0.106', '192.168.0.107']
Elapsed Time: 1.05 s Max RSS: 21508 kbytes

Go

本来打算支持ICMP协议扫描的,但是尝试后有点Bug,所以只实现了TCP扫描。

├── cmd
│   ├── icmp.go
│   ├── root.go
│   └── tcp.go
├── go.mod
├── go.sum
├── main.go
└── pkg├── icmp.go├── tcp.go└── utils.go
  • pkg/utils.go用于解析CIDR地址,获取对应的IP列表
package pkgimport "net"func inc(ip net.IP) {for i := len(ip) - 1; i >= 0; i-- {ip[i]++if ip[i] > 0 {break}}
}func GetIpList(cidr string) ([]string, error) {_, ipNet, err := net.ParseCIDR(cidr)if err != nil {return nil, err}var ips []stringfor ip := ipNet.IP.Mask(ipNet.Mask); ipNet.Contains(ip); inc(ip) {ips = append(ips, ip.String())}// 移除网络地址和广播地址, eg: 192.168.0.0 and 192.168.0.255if len(ips) > 2 {ips = ips[1 : len(ips)-1]}return ips, nil
}
  • pkg/tcp.go实现了Go连接测试
package pkgimport ("fmt""net""time"
)func SendTCP(ip string, port int, timeout int) error {// 直接使用ip:port方式可能对ipv6不生效, 所以使用JoinHostPortconn, err := net.DialTimeout("tcp", net.JoinHostPort(ip, fmt.Sprintf("%d", port)), time.Duration(timeout)*time.Second)if err != nil {// 忽略错误// return fmt.Errorf("failed to dial tcp: %v", err)return nil}defer conn.Close()fmt.Printf("Connect to %s:%d successfully\n", ip, port)return nil
}
  • cmd/tcp.go用于解析命令行参数
package cmdimport ("fmt""ipscan/pkg""sync""github.com/spf13/cobra"
)var (port int
)// tcpCmd represents the tcp command
var tcpCmd = &cobra.Command{Use:   "tcp",Short: "使用TCP协议进行网络探测",Long:  `使用TCP协议进行网络探测`,Example: `./ipscan tcp 192.168.0.0/24./ipscan tcp 192.168.0.0/24 -p 443  # 指定端口号为 443`,Run: func(cmd *cobra.Command, args []string) {for _, arg := range args {ips, err := pkg.GetIpList(arg)if err != nil {fmt.Println(err)continue}timeout, err := cmd.Flags().GetInt("timeout")if err != nil {fmt.Println(err)continue}wg := &sync.WaitGroup{}for _, ip := range ips {wg.Add(1)go func(ip string) {defer wg.Done()err := pkg.SendTCP(ip, port, timeout)if err != nil {fmt.Println(err)}}(ip)}wg.Wait()}},
}func init() {rootCmd.AddCommand(tcpCmd)tcpCmd.Flags().IntVarP(&port, "port", "p", 80, "Port to scan")
}
  • cmd/root.go
package cmdimport ("os""github.com/spf13/cobra"
)// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{Use:   "ipscan",Short: "局域网存活IP扫描器",Long: `扫描网段下所有存活的IP地址`,
}func Execute() {err := rootCmd.Execute()if err != nil {os.Exit(1)}
}func init() {rootCmd.PersistentFlags().IntP("timeout", "t", 1, "timeout")
}
  • main.go
package mainimport "ipscan/cmd"func main() {cmd.Execute()
}

TODO

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/904313.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

原来不是喜欢原子化css,只不过是喜欢tailwind

前言 写css的时候,经常有某个控件只需要些许css样式,但是写行内样式又有优先级问题,从而需要为其单独定义一个class,然而某个控件只是用来布局,没有特定含义,连名称都不好命名。 因此,原子化css应运而生,早期的bootstrap,以及一些组件库中都有使用。 那时叫做工具类,…

c语言分支与循环基础

实验任务一 问题1:生成一个1到100的随机整数 问题2:使输出的整数宽度为4位,不足时在前面补0 问题3:循环生成五个1到100的随机整数,与固定前缀组合后输出类似学员编号的内容实验任务2 问题一:在一次购买流程结束后,清除本次购买的总价,下次运行能重新计算。去掉的话,总…

梯度方差的概念

梯度方差的概念 内容 在深度学习中,梯度方差(Gradient Variance) 是一个关键概念,它直接影响模型的训练稳定性和收敛速度。以下用通俗的语言和实际例子解释它的含义、作用及影响。1. 什么是梯度方差?定义: 梯度方差表示 不同批次数据计算出的梯度之间的波动程度。 如果每…

解决方案 | 如何安全可靠地更改win10的C盘用户名

有的朋友可能最开始由于不知道使用中文名在编程中的各种bug,从而将自己的系统用户名设置成了中文名或者各种奇怪符号的名字,导致在英文编程的时候或者使用英文软件的时候可能出错。为了解决这个问题,网上的文章写得又臭又长,生怕别人看懂学到了技术。本文目的:实现原用户名…

20244106 实验一《Python程序设计》实验报告

20244106 2024-2025-2 《Python程序设计》实验一报告 课程:《Python程序设计》 班级:2441 姓名:孙诗棋 学号:20244106 实验教师:王志强 实验日期:2025年3月20日 必修/选修: 公选课 1.实验内容熟悉Python开发环境; 练习Python运行、调试技能; 编写程序,练习变量和类型…

Nature Communications | 单细胞表观图谱破解颅神经发育疾病非编码变异之谜

摘要总结 这篇文章是2024年9月发表在《Nature Communications》杂志上的一篇研究,标题为“A cell type-aware framework for nominating non-coding variants in Mendelian regulatory disorders”。这篇文章通过整合小鼠胚胎颅运动神经元的单细胞染色质可及性、组蛋白修饰和基…

IOC容器启动及Bean生成流程

目录 一、容器启动IOC启动流程重点二、扫描并注册BeanDefination加载并过滤资源注册BeanDefination三、BeanFactory后置处理 四、注册Bean后置处理器 五、遍历BeanDefination,实例化单例BeanpreInstantiateSingletonsdoGetBean(我们只关注单例)createBean实例化前执行doCrea…

关于QQ提示非官方正版应用

笔者是magisk+lsp+zygisk+shamiko环境,依然是被制裁了,momo检测只有Bootloader未锁定。 也没想在手机上登录QQ(已经摆烂,反正现在工作了,基本都用微信了),只是想在手机打两把王者,登录王者时选择用ipad扫码登录(只是授权登录) 这时候提示我 sign of app is error(100…

苍穹外卖-day04

day-04 25-3-20 新增套餐 需求分析&设计业务规则套餐名称唯一 套餐必须属于某个分类 套餐必须包含菜品 名称、分类、价格、图片为必填项 添加菜品窗口需要根据分类类型来展示菜品 新增的套餐默认为停售状态接口设计(共涉及到4个接口):根据类型查询分类(已完成) 根据分…

提示词工程师自白:我如何用一个技巧解放自己的生产力

“在AI时代的交响乐中,提示词工程师是默默无闻却至关重要的指挥家,用精心编织的语言指引大模型这个智能巨兽创造出人类思维的奇迹。” AI粉嫩特攻队,2025年3月22日。 自从新的生产范式诞生以来,我的工作中多了一项新身份——提示工程师。 在不同的大语言模型之间穿梭,寻求…