"""
Module **sesf.msgtransfer2.imp_0.node** has class ``Node`` that
implements a node of a msgtransfer2 implementation.
- Run ``python user_node 2 0 --use imp_0/node.py 2 0`` to start a
process executing a user node with address 0 that starts an imp_0 node
with address 0.
- Run ``python user_node 2 1 --use imp_0/node.py 2 1`` to start a
process executing a user node with address 1 that starts an imp_0 node
with address 1.
- Or run ``python start_nodes.py 2 user_node.py --use imp_0.node``
instead of the above two commands.
The two imp_0 node instances connect via TCP, and the resulting distributed
system implements msgtransfer2 service, which is used by the two user nodes.
Each user node has two concurrent threads, one sending messages and one
receving messages; after a while, the main thread in the process with
node 0 calls its imp_0 node's ``end()``, causing the two processes to end.
"""
import socket, struct, time, argparse, sys
from queue import Queue
from threading import Thread, Lock
import pickle as serializer
from random import randint
# from stackoverflow: because gethostname() may give an incorrect address
def getmyipaddr():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 0)) # UDP address, doesn't send packets
return s.getsockname()[0]
[docs]class Node():
"""
A node of a distributed system that implements the msgtransfer2 service.
The distributed system consists of two Node instances with addresses
0 and 1. For brevity, refer to the node at adddress j as node_j, where
j ranges over 0 and 1.
After node_j becomes connected, it maintains the following variables:
- ``sbuf`` (bytearray): buffer to hold an outgoing packet.
- ``sbufview``: memoryview of ``sbuf`` (slicing ``sbufview`` is cheap,
slicing ``sbuf`` is not).
- ``rbuf`` (bytearray): buffer to hold an incoming packet.
- ``rbufview``: memoryview of ``rbuf``.
- ``rcvdmsgqueue``: queue of incoming ``(flag,msg)`` tuples for local user.
- local thread named *recv*: receives packets from TCP socket and puts
them in ``rcvdmsgqueue``.
"""
# def __init__(self, num_nodes, myid, testerproxy=None):
def __init__(self, argv, testerproxy=None):
"""The two instances establish a TCP connection: instance 0 starts
accepting at port accptngport0 (hardcoded to 6000); instance 1 attempts
to connect to instance 0, retrying until success.
After instance j, j in (0,1), becomes connected, it creates:
- sbuf (bytearray): buffer to hold an outgoing packet.
- sbufview: memoryview of sbuf (slicing view, unlike sbuf, is cheap).
- rbuf (bytearray): buffer to hold an incoming packet.
- rbufview: memoryview of rbuf.
- rcvdmsgqueue: queue of incoming (flag,msg) tuples for local user.
- local thread executing recv_pkt_loop(): receive bytes from TCP socket,
extracts messages, adds (True,msg) or (False,) to rcvdmsgqueue.
Packet structure:
- packet header: 2B srcid (not used), 2B dstid (not used), 4B datalen.
- packet data: serialized msg of length datalen (up to 2**32 bytes).
Packet with datalen of 0 is an "END" packet.
"""
# argv: 'x_module/script n j xargs [--use xmodule n j xargs]^0
p = argparse.ArgumentParser()
p.add_argument("num_nodes", type=int)
p.add_argument("myid", type=int)
args = p.parse_args(argv[1:])
# Pyro4 proxy when running against the service tester
self.myid = args.myid
self.testerproxy = testerproxy
# for multi-machine network: use socket.gethostname() or getmyipaddr()
self.myipaddr = 'localhost'
self.ipaddr0 = 'localhost'
print("my ip addr", self.myipaddr, flush=True)
self.ending = False
# maxpktlen in bytes: increase this if needed
self.maxpktlen = 1024
# pkt header 8 bytes: src(h:2), dst(h:2), datalen(i:4)
self.pkthdrlen = 8
self.maxpktdatalen = self.maxpktlen - self.pkthdrlen
self.accptngport0 = 6000
# TCP socket; updated in join_network()
self.tcpsocket = None
# for atomic updates when user and recv threads try simultaneous sends
self.tcpsend_lock = Lock()
# join the network: connect to next channel and prev channel
self.join_network()
# buffer to hold outgoing packet from this user
self.sbuf = bytearray(self.maxpktlen)
self.sbufview = memoryview(self.sbuf)
# buffer to hold incoming packet (from prev channel)
self.rbuf = bytearray(self.maxpktlen)
self.rbufview = memoryview(self.rbuf)
# queue for messages for local user received from tcpsocket
self.rcvdmsgqueue = Queue()
# start thread receiving on tcpsocket
Thread(name="recv", target=self.recv_pkt_loop, args=(self.rbufview,)
).start()
print('channel', self.myid, 'started', flush=True)
def inform_tester(self, event):
"""Inform tester of local event"""
if self.testerproxy != None:
self.testerproxy.inform_tester(event)
def join_network(self):
if self.myid == 0:
# server socket
self.tcpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# may need to uncomment the next line on Mac OSX
# self.tcpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.tcpsocket.bind((self.myipaddr, self.accptngport0))
self.tcpsocket.listen(1)
# accept connect request from peer; reuse tcpsocket
(self.tcpsocket, self.addrin) = self.tcpsocket.accept()
print('Msgtransfer2', self.myid, 'accepted connect req from',
self.addrin, flush=True)
elif self.myid == 1:
self.tcpsocket = socket.socket()
while True:
try:
self.tcpsocket.connect((self.ipaddr0, self.accptngport0))
print('node', self.myid, 'connect req accepted', flush=True)
break
except:
print('node', self.myid, 'connect req refused', flush=True)
# sleep a bit and try again
time.sleep(1)
#### BEGIN leave network code ##################################
def close_tcpsocket_rcvdmsgqueueput(self):
self.tcpsocket.shutdown(socket.SHUT_RDWR)
self.tcpsocket.close()
self.rcvdmsgqueue.put((False,))
print('node', self.myid, 'closed tcpsocket and rcvdmsgqueue.put',
flush=True)
def end(self):
# send END packet
with self.tcpsend_lock:
self.ending = True
hdr = struct.pack('!hhi', 0, 0, 0)
self.send_bytes(self.pkthdrlen, hdr)
def handle_endmsg(self):
with self.tcpsend_lock:
if not self.ending:
self.ending = True
# send END msg
hdr = struct.pack('!hhi', 0, 0, 0)
self.send_bytes(self.pkthdrlen, hdr)
self.close_tcpsocket_rcvdmsgqueueput()
#### END leave network code ##################################
def send(self, msg):
"""Send msg to remote peer. Called by user."""
data = serializer.dumps(msg)
datalen = len(data)
if datalen == 0:
print('invalid send: serialized msg size is zero', flush=True)
return False
if datalen > self.maxpktdatalen:
print('invalid send: serialized msg size is too large', datalen, flush=True)
return False
# construct packet header
hdr = struct.pack('!hhi', 0, 0, datalen)
pktlen = len(hdr) + datalen
# grab lock because END msg be getting sent
with self.tcpsend_lock:
if self.ending:
return False
# place packet in sbuf
self.sbufview[:pktlen] = hdr + data
try:
# self.inform_tester((self.myid, self.myid, 0, msg))
self.send_bytes(pktlen, self.sbufview)
except:
# send error: perhaps end() was called or END msg received
return False
return True
def send_bytes(self, nbytes, bufview):
"""Send bufview[0:nbytes] bytes on tcpsocket"""
totalsent = 0
while totalsent < nbytes:
nsent = self.tcpsocket.send(bufview[totalsent:nbytes])
if nsent == 0:
raise Exception('send_bytes: tcpsocket closed')
totalsent += nsent
def recv(self):
try:
rval = self.rcvdmsgqueue.get()
except:
print('recv: rcvdmsgqueue closed', flush=True)
return (False,)
return rval
def recv_pkt_loop(self, bufview):
"""Repeatedly receive whole packet, extract msg, add (True,msg) to
rcvdmsgqueue, until tcpsocket is closed or END packet is recvd."""
while True:
(src, dst, datalen) = self.recv_pkt(bufview)
"""
try:
(src, dst, datalen) = self.recv_pkt(bufview)
except:
raise Exception('recv_pkt: tcpsocket closed')
"""
if datalen == 0:
self.handle_endmsg()
break
else:
msg = serializer.loads(
bufview[self.pkthdrlen:self.pkthdrlen+datalen])
self.rcvdmsgqueue.put((True,msg))
self.inform_tester(('checker', 'rcvd', self.myid, msg))
def recv_pkt(self, bufview):
"""Receive incoming packet into bufview. Return hdr tuple."""
# receive pkt header, and unpack src, dst, datalen
self.recv_bytes(self.pkthdrlen, bufview)
(src, dst, datalen) = struct.unpack('!hhi', bufview[:self.pkthdrlen])
if datalen > 0:
# receive data of packet
self.recv_bytes(datalen, bufview[self.pkthdrlen:])
return (src, dst, datalen)
def recv_bytes(self, nbytes, bufview):
"""Receive nbytes from tcpsocket.recv into bufview."""
totalrcvd = 0
while totalrcvd < nbytes:
try:
nrcvd = self.tcpsocket.recv_into(bufview, nbytes)
except:
raise RuntimeError('tcpsocket socket closed')
if nrcvd == 0:
raise Exception('tcpsocket socket closed')
bufview = bufview[nrcvd:] # slicing views is cheap
nbytes -= nrcvd
########### END class Node ####################################