"""
Module **sesf.msgtransfer2.servicetester** has class ``ServiceTester``
that implements the msgtransfer2 servicetester.
This can test a distributed implementation of the msgtransfer2 service.
For example, to test a distributed system of two imp_0 nodes:
- Start a Pyro4 nameserver if one is not already running:
run ``python -m Pyro4.naming &``.
- Run ``python test_node.py 2 0 --use imp_0.node 2 0``
and ``python test_node.py 2 1 --use imp_0.node 2 1``.
(Or ``python start_nodes.py 2 test_node.py --use imp_0.node``.)
- Run ``python servicetester.py 2 --checker imp_0.checker 2`` to
test against the service and the implementation-specific assertions
in ``imp_0.checker.Checker``.
(Or ``python servicetester.py 2`` to test only against the service.)
"""
from sesf.msgtransfer2.serviceparts import ServiceParts
import Pyro4, argparse, time, random, importlib
from sesf.util.service import (do_service_input, do_service_output)
from sesf.util.pyro import (start_pyrodaemon, end_testerpyronet)
from threading import Thread, Lock, Condition, Semaphore
def random_delay():
time.sleep(random.random()*1)
[docs]class ServiceTester():
def __init__(self, num_nodes, checker_argv, test_duration=3):
self.nodeproxies = {}
self.pyrodaemon = None
# num_nodes must be 2; their ids are 0 and 1
self.num_nodes = 2
self.enderid = random.randint(0, 1)
self.nthreads = [2 if j != self.enderid else 3
for j in range(self.num_nodes)]
self.test_duration = test_duration
self.serviceparts = ServiceParts()
self.lck = Lock()
self.cond = Condition(self.lck)
self.awaitend = Semaphore(0)
# initialize checker if needed
if checker_argv == None:
self.checker = None
else:
checkermodule = importlib.import_module(checker_argv[0])
self.checker = checkermodule.Checker(self, checker_argv)
print(self.checker, flush=True)
print('Starting pyronet', flush=True)
Thread(target=start_pyrodaemon,
args=(self, 'msgtransfer2_tester')).start()
nodes_start_order = [0, 1]
random.shuffle(nodes_start_order)
for j in nodes_start_order:
Thread(target=self.start_testnode, args=(j,), daemon=True).start()
@Pyro4.expose
def inform_tester(self, event):
# rpc-called by a node to convey event to imptester
if self.checker:
with self.lck:
self.checker.handle_event(event)
def start_testnode(self, j):
random_delay()
self.nodeproxies[j] = [
Pyro4.Proxy("PYRONAME:" + 'msgtransfer2_testnode_' + str(j))
for h in range(3)]
self.nodeproxies[j][0].start_testnode()
print("start_testnode", j, "returned", flush=True)
Thread(
name='send_'+str(j),
target=self.do_send_repeatedly, args=(j,), daemon=True).start()
Thread(
name='recv_'+str(j),
target=self.do_recv_repeatedly, args=(j,), daemon=True).start()
if self.enderid == j:
self.do_end(j)
self.check_node_termination(j)
print("ender finished notifying", j, flush=True)
def do_send(self, j):
x = self.serviceparts
call_args = do_service_output(j, x.send_COC, x.send_CU, self.cond)
print(j, "do_send call(", *call_args, ")", flush=True) # DEBUG
random_delay()
i_rval = self.nodeproxies[j][0].send(*call_args)
random_delay()
# print(j, "do_send return", i_rval, flush=True) # DEBUG
do_service_input(j, x.send_RIC, x.send_RU, (i_rval,), self.cond)
return i_rval
def do_send_repeatedly(self, j):
while True:
# for n in range(4):
i_rval = self.do_send(j)
if i_rval == False:
break
self.check_node_termination(j)
def do_recv(self, j):
x = self.serviceparts
random_delay()
call_args = do_service_output(j, x.recv_COC, x.recv_CU, self.cond)
# print(j, "do_recv call(", *call_args, ")", flush=True) # DEBUG
i_rval = self.nodeproxies[j][1].recv(*call_args)
# i_rval: (False,) | (True, msg)
print(j, "do_recv return", i_rval, flush=True) # DEBUG
random_delay()
do_service_input(j, x.recv_RIC, x.recv_RU, (i_rval,), self.cond)
return i_rval
def do_recv_repeatedly(self, j):
"""Repeatedly call node_j's recv and update global snapshot.
Stop when recv returns None (ie, node_j insocket has closed).
"""
while True:
rval = self.do_recv(j)
if rval == (False,):
break
self.check_node_termination(j)
def do_end(self, j):
time.sleep(self.test_duration)
x = self.serviceparts
call_args = do_service_output(j, x.end_COC, x.end_CU, self.cond)
print(j, "do_end call(", *call_args, ")", flush=True) # DEBUG
i_rval = self.nodeproxies[j][2].end()
# can omit below coz end(j)'s retcond and retupdate are vacuous
do_service_input(j, x.end_RIC, x.end_RU, (i_rval,), self.cond)
def check_node_termination(self, j):
with self.lck:
self.nthreads[j] -= 1
# print("check node termination: nthreads", self.nthreads,
# flush=True)
if sum(self.nthreads) == 0:
self.awaitend.release()
time.sleep(2)
def print_sent_nrcvd(self):
"""Print the sent and nrcvd of both nodes."""
print("__________________________________________________", flush=True)
print("Printing sent and nrcvd", flush=True)
print(" sent[0]:", self.serviceparts.sent[0], flush=True)
print(" nrcvd[1]", self.serviceparts.nrcvd[1], flush=True)
print(" -------------------", flush=True)
print(" sent[1]:", self.serviceparts.sent[1], flush=True)
print(" nrcvd[0]", self.serviceparts.nrcvd[0], flush=True)
print("__________________________________________________", flush=True)
########## end class ServiceTester #############
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("num_nodes", type=int, help="number of nodes")
parser.add_argument("--checker", nargs=argparse.REMAINDER,
help="checker module and args")
args = parser.parse_args()
if args.num_nodes != 2:
print("changing number of nodes to 2", flush=True)
print("Starting msgtransfer2.servicetester with checker",
args.checker, flush=True)
tester = ServiceTester(args.num_nodes, args.checker, 4)
# import pdb; pdb.set_trace()
tester.awaitend.acquire()
time.sleep(1)
end_testerpyronet(tester, tester.num_nodes)
tester.print_sent_nrcvd()
time.sleep(1)