3. A message-transfer service spanning two locations¶
This section defines a message-transfer service between two locations, identified by addresses 0 and 1, and a distributed program that implements the service using TCP sockets. The distributed program starts two instances of a “node” program, one at each address. Each node provides three functions to the local user: one to send a message, one to receive a message, and one to end the service. The programs use TCP’s connection opening and closing, and add message framing to TCP’s byte-stream data transfer.
The intended service is described, first informally and then in pseudo-Python and Python. We obtain a service model program and a service tester program, which is used to test our implementation. This example shows how Sesf handles distributed services and implementations, and service functions that return (non-void) values.
Intended service informally stated¶
The service spans two locations, identified by addresses 0 and 1. At each address, three functions can be called by (a local user in) the environment.
- send(msg): send message- msgto the other address.- Return Falseonly if the service is closed, else returnTrue.
- Call only if no send()call is ongoing at this address and only if no previoussend()call at this address has returnedFalse.
 
- Return 
- recv(): receive a message sent at the other end.- Return (False,)only if the service is closed, return(True, msg)only if incoming messagemsgis available, else block.
- Call only if no recv()call is ongoing at this address and only if no previousrecv()call at this address has returned(False,).
- Messages are received in the order they were sent at the other address.
 
- Return 
- end(): Close the service.- Return None.
- Call only if end()has not been called at any address.
 
- Return 
This, though informal, completely and unambiguously defines the intended service of the distributed program. The function calls define the inputs, the function returns define the outputs, and the constraints on function calls and return values define the set of acceptable sequences of inputs and outputs. Later we will cast it into a Python class.
An implementation using TCP¶
Module sesf.msgtransfer2.imp_0.node has class Node that
implements a node of a msgtransfer2 implementation.
- Run python user_node 2 0 --use imp_0/node.py 2 0to start a process executing a user node with address 0 that starts an imp_0 node with address 0.
- Run python user_node 2 1 --use imp_0/node.py 2 1to start a process executing a user node with address 1 that starts an imp_0 node with address 1.
- Or run python start_nodes.py 2 user_node.py --use imp_0.nodeinstead of the above two commands.
The two imp_0 node instances connect via TCP, and the resulting distributed
system implements msgtransfer2 service, which is used by the two user nodes.
Each user node has two concurrent threads, one sending messages and one
receving messages; after a while, the main thread in the process with
node 0 calls its imp_0 node’s end(), causing the two processes to end.
- 
class msgtransfer2.imp_0.node.Node(argv, testerproxy=None)[source]¶
- A node of a distributed system that implements the msgtransfer2 service. The distributed system consists of two Node instances with addresses 0 and 1. For brevity, refer to the node at adddress j as node_j, where j ranges over 0 and 1. - After node_j becomes connected, it maintains the following variables: - sbuf(bytearray): buffer to hold an outgoing packet.
- sbufview: memoryview of- sbuf(slicing- sbufviewis cheap, slicing- sbufis not).
- rbuf(bytearray): buffer to hold an incoming packet.
- rbufview: memoryview of- rbuf.
- rcvdmsgqueue: queue of incoming- (flag,msg)tuples for local user.
- local thread named recv: receives packets from TCP socket and puts
them in rcvdmsgqueue.
 - Initialization - The first step is to establish a TCP connection with its peer. node_0 starts accepting on a TCP server socket on port - accptngport0(hardcoded to 6000). node_1 starts a TCP client socket connecting to node_0’s ip address and port number, retrying until success. At some point, each side’s TCP socket becomes open.- Then node_j creates some buffers: - sbuf, to hold an outgoing packet;- rbuf, to hold an incoming packet; and- rcvdmsgqueue, to hold incoming- (flag,msg)tuples. Then node_j starts a local thread, named recv, to receive bytes from TCP. Then it returns, giving the caller access to its three “external” functions:- send(msg),- recv(), and- end().- Packet structure - A packet containing a user message - msghas the following structure: 8-byte header and variable-sized data consisting of the serialized (pickled)- msg. The 8-byte header consists of: a 2-byte- srcid(not used), a 2-byte- dstid(not used), and a 4-byte- datalen, equal to the size in bytes of the data. Thus the serialized- msgcan be at most 232 bytes.- There is also an “END” packet (used in closing). It has only a header with - datalenof 0.- User-callable functions - A - send(msg)call casts- msginto a packet, sends the byte sequence over TCP, and returns- (True,)if the TCP send succeeded and- (False,)otherwise.- A - recv()call returns the next- (flag,msg)tuple from- rcvdmsgqueue, blocking until the latter is not empty.- An - end()call sends an END packet and returns, after which- send()calls have no effect. (The peer responds to the END packet with its own END packet, and when that is received this system closes its TCP socket and ends.)- Receiving packets - The local thread recv repeatedly does the following: - receive 8 bytes (packet header) and get the value of datalen;
