#!/usr/bin/env python3 # (C) 2013 by Katerina Barone-Adesi # (C) 2013 by Holger Hans Peter Freyther # (C) 2019 by sysmocom s.f.m.c. GmbH # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os, sys import time import unittest import socket import subprocess import time import osmopy.obscvty as obscvty import osmopy.osmoutil as osmoutil from osmopy.osmo_ipa import IPA # to be able to find $top_srcdir/doc/... confpath = os.path.join(sys.path[0], '..') TIMEOUT = 10 class TestVTYBase(unittest.TestCase): def checkForEndAndExit(self): res = self.vty.command("list") #print ('looking for "exit"\n') self.assertTrue(res.find(' exit\r') > 0) #print 'found "exit"\nlooking for "end"\n' self.assertTrue(res.find(' end\r') > 0) #print 'found "end"\n' def vty_command(self): raise Exception("Needs to be implemented by a subclass") def vty_app(self): raise Exception("Needs to be implemented by a subclass") def setUp(self): osmo_vty_cmd = self.vty_command()[:] config_index = osmo_vty_cmd.index('-c') if config_index: cfi = config_index + 1 osmo_vty_cmd[cfi] = os.path.join(confpath, osmo_vty_cmd[cfi]) try: self.proc = osmoutil.popen_devnull(osmo_vty_cmd) except OSError: print("Current directory: %s" % os.getcwd(), file=sys.stderr) print("Consider setting -b", file=sys.stderr) appstring = self.vty_app()[2] appport = self.vty_app()[0] self.vty = obscvty.VTYInteract(appstring, "127.0.0.1", appport) def tearDown(self): if self.vty: self.vty._close_socket() self.vty = None osmoutil.end_proc(self.proc) class TestVTYSTP(TestVTYBase): def vty_command(self): return ["./stp/osmo-stp", "-c", "../doc/examples/osmo-stp-multihome.cfg"] def vty_app(self): return (4239, "./stp/osmo-stp", "OsmoSTP", "stp") def check_sctp_sock_local(self, laddr_list, lport): path = "/proc/net/sctp/eps" try: with open(path, "r") as fp: #drop first line, contains column names: fp.readline() while True: # Read next line line = fp.readline().strip() if not line: return False print("%s: parsing line: %s" %(path, line)) it = line.split() if lport == int(it[5]): print("%s: local port %d found" %(path, lport)) itaddr_list = it[8:] if len(itaddr_list) != len(laddr_list): print("%s: addr list mismatch: %r vs %r" % (path, repr(itaddr_list), repr(laddr_list))) continue for addr in laddr_list: if addr not in itaddr_list: print("%s: addr not found in list: %s vs %r" % (path, addr, repr(itaddr_list))) return False return True return False except IOError as e: print("I/O error({0}): {1}".format(e.errno, e.strerror)) return False def testMultiHome(self): # first check if STP is listening in required addresses: found = False for i in range(5): if self.check_sctp_sock_local(['127.0.0.1', '127.0.0.2', '0000:0000:0000:0000:0000:0000:0000:0001'], 2905): found = True break else: print("[%d] osmo-stp not yet available, retrying in a second" % i) time.sleep(1) self.assertTrue(found) try: proto = socket.IPPROTO_SCTP except AttributeError: # it seems to be not defined under python2? proto = 132 # IPv4: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, proto) s.bind(('127.0.0.3', 0)) s.settimeout(TIMEOUT) try: s.connect(('127.0.0.2',2905)) except socket.error as msg: s.close() self.fail("Failed to connect IPv4 socket: %s" % msg) print("Connected to STP through SCTP (IPv4)") s.close() # IPv6: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, proto) s.bind(('::1', 0)) s.settimeout(TIMEOUT) try: s.connect(('::1',2905)) except socket.error as msg: s.close() self.fail("Failed to connect IPv6 socket: %s" % msg) print("Connected to STP through SCTP (IPv6)") s.close() def testTonsOfASP(self): self.vty.enable() self.assertTrue(self.vty.verify("configure terminal",[''])) self.assertTrue(self.vty.verify("cs7 instance 0",[''])) num_of_asp = 1000 for i in range(num_of_asp): asp_name = "asp-TonsofASP" + str(i) asp_node = "asp " + asp_name + " " + str(10000+i) + " " + "2905 m3ua" self.assertTrue(self.vty.verify(asp_node,[''])) self.assertEqual(self.vty.node(), 'config-cs7-asp') self.assertTrue(self.vty.verify("local-ip 127.0.0.1",[''])) self.assertTrue(self.vty.verify("local-ip ::1",[''])) self.assertTrue(self.vty.verify("remote-ip 127.0.0.9",[''])) self.assertTrue(self.vty.verify("remote-ip ::2",[''])) self.assertTrue(self.vty.verify("role sg",[''])) self.assertTrue(self.vty.verify("sctp-role server",[''])) self.assertTrue(self.vty.verify("no shutdown",[''])) self.assertTrue(self.vty.verify("exit",["% NOTE: Skipping automatic restart of ASP since an explicit '[no] shutdown' command was entered"])) as_name = "as-TonsOfASP" as_node = "as " + as_name + " m3ua" self.assertTrue(self.vty.verify(as_node,[''])) self.assertEqual(self.vty.node(), 'config-cs7-as') for i in range(num_of_asp): asp_name = "asp-TonsofASP" + str(i) self.assertTrue(self.vty.verify("asp " + asp_name,[''])) self.assertTrue(self.vty.verify("exit", [''])) # Now remove all of them: self.assertTrue(self.vty.verify(as_node,[''])) self.assertEqual(self.vty.node(), 'config-cs7-as') for i in range(num_of_asp): asp_name="asp-TonsofASP" + str(i) self.assertTrue(self.vty.verify("no asp " + asp_name,[''])) self.assertTrue(self.vty.verify("exit", [''])) for i in range(num_of_asp): asp_name="asp-TonsofASP" + str(i) self.assertTrue(self.vty.verify("no asp " + asp_name,[''])) self.assertTrue(self.vty.verify("no as " + as_name,[''])) # Validate one ASP can be configured to serve tons of AS: def testASPservesTonsOfAS(self): self.vty.enable() self.assertTrue(self.vty.verify("configure terminal",[''])) self.assertTrue(self.vty.verify("cs7 instance 0",[''])) asp_name = "asp-ASPservesTonsOfAS" asp_node = "asp " + asp_name + " " + str(10000) + " " + "2905 m3ua" self.assertTrue(self.vty.verify(asp_node,[''])) self.assertEqual(self.vty.node(), 'config-cs7-asp') self.assertTrue(self.vty.verify("local-ip 127.0.0.1",[''])) self.assertTrue(self.vty.verify("local-ip ::1",[''])) self.assertTrue(self.vty.verify("remote-ip 127.0.0.9",[''])) self.assertTrue(self.vty.verify("remote-ip ::2",[''])) self.assertTrue(self.vty.verify("role sg",[''])) self.assertTrue(self.vty.verify("sctp-role server",[''])) self.assertTrue(self.vty.verify("no shutdown",[''])) self.assertTrue(self.vty.verify("exit",["% NOTE: Skipping automatic restart of ASP since an explicit '[no] shutdown' command was entered"])) num_of_as = 1000 for i in range(num_of_as): as_name = "as-ASPservesTonsOfAS" + str(i) as_node = "as " + as_name + " m3ua" self.assertTrue(self.vty.verify(as_node,[''])) self.assertEqual(self.vty.node(), 'config-cs7-as') self.assertTrue(self.vty.verify("asp " + asp_name,[''])) self.assertTrue(self.vty.verify("exit", [''])) # Now remove all of them: for i in range(num_of_as): as_name = "as-ASPservesTonsOfAS" + str(i) as_node = "as " + as_name + " m3ua" self.assertTrue(self.vty.verify(as_node,[''])) self.assertEqual(self.vty.node(), 'config-cs7-as') self.assertTrue(self.vty.verify("no asp " + asp_name,[''])) self.assertTrue(self.vty.verify("exit", [''])) self.assertTrue(self.vty.verify("no as " + as_name,[''])) self.assertTrue(self.vty.verify("no asp " + asp_name,[''])) if __name__ == '__main__': import argparse import sys workdir = '.' parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="verbose mode") parser.add_argument("-p", "--pythonconfpath", dest="p", help="searchpath for config") parser.add_argument("-w", "--workdir", dest="w", help="Working directory") parser.add_argument("test_name", nargs="*", help="(parts of) test names to run, case-insensitive") args = parser.parse_args() verbose_level = 1 if args.verbose: verbose_level = 2 if args.w: workdir = args.w if args.p: confpath = args.p print("confpath %s, workdir %s" % (confpath, workdir)) os.chdir(workdir) print("Running tests for specific VTY commands") suite = unittest.TestSuite() suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestVTYSTP)) if args.test_name: osmoutil.pick_tests(suite, *args.test_name) res = unittest.TextTestRunner(verbosity=verbose_level, stream=sys.stdout).run(suite) sys.exit(len(res.errors) + len(res.failures)) # vim: shiftwidth=4 expandtab nocin ai