3. A message-transfer service spanning two locations¶
This section defines a message-transfer service between two locations, identified by addresses 0 and 1, and a distributed program that implements the service using TCP sockets. The distributed program starts two instances of a “node” program, one at each address. Each node provides three functions to the local user: one to send a message, one to receive a message, and one to end the service. The programs use TCP’s connection opening and closing, and add message framing to TCP’s byte-stream data transfer.
The intended service is described, first informally and then in pseudo-Python and Python. We obtain a service model program and a service tester program, which is used to test our implementation. This example shows how Sesf handles distributed services and implementations, and service functions that return (non-void) values.
Intended service informally stated¶
The service spans two locations, identified by addresses 0 and 1. At each address, three functions can be called by (a local user in) the environment.
send(msg)
: send messagemsg
to the other address.- Return
False
only if the service is closed, else returnTrue
. - Call only if no
send()
call is ongoing at this address and only if no previoussend()
call at this address has returnedFalse
.
- Return
recv()
: receive a message sent at the other end.- Return
(False,)
only if the service is closed, return(True, msg)
only if incoming messagemsg
is available, else block. - Call only if no
recv()
call is ongoing at this address and only if no previousrecv()
call at this address has returned(False,)
. - Messages are received in the order they were sent at the other address.
- Return
end()
: Close the service.- Return
None
. - Call only if
end()
has not been called at any address.
- Return
This, though informal, completely and unambiguously defines the intended service of the distributed program. The function calls define the inputs, the function returns define the outputs, and the constraints on function calls and return values define the set of acceptable sequences of inputs and outputs. Later we will cast it into a Python class.
An implementation using TCP¶
Module sesf.msgtransfer2.imp_0.node has class Node
that
implements a node of a msgtransfer2 implementation.
- Run
python user_node 2 0 --use imp_0/node.py 2 0
to start a process executing a user node with address 0 that starts an imp_0 node with address 0. - Run
python user_node 2 1 --use imp_0/node.py 2 1
to start a process executing a user node with address 1 that starts an imp_0 node with address 1. - Or run
python start_nodes.py 2 user_node.py --use imp_0.node
instead of the above two commands.
The two imp_0 node instances connect via TCP, and the resulting distributed
system implements msgtransfer2 service, which is used by the two user nodes.
Each user node has two concurrent threads, one sending messages and one
receving messages; after a while, the main thread in the process with
node 0 calls its imp_0 node’s end()
, causing the two processes to end.
-
class
msgtransfer2.imp_0.node.
Node
(argv, testerproxy=None)[source]¶ A node of a distributed system that implements the msgtransfer2 service. The distributed system consists of two Node instances with addresses 0 and 1. For brevity, refer to the node at adddress j as node_j, where j ranges over 0 and 1.
After node_j becomes connected, it maintains the following variables:
sbuf
(bytearray): buffer to hold an outgoing packet.sbufview
: memoryview ofsbuf
(slicingsbufview
is cheap, slicingsbuf
is not).rbuf
(bytearray): buffer to hold an incoming packet.rbufview
: memoryview ofrbuf
.rcvdmsgqueue
: queue of incoming(flag,msg)
tuples for local user.- local thread named recv: receives packets from TCP socket and puts
them in
rcvdmsgqueue
.
Initialization
The first step is to establish a TCP connection with its peer. node_0 starts accepting on a TCP server socket on port
accptngport0
(hardcoded to 6000). node_1 starts a TCP client socket connecting to node_0’s ip address and port number, retrying until success. At some point, each side’s TCP socket becomes open.Then node_j creates some buffers:
sbuf
, to hold an outgoing packet;rbuf
, to hold an incoming packet; andrcvdmsgqueue
, to hold incoming(flag,msg)
tuples. Then node_j starts a local thread, named recv, to receive bytes from TCP. Then it returns, giving the caller access to its three “external” functions:send(msg)
,recv()
, andend()
.Packet structure
A packet containing a user message
msg
has the following structure: 8-byte header and variable-sized data consisting of the serialized (pickled)msg
. The 8-byte header consists of: a 2-bytesrcid
(not used), a 2-bytedstid
(not used), and a 4-bytedatalen
, equal to the size in bytes of the data. Thus the serializedmsg
can be at most 232 bytes.There is also an “END” packet (used in closing). It has only a header with
datalen
of 0.User-callable functions
A
send(msg)
call castsmsg
into a packet, sends the byte sequence over TCP, and returns(True,)
if the TCP send succeeded and(False,)
otherwise.A
recv()
call returns the next(flag,msg)
tuple fromrcvdmsgqueue
, blocking until the latter is not empty.An
end()
call sends an END packet and returns, after whichsend()
calls have no effect. (The peer responds to the END packet with its own END packet, and when that is received this system closes its TCP socket and ends.)Receiving packets
The local thread recv repeatedly does the following:
- receive 8 bytes (packet header) and get the value of
datalen
; - if the value is not zero:
receive that many bytes (packet data);
unpickle it to a message, say
msg
; and add(True,msg)
torcvdmsgqueue
; - if the value of
datalen
is zero (END packet): ifend()
was not called locally, send an END packet; add(False,None)
torcvdmsgqueue
, and close the TCP socket. (node_j would typically end at some point after this.)
Running the processes on different machines¶
The two processes must at network locations reachable from each
other via TCP.
Class Node
assumes that its peer node is on the same OS:
so they have the same IP address (“localhost”) and differ only in
their TCP port numbers.
This is convenient but not necessary.
To run the two processes on different machines,
each Node
instance needs the IP address of the other
(easily done).
But you also have to deal with any intervening firewalls
(and network administrators).
Also, remote testing would not be as safe because these programs
are not secure (no authentication, pickle for serializing)
and they would be exposed to your network.
Start_nodes utility¶
The script
start_nodes.py
allows you to enter one command to start any number of nodes,
each running as a process on the same OS in its own shell.
Instead of entering
python imp_0/node.py 2 0
and python imp_0/node.py 2 1
in different shells,
you can run python start-nodes.py 2 imp_0/node.py
in a shell.
The script uses function Popen
from the subprocessing
module
to start a process in a new shell.
(You may need to edit the arguments of the Popen
call
for your operating system.
More recent versions of Python may take care of this.)
Starting the msgtransfer2 nodes from user nodes¶
Module sesf.msgtransfer2.user_node has class UserNode
that
implements a user node that starts a given msgtransfer2 node.
If the peer user node is also started, the two user nodes exercise
the msgtransfer2 service provided by the two msgtransfer2 nodes.
For example:
- Run
python user_node.py 2 0 --use imp_0.node 2 0
to start a process that startsUserNode
0 which startsmsgtransfer2.imp_0.Node
0. - Run
python user_node.py 2 1 --use imp_0.node 2 1
to start a process that startsUserNode
1 which startsmsgtransfer2.imp_0.Node
1. - Or, instead of the above two commands, run
python ../start_nodes.py 2 user_node.py --use imp_0.node
.)
The two user nodes use the service provided by the two msgtransfer2 nodes to exchange a few messages, then end the msgtransfer2 service and the two processes.
The user nodes can also make use of a running msgtransfer2 servicemodel,
instead of a msgtransfer2 implementation. Simply replace imp_0.node
by service_node
in the above commands.
Service program (abstract version)¶
We now define a “pseudo-Python” class Service
in
msgtransfer2.service.ppy,
that expresses the intended service formally.
We say “pseudo-Python” because it has non-Python constructs
(namely, CC
, CU
, RETURN
, RC
, RU
),
which are explained below.
Conceptually, the class is a program that can,
at any point in its execution,
receive any (and only an) acceptable input
and can return any acceptable output.
The class has the following structure.
class Service():
def __init__(self):
<instance variables>
def send(self, j, msg):
<call part>
<return part>
def recv(self, j):
<call part>
<return part>
def end(self, j):
<call part>
<return part>
The class has three non-init functions:
send(j,msg)
, recv(j)
and end(j)
,
corresponding to the functions
send(msg)
, recv()
and end()
at address j
of an implementation of the service.
Note
Convention: Throughout the class,
parameter j
is an address (0 or 1).
The __init__
function defines variables adequate to express
when a non-init function can be called
and the possible values it can return.
Every non-init function has a call part followed by a return part.
Each part is executed atomically.
The call part checks whether the function call is valid,
and if so updates the instance variables.
The return part checks whether the function can return,
and if so generates a return value, if any,
and updates the instance variables.
Here is the function __init__()
:
def __init__(self):
"""Throughout j ranges over 0..1:
- ending is true iff end(0) or end(1) has been called.
- ongoing_send[j] is true iff a send(j,.) call is ongoing. j in 0..1
- closed_send[j] is true iff a send(j,.) call has returned False.
- ongoing_recv[j] is true iff a recv(j,.) call is ongoing.
- closed_recv[j] is true iff a recv(j,.) call has returned (False,).
- sent[j] is the sequence of messages sent in send(j,.) calls.
- nrcvd[j] is the number of messages returned in recv(j) calls.
"""
self.ending = False
self.ongoing_send = [False, False]
self.closed_send = [False, False]
self.ongoing_recv = [False, False]
self.closed_recv = [False, False]
self.sent = [[], []]
self.nrcvd = [0, 0]
Here is the function send(j, msg)
,
with the call and return parts indicated by the comments at right.
def send(self, j, msg):
CIC:
j in (0, 1) and # call part
not self.ongoing_send[j] and not self.closed_send[j] # " " "
CU: # " " "
self.sent[j].append(msg) # " " "
self.ongoing_send[j] = True # " " "
RETURN(rval): # return part
ROC: # " " "
rval == True or (rval == False and self.ending) # " " "
RU: # " " "
self.ongoing_send[j] = False # " " "
if not rval: # " " "
self.closed_send[j] = True # " " "
return rval # " " "
The call part has two components:
CIC
, which stands for call input condition;
and CU
, which stands short for call update.
When a thread comes to the call condition,
it does the following in one atomic step:
evaluate the call condition;
if true (which means the call is valid)
execute the call update, else abort.
The return part has three components:
RETURN
,
which defines a return parameter, rval
;
ROC
, which stands for return output condition;
and RU
, which stands for return update.
A thread at RETURN
is blocked unless
the return output condition is true for some value, say x
, of rval
,
in which case it does the following atomically:
set rval
to x
;
execute the return update (which returns (x,)
).
Thus the return output condition implicitly assigns
a value to the return parameter.
Short-circuit evaluation does not apply here.
If ending
is true, the function can return True
or False
,
and the choice is non-deterministic.
Here is the function recv(j)
:
def recv(self, j):
CIC:
(j in (0, 1) and
not self.ongoing_recv[j] and not self.closed_recv[j])
CU:
self.ongoing_recv[j] = True
RETURN((flag, msg)):
# rval is (False,) | (True, msg)
ROC:
((flag == False and self.ending) or
(flag == True and self.nrcvd[j] < len(self.sent[1-j])
and msg == self.sent[1-j][self.nrcvd[j]]))
RU:
self.ongoing_recv[j] = False
if flag == False:
self.closed_recv[j] = True
return (flag,)
else:
self.nrcvd[j] += 1
return (flag, msg)
The return parameter is a tuple (flag, msg)
.
The first disjunct in the return condition can be chosen
if ending
is true;
it sets flag
to false and msg
to None.
The second disjunct can be chosen if
there is an incoming message to be received;
it sets flag
to true and msg
to the next “in-order” message
sent at the remote address.
The two disjuncts are not mutually exclusive:
if both are true, either can happen and the choice is non-deterministic.
Here is the function end(j)
:
def end(self, j):
CIC:
j in (0,1) and not self.ending
CU:
self.ending = True
RETURN():
ROC:
True
RU:
pass
Non-determinism
As noted above, when a return condition allows more than one value for a return parameter, one of these values is non-deterministically chosen. This non-determinism is essential for defining services of multi-threaded programs. Encoding it by a boolean condition makes it easy to understand the service program and to use it in proving that an implementation satisfies the service.
Service program (concrete version)¶
Recall that we want the service to yield executable servicetester and service programs.
To obtain a service program, we need to implement the non-Python constructs in Python. The call part can be easily done using locks and condition variables. The return part is not as easy because of the return condition: we have to get Python code equivalent to the non-deterministic assignment of return values.
We do this in two steps.
The first step is to obtain a version of class Service
in which each component (CC, CU, RC, RU) of a function
is represented by a separate Python function.
Consider a function f
in the pseudo-Python class:
def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ...
This function is broken up into four functions.
(Below f.CIC
refers to the body of CIC
in function f
;
ditto for f.CU
, f.ROC
, f.RU
).
def f_CIC(self, j, args): return f.CC def f_CU(self, j, args): f.CU def f_ROC(self, j): if f.ROC does not hold for any value of rval: return (0,) else: randomly select a value x of rval such that f.RC holds return (1, x) def f_RU(self, j, args, rval): f.RU
Barring f_ROC
, the above functions can be obtained mechanically.
Given these functions, it is straightforward to use standard Python
synchronization constructs (e.g., locks and condition variables)
to obtain the service-model.
Obtaining the servicetester program is similar,
but with the roles of the call condition and return condition switched.
That is, the tester can make a call to an implementation function f
iff f.COC
is true for some value x of function parameters,
and those values become the arguments of the call.
When the implementation returns the call,
the tester has to check that the return satisfies f.RIC
.
That is, for each call condition we need an output version (CCO),
and for each return condition we need an input version (RCI).
Module sesf.msgtransfer2.serviceparts has class ServiceParts
,
which has the component functions of sesf.msgtransfer2.service.ppy.
Each function f
in the pseudo-Python class gives rise to six functions:
f_CIC
, f_COC
, f_CU
, f_RIC
, f_ROC
, f_RU
.
Consider a function f
in the pseudo-Python class:
def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ...
To make the “synchronization harness”,
the model class can use a lock, say lck
,
and an associated condition variable, say cond
.
Then the function f
in the model class would be:
def f(self, j, args): # do_service_input with cond: icval = f_CIC(j, args) if not icval: raise Exception("input call condition violated") f_CU(j, args) cond.notify_all() # do_service_output with cond: while True: oval = f_ROC(j) if oval == (0,): # output return condition unsatisfiable cond.wait() else: # oval == (1,x); # output return condition satisfied with output value x break # oval == (1,x) oval = oval[1:] # oval == (x) f_RU(j, oval) cond.notify_all() return rval
It’s convenient to treat the # do_service_input
part as a function
with f_CIC
and f_CU
as parameters.
Similarly, treat the # do_service_output
part as a function
with f_ROC
and f_RU
as parameters.
These functions (with a slight tweak so that oval
is always a tuple)
are here:
-
util.service.
do_service_input
(j, check_cond, do_update, args, cond)[source]¶ Service input-part factory, used in service model and service tester. The arguments are:
- j (int): node id for the input step.
- check_cond (fn): CCI or RCI function from service.
- do_update (fn): CU or RU function from service.
- args: tuple of arguments supplied by implementation; (None,) if none.
- cond (Condition): condition used in service model or service tester.
-
util.service.
do_service_output
(j, get_args, do_update, cond)[source]¶ Service output-part factory, used in service model and service tester. The arguments are:
- j (int): node id for the input step.
- get_args (fn): CCO or RCO function from service.
- do_update (fn): CU or RU function from service.
- cond (Condition): condition used in service tester or service model.
What remains is to redirect user j’s calls of msgtransfer2 functions to calls of the corresponding functions in the model. RPC (remote procedure call) is a natural choice here (because the users are distributed). We use Pyro4. The framework has three parts.
- A Pyro4 nameserver runs in the background.
- The model starts a Pyro4 daemon (to receive RPCs)
and registers itself with the nameserver
(with name
sesf.msgtransfer2.service
). - Each user j gets the address of daemon
sesf.msgtransfer2.service
from the nameserver and creates three proxies for the model. User j can call a function of the model by prefixing the call with a proxy’s name. Why three proxies? Because user j can potentially have three concurrent calls to the model (one for each ofsend()
,recv()
andend()
).
Service node¶
Instead of adding Pyro4 code to the node that uses the service
(eg, msgtransfer2.user_node
node),
we put the Pyro4 code in a class msgtransfer2.service_node.Node
,
and have the user node use the service node
(instead of an implementation node).
Module sesf.msgtransfer2.service_node has class Node
that
has the signature of a msgtransfer2 implementation node
and redirects incoming calls to RPC calls to a running msgtransfer2.service
with pyrodaemon sesf.msgtransfer2.service.
Service program¶
Module sesf.msgtransfer2.service has class Service
, which is
the concrete version of msgtransfer2 service. RPC-accessable via pyrodaemon
sesf.msgtransfer2.service. Run python service.py
to start
a Service instance, which can then be accessed by a distributed program
that uses msgtransfer2 service.
Running user nodes over the service¶
The msgtransfer2 user node class described above can make use of the service (instead of an implementation).
- Start a Pyro4 nameserver if one is not already running
(run
python -m Pyro4.naming &
). - Run
python servicemodel.py
, to start the service program. - Run
python user_node.py 2 0 --use service_node 2 0
. - Run
python user_node.py 2 1 --use service_node 2 1
. - Or run
python start_nodes.py 2 user_node.py --use service_node
instead of the last two commands.
Servicetester (abstract version)¶
TODO
Servicetester (concrete version)¶
Recall that a servicetester program is a program that can be used in place of the users of the service, for testing a (distributed) implementation of the service.
The servicetester class, ServiceTester
,
would have the signature of the pseudo-Python service
but with inputs and outputs exchanged.
For each function f
in the pseudo-Python service,
the tester would have a function, say do_f
(the name doesn’t matter),
whose body would be a “synchronization harness” in which functions
f_CCO
, f_CU
, f_RCI
, f_RU
(from the Python service)
and a RPC call to node_j.f
are inserted.
Consider a function f
in the pseudo-Python class:
def f(self, j, args): CIC: ... CU: ... RETURN(rval): ROC: ... RU: ...
The function do_f
would be as follows,
where the synchronization harness uses
lock lck
and associated condition variable cond
,
and rpc_m[j]
is an RPC proxy for implementation node j.
def do_f(self, j): # do_service_output with cond: while True: oval = f_COC(j) if oval == (0,): # call condition unsatisfiable cond.wait() else: # oval == (1, x) # output call condition satisfied with args value x break # oval == (1, x) oval = oval[1:] # oval == (x) f_CU(j, oval) cond.notify_all() # call the implemenation function f(oval) at address j # rpc_mp[j] is an RPC proxy for node_j irval = rpc_mp[j].f(ccargs) # do_service_input with cond: rcval = f_RIC(j, irval) if not rcval: raise Exception("return condition violated") f_RU(j, irval) cond.notify_all() # irval may be relevant to ending the tester return irval
What remains is the Pyro4 RPC glue. The framework has three parts.
- A Pyro4 nameserver runs in the background.
- In the implementation, node j starts a Pyro4 daemon (to receive RPCs) and registers itself with the nameserver.
- The service-tester, in its
__init__
function, gets the addresses of these daemons from the nameserver and creates three proxies for each node j. (These are the proxies used in thedo_f
functions to call thesend
,recv
andend
functions of node j.)
class TestNode¶
Instead of adding Pyro4 code to class Node
,
we put it in a wrapper class TestNode
,
to minimize modifications to Msgtransfer2_imp0
.
class ServiceTester¶
Module sesf.msgtransfer2.servicetester has class ServiceTester
that implements the msgtransfer2 servicetester.
This can test a distributed implementation of the msgtransfer2 service.
For example, to test a distributed system of two imp_0 nodes:
- Start a Pyro4 nameserver if one is not already running:
run
python -m Pyro4.naming &
. - Run
python test_node.py 2 0 --use imp_0.node 2 0
andpython test_node.py 2 1 --use imp_0.node 2 1
. (Orpython start_nodes.py 2 test_node.py --use imp_0.node
.) - Run
python servicetester.py 2 --checker imp_0.checker 2
to test against the service and the implementation-specific assertions inimp_0.checker.Checker
. (Orpython servicetester.py 2
to test only against the service.)
Checker for implementation imp_0¶
-
class
msgtransfer2.imp_0.checker.
Checker
(servicetester, argv)[source]¶ Maintain the global state of a distributed system of msgtransfer.imp0 nodes and check some assertions. Created by msgtransfer2.servicetester when testing the distributed system. Maintains the following variables:
- tester: the tester object that started this checker object
- sent: sent state maintained by tester (actually tester.service) object
- rcvd_counts: list of two integers initially both 0. rcvd_counts[j] is number of packets received at node j, for j in 0..1.
Has a function
handle_event(e)
that takes an evente
reported by a node, updates the global state, and checks whether an assertion still holds.- RPC-called by an node to report an event
e
at the node.e
is a('checker','rcvd',j,msg)
tuple, indicating the arrival at node j of packet whose data ismsg
serialized. - Assertion checked: for every event (‘checker’, ‘rcvd’, j, msg): sent[1-j][rcvd_counts[j]] == msg