- if the value is not zero:
receive that many bytes (packet data);
unpickle it to a message, say msg; and add(True,msg)torcvdmsgqueue;
- if the value of datalenis zero (END packet): ifend()was not called locally, send an END packet; add(False,None)torcvdmsgqueue, and close the TCP socket. (node_j would typically end at some point after this.)
 
Running the processes on different machines¶
The two processes must at network locations reachable from each
other via TCP.
Class Node assumes that its peer node is on the same OS:
so they have the same IP address (“localhost”) and differ only in
their TCP port numbers.
This is convenient but not necessary.
To run the two processes on different machines,
each Node instance needs the IP address of the other
(easily done).
But you also have to deal with any intervening firewalls
(and network administrators).
Also, remote testing would not be as safe because these programs
are not secure (no authentication, pickle for serializing)
and they would be exposed to your network.
Start_nodes utility¶
The script
start_nodes.py
allows you to enter one command to start any number of nodes,
each running as a process on the same OS in its own shell.
Instead of entering
python imp_0/node.py 2 0 and python imp_0/node.py 2 1
in different shells,
you can run python start-nodes.py 2 imp_0/node.py in a shell.
The script uses function Popen from the subprocessing module
to start a process in a new shell.
(You may need to edit the arguments of the Popen call
for your operating system.
More recent versions of Python may take care of this.)
Starting the msgtransfer2 nodes from user nodes¶
Module sesf.msgtransfer2.user_node has class UserNode that
implements a user node that starts a given msgtransfer2 node.
If the peer user node is also started, the two user nodes exercise
the msgtransfer2 service provided by the two msgtransfer2 nodes.
For example:
- Run python user_node.py 2 0 --use imp_0.node 2 0to start a process that startsUserNode0 which startsmsgtransfer2.imp_0.Node0.
- Run python user_node.py 2 1 --use imp_0.node 2 1to start a process that startsUserNode1 which startsmsgtransfer2.imp_0.Node1.
- Or, instead of the above two commands, run
python ../start_nodes.py 2 user_node.py --use imp_0.node.)
The two user nodes use the service provided by the two msgtransfer2 nodes to exchange a few messages, then end the msgtransfer2 service and the two processes.
The user nodes can also make use of a running msgtransfer2 servicemodel,
instead of a msgtransfer2 implementation. Simply replace imp_0.node
by service_node in the above commands.
Service program (abstract version)¶
We now define a “pseudo-Python” class Service in
msgtransfer2.service.ppy,
that expresses the intended service formally.
We say “pseudo-Python” because it has non-Python constructs
(namely, CC, CU, RETURN, RC, RU),
which are explained below.
Conceptually, the class is a program that can,
at any point in its execution,
receive any (and only an) acceptable input
and can return any acceptable output.
The class has the following structure.
class Service():
  def __init__(self):
    <instance variables>
  def send(self, j, msg):
    <call part>
    <return part>
  def recv(self, j):
    <call part>
    <return part>
  def end(self, j):
    <call part>
    <return part>
