小白求助,目前如果使用power shell或者centos的shell里,去执行命令一切是正常的
然后我希望把这个流程使用paramiko来代替我实现,这样我可以后续通过程序周期性的上来选择一些设备执行命令。我的理解是,先用paramiko连接上我的jumpserver的koko,然后按提示符的回显来敲入设备选择,以及填入命令。我的代码如下
import paramiko
import time
def handler(title, instructions, prompt_list):
return [jumpserver_password]
定义读取回显的函数
def read_channel(channel, prompt, timeout=10):
output = “”
end_time = time.time() + timeout
while time.time() < end_time:
if channel.recv_ready():
output += channel.recv(1024).decode(‘utf-8’)
if prompt in output:
break
time.sleep(0.1)
return output
定义Jumpserver信息
jumpserver_ip = ‘myserver-ip’
jumpserver_port = 2222
jumpserver_user = ‘jmpuser’
jumpserver_password = ‘jmppass’
初始化Transport
trans = paramiko.Transport((jumpserver_ip, jumpserver_port))
trans.start_client()
try:
# 尝试使用password认证
trans.auth_password(jumpserver_user, jumpserver_password)
except paramiko.AuthenticationException:
# 如果password认证失败,尝试使用keyboard-interactive认证
trans.auth_interactive(jumpserver_user, handler)
打开一个通道并启动shell
channel = trans.open_session()
channel.get_pty()
channel.invoke_shell()
time.sleep(2) # 确保shell已启动
读取初始回显(可能有欢迎信息)
output = read_channel(channel, “password:”)
print(f"输出1:{output}")
发送密码
channel.send(jumpserver_password + ‘\n’)
output = read_channel(channel, 'Opt> ')
print(f"输出2:{output}")
发送选择命令 ‘1’,并读取输出
channel.send(‘1\n’)
output = read_channel(channel, '[Host]> ') # 假设业务服务器提示符是 ‘[Host]>’
print(f"输出3:{output}")
发送命令 ‘ifconfig’,并读取输出
channel.send(‘clock\n’)
output = read_channel(channel, '# ') # 假设业务服务器提示符是 ‘#’
print(f"输出4:{output}")
关闭通道和连接
channel.close()
trans.close()
目前代码运行时,有些问题,我的回显如下:
输出1: jmpuser, JumpServer 开源堡垒机
1) 输入 部分IP,主机名,备注 进行搜索登录(如果唯一).
2) 输入 / + IP,主机名,备注 进行搜索,如:/192.168.
3) 输入 p 进行显示您有权限的资产.
4) 输入 g 进行显示您有权限的节点.
5) 输入 h 进行显示您有权限的主机.
6) 输入 d 进行显示您有权限的数据库.
7) 输入 k 进行显示您有权限的Kubernetes.
8) 输入 r 进行刷新最新的机器和节点信息.
9) 输入 s 进行中文-English-日本語语言切换.
10) 输入 ? 进行显示帮助.
11) 输入 q 进行退出.
Opt>
输出2:jmppass
输出3:1
输出4:clock
貌似程序无法在Opt>后输入1这些信息,也无法捕获到任何回显了,请问这是为什么,我应该如何调整。