简介
前段时间帮朋友调试内网离线环境,其中有个小问题是要扫描局域网下有哪些存活主机,一开始用python脚本实现的,写起来比较简单,性能也还行。后来捣鼓了下又重写一个。
Python版本
并发线程
最开始就是用Python实现的,代码量也不大,临时用用也还不错。
import socket
from concurrent.futures import ThreadPoolExecutordef scan_ip_port(ip: str, port: int, timeout: int = 1) -> bool:"""尝试连接指定的 IP 和端口,返回是否成功连接。"""try:# 创建一个 socket 对象sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(timeout) # 设置超时时间# 尝试连接result = sock.connect_ex((ip, port))sock.close()# 如果返回 0,表示连接成功return result == 0except Exception:# print(f"Error scanning {ip}:{port}")return Falsedef scan_network(base_ip: str, port: int = 80, max_workers=500):"""扫描局域网内存活的 IP 地址,通过检测指定端口是否开放。"""active_ips = []# 使用线程池并发扫描with ThreadPoolExecutor(max_workers=max_workers) as executor:futures = []for i in range(1, 255):ip = f"{base_ip}.{i}"# 提交任务到线程池future = executor.submit(scan_ip_port, ip, port)futures.append((ip, future))# 获取结果for ip, future in futures:if future.result():print(f"{ip} is active (port {port} is open)")active_ips.append(ip)# else:# print(f"{ip} is inactive or port {port} is closed")return active_ipsif __name__ == "__main__":# 假设局域网的 IP 段是 192.168.0.xbase_ip = "192.168.0"port = 80active_ips = scan_network(base_ip, port)print(f"Active IPs: {active_ips}")
用time
工具运行获取执行时间和内存消耗
$ /usr/bin/time -f 'Elapsed Time: %e s Max RSS: %M kbytes' python3 ipscan.py
192.168.0.101 is active (port 80 is open)
192.168.0.106 is active (port 80 is open)
192.168.0.107 is active (port 80 is open)
Active IPs: ['192.168.0.101', '192.168.0.106', '192.168.0.107']
Elapsed Time: 1.08 s Max RSS: 28220 kbytes
并发异步
让DeepSeek改写成异步方式
import asyncio
import socket
from asyncio import Semaphoreasync def async_scan_ip_port(ip: str, port: int, timeout: int = 1, sem: Semaphore = None) -> bool:"""异步版本端口扫描 (使用asyncio协程实现)"""async with sem: # 通过信号量控制并发量try:# 创建异步连接reader, writer = await asyncio.wait_for(asyncio.open_connection(ip, port),timeout=timeout)writer.close()await writer.wait_closed()return Trueexcept (socket.error, asyncio.TimeoutError, OSError):return Falseexcept Exception as e:print(f"Unexpected error with {ip}:{port} - {str(e)}")return Falseasync def async_scan_network(base_ip: str, port: int = 80, max_concurrency=500) -> list:"""异步扫描核心逻辑"""sem = Semaphore(max_concurrency) # 控制最大并发量tasks = []# 创建扫描任务for i in range(1, 255):ip = f"{base_ip}.{i}"task = asyncio.create_task(async_scan_ip_port(ip, port, sem=sem))tasks.append((ip, task))# 收集结果active_ips = []for ip, task in tasks:try:if await task:print(f"{ip} is active (port {port} is open)")active_ips.append(ip)except Exception as e:print(f"Task error for {ip}: {str(e)}")return active_ipsif __name__ == "__main__":# 运行事件循环base_ip = "192.168.0"port = 80active_ips = asyncio.run(async_scan_network(base_ip, port))print(f"Active IPs: {active_ips}")
同样记录下运行时间和内存消耗。运行时间差不多,但内存消耗更低。
$ /usr/bin/time -f 'Elapsed Time: %e s Max RSS: %M kbytes' python3 ipscan_async.py
192.168.0.101 is active (port 80 is open)
192.168.0.106 is active (port 80 is open)
192.168.0.107 is active (port 80 is open)
Active IPs: ['192.168.0.101', '192.168.0.106', '192.168.0.107']
Elapsed Time: 1.05 s Max RSS: 21508 kbytes
Go
本来打算支持ICMP协议扫描的,但是尝试后有点Bug,所以只实现了TCP扫描。
├── cmd
│ ├── icmp.go
│ ├── root.go
│ └── tcp.go
├── go.mod
├── go.sum
├── main.go
└── pkg├── icmp.go├── tcp.go└── utils.go
pkg/utils.go
用于解析CIDR地址,获取对应的IP列表
package pkgimport "net"func inc(ip net.IP) {for i := len(ip) - 1; i >= 0; i-- {ip[i]++if ip[i] > 0 {break}}
}func GetIpList(cidr string) ([]string, error) {_, ipNet, err := net.ParseCIDR(cidr)if err != nil {return nil, err}var ips []stringfor ip := ipNet.IP.Mask(ipNet.Mask); ipNet.Contains(ip); inc(ip) {ips = append(ips, ip.String())}// 移除网络地址和广播地址, eg: 192.168.0.0 and 192.168.0.255if len(ips) > 2 {ips = ips[1 : len(ips)-1]}return ips, nil
}
pkg/tcp.go
实现了Go连接测试
package pkgimport ("fmt""net""time"
)func SendTCP(ip string, port int, timeout int) error {// 直接使用ip:port方式可能对ipv6不生效, 所以使用JoinHostPortconn, err := net.DialTimeout("tcp", net.JoinHostPort(ip, fmt.Sprintf("%d", port)), time.Duration(timeout)*time.Second)if err != nil {// 忽略错误// return fmt.Errorf("failed to dial tcp: %v", err)return nil}defer conn.Close()fmt.Printf("Connect to %s:%d successfully\n", ip, port)return nil
}
cmd/tcp.go
用于解析命令行参数
package cmdimport ("fmt""ipscan/pkg""sync""github.com/spf13/cobra"
)var (port int
)// tcpCmd represents the tcp command
var tcpCmd = &cobra.Command{Use: "tcp",Short: "使用TCP协议进行网络探测",Long: `使用TCP协议进行网络探测`,Example: `./ipscan tcp 192.168.0.0/24./ipscan tcp 192.168.0.0/24 -p 443 # 指定端口号为 443`,Run: func(cmd *cobra.Command, args []string) {for _, arg := range args {ips, err := pkg.GetIpList(arg)if err != nil {fmt.Println(err)continue}timeout, err := cmd.Flags().GetInt("timeout")if err != nil {fmt.Println(err)continue}wg := &sync.WaitGroup{}for _, ip := range ips {wg.Add(1)go func(ip string) {defer wg.Done()err := pkg.SendTCP(ip, port, timeout)if err != nil {fmt.Println(err)}}(ip)}wg.Wait()}},
}func init() {rootCmd.AddCommand(tcpCmd)tcpCmd.Flags().IntVarP(&port, "port", "p", 80, "Port to scan")
}
cmd/root.go
package cmdimport ("os""github.com/spf13/cobra"
)// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{Use: "ipscan",Short: "局域网存活IP扫描器",Long: `扫描网段下所有存活的IP地址`,
}func Execute() {err := rootCmd.Execute()if err != nil {os.Exit(1)}
}func init() {rootCmd.PersistentFlags().IntP("timeout", "t", 1, "timeout")
}
main.go
package mainimport "ipscan/cmd"func main() {cmd.Execute()
}