Python 登录交换机switch 巡检 拉取交换机配置信息

场景需求

前面老季写了一个 PHP如何登录连接TELNET并执行交换机命令类 图文教程 ,最近项目中遇到了要换python3来登录交换机并拉取配置。这里我们直接贴出相关代码使用telnetlib类来登录交换机并拉取配置。

我们在代码中可以处理交换机拉取配置时,内容过长,出现 --- More --- 需要按空格的场景。并且将所有交换机命令的返回值都进行了返回给调用方法。

代码实例

class TelnetClient():
    def __init__(self,):
        self.tn = telnetlib.Telnet()
        time.sleep(0.2)

    # 此函数实现telnet登录主机
    def login(self,host_ip,username,password,port=23):
        try:
            # self.tn = telnetlib.Telnet(host_ip,port=23)
            self.tn.open(host_ip,port)
        except:
            logging.critical('网络连接失败'+repr(host_ip))
            return False
        # 等待login出现后输入用户名,最多等待10秒
        self.tn.read_until(b'Username: ',timeout=1)
        self.tn.write(username.encode('ascii') + b'\n')
        self.tn.read_until(b'Password: ',timeout=1)
        self.tn.write(password.encode('ascii') + b'\n')
        # 延时两秒再收取返回结果,给服务端足够响应时间
        time.sleep(0.2)
        # 获取登录结果
        # read_very_eager()获取到的是的是上次获取之后本次获取之前的所有输出
        command_result = self.tn.read_very_eager().decode('ascii')
        if 'Login incorrect' not in command_result:
            return True
        else:
            return False

    # 此函数实现执行传过来的命令,并输出其执行结果
    def execute_some_command(self,command):
        logging.info('开始执行交换机命令:'+command)
        # 执行命令
        self.tn.write(command.encode('ascii')+b'\n')
        time.sleep(0.2)
        # 获取命令结果
        a = []
        a.append(('--- More ---').encode('ascii'))
        command_result = self.tn.read_very_eager().decode('ascii')
        while True:
            b,c,d = self.tn.expect(a,timeout=1)
            command_result += d.decode('ascii')
            if 0 == b:
                time.sleep(0.2)
                self.tn.write(b' \n') #不用\r\n来继续
            else:
                break
        logging.info('命令执行结果:'+repr(command_result))
        if 'Error:' in command_result :
            logging.critical('执行结果中可能包含错误`Error:`,脚本强制退出')
            sendWxMsg('流量调度系统交换机命令执行出现错误,请及时处理!')
            sys.exit()
        return command_result

    # 退出telnet
    def logout(self):
        self.tn.write(b"exit\n")
        self.tn.close()

多个交换机命令时封装调用(可自行封装)

# 执行多条交换机命令
def execSwitchCmd( host , username , password , commands , extra = '1' ) :
    '''
    :param host:主机地址
    :param username:用户名
    :param password:密码
    :param commands:列表命令集
    :param extra: 是否需要执行 system-view , commit 命令
    '''
    logging.info('开始执行交换机命令【'+repr(commands)+'】')
    # return True
    telnet_client = TelnetClient()
    msg = ''
    # 如果登录结果返加True,则执行命令,然后退出
    if telnet_client.login(host,username,password) :
        if extra == 1:
            msg += telnet_client.execute_some_command( 'system-view' )
        for command in commands :
            msg += telnet_client.execute_some_command( command )
        if extra == 1:            
            msg += telnet_client.execute_some_command( 'commit' )
        telnet_client.logout()    
    return msg

调用方法

cmd = []
cmd.append('dis traffic-policy applied-record TO_TrafficCenter')
res = func.execSwitchCmd(room['host'],room['username'],room['password'],cmd , '0')
if 'xxxxx' in res :
    # 某些配置在交换机中,做什么处理
    ......

交换机命令过多时,建议使用\r\n拼接命令并分组

def centerSchedulingOut95( destination_id , pieces ) :
    # .....
    #.......
    cmd = []
    i = 1
    divided = 12
    tmpCmd = ''
    for policy in policys :
        from_room = getRoomById( policy['destination_id'] )
        if i % divided == 0 :
            cmd.append( tmpCmd )
            tmpCmd = ''
        else :
            tmpCmd += '\r\n' + policy['name'] + '\r\n' + 'UNDO traffic-policy TO_'+from_room['abbr']+' inbound' + '\r\n' + 'traffic-policy TO_'+to_room['abbr']+' inbound' + '\r\n'
        i += 1
        config.cursor.execute("UPDATE center_policy SET status = 2 ,destination_id = %s , update_time=%s WHERE id = %s" , [ destination_id , getNow() , policy['id'] ] )
        logging.info('|----已更改 %s 状态为:使用中,去向机房ID:%s',policy['name'] , to_room['id'])
    if tmpCmd != '' :
        cmd.append( tmpCmd )
    logging.info('|----开始登录调度中心交换机,批量执行调出POLICY命令')
    config.cursor.execute("SELECT * FROM center")
    centerObj = config.cursor.fetchone()
    if len(cmd) > 0 :
        logging.info('|----本次实际调度POLICY规则条数:%s',repr( len(policys) ) )
        execSwitchCmd( host = centerObj['host'] , username = centerObj['username'] , password = centerObj['password'] , commands = cmd )

我们这里将命令拼接成 \r\n的格式拼接,每12个命令一组,传入到交换机里执行。

链接到文章: https://ixvps.com/46467.html

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注