5. A message-transfer service spanning N locations¶
This section extends the treatment in the previous section to N locations, where N ≥ 2. We obtain service model and tester programs, and a distributed program that implements the service using TCP. For no good reason, there is also a program that plays the log of a test (example video).
Intended service informally stated¶
The service spans N locations,
identified by addresses 0, 1, …, N-1, for any N ≥ 2.
At each address j
, three functions are provided to the user:
send(k, msg)
: send messagemsg
to addressk
.- Return
False
only if the service is closed, else returnTrue
. - Call only if no
send()
call is ongoing at addressj
and no previoussend()
call at addressj
has returnedFalse
.
- Return
recv()
: receive a message sent at some other address.- Return
(False,)
only if the service is closed, return(True, msg)
only if incoming messagemsg
is available, else block. - Call only if no
recv()
call is ongoing at this address and no previousrecv()
call at this address has returned(False,)
. - For every address
k
, messages received fromk
are in the order in which they were sent at addressk
. But messages sent toj
from different addresses can be received interleaved in any order.
- Return
end()
: Close the service.- Return
None
. - Call only if
end()
has not been called at any address.
- Return
An implementation using TCP¶
Module sesf.msgtransfer.imp_0.node has class Node
that implements
a node of a msgtransfer service implementation for N addresses, N ≥ 2.
Run the N commands python user_node.py N j --use imp_0/node.py N j
for j
ranging over 0..N-1 to start, exercise and close a msgtransfer
service spanning N locations.
(Or run python ../start_nodes.py N user_node.py --use imp_0.node
.)
Process j starts a user node with address j that starts an imp_0 node
with address j. The N imp_0 nodes implement a msgtransfer service over
N addresses. This service is exercised by the user nodes. Each user node
has two concurrent threads, one sending messages and one receving messages;
after a while, the main thread in the process with address 0 calls
its imp_0 node’s end()
, causing the network to shutdown.
-
class
msgtransfer.imp_0.node.
Node
(argv, testerproxy=None)[source]¶ An instance of this is a node of a distributed system that implements the msgtransfer service. Assumes it is in a network of num_nodes nodes (in different processes), and has address myid, where myid is a a value in 0, …, num_nodes-1.
For brevity, below we say “n” for num_nodes, “j” for myid, and “Node(j)” for the
Node
instance with address j. If Node(j), …, Node(n-1) have been started, they form a ring of TCP links, with Node(j) connected to Node((j+1)%n).During initialization, Node(j) forms TCP connections with Noden((j-1)%n) and Node((j+1)%n) as follows:
- If j is 0: Node(0) connects to Node(1) at port 6001, retrying until
success. Then it waits to accept a TCP connect, which would be from
Node(n-1), on port
accptngport0
(= 6000). - If j is in 1, …, n-1: Node(j) waits to accept a TCP connect
(which would be from Node(j-1)) on port accptngport0 + j. After it
accepts this connect, Node(j) connects to Node(j+1) at port
accptngport0``+j+1 (retrying until success), except for Node(n-1), which connects to Node(0) at port ``accptngport0
.
After Node(j) has opened its two TCP connections, it is ready to accept calls to its
send
,recv
andend
functions. At this point, Node(j) maintains the following variables:ending
: True iffend
has been called or END msg has been received.accptngport0
= 6000 (hard-coded).accptngsocket
= TCP socket to accept connect requests (at portaccptngport0 + j
).outsocket
= TCP socket connected to Node(j+1).insocket
= TCP socket connected to Node(j-1).sbuf
(bytearray): buffer to hold an outgoing packet.sbufview
: memoryview ofsbuf
(slicingsbufview
is cheap).rbuf
(bytearray): buffer to hold an incoming packet.rbufview
: memoryview ofrbuf
.rcvdmsgqueue
: queue of incoming(flag,msg)
tuples for local user.testerproxy
: Used iff Node(j) is started within a test node, in which case Node(j) can calltesterproxy.inform_tester(e)
to inform the servicetester of local evente
.
Each user message is placed in a packet with an 8-byte header (consisting of 2-byte src id, 2-byte dst id, 4-byte datalen) and data of length datalen (consisting of the serialized user message).
Packets travel only in one direction of the ring. A message sent by Node(j) to Node(k) goes via nodes j+1, j+2, …, k. Each node has a local thread that receives packets from
insocket
(from the previous node) and puts them inrcvdmsgqueue
or forwards them tooutsocket
(to the next node).The end function should be called at one address only. The call initiates an END message that travels along the ring, closing each node it encounters. The END message has a datalen field of 0.
- If j is 0: Node(0) connects to Node(1) at port 6001, retrying until
success. Then it waits to accept a TCP connect, which would be from
Node(n-1), on port
Starting the msgtransfer nodes from user nodes¶
Module sesf.msgtransfer.user_node has class UserNode
that
implements a user node that starts a given msgtransfer node.
If the peer user nodes are also started, the collection of user nodes
exercise the msgtransfer service provided by the msgtransfer nodes.
For example, to start an N-process system using imp_0 msgtransfer nodes,
run the N shell commands python user_node.py N j --use imp_0.node N j
where j
ranges over 0, 1, …, N-1.
(Or run python ../start_nodes.py N user_node.py --use imp_0.node
.)
The user nodes use the service provided by the two imp_0 msgtransfer nodes to exchange a few messages, then end the msgtransfer service and the N processes.
The user nodes can also make use of a running msgtransfer servicemodel,
instead of a msgtransfer implementation. Simply replace imp_0.node
by service_node
in the above commands.
Service program (abstract version)¶
Extending the (pseudo-Python) service program for two addresses to more than 2 addresses has interesting consequences. Suppose the program maintains the following variables for every address j and k: sentj,k, the sequence of messages sent at j to k; and rcvdk, the sequence of messages received at k (from all addresses). Consider the receive function at say address 0. Its return condition would say that message m can be returned iff there is a merge s of sent1,0, …, sentN-1,0 such that rcvd0 concatentated with m is a prefix of s.
This works but it’s not convenient. The concrete service and servicetester programs would be computationally expensive. Using the abstract service and servicetester program in proofs would be messy.
A better approach is to allow internal non-determinism in the
service program.
Instead of rcvdk, suppose the program maintains nrcvdj,k,
the number of messages received at k from j.
Also have an additional parameter j in the return part of recv(0)
.
Then the return condition can say that message m can be returned iff
there is a j such that m equals sentj,0[nrcvdj,k].
If more than one j satisfies this condition,
a particular j is chosen non-deterministically.
Furthermore, the non-determinism is internal
because the environment does not see the chosen value of j
(although it may able to infer it at some later point
after receiving more messages).
Using this approach,
the service is formally defined by the pseudo-Python class Service
in
msgtransfer.service.ppy.
As usual, each function has a parameter j
identifying the address
at which the function is located.
Service program (concrete version)¶
Module sesf.msgtransfer.serviceparts has class ServiceParts
,
which has the component functions of sesf.msgtransfer.service.
Recall the internal parameter k
in the return part of
the pseudo-Python service function recv(j)
.
Note what happens to it in the Python version of the service.
In function recv_RCI(j,rval)
,
the input version of recv(j).RC
,
a value for k
is provided by the implementation’s rval
.
In function recv_RCO(j)
,
the output version of recv(j).RC
,
the function generates a value for k
.
Module sesf.msgtransfer.service_node has class Node
that
has the signature of a msgtransfer implementation node
and redirects incoming calls to RPC calls to a running service
with pyrodaemon sesf.msgtransfer.service.
Module sesf.msgtransfer.service has class Service
, which is the
concrete version of msgtransfer service. RPC-accessable via pyrodaemon
sesf.msgtransfer.service. Run python service.py <num_nodes>
to
start a Service instance for num_nodes
addresses, which can then be
accessed by a distributed program that uses the msgtransfer service.
Note that function recv(j)
does not return the value of
the internal parameter k
generated by recv_RCO(j)
.
Running user nodes over the service¶
The msgtransfer user node class can make use of the service model (instead of an implementation).
- Start a Pyro4 nameserver if one is not already running:
enter
python -m Pyro4.naming &
in a shell. - Run
python service.py N
, to start the service program forN
addresses. - Run
python user_node.py N j --use service_node N j
forj
ranging over 0, 1, …, N-1. (Or runpython ../start_nodes.py N user_node.py --use service_node
.)
Servicetester (abstract version)¶
TODO
Servicetester (concrete version)¶
Recall that the service tester is a program that can be used in place of the users of the service, typically for testing a (distributed) implementation of the service. For testing, each node of the implementation is wrapped in a “test node”. The service tester makes RPC calls to the test nodes. A test node redirects incoming RPC calls to its implementation node.
The service tester can also use a checker,
if one is provided with the implementation,
to check implementation-specific assertions.
In this case,
each implementation node can inform the servicetester of local events
by calling the node’s inform_tester()
function.
This function has no effect if the implementation node has not been
started by a test node.
class TestNode¶
Module sesf.msgtransfer.test_node has class TestNode that implements
an RPC wrapper for a node of an implementation of msgtransfer service.
Run python test_node.py N j --use imp_0.imp N j
to start a
process with a TestNode object wrapping an imp_0.imp
implementation node.
class ServiceTester¶
Module sesf.msgtransfer.servicetester has class ServiceTester
that implements the msgtransfer servicetester.
This can test a distributed implementation of the msgtransfer service.
For example, to test a distributed system of N imp_0 implementation nodes:
- Start a Pyro4 nameserver if one is not already running:
run
python -m Pyro4.naming &
. - Run
python test_node.py N j --use imp_0.node N j
forj
ranging over 0, 1, …, N-1. (Or runpython start_nodes.py N test_node.py --use imp_0.node
.) - Run
python servicetester.py N
to test only against the service. - Or run
python servicetester.py N --checker imp_0.checker N
to test against the service and the implementation-specific assertions inimp_0.checker.Checker
. - Or run
python servicetester.py N --log <logfile> --checker imp_0.checker N
to also generate an event log in filearthistory.pickle
, which can later be played by runningpython logplayer.py logfile
.
Note that the return part of function do_recv(j)
expects
the implementation-supplied rval
to have a value
for the internal parameter k
of recv_RCI(j)
.
This is easily achieved by having the servicetester include the value
in the message it sends (in the call part of send(j)
);
no modification to the implementation node’s code is needed.
Checker for implementation imp_0¶
-
class
msgtransfer.imp_0.checker.
Checker
(servicetester, argv)[source]¶ Maintain the global state of a distributed system of msgtransfer.imp_0 nodes and check some assertions. Created by msgtransfer.servicetester when testing the distributed system. Maintains the following variables:
- servicetester: the servicetester object that started this checker object
- n: number of (msgtransfer.imp0) nodes
- sent: sent state maintained by servicetester.serviceparts object
- transit_counts: map indexed by node pairs of lists of ints. transit_counts[(j,k)] is a list of (k-j)%n+1 ints, where list entry p is the number of packets of flow (j,k) that have passed by node (j+p)%n.
Function
handle_event(e)
:- RPC-called by an node to report an event
e
at the node. Heree
is a(p,j,k,msg)
tuple indicating the generation or arrival at nodep
of a packet with srcidj
, dstidk
, and data equal to serializedmsg
. - Update the global state according to
e
and check if following assertion still holds: for every event (p, j, k, msg): sent[(j,k)][transit_counts[(j,k)][(j+k)%n]] == msg