The class has three non-init functions:
send(j,msg), recv(j) and end(j),
corresponding to the functions
send(msg), recv() and end() at address j
of an implementation of the service.
Note
Convention: Throughout the class,
parameter j is an address (0 or 1).
The __init__ function defines variables adequate to express
when a non-init function can be called
and the possible values it can return.
Every non-init function has a call part followed by a return part.
Each part is executed atomically.
The call part checks whether the function call is valid,
and if so updates the instance variables.
The return part checks whether the function can return,
and if so generates a return value, if any,
and updates the instance variables.
Here is the function __init__():
  def __init__(self):
    """Throughout j ranges over 0..1:
    - ending is true iff end(0) or end(1) has been called.
    - ongoing_send[j] is true iff a send(j,.) call is ongoing. j in 0..1
    - closed_send[j] is true iff a send(j,.) call has returned False.
    - ongoing_recv[j] is true iff a recv(j,.) call is ongoing.
    - closed_recv[j] is true iff a recv(j,.) call has returned (False,).
    - sent[j] is the sequence of messages sent in send(j,.) calls.
    - nrcvd[j] is the number of messages returned in recv(j) calls.
    """
    self.ending = False
    self.ongoing_send = [False, False]
    self.closed_send = [False, False]
    self.ongoing_recv = [False, False]
    self.closed_recv = [False, False]
    self.sent = [[], []]
    self.nrcvd = [0, 0]
Here is the function send(j, msg),
with the call and return parts indicated by the comments at right.
  def send(self, j, msg):
    CIC:
      j in (0, 1) and                                       # call part
      not self.ongoing_send[j] and not self.closed_send[j]  #  "  "  "
    CU:                                                     #  "  "  "
      self.sent[j].append(msg)                              #  "  "  "
      self.ongoing_send[j] = True                           #  "  "  "
    RETURN(rval):                                           # return part
      ROC:                                                   #  "  "  "
        rval == True or (rval == False and self.ending)     #  "  "  "
      RU:                                                   #  "  "  "
        self.ongoing_send[j] = False                        #  "  "  "
        if not rval:                                        #  "  "  "
          self.closed_send[j] = True                        #  "  "  "
        return rval                                         #  "  "  "
The call part has two components:
CIC, which stands for call input condition;
and CU, which stands short for call update.
When a thread comes to the call condition,
it does the following in one atomic step:
evaluate the call condition;
if true (which means the call is valid)
execute the call update, else abort.
The return part has three components:
RETURN,
which defines a return parameter, rval;
ROC, which stands for return output condition;
and RU, which stands for return update.
A thread at RETURN is blocked unless
the return output condition is true for some value, say x, of rval,
in which case it does the following atomically:
set rval to x;
execute the return update (which returns (x,)).
Thus the return output condition implicitly assigns
a value to the return parameter.
Short-circuit evaluation does not apply here.
If ending is true, the function can return True or False,
and the choice is non-deterministic.
Here is the function recv(j):
  def recv(self, j):
    CIC:
      (j in (0, 1) and
       not self.ongoing_recv[j] and not self.closed_recv[j])
    CU:
      self.ongoing_recv[j] = True
    RETURN((flag, msg)):
      # rval is (False,) | (True, msg)
      ROC:
       ((flag == False and self.ending) or
        (flag == True and self.nrcvd[j] < len(self.sent[1-j])
         and msg == self.sent[1-j][self.nrcvd[j]]))
      RU:
        self.ongoing_recv[j] = False
        if flag == False:
          self.closed_recv[j] = True
          return (flag,)
        else:
          self.nrcvd[j] += 1
        return (flag, msg)
The return parameter is a tuple (flag, msg).
The first disjunct in the return condition can be chosen
if ending is true;
it sets flag to false and msg to None.
The second disjunct can be chosen if
there is an incoming message to be received;
it sets flag to true and msg to the next “in-order” message
sent at the remote address.
The two disjuncts are not mutually exclusive:
if both are true, either can happen and the choice is non-deterministic.
Here is the function end(j):
  def end(self, j):
    CIC:
      j in (0,1) and not self.ending
    CU:
      self.ending = True
    RETURN():
      ROC:
        True
      RU:
        pass
