Source code for msgtransfer2.test_node
"""
Module **sesf.msgtransfer2.test_node** has class TestNode that implements
an RPC wrapper for a node of an implementation of msgtransfer2 service.
Run ``python test_node.py imp_0.imp`` in a shell to start a TestNode object
wrapping an implementation imp_0.imp.Imp.
"""
import Pyro4, argparse, importlib, time
from sesf.util.pyro import (start_pyrodaemon)
from threading import Thread
[docs]@Pyro4.expose
class TestNode():
def __init__(self, myid, use):
self.myid = myid
self.use = use
self.daemon = None
self.testerproxy = None
self.node = None
self.pyrodaemon_thread = Thread(
target=start_pyrodaemon,
args=(self, "msgtransfer2_testnode_"+str(myid)))
self.pyrodaemon_thread.start()
def start_testnode(self):
tp = Pyro4.Proxy("PYRONAME:msgtransfer2_tester")
print("got testerproxy", tp, flush=True)
imp_nodemodule = importlib.import_module(self.use[0])
print("creating_node_from_module", imp_nodemodule)
self.node = imp_nodemodule.Node(self.use, tp)
return
def send(self, msg):
print('testnode sending', msg, flush=True) # DEBUG
x = self.node.send(msg)
# print('testnode send return', x, flush=True) # DEBUG
return x
def recv(self):
return self.node.recv()
def end(self):
return self.node.end()
@Pyro4.oneway
def end_pyrodaemon(self):
self.pyrodaemon.shutdown()
##### end class TestNode #####################################
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("n", type=int,
help="number of nodes, should be 2")
parser.add_argument("myid", type=int,
help="id of this user system; 0 or 1")
parser.add_argument("--use", nargs=argparse.REMAINDER,
help="one or more --use node_descriptors")
args = parser.parse_args()
testnode = TestNode(args.myid, args.use)
testnode.pyrodaemon_thread.join()
time.sleep(10)