一、基础功能介绍
# coding=utf-8
import paramiko
from time import sleep# 建立通信
transport = paramiko.Transport(('192.168.0.7', 22))
print(transport) # <paramiko.Transport at 0x5745ed0 (unconnected)># 建立连接
transport.connect(username='root', password='123456')
print(transport) # <paramiko.Transport at 0x5745ed0 (cipher aes128-ctr, 128 bits) (active; 0 open channel(s))>
# 注意 active 此时为0# 开启一个信道
channel = transport.open_session()
print(channel) # <paramiko.Channel 0 (open) window=0 -> <paramiko.Transport at 0x5745ed0 (cipher aes128-ctr, 128 bits) (active; 1 open channel(s))>>
# 调用open_session后,active为1,表示已打开channel。此时只能下发命令,收不到回显# 设置信道获取信息的超时时间。因为在调用 channel.recv(65535) 方法时 会阻塞执行,不设置就会卡死。
channel.timeout = 10# 开启终端,进入交互模式
channel.get_pty()
channel.invoke_shell()sleep(2) # 是为了一次能显示完所有回显,如果不等待2秒,获取的回显可能不完整。# 检查通道是否有数据。若没有,则返回False,注意:不能用来判断已回显完。当下发命令后,执行出现卡顿,在卡顿期间信道是没有数据的。
channelStatus = channel.recv_ready()
print(channelStatus) # 此时返回True# 获取返回的数据。此时返回的是登陆信息。注意:当信道没有数据是,若直接获取,则会处于阻塞状态
backMsg = channel.recv(65535).decode('utf-8') # 使用recv读取in-buffer内容,65535表示预读取内容大小,若该值小于in-buffer值,则会读取不完全
print(backMsg)# 下发命令,命令后需追加\n表示发送命令
channel.send("10.27.0.7 \r")
sleep(2)
backMsg = channel.recv(65535).decode('utf-8')
print(backMsg)channel.send("mysql22001 \r")
sleep(2)
backMsg = channel.recv(65535).decode('utf-8')
print(backMsg)channel.send("quit; \r")
sleep(2)
backMsg = channel.recv(65535).decode('utf-8')
print(backMsg)# 关闭信道
channel.close()# 关闭连接
transport.close()
二、简单封装
import paramiko
import timeclass SSH(object):def __init__(self):self.__transport = Noneself.__channel = Noneself.__default_end_check_infos = ["[root@"]def connect(self, host, port, username, password, **kwargs):# 建立通信self.__transport = paramiko.Transport((host, port))# 建立连接self.__transport.connect(username=username, password=password)# 开启一个信道self.__channel = self.__transport.open_session()self.__channel.timeout = 10# 开启终端,进入交互模式self.__channel.get_pty()self.__channel.invoke_shell()return self.__echo(**kwargs)def disconnect(self):self.__channel.close()self.__transport.close()def exec_cmd(self, cmd, **kwargs):self.__channel.send("%s \r" % cmd)return self.__echo(**kwargs)def set_echo_end_check_info(self, end_check_info=None):""" 设置回显校验信息 """if end_check_info is not None:if isinstance(end_check_info, str):self.__default_end_check_infos.append(end_check_info)if isinstance(end_check_info, list):self.__default_end_check_infos = self.__default_end_check_infos + end_check_inforeturn self.__default_end_check_infosdef __echo(self, timeout=10, interval=0.5):""" 获取回显 """t0 = time.time()echo = ""while time.time()-t0 < timeout:if self.__channel.recv_ready() is True:_echo = self.__channel.recv(65535).decode('utf-8')echo += _echofor end_check_info in self.__default_end_check_infos:if end_check_info in echo:print(echo)time.sleep(0.5)return echotime.sleep(interval)self.disconnect()raise RuntimeError("获取预期回显超时!")if __name__ == '__main__':ssh_session = SSH()ssh_session.set_echo_end_check_info(["Opt>"])ssh_session.set_echo_end_check_info("MySQL")ssh_session.connect('192.168.0.7', 22, 'root', '123456')ssh_session.exec_cmd("10.27.0.7")ssh_session.exec_cmd("mysql22001")ssh_session.exec_cmd("quit;")ssh_session.disconnect()
执行结果:
源码等资料获取方法
各位想获取源码等教程资料的朋友请点赞 + 评论 + 收藏,三连!
三连之后我会在评论区挨个私信发给你们~