Non-determinism
As noted above, when a return condition allows more than one value for a return parameter, one of these values is non-deterministically chosen. This non-determinism is essential for defining services of multi-threaded programs. Encoding it by a boolean condition makes it easy to understand the service program and to use it in proving that an implementation satisfies the service.
Service program (concrete version)¶
Recall that we want the service to yield executable servicetester and service programs.
To obtain a service program, we need to implement the non-Python constructs in Python. The call part can be easily done using locks and condition variables. The return part is not as easy because of the return condition: we have to get Python code equivalent to the non-deterministic assignment of return values.
We do this in two steps.
The first step is to obtain a version of class Service
in which each component (CC, CU, RC, RU) of a function
is represented by a separate Python function.
Consider a function f in the pseudo-Python class:
def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ...
This function is broken up into four functions.
(Below f.CIC refers to the body of CIC in function f;
ditto for f.CU, f.ROC, f.RU).
def f_CIC(self, j, args): return f.CC def f_CU(self, j, args): f.CU def f_ROC(self, j): if f.ROC does not hold for any value of rval: return (0,) else: randomly select a value x of rval such that f.RC holds return (1, x) def f_RU(self, j, args, rval): f.RU
Barring f_ROC, the above functions can be obtained mechanically.
Given these functions, it is straightforward to use standard Python
synchronization constructs (e.g., locks and condition variables)
to obtain the service-model.
Obtaining the servicetester program is similar,
but with the roles of the call condition and return condition switched.
That is, the tester can make a call to an implementation function f
iff f.COC is true for some value x of function parameters,
and those values become the arguments of the call.
When the implementation returns the call,
the tester has to check that the return satisfies f.RIC.
That is, for each call condition we need an output version (CCO),
and for each return condition we need an input version (RCI).
Module sesf.msgtransfer2.serviceparts has class ServiceParts,
which has the component functions of sesf.msgtransfer2.service.ppy.
Each function f in the pseudo-Python class gives rise to six functions:
f_CIC, f_COC, f_CU, f_RIC, f_ROC, f_RU.
Consider a function f in the pseudo-Python class:
def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ...
To make the “synchronization harness”,
the model class can use a lock, say lck,
and an associated condition variable, say cond.
Then the function f in the model class would be:
def f(self, j, args): # do_service_input with cond: icval = f_CIC(j, args) if not icval: raise Exception("input call condition violated") f_CU(j, args) cond.notify_all() # do_service_output with cond: while True: oval = f_ROC(j) if oval == (0,): # output return condition unsatisfiable cond.wait() else: # oval == (1,x); # output return condition satisfied with output value x break # oval == (1,x) oval = oval[1:] # oval == (x) f_RU(j, oval) cond.notify_all() return rval
It’s convenient to treat the # do_service_input part as a function
with f_CIC and f_CU as parameters.
Similarly, treat the # do_service_output part as a function
with f_ROC and f_RU as parameters.
These functions (with a slight tweak so that oval is always a tuple)
are here:
- 
util.service.do_service_input(j, check_cond, do_update, args, cond)[source]¶
- Service input-part factory, used in service model and service tester. The arguments are: - j (int): node id for the input step.
- check_cond (fn): CCI or RCI function from service.
- do_update (fn): CU or RU function from service.
- args: tuple of arguments supplied by implementation; (None,) if none.
- cond (Condition): condition used in service model or service tester.
 
- 
util.service.do_service_output(j, get_args, do_update, cond)[source]¶
- Service output-part factory, used in service model and service tester. The arguments are: - j (int): node id for the input step.
- get_args (fn): CCO or RCO function from service.
- do_update (fn): CU or RU function from service.
- cond (Condition): condition used in service tester or service model.
 
