Source code for msgtransfer2.imp_0.node

"""
Module **sesf.msgtransfer2.imp_0.node** has class ``Node`` that
implements a node of a msgtransfer2 implementation.

- Run ``python user_node 2 0 --use imp_0/node.py 2 0`` to start a
  process executing a user node with address 0 that starts an imp_0 node
  with address 0.
- Run ``python user_node 2 1 --use imp_0/node.py 2 1`` to start a
  process executing a user node with address 1 that starts an imp_0 node
  with address 1.
- Or run ``python start_nodes.py 2 user_node.py --use imp_0.node``
  instead of the above two commands.

The two imp_0 node instances connect via TCP, and the resulting distributed
system implements msgtransfer2 service, which is used by the two user nodes.
Each user node has two concurrent threads, one sending messages and one
receving messages; after a while, the main thread in the process with
node 0 calls its imp_0 node's ``end()``, causing the two processes to end.
"""

import socket, struct, time, argparse, sys
from queue import Queue
from threading import Thread, Lock
import pickle as serializer
from random import randint


# from stackoverflow: because gethostname() may give an incorrect address
def getmyipaddr():
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.connect(('8.8.8.8', 0))  # UDP address, doesn't send packets
  return s.getsockname()[0]

[docs]class Node(): """ A node of a distributed system that implements the msgtransfer2 service. The distributed system consists of two Node instances with addresses 0 and 1. For brevity, refer to the node at adddress j as node_j, where j ranges over 0 and 1. After node_j becomes connected, it maintains the following variables: - ``sbuf`` (bytearray): buffer to hold an outgoing packet. - ``sbufview``: memoryview of ``sbuf`` (slicing ``sbufview`` is cheap, slicing ``sbuf`` is not). - ``rbuf`` (bytearray): buffer to hold an incoming packet. - ``rbufview``: memoryview of ``rbuf``. - ``rcvdmsgqueue``: queue of incoming ``(flag,msg)`` tuples for local user. - local thread named *recv*: receives packets from TCP socket and puts them in ``rcvdmsgqueue``. """ # def __init__(self, num_nodes, myid, testerproxy=None): def __init__(self, argv, testerproxy=None): """The two instances establish a TCP connection: instance 0 starts accepting at port accptngport0 (hardcoded to 6000); instance 1 attempts to connect to instance 0, retrying until success. After instance j, j in (0,1), becomes connected, it creates: - sbuf (bytearray): buffer to hold an outgoing packet. - sbufview: memoryview of sbuf (slicing view, unlike sbuf, is cheap). - rbuf (bytearray): buffer to hold an incoming packet. - rbufview: memoryview of rbuf. - rcvdmsgqueue: queue of incoming (flag,msg) tuples for local user. - local thread executing recv_pkt_loop(): receive bytes from TCP socket, extracts messages, adds (True,msg) or (False,) to rcvdmsgqueue. Packet structure: - packet header: 2B srcid (not used), 2B dstid (not used), 4B datalen. - packet data: serialized msg of length datalen (up to 2**32 bytes). Packet with datalen of 0 is an "END" packet. """ # argv: 'x_module/script n j xargs [--use xmodule n j xargs]^0 p = argparse.ArgumentParser() p.add_argument("num_nodes", type=int) p.add_argument("myid", type=int) args = p.parse_args(argv[1:]) # Pyro4 proxy when running against the service tester self.myid = args.myid self.testerproxy = testerproxy # for multi-machine network: use socket.gethostname() or getmyipaddr() self.myipaddr = 'localhost' self.ipaddr0 = 'localhost' print("my ip addr", self.myipaddr, flush=True) self.ending = False # maxpktlen in bytes: increase this if needed self.maxpktlen = 1024 # pkt header 8 bytes: src(h:2), dst(h:2), datalen(i:4) self.pkthdrlen = 8 self.maxpktdatalen = self.maxpktlen - self.pkthdrlen self.accptngport0 = 6000 # TCP socket; updated in join_network() self.tcpsocket = None # for atomic updates when user and recv threads try simultaneous sends self.tcpsend_lock = Lock() # join the network: connect to next channel and prev channel self.join_network() # buffer to hold outgoing packet from this user self.sbuf = bytearray(self.maxpktlen) self.sbufview = memoryview(self.sbuf) # buffer to hold incoming packet (from prev channel) self.rbuf = bytearray(self.maxpktlen) self.rbufview = memoryview(self.rbuf) # queue for messages for local user received from tcpsocket self.rcvdmsgqueue = Queue() # start thread receiving on tcpsocket Thread(name="recv", target=self.recv_pkt_loop, args=(self.rbufview,) ).start() print('channel', self.myid, 'started', flush=True) def inform_tester(self, event): """Inform tester of local event""" if self.testerproxy != None: self.testerproxy.inform_tester(event) def join_network(self): if self.myid == 0: # server socket self.tcpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # may need to uncomment the next line on Mac OSX # self.tcpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.tcpsocket.bind((self.myipaddr, self.accptngport0)) self.tcpsocket.listen(1) # accept connect request from peer; reuse tcpsocket (self.tcpsocket, self.addrin) = self.tcpsocket.accept() print('Msgtransfer2', self.myid, 'accepted connect req from', self.addrin, flush=True) elif self.myid == 1: self.tcpsocket = socket.socket() while True: try: self.tcpsocket.connect((self.ipaddr0, self.accptngport0)) print('node', self.myid, 'connect req accepted', flush=True) break except: print('node', self.myid, 'connect req refused', flush=True) # sleep a bit and try again time.sleep(1) #### BEGIN leave network code ################################## def close_tcpsocket_rcvdmsgqueueput(self): self.tcpsocket.shutdown(socket.SHUT_RDWR) self.tcpsocket.close() self.rcvdmsgqueue.put((False,)) print('node', self.myid, 'closed tcpsocket and rcvdmsgqueue.put', flush=True) def end(self): # send END packet with self.tcpsend_lock: self.ending = True hdr = struct.pack('!hhi', 0, 0, 0) self.send_bytes(self.pkthdrlen, hdr) def handle_endmsg(self): with self.tcpsend_lock: if not self.ending: self.ending = True # send END msg hdr = struct.pack('!hhi', 0, 0, 0) self.send_bytes(self.pkthdrlen, hdr) self.close_tcpsocket_rcvdmsgqueueput() #### END leave network code ################################## def send(self, msg): """Send msg to remote peer. Called by user.""" data = serializer.dumps(msg) datalen = len(data) if datalen == 0: print('invalid send: serialized msg size is zero', flush=True) return False if datalen > self.maxpktdatalen: print('invalid send: serialized msg size is too large', datalen, flush=True) return False # construct packet header hdr = struct.pack('!hhi', 0, 0, datalen) pktlen = len(hdr) + datalen # grab lock because END msg be getting sent with self.tcpsend_lock: if self.ending: return False # place packet in sbuf self.sbufview[:pktlen] = hdr + data try: # self.inform_tester((self.myid, self.myid, 0, msg)) self.send_bytes(pktlen, self.sbufview) except: # send error: perhaps end() was called or END msg received return False return True def send_bytes(self, nbytes, bufview): """Send bufview[0:nbytes] bytes on tcpsocket""" totalsent = 0 while totalsent < nbytes: nsent = self.tcpsocket.send(bufview[totalsent:nbytes]) if nsent == 0: raise Exception('send_bytes: tcpsocket closed') totalsent += nsent def recv(self): try: rval = self.rcvdmsgqueue.get() except: print('recv: rcvdmsgqueue closed', flush=True) return (False,) return rval def recv_pkt_loop(self, bufview): """Repeatedly receive whole packet, extract msg, add (True,msg) to rcvdmsgqueue, until tcpsocket is closed or END packet is recvd.""" while True: (src, dst, datalen) = self.recv_pkt(bufview) """ try: (src, dst, datalen) = self.recv_pkt(bufview) except: raise Exception('recv_pkt: tcpsocket closed') """ if datalen == 0: self.handle_endmsg() break else: msg = serializer.loads( bufview[self.pkthdrlen:self.pkthdrlen+datalen]) self.rcvdmsgqueue.put((True,msg)) self.inform_tester(('checker', 'rcvd', self.myid, msg)) def recv_pkt(self, bufview): """Receive incoming packet into bufview. Return hdr tuple.""" # receive pkt header, and unpack src, dst, datalen self.recv_bytes(self.pkthdrlen, bufview) (src, dst, datalen) = struct.unpack('!hhi', bufview[:self.pkthdrlen]) if datalen > 0: # receive data of packet self.recv_bytes(datalen, bufview[self.pkthdrlen:]) return (src, dst, datalen) def recv_bytes(self, nbytes, bufview): """Receive nbytes from tcpsocket.recv into bufview.""" totalrcvd = 0 while totalrcvd < nbytes: try: nrcvd = self.tcpsocket.recv_into(bufview, nbytes) except: raise RuntimeError('tcpsocket socket closed') if nrcvd == 0: raise Exception('tcpsocket socket closed') bufview = bufview[nrcvd:] # slicing views is cheap nbytes -= nrcvd
########### END class Node ####################################