#!/usr/bin/env python3 # (C) 2013 by Katerina Barone-Adesi <kat.obsc@gmail.com> # (C) 2013 by Holger Hans Peter Freyther # (C) 2019 by sysmocom s.f.m.c. GmbH # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os, sys import time import unittest import socket import subprocess import time import osmopy.obscvty as obscvty import osmopy.osmoutil as osmoutil from osmopy.osmo_ipa import IPA # to be able to find $top_srcdir/doc/... confpath = os.path.join(sys.path[0], '..') TIMEOUT = 10 class TestVTYBase(unittest.TestCase): def checkForEndAndExit(self): res = self.vty.command("list") #print ('looking for "exit"\n') self.assertTrue(res.find(' exit\r') > 0) #print 'found "exit"\nlooking for "end"\n' self.assertTrue(res.find(' end\r') > 0) #print 'found "end"\n' def vty_command(self): raise Exception("Needs to be implemented by a subclass") def vty_app(self): raise Exception("Needs to be implemented by a subclass") def setUp(self): osmo_vty_cmd = self.vty_command()[:] config_index = osmo_vty_cmd.index('-c') if config_index: cfi = config_index + 1 osmo_vty_cmd[cfi] = os.path.join(confpath, osmo_vty_cmd[cfi]) try: self.proc = osmoutil.popen_devnull(osmo_vty_cmd) except OSError: print("Current directory: %s" % os.getcwd(), file=sys.stderr) print("Consider setting -b", file=sys.stderr) appstring = self.vty_app()[2] appport = self.vty_app()[0] self.vty = obscvty.VTYInteract(appstring, "127.0.0.1", appport) def tearDown(self): if self.vty: self.vty._close_socket() self.vty = None osmoutil.end_proc(self.proc) class TestVTYSTP(TestVTYBase): def vty_command(self): return ["./stp/osmo-stp", "-c", "../doc/examples/osmo-stp-multihome.cfg"] def vty_app(self): return (4239, "./stp/osmo-stp", "OsmoSTP", "stp") def check_sctp_sock_local(self, laddr_list, lport): path = "/proc/net/sctp/eps" try: with open(path, "r") as fp: #drop first line, contains column names: fp.readline() while True: # Read next line line = fp.readline().strip() if not line: return False print("%s: parsing line: %s" %(path, line)) it = line.split() if lport == int(it[5]): print("%s: local port %d found" %(path, lport)) itaddr_list = it[8:] if len(itaddr_list) != len(laddr_list): print("%s: addr list mismatch: %r vs %r" % (path, repr(itaddr_list), repr(laddr_list))) continue for addr in laddr_list: if addr not in itaddr_list: print("%s: addr not found in list: %s vs %r" % (path, addr, repr(itaddr_list))) return False return True return False except IOError as e: print("I/O error({0}): {1}".format(e.errno, e.strerror)) return False def testMultiHome(self): # first check if STP is listening in required addresses: found = False for i in range(5): if self.check_sctp_sock_local(['127.0.0.1', '127.0.0.2', '0000:0000:0000:0000:0000:0000:0000:0001'], 2905): found = True break else: print("[%d] osmo-stp not yet available, retrying in a second" % i) time.sleep(1) self.assertTrue(found) try: proto = socket.IPPROTO_SCTP except AttributeError: # it seems to be not defined under python2? proto = 132 # IPv4: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, proto) s.bind(('127.0.0.3', 0)) s.settimeout(TIMEOUT) try: s.connect(('127.0.0.2',2905)) except socket.error as msg: s.close() self.fail("Failed to connect IPv4 socket: %s" % msg) print("Connected to STP through SCTP (IPv4)") s.close() # IPv6: s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, proto) s.bind(('::1', 0)) s.settimeout(TIMEOUT) try: s.connect(('::1',2905)) except socket.error as msg: s.close() self.fail("Failed to connect IPv6 socket: %s" % msg) print("Connected to STP through SCTP (IPv6)") s.close() if __name__ == '__main__': import argparse import sys workdir = '.' parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="verbose mode") parser.add_argument("-p", "--pythonconfpath", dest="p", help="searchpath for config") parser.add_argument("-w", "--workdir", dest="w", help="Working directory") parser.add_argument("test_name", nargs="*", help="(parts of) test names to run, case-insensitive") args = parser.parse_args() verbose_level = 1 if args.verbose: verbose_level = 2 if args.w: workdir = args.w if args.p: confpath = args.p print("confpath %s, workdir %s" % (confpath, workdir)) os.chdir(workdir) print("Running tests for specific VTY commands") suite = unittest.TestSuite() suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestVTYSTP)) if args.test_name: osmoutil.pick_tests(suite, *args.test_name) res = unittest.TextTestRunner(verbosity=verbose_level, stream=sys.stdout).run(suite) sys.exit(len(res.errors) + len(res.failures)) # vim: shiftwidth=4 expandtab nocin ai