What remains is to redirect user j’s calls of msgtransfer2 functions to calls of the corresponding functions in the model. RPC (remote procedure call) is a natural choice here (because the users are distributed). We use Pyro4. The framework has three parts.
- A Pyro4 nameserver runs in the background.
- The model starts a Pyro4 daemon (to receive RPCs)
and registers itself with the nameserver
(with name sesf.msgtransfer2.service).
- Each user j gets the address of daemon sesf.msgtransfer2.servicefrom the nameserver and creates three proxies for the model. User j can call a function of the model by prefixing the call with a proxy’s name. Why three proxies? Because user j can potentially have three concurrent calls to the model (one for each ofsend(),recv()andend()).
Service node¶
Instead of adding Pyro4 code to the node that uses the service
(eg, msgtransfer2.user_node node),
we put the Pyro4 code in a class msgtransfer2.service_node.Node,
and have the user node use the service node
(instead of an implementation node).
Module sesf.msgtransfer2.service_node has class Node that
has the signature of a msgtransfer2 implementation node
and redirects incoming calls to RPC calls to a running msgtransfer2.service
with pyrodaemon sesf.msgtransfer2.service.
Service program¶
Module sesf.msgtransfer2.service has class Service, which is 
the concrete version of msgtransfer2 service. RPC-accessable via pyrodaemon 
sesf.msgtransfer2.service.  Run python service.py to start
a Service instance, which can then be accessed by a distributed program
that uses msgtransfer2 service.
Running user nodes over the service¶
The msgtransfer2 user node class described above can make use of the service (instead of an implementation).
- Start a Pyro4 nameserver if one is not already running
(run python -m Pyro4.naming &).
- Run python servicemodel.py, to start the service program.
- Run python user_node.py 2 0 --use service_node 2 0.
- Run python user_node.py 2 1 --use service_node 2 1.
- Or run python start_nodes.py 2 user_node.py --use service_nodeinstead of the last two commands.
Servicetester (abstract version)¶
TODO
Servicetester (concrete version)¶
Recall that a servicetester program is a program that can be used in place of the users of the service, for testing a (distributed) implementation of the service.
The servicetester class, ServiceTester,
would have the signature of the pseudo-Python service
but with inputs and outputs exchanged.
For each function f in the pseudo-Python service,
the tester would have a function, say do_f (the name doesn’t matter),
whose body would be a “synchronization harness” in which functions
f_CCO, f_CU, f_RCI, f_RU (from the Python service)
and a RPC call to node_j.f are inserted.
Consider a function f in the pseudo-Python class:
def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ...
The function do_f would be as follows,
where the synchronization harness uses
lock lck and associated condition variable cond,
and rpc_m[j] is an RPC proxy for implementation node j.
def do_f(self, j): # do_service_output with cond: while True: oval = f_COC(j) if oval == (0,): # call condition unsatisfiable cond.wait() else: # oval == (1, x) # output call condition satisfied with args value x break # oval == (1, x) oval = oval[1:] # oval == (x) f_CU(j, oval) cond.notify_all() # call the implemenation function f(oval) at address j # rpc_mp[j] is an RPC proxy for node_j irval = rpc_mp[j].f(ccargs) # do_service_input with cond: rcval = f_RIC(j, irval) if not rcval: raise Exception("return condition violated") f_RU(j, irval) cond.notify_all() # irval may be relevant to ending the tester return irval
What remains is the Pyro4 RPC glue. The framework has three parts.
- A Pyro4 nameserver runs in the background.
- In the implementation, node j starts a Pyro4 daemon (to receive RPCs) and registers itself with the nameserver.
- The service-tester, in its __init__function, gets the addresses of these daemons from the nameserver and creates three proxies for each node j. (These are the proxies used in thedo_ffunctions to call thesend,recvandendfunctions of node j.)
class TestNode¶
Instead of adding Pyro4 code to class Node,
we put it in a wrapper class TestNode,
to minimize modifications to Msgtransfer2_imp0.
class ServiceTester¶
Module sesf.msgtransfer2.servicetester has class ServiceTester
that implements the msgtransfer2 servicetester.
This can test a distributed implementation of the msgtransfer2 service.
For example, to test a distributed system of two imp_0 nodes:
- Start a Pyro4 nameserver if one is not already running:
run python -m Pyro4.naming &.
- Run python test_node.py 2 0 --use imp_0.node 2 0andpython test_node.py 2 1 --use imp_0.node 2 1. (Orpython start_nodes.py 2 test_node.py --use imp_0.node.)
- Run python servicetester.py 2 --checker imp_0.checker 2to test against the service and the implementation-specific assertions inimp_0.checker.Checker. (Orpython servicetester.py 2to test only against the service.)
Checker for implementation imp_0¶
- 
class msgtransfer2.imp_0.checker.Checker(servicetester, argv)[source]¶
- Maintain the global state of a distributed system of msgtransfer.imp0 nodes and check some assertions. Created by msgtransfer2.servicetester when testing the distributed system. Maintains the following variables: - tester: the tester object that started this checker object
- sent: sent state maintained by tester (actually tester.service) object
- rcvd_counts: list of two integers initially both 0. rcvd_counts[j] is number of packets received at node j, for j in 0..1.
 - Has a function - handle_event(e)that takes an event- ereported by a node, updates the global state, and checks whether an assertion still holds.- RPC-called by an node to report an event eat the node.eis a('checker','rcvd',j,msg)tuple, indicating the arrival at node j of packet whose data ismsgserialized.
- Assertion checked: for every event (‘checker’, ‘rcvd’, j, msg): sent[1-j][rcvd_counts[j]] == msg