"""
Module **sesf.distlock2.servicetester** has class ``ServiceTester``
that implements the distlock2 servicetester.
This can test a distributed implementation of the distlock2 service.
For example, to test a distributed system of two imp_0 nodes:
- Start a Pyro4 nameserver if one is not already running:
run ``python -m Pyro4.naming &``.
- Run ``python test_node.py 2 0 --use imp_0.node 2 0``
and ``python test_node.py 2 1 --use imp_0.node 2 1``.
(Or ``python start_nodes.py 2 test_node.py --use imp_0.node``.)
- Run ``python servicetester.py 2`` to test against the service.
Or, run ``python servicetester.py 2 --checker imp_0.checker 2`` to
test against the service and the implementation-specific assertions
in ``imp_0.checker.Checker`` (if present).
"""
from sesf.distlock2.serviceparts import ServiceParts
import importlib, argparse, time, Pyro4, sys, random
from sesf.util.service import (do_service_input, do_service_output)
from sesf.util.pyro import (start_pyrodaemon, end_testerpyronet)
from threading import Thread, Lock, Condition
def random_delay():
time.sleep(random.random()*1)
[docs]class ServiceTester():
def __init__(self, num_nodes, num_acqrels, checker_argv):
# def __init__(self, argv):
self.nodeproxies = {}
self.pyrodaemon = None
# number of nodes must be 2; their ids are 0 and 1
self.num_nodes = 2
self.num_acqrels = num_acqrels
self.serviceparts = ServiceParts()
self.lck = Lock()
self.cond = Condition(self.lck)
self.threads = {}
# initialize checker if needed
if checker_argv == None:
self.checker = None
else:
checkermodule = importlib.import_module(checker_argv[0])
self.checker = checkermodule.Checker(self, checker_argv)
print(self.checker, flush=True)
print('Starting pyronet', flush=True)
Thread(target=start_pyrodaemon,
args=(self, 'sesf.distlock2.servicetester')).start()
nodes_start_order = [0, 1]
random.shuffle(nodes_start_order)
for j in nodes_start_order:
self.threads[j] = Thread(target=self.start_testnode, args=(j,))
self.threads[j].start()
@Pyro4.expose
def inform_tester(self, event):
# rpc-called by a node to convey event to implementation checker
if self.checker:
with self.lck:
self.checker.handle_event(event)
def start_testnode(self, j):
random_delay()
self.nodeproxies[j] = [Pyro4.Proxy(
"PYRONAME:" + 'sesf.distlock2.testnode_' + str(j))]
self.nodeproxies[j][0].start_testnode()
print("start_testnode", j, "returned", flush=True)
self.do_acq_rel_repeatedly(j)
def do_acq(self, j):
# import pdb; pdb.set_trace()
x = self.serviceparts
do_service_output(j, x.acq_COC, x.acq_CU, self.cond)
# print(j, "do_acq call()", flush=True) # DEBUG
random_delay()
self.nodeproxies[j][0].acq()
random_delay()
# print(j, "do_acq return", flush=True) # DEBUG
do_service_input(j, x.acq_RIC, x.acq_RU, (), self.cond)
return
def do_rel(self, j):
# import pdb; pdb.set_trace()
x = self.serviceparts
do_service_output(j, x.rel_COC, x.rel_CU, self.cond)
# print(j, "do_rel call()", flush=True) # DEBUG
random_delay()
self.nodeproxies[j][0].rel()
random_delay()
# print(j, "do_rel return", flush=True) # DEBUG
do_service_input(j, x.rel_RIC, x.rel_RU, (), self.cond)
return
def do_acq_rel_repeatedly(self, j):
"""Call do_acq(j) and do_rel(j) for num_acqrels repetitions."""
for i in range(self.num_acqrels):
random_delay()
print(j, "do_acq call()", i, flush=True) # DEBUG
self.do_acq(j)
print(j, "do_rel call()", i, flush=True) # DEBUG
self.do_rel(j)
print(" finished do_acq_rel_repeatedly", j, flush=True)
def end_test(self):
"""Wait for threads executing do_acq_rel_repeatedly(j), j = 0,1, to end.
Call end() of a randomly chosen node.
"""
for j in range(self.num_nodes):
self.threads[j].join()
endj = random.randint(0,1)
print(endj, "end call()", flush=True) # DEBUG
self.nodeproxies[endj][0].end()
########## end class ServiceTester #############
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("num_nodes", type=int,
help="number of nodes; must be 2")
parser.add_argument("num_acqrels", type=int,
help="number of acq-rel ops")
parser.add_argument("--checker", nargs=argparse.REMAINDER,
help="checker module and args")
args = parser.parse_args()
if args.num_nodes != 2:
print("changing number of nodes to 2", flush=True)
servicetester = ServiceTester(
args.num_nodes, args.num_acqrels, args.checker)
servicetester.end_test()
end_testerpyronet(servicetester, servicetester.num_nodes)
time.sleep(10)