Source code for msgtransfer.test_node
"""
Module **sesf.msgtransfer.test_node** has class TestNode that implements
an RPC wrapper for a node of an implementation of msgtransfer service.
Run ``python test_node.py N j --use imp_0.imp N j`` to start a
process with a TestNode object wrapping an ``imp_0.imp`` implementation node.
"""
import Pyro4, importlib, argparse, time, sys
from sesf.util.pyro import (start_pyrodaemon)
from threading import Thread
[docs]@Pyro4.expose
class TestNode(): # msgtransfer.test_node.TestNode
def __init__(self, argv):
parser = argparse.ArgumentParser()
parser.add_argument("n", type=int)
parser.add_argument("myid", type=int)
parser.add_argument("--use", nargs=argparse.REMAINDER,
help="one or more --use node_descriptors")
args = parser.parse_args()
print("args", args, flush=True)
# args.n not needed for test_node
self.myid = args.myid
self.use = args.use
self.pyrodaemon = None
self.testerproxy = None
self.mt_node = None
self.pyrodaemon_thread = Thread(
target=start_pyrodaemon,
args=(self, "sesf.msgtransfer.test_node_"+str(self.myid)))
self.pyrodaemon_thread.start()
def start_testnode(self):
stp = Pyro4.Proxy("PYRONAME:sesf.msgtransfer.servicetester")
print("got servicetester proxy", stp, flush=True)
imp_nodemodule = importlib.import_module(self.use[0])
print("creating_node_from_module", imp_nodemodule, flush=True)
self.mt_node = imp_nodemodule.Node(self.use, stp)
return
def send(self, dst, msg):
print("send call", dst, msg, flush=True)
# return self.mt_node.send(dst, msg)
x = self.mt_node.send(dst, msg)
print("send ret", x, flush=True)
return x
def recv(self):
return self.mt_node.recv()
def end(self):
# import pdb; pdb.set_trace()
return self.mt_node.end()
@Pyro4.oneway
def end_pyrodaemon(self):
self.pyrodaemon.shutdown()
##### END class TestNode of module sesf.msgtransfer.test_node #####
if __name__ == '__main__':
testnode = TestNode(sys.argv)
testnode.pyrodaemon_thread.join()
time.sleep(10)