"""
Module **sesf.rwlock.servicetester** has class ``ServiceTester`` that
implements the read-write-lock servicetester. This can test an
implementation against the rwlock service and, optionally, against
a checker with implementation-specific assertions.
For example, assuming a running Pyro4 nameserver and a running test_imp
using ``rwlock.imp_0.imp``,
``python servicetester.py <num_threads> <num_ops> --checker imp_0.checker``
tests the rwlock imp_0 implementation with `num_threads` threads each
doing `num_ops` read or write operations, and ``imp_0.checker`` to evaluate
implementation-specific assertions.
"""
from sesf.rwlock.serviceparts import ServiceParts
from threading import Thread, Lock, Condition, Semaphore, get_ident
import time, random, argparse, importlib, Pyro4
from sesf.util.pyro import start_pyrodaemon
def random_delay():
time.sleep(random.random()*1)
[docs]class ServiceTester():
"""
Read-write-lock servicetester to test an implementation, driving
it with *num_users* user threads each doing *num_ops* (randomly chosen)
read or write operations, and optionally using *checkermodule.Checker*.
It maintains the following variables:
- ``num_users`` (int): number of user threads in testing
- ``num_ops`` (int): number of read or write operations per user thread.
- ``checker`` (obj): instance of ``checker.Checker``
- ``pyrodaemon``: Pyro4 daemon `sesf.rwlock.servicetester`
- ``serviceparts`` (obj): read-write-lock serviceparts object
- ``impproxies``: list of Pyro proxies to the implementation.
- ``lck``, ``cond``: lock and condition variable for atomicity of
call and return parts.
- ``awaitend`` (semaphore): main thread waits here for test to be over.
Functions: j in 0..num_users
- ``do_acqr(j)``: do a valid impproxies[j].acqr call and validate return
- ``do_relr(j)``: do a valid impproxies[j].relr call and validate return
- ``do_acqw(j)``: do a valid impproxies[j].acqw call and validate return
- ``do_relw(j)``: do a valid impproxies[j].relw call and validate return
- ``do_ops(j)``: issue *num_ops* read or write (randomly chosen) operations
- ``do_test()``: start ``do_ops(0)``, ..., ``do_ops(num_users-1)``
in separate threads.
- ``inform_tester(event)``: RPC-called by implementation to convey event
to checker.
"""
def __init__(self, num_users, num_ops, checkermodule=None):
"""
Instantiate attributes, obtain *num_users* proxies for `rwlock_imp`,
start a Pyro4 daemon and register it as `sesf.rwlock.servicetester`
in a new thread, start ``do_test`` in a new thread.
"""
self.pyrodaemon = None
self.serviceparts = ServiceParts()
self.lck = Lock()
self.cond = Condition(self.lck)
self.awaitend = Semaphore(0)
self.num_users = num_users
self.num_ops = num_ops
if checkermodule == None:
self.checker = None
else:
checkermodule = importlib.import_module(checkermodule)
self.checker = checkermodule.Checker(self)
print(self.checker)
self.impproxies = [Pyro4.Proxy("PYRONAME:" + 'sesf.rwlock.test_imp')
for h in range(self.num_users)]
print('Starting pyronet', flush=True)
Thread(name='pyrodaemon_thread', target=start_pyrodaemon,
args=(self, 'sesf.rwlock.servicetester')).start()
self.impproxies[0].start_testimp()
print("start_testimp returned")
Thread(target=self.do_test, daemon=True).start()
@Pyro4.expose
def inform_tester(self, event):
"""RPC-called by implementation to convey event to checker."""
# print("INFORM CALL", event, flush=True)
if self.checker:
with self.lck:
self.checker.handle_event(event)
def do_acqr(self, j):
"""Issue a valid impproxies[j].acqr() call and validate return."""
x = self.serviceparts
## await acqr.CIC: acqr.CU
with self.cond:
while not x.acqr_CIC():
self.cond.wait()
x.acqr_CU()
self.cond.notify_all()
# random_delay()
## call implementation's acqr()
self.impproxies[j].acqr()
## check acqr.ROC: acqr.RU
random_delay()
with self.cond:
if not x.acqr_ROC():
raise Exception('acqr ROC violated')
x.acqr_RU()
self.cond.notify_all()
def do_relr(self, j):
"""Issue a valid impproxies[j].relr() call and validate return."""
x = self.serviceparts
## await relr.CIC: relr.CU
with self.cond:
while not x.relr_CIC():
self.cond.wait()
x.relr_CU()
self.cond.notify_all()
# random_delay()
## call implementation's relr()
self.impproxies[j].relr()
random_delay()
## check relr.ROC: relr.RU
with self.cond:
if not x.relr_ROC():
raise Exception('relr ROC violated')
x.relr_RU()
self.cond.notify_all()
def do_acqw(self, j):
"""Issue a valid impproxies[j].acqw() call and validate return."""
x = self.serviceparts
## await acqw.CIC: acqw.CU
with self.cond:
while not x.acqw_CIC():
self.cond.wait()
x.acqw_CU()
self.cond.notify_all()
# random_delay()
## call implementation's acqw()
self.impproxies[j].acqw()
## check acqw.ROC: acqw.RU
random_delay()
# time.sleep(random.random()*3)
with self.cond:
if not x.acqw_ROC():
raise Exception('acqw ROC violated')
x.acqw_RU()
self.cond.notify_all()
def do_relw(self, j):
"""Issue a valid impproxies[j].relw() call and validate return."""
x = self.serviceparts
## await relw.CIC: relw.CU
with self.cond:
while not x.relw_CIC():
self.cond.wait()
x.relw_CU()
self.cond.notify_all()
random_delay()
## call implementation's relw()
self.impproxies[j].relw()
## check relw.ROC: relw.RU
# random_delay()
with self.cond:
if not x.relw_ROC():
raise Exception('relw ROC violated')
x.relw_RU()
self.cond.notify_all()
def do_ops(self, j):
"""Issue *num_ops* read or write (randomly chosen) operations
via impproxies[j]."""
print('starting user', get_ident())
for k in range(self.num_ops):
random_delay()
# randomly select read or write operation
if random.randint(0, 1):
self.do_acqr(j)
print('read',
'wset =', self.serviceparts.wset,
' rset =', self.serviceparts.rset,
flush=True)
random_delay()
self.do_relr(j)
else:
self.do_acqw(j)
print('write:',
'wset =', self.serviceparts.wset,
' rset =', self.serviceparts.rset,
flush=True)
random_delay()
self.do_relw(j)
self.awaitend.release()
def do_test(self):
"""Start do_ops(0), ..., do_ops(num_users-1) in separate threads."""
print('starting', self.num_users, 'users, each doing',
self.num_ops, 'read or write operations')
for j in range(self.num_users):
Thread(target=self.do_ops, args=(j,)).start()
### End class ServiceTester ###########################
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("num_users", type=int,
help="number of user threads")
parser.add_argument("num_ops", type=int,
help="number of read or write operations per user")
parser.add_argument("--checker", type=str,
help="checker module")
args = parser.parse_args()
tester = ServiceTester(args.num_users, args.num_ops, args.checker)
for k in range(tester.num_users):
tester.awaitend.acquire()
print("ENDING_PYRONET", flush=True)
tester.impproxies[0].end_pyrodaemon()
if tester.pyrodaemon != None:
tester.pyrodaemon.shutdown()