"""
Module **sesf.msgtransfer.servicetester** has class ``ServiceTester``
that implements the msgtransfer servicetester.
This can test a distributed implementation of the msgtransfer service.
For example, to test a distributed system of N imp_0 implementation nodes:
- Start a Pyro4 nameserver if one is not already running:
run ``python -m Pyro4.naming &``.
- Run ``python test_node.py N j --use imp_0.node N j``
for ``j`` ranging over 0, 1, ..., N-1.
(Or run ``python start_nodes.py N test_node.py --use imp_0.node``.)
- Run ``python servicetester.py N`` to test only against the service.
- Or run ``python servicetester.py N --checker imp_0.checker N`` to
test against the service and the implementation-specific assertions
in ``imp_0.checker.Checker``.
- Or run
``python servicetester.py N --log <logfile> --checker imp_0.checker N``
to also generate an event log in file ``arthistory.pickle``, which
can later be played by running ``python logplayer.py logfile``.
"""
### There is code for a command-line "--imp" option to specify the
### implementation to test. But I'm not sure of this, so I've commented
### it out using "##IMP"
from sesf.msgtransfer.serviceparts import ServiceParts
##IMP from sesf.start_nodes import start_nodes
# general imports
import importlib, argparse, time, Pyro4, sys
from sesf.util.service import (do_service_output, do_service_input)
from sesf.util.pyro import (end_testerpyronet, start_pyrodaemon)
from threading import Thread, Lock, Condition, Semaphore
from random import randint, shuffle
import pickle
[docs]class ServiceTester():
def __init__(self, argv, test_duration=0.5):
# argv: servicetester.py num_nodes
# [--log logfile]
# [--checker checkermodule and any args]
##IMP [--imp test_node.py --use ...]
parser = argparse.ArgumentParser()
parser.add_argument("num_nodes", type=int, help="number of nodes")
parser.add_argument('--log', type=str, help='logfile')
parser.add_argument("--checker", nargs='*',
help="checker module and any args")
##IMP parser.add_argument("--imp", nargs=argparse.REMAINDER,
##IMP help="node_argv")
args = parser.parse_args(argv[1:])
print("Starting msgtransfer servicetester:",
"num_nodes:", args.num_nodes,
" logging:", args.log,
" checker:", args.checker,
flush=True)
##IMP print("Testing implementation:", args.imp)
##IMP if args.imp:
##IMP start_nodes(['placeholder', str(args.num_nodes)] + args.imp)
##IMP time.sleep(3)
self.nodeproxies = {}
self.daemon = None
self.log = args.log
self.num_nodes = args.num_nodes
self.enderid = randint(0, self.num_nodes-1)
self.nthreads = [2 if j != self.enderid else 3
for j in range(self.num_nodes)]
self.test_duration = test_duration
self.serviceparts = ServiceParts(self.num_nodes)
self.lck = Lock()
self.cond = Condition(self.lck)
self.awaitend = Semaphore(0)
self.eventlog = [('service', 'init', [str(self.num_nodes)])] if args.log else None
self.checker = args.checker
if self.checker != None:
checkermodule = importlib.import_module(args.checker[0])
self.checker = checkermodule.Checker(self, args.checker)
print(self.checker, flush=True)
print('Starting pyronet', flush=True)
Thread(name='pyrodaemon_thread', target=start_pyrodaemon,
args=(self, 'sesf.msgtransfer.servicetester')).start()
nodes_start_order = list(range(self.num_nodes))
shuffle(nodes_start_order)
for j in nodes_start_order:
Thread(target=self.start_testnode, args=(j,), daemon=True).start()
@Pyro4.expose
def inform_tester(self, event):
# rpc-called by a node to convey event to imptester
print("INFORM CALL", event, flush=True)
if self.checker:
with self.lck:
self.log_event(event)
self.checker.handle_event(event)
def log_event(self, event):
if self.eventlog:
self.eventlog.append(event)
def log_event_lock(self, event):
if self.eventlog:
with self.lck:
self.eventlog.append(event)
def start_testnode(self, j):
self.nodeproxies[j] = [Pyro4.Proxy("PYRONAME:" + 'sesf.msgtransfer.test_node_' + str(j))
for h in range(3)]
self.log_event_lock(('service', 'node', 'starting', j))
self.nodeproxies[j][0].start_testnode()
print("START_TESTNODE", j, "returned", flush=True)
self.log_event_lock(('service', 'node', 'started', j))
Thread(
name='send_'+str(j),
target=self.do_send_repeatedly, args=(j,), daemon=True).start()
Thread(
name='recv_'+str(j),
target=self.do_recv_repeatedly, args=(j,), daemon=True).start()
if self.enderid == j:
self.do_end(j)
self.check_node_termination(j)
print("ENDER finished notifying", j, flush=True)
def do_send(self, j):
x = self.serviceparts
call_args = do_service_output(j, x.send_COC, x.send_CU, self.cond)
# print("do_send", j, call_args, flush=True)
self.log_event_lock(('service', 'msg', 'send', j, *call_args))
i_rval = self.nodeproxies[j][0].send(*call_args)
do_service_input(j, x.send_RIC, x.send_RU, (i_rval,), self.cond)
return i_rval
def do_send_repeatedly(self, j):
while True:
# for n in range(4):
time.sleep(randint(0,1))
i_rval = self.do_send(j)
if i_rval == False:
break
self.check_node_termination(j)
def do_recv(self, j):
x = self.serviceparts
call_args = do_service_output(j, x.recv_COC, x.recv_CU, self.cond)
i_rval = self.nodeproxies[j][1].recv(*call_args)
# i_rval: (False,) | (True, msg), where msg is [sender_id, rest_of_msg]
# if (True, msg), change to (True, msg, sender_id), coz sender_id equals
# the internal-nondeterminism argument
if i_rval[0]:
i_rval = i_rval + (i_rval[1][0],)
do_service_input(j, x.recv_RIC, x.recv_RU, (i_rval,), self.cond)
# print("do_recv ret", j, i_rval, flush=True)
if i_rval[0]:
self.log_event_lock(
('service', 'msg', 'recv', i_rval[1][0], j, i_rval[1]))
return i_rval
def do_recv_repeatedly(self, j):
"""Repeatedly call msgtransfer j's recv and update global snapshot.
Stop when recv returns None (ie, msgtransfer insocket has closed).
"""
while True:
# for k in range(5):
rval = self.do_recv(j)
if rval == (False,):
break
self.check_node_termination(j)
def do_end(self, j):
time.sleep(self.test_duration)
x = self.serviceparts
call_args = do_service_output(j, x.end_COC, x.end_CU, self.cond)
# print("calling msgtransfer", j, "end()", flush=True)
i_rval = self.nodeproxies[j][2].end()
# can omit below coz end(j)'s retcond and retupdate are vacuous
do_service_input(j, x.end_RIC, x.end_RU, (i_rval,), self.cond)
print("DO_END RETURNED", j, flush=True)
def check_node_termination(self, j):
with self.lck:
self.nthreads[j] -= 1
# print("CHECK NODE TERMINATION: nthreads", self.nthreads, flush=True)
if self.nthreads[j] == 0:
self.log_event(('service', 'node', 'ended', j))
if sum(self.nthreads) == 0:
self.awaitend.release()
time.sleep(2)
self.log_event('END')
def print_sent_rcvd_histories(self):
"""Print the sent and rcvd histories for every pair of msgtransfers."""
print("_________________________________________________________",
flush=True)
print("Printing send and received histories", flush=True)
print(' ', flush=True)
for i in range(self.num_nodes):
for j in range(self.num_nodes):
if i != j:
strij = str(i) + ',' + str(j)
print("sent[" + strij + "]:", self.serviceparts.sent[(i,j)],
flush=True)
print("nrcvd[" + strij + "]:", self.serviceparts.nrcvd[(i,j)],
flush=True)
print("-------------------", flush=True)
print("_________________________________________________________",
flush=True)
########## end class ServiceTester ##################################
if __name__ == '__main__':
# tester = ServiceTester(args.n, args.checker, args.log)
tester = ServiceTester(sys.argv)
# import pdb; pdb.set_trace()
tester.awaitend.acquire()
time.sleep(1)
if tester.log:
# with open(r'arthistory.pickle', 'wb') as outfile:
with open(tester.log, 'wb') as outfile:
pickle.dump(tester.eventlog, outfile)
print("ENDING_PYRONET", flush=True)
end_testerpyronet(tester, tester.num_nodes)
tester.print_sent_rcvd_histories()