"""
Module **sesf.msgtransfer.imp_0.node** has class ``Node`` that implements
a node of a msgtransfer service implementation for N addresses, N |geq| 2.
Run the N commands ``python user_node.py N j --use imp_0/node.py N j``
for ``j`` ranging over 0..N-1 to start, exercise and close a msgtransfer
service spanning N locations.
(Or run ``python ../start_nodes.py N user_node.py --use imp_0.node``.)
Process j starts a user node with address j that starts an imp_0 node
with address j. The N imp_0 nodes implement a msgtransfer service over
N addresses. This service is exercised by the user nodes. Each user node
has two concurrent threads, one sending messages and one receving messages;
after a while, the main thread in the process with address 0 calls
its imp_0 node's ``end()``, causing the network to shutdown.
"""
import pickle as serializer
import socket, struct, time, argparse, sys
from queue import Queue
from threading import Thread, Lock
from random import randint
# def myprint(*args):
# print(*args, flush=True)
# from stackoverflow: because gethostname() may give an incorrect address
def getmyipaddr():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 0)) # UDP address, doesn't send packets
return s.getsockname()[0]
[docs]class Node():
"""
An instance of this is a node of a distributed system that implements
the msgtransfer service. Assumes it is in a network of *num_nodes* nodes
(in different processes), and has address *myid*, where *myid* is a
a value in 0, ..., *num_nodes*-1.
For brevity, below we say "n" for *num_nodes*, "j" for *myid*,
and "Node(j)" for the ``Node`` instance with address j.
If Node(j), ..., Node(n-1) have been started, they form a ring of
TCP links, with Node(j) connected to Node((j+1)%n).
During initialization, Node(j) forms TCP connections with Noden((j-1)%n)
and Node((j+1)%n) as follows:
- If j is 0: Node(0) connects to Node(1) at port 6001, retrying until
success. Then it waits to accept a TCP connect, which would be from
Node(n-1), on port ``accptngport0`` (= 6000).
- If j is in 1, ..., n-1: Node(j) waits to accept a TCP connect
(which would be from Node(j-1)) on port accptngport0 + j. After it
accepts this connect, Node(j) connects to Node(j+1) at port
``accptngport0``+j+1 (retrying until success), except for Node(n-1),
which connects to Node(0) at port ``accptngport0``.
After Node(j) has opened its two TCP connections, it is ready to accept
calls to its ``send``, ``recv`` and ``end`` functions. At this point,
Node(j) maintains the following variables:
- ``ending``: True iff ``end`` has been called or END msg has been received.
- ``accptngport0`` = 6000 (hard-coded).
- ``accptngsocket`` = TCP socket to accept connect requests
(at port ``accptngport0 + j``).
- ``outsocket`` = TCP socket connected to Node(j+1).
- ``insocket`` = TCP socket connected to Node(j-1).
- ``sbuf`` (bytearray): buffer to hold an outgoing packet.
- ``sbufview``: memoryview of ``sbuf`` (slicing ``sbufview`` is cheap).
- ``rbuf`` (bytearray): buffer to hold an incoming packet.
- ``rbufview``: memoryview of ``rbuf``.
- ``rcvdmsgqueue``: queue of incoming ``(flag,msg)`` tuples for local user.
- ``testerproxy``: Used iff Node(j) is started within a test node,
in which case Node(j) can call ``testerproxy.inform_tester(e)`` to
inform the servicetester of local event ``e``.
Each user message is placed in a packet with an 8-byte header
(consisting of 2-byte src id, 2-byte dst id, 4-byte datalen) and data
of length datalen (consisting of the serialized user message).
Packets travel only in one direction of the ring. A message sent by Node(j)
to Node(k) goes via nodes j+1, j+2, ..., k. Each node has a local thread
that receives packets from ``insocket`` (from the previous node) and puts
them in ``rcvdmsgqueue`` or forwards them to ``outsocket`` (to the next
node).
The end function should be called at one address only. The call initiates
an END message that travels along the ring, closing each node it encounters.
The END message has a datalen field of 0.
"""
# def __init__(self, num_nodes, myid, testerproxy=None):
def __init__(self, argv, testerproxy=None):
# import pdb; pdb.set_trace()
print("msgtransfer.imp_0.node init", argv, testerproxy, flush=True)
self.testerproxy = testerproxy
p = argparse.ArgumentParser()
p.add_argument("num_nodes", type=int)
p.add_argument("myid", type=int)
args = p.parse_args(argv[1:])
self.num_nodes = args.num_nodes
self.myid = args.myid
# for multi-machine network: use socket.gethostname() or getmyipaddr()
self.myipaddr = 'localhost'
print("My ip addr", self.myipaddr, flush=True)
self.ending = False
# maxpktlen in bytes: increase this if needed (can go up to 2**32 - 8)
self.maxpktlen = 1024
# pkt header 8 bytes: src(h:2), dst(h:2), datalen(i:4)
self.pkthdrlen = 8
self.maxpktdatalen = self.maxpktlen - self.pkthdrlen
# accptngport number for mt 0: HARD-CODED
self.accptngport0 = 6000
# server socket: to accept connect request (from prev mtnode)
self.accptngsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.accptngsocket.bind((self.myipaddr, self.accptngport0 + self.myid))
self.accptngsocket.listen(1)
# client socket: connected to next msgtransfer, to send packets
self.outsocket = socket.socket()
# for atomic updates when user and recv threads try simultaneous sends
self.outsocket_lock = Lock()
# client socket: connected to prev mtnode (via accept), to recv packets
self.insocket = None
# join the network: connect to next mtnode and prev mtnode
self.join_network()
# buffer to hold outgoing packet from this user
self.sbuf = bytearray(self.maxpktlen)
self.sbufview = memoryview(self.sbuf)
# buffer to hold incoming packet (from prev mtnode)
self.rbuf = bytearray(self.maxpktlen)
self.rbufview = memoryview(self.rbuf)
# queue for messages for local user received from insocket
self.rcvdmsgqueue = Queue()
# start thread receiving on insocket
Thread(name="recv", target=self.recv_pkt_loop, args=(self.rbufview,)
).start()
print('mt_node', self.myid, 'started', flush=True)
def inform_tester(self, event):
"""Inform tester of local event"""
if self.testerproxy != None:
print("NODE: CALLING inform_tester:", event, flush=True)
self.testerproxy.inform_tester(event)
# def nextid(self):
# return (self.myid + 1) % self.num_nodes
#### BEGIN join network code ###############################
def join_network(self):
if self.myid == 0:
self.connect_next()
self.accept_prev()
else:
self.accept_prev()
self.connect_next()
def connect_next(self):
nextid = (self.myid + 1) % self.num_nodes
while True:
try:
self.outsocket.connect((self.myipaddr,
self.accptngport0 + nextid))
print('msgtransfer', self.myid, 'connect req to', nextid,
'accepted', flush=True)
# import pdb; pdb.set_trace()
self.inform_tester(('checker', 'linkup', self.myid, nextid))
break
except:
print('mtnode', self.myid, 'connect req to', nextid, 'refused',
flush=True)
# sleep a bit and try again
time.sleep(1)
def accept_prev(self):
(self.insocket, self.addrin) = self.accptngsocket.accept()
print('msgtransfer', self.myid, 'accepted connect req from',
self.addrin, flush=True)
#### END join network code ##################################
#### BEGIN leave network code ##################################
def close_outsocket_rcvdmsgqueue(self):
with self.outsocket_lock:
self.ending = True
hdr = struct.pack('!hhi', 0, 0, 0)
self.send_bytes(self.pkthdrlen, hdr)
self.outsocket.shutdown(socket.SHUT_RDWR)
self.outsocket.close()
print('msgtransfer', self.myid, 'outsocket closed', flush=True)
self.rcvdmsgqueue.put((False,))
print('rcvdmsgqueue put closed', flush=True)
def end(self):
print('END CALLED', flush=True)
self.close_outsocket_rcvdmsgqueue()
print('END RETURNED', flush=True)
def handle_endmsg(self):
if not self.ending:
self.close_outsocket_rcvdmsgqueue()
self.insocket.close()
previd = (self.myid - 1) % self.num_nodes
self.inform_tester(('checker', 'linkdown', previd, self.myid))
print('msgtransfer', self.myid, 'insocket closed', flush=True)
#### END leave network code ##################################
def send(self, dst, msg):
"""Send msg to dst. User calls this."""
if dst < 0 or dst >= self.num_nodes or dst == self.myid:
print('send: invalid destination', dst, flush=True)
return False
data = serializer.dumps(msg)
datalen = len(data)
if datalen == 0:
print('send: null msg', flush=True)
return False
if datalen > self.maxpktdatalen:
print('send: serialized msg size is too large', datalen, flush=True)
return None
# construct packet header
hdr = struct.pack('!hhi', self.myid, dst, datalen)
pktlen = len(hdr) + datalen
with self.outsocket_lock:
if self.ending:
return False
# place packet in sbuf
self.sbufview[:pktlen] = hdr + data
try:
self.inform_tester(('checker', 'sent', self.myid, dst, msg))
self.send_bytes(pktlen, self.sbufview)
except:
# here if outsocket is closed (eg, another thread doing end())
return False
return True
def send_bytes(self, nbytes, bufview):
"""Send bufview[0:nbytes] bytes on outsocket"""
totalsent = 0
while totalsent < nbytes:
nsent = self.outsocket.send(bufview[totalsent:nbytes])
if nsent == 0:
raise Exception('send_bytes: outsocket closed')
totalsent += nsent
def recv(self):
try:
rval = self.rcvdmsgqueue.get()
except:
print('recv: rcvdmsgqueue closed', flush=True)
return (False,)
return rval
def recv_pkt_loop(self, bufview):
"""Repeatedly receive whole packet and either add to rcvdmsgqueue or
send to next msgtransfer, until insocket is closed or END msg is recvd."""
while True:
try:
(src, dst, datalen) = self.recv_pkt(bufview)
except:
raise Exception('recv_pkt: insocket closed')
if datalen == 0:
self.handle_endmsg()
break
if dst == self.myid:
msg = serializer.loads(
bufview[self.pkthdrlen:self.pkthdrlen+datalen])
self.rcvdmsgqueue.put((True,msg))
self.inform_tester(('checker', 'rcvd', self.myid, dst, msg))
else:
msg = serializer.loads(
bufview[self.pkthdrlen:self.pkthdrlen+datalen])
self.inform_tester(('checker', 'fwd', self.myid, dst, msg))
with self.outsocket_lock:
if not self.ending:
self.send_bytes(self.pkthdrlen + datalen, bufview)
def recv_pkt(self, bufview):
"""Receive incoming packet (bytes: hdr(8) + data) into bufview.
Return hdr tuple.
"""
# receive pkt header, and unpack src, dst, datalen
self.recv_bytes(self.pkthdrlen, bufview)
(src, dst, datalen) = struct.unpack('!hhi', bufview[:self.pkthdrlen])
if datalen > 0:
self.recv_bytes(datalen, bufview[self.pkthdrlen:])
return (src, dst, datalen)
def recv_bytes(self, nbytes, bufview):
"""Receive nbytes from insocket.recv into bufview."""
totalrcvd = 0
while totalrcvd < nbytes:
nrcvd = self.insocket.recv_into(bufview, nbytes)
if nrcvd == 0:
raise RuntimeError('insocket connection broken')
bufview = bufview[nrcvd:] # slicing views is cheap
nbytes -= nrcvd
##### END class Node in module msgtransfer.imp0.node ##################