Additional reading:
FIGURE
copper wires (Ethernet, RS232-C, V.32, etc.)
fiber optics (ATM, FDDI)
air (IR, Radio, micro-wave)
Speeds (link not aggregate)
Latency - time needed to transfer an empty message between two systems - normally measures software delay and transit time
Network Bandwidth - Volume of traffic per unit time transferred across the network
Quality of Service (QoS) - other guarantees such as delay and b/w guarantees, reliability and availability guarantees
Assuming no delays due to congestion
FIGURE
How are the communicating objects connected
Fully connected - link between all sites
FIGURE
Reason for networks is to share information
must be able to communicate in a common language
- called protocols
- The nice thing about protocols is that there are so many of them!
Protocols
- must be unambiguous and followed exactly
- rule of thumb for good protocol implementations
- be rigorous is what you generate
- be liberal in what you accept
- there are many different aspects to protocols
- electrical through web services
service providers and service users
Goal: Raw bits over a communication channel
Goal: transmit error free frames over the physical link
Goal: controlling operations of the subset
Goal: common services shared by several applications
Goal: common types of exchanges standardized
“Unreliable Message” - single msg sent from sender to recipient without acknowledgment (e.g., UDP)
Processes that use unreliable messages are responsible for enforcing correct/reliable message passing
Programs have data structures
Messages are self-contained sequence of bytes
Problem: How does the receiver know how the sender has flattened?
What if sender and receiver have different representations?
=> Follow standard (possibly external) data format - or the one which has been agreed upon between sender and receiver in advance
The following shows how to marshall some data using sprintf():
char *name = “smith”, place = “London”; int year = 1934
sprintf(message, “%d %s %d %s %d”, strlen(name), name, strlen(place), place, years);
Can you think of how to write the unmarshalling version using sscanf()?
public class Message {
private static boolean debug = true;
private static int maxDebugLevel = 1;
private static final String P_STRING = "S$";
private static final String P_INTEGER = "I$";
private static final String P_LONG = "L$";
private static final String P_BOOLEAN = "B$";
private Hashtable<Object, String> parameters = new Hashtable<Object, String>();
private int type = 0;
private int tag = 0;
private int length = 0;
public Message() {
// nothing additional to do
}
public static void log(int level, String function, String message) {
if (debug && level <= maxDebugLevel)
System.out.println("Message::" + function + "> " + message);
}
public void encode(DataOutputStream out) throws IOException {
// output a header
out.writeUTF("SMA");
// output length, type, tag
out.writeInt(length);
out.writeInt(type);
out.writeInt(tag);
// output # of pairs
out.writeInt(parameters.size());
// output pairs
Enumeration<Object> e = parameters.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
out.writeUTF(key);
String value = parameters.get(key);
out.writeUTF(value);
}
}
public void decode(DataInputStream in) throws IOException {
// read header
String header = in.readUTF();
if (!header.equals("SMA"))
throw new IOException();
// read length, type, tag
length = in.readInt();
type = in.readInt();
tag = in.readInt();
int parameterCount = in.readInt();
for (int i = 0; i < parameterCount; i++) {
String key = in.readUTF();
String value = in.readUTF();
parameters.put(key, value);
}
}
public void setType(int type) {
this.type = type;
}
public int getType() {
return type;
}
public void setTag(int tag) {
this.tag = tag;
}
public int getTag() {
return tag;
}
public void setParam(String key, String value) {
parameters.put(P_STRING + key, value);
}
public String getParam(String key) {
return parameters.get(P_STRING + key);
}
public void setStringParam(String key, String value) {
parameters.put(P_STRING + key, value);
}
public String getStringParam(String key) {
return parameters.get(P_STRING + key);
}
public void setIntegerParam(String key, int value) {
parameters.put(P_INTEGER + key, value + "");
}
public int getIntegerParam(String key) {
try {
return Integer.parseInt(parameters.get(P_INTEGER + key));
} catch (Exception e) {
return 0; // This cannot happen. I'm just making javac happy.
}
}
public void setLongParam(String key, long value) {
parameters.put(P_LONG + key, value + "");
}
public long getLongParam(String key) {
try {
return Long.parseLong(parameters.get(P_LONG + key));
} catch (Exception e) {
return 0; // This cannot happen. I'm just making javac happy.
}
}
public void setBooleanParam(String key, boolean value) {
parameters.put(P_BOOLEAN + key, value + "");
}
public boolean getBooleanParam(String key) {
String value = parameters.get(P_BOOLEAN + key);
return value.equals(true + "");
}
public void merge(Message m) {
Enumeration<Object> e = m.parameters.keys();
while (e.hasMoreElements()) {
Object key = e.nextElement();
parameters.put(key, m.parameters.get(key));
}
}
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("Message {");
Enumeration<Object> e = parameters.keys();
while (e.hasMoreElements()) {
String key = (String)e.nextElement();
String value = parameters.get(key);
buffer.append(key);
buffer.append("=");
buffer.append(value);
if (e.hasMoreElements()) buffer.append(", ");
}
buffer.append("}");
return buffer.toString();
}
public static void main(String args[]) {
Message m1 = new Message();
Message m2 = new Message();
m1.setType(2);
m1.setTag(3);
m1.setStringParam("s1", "George");
m1.setBooleanParam("b2", true);
m1.setIntegerParam("i3", 100);
m1.setIntegerParam("i4", 100);
try {
FileOutputStream fos = new FileOutputStream("m1.dat");
DataOutputStream dos = new DataOutputStream(fos);
m1.encode(dos);
fos.close();
} catch (Exception e) {
System.out.println("exception/m1.dat" + e);
}
System.out.println("Message written to m1.dat");
try {
FileInputStream fis = new FileInputStream("m1.dat");
DataInputStream dis = new DataInputStream(fis);
m2.decode(dis);
fis.close();
} catch (Exception e) {
System.out.println("exception/m2 " + e);
}
System.out.println("Read m2");
System.out.println("Message m1 " + m1);
System.out.println("Message m2 " + m2);
}
}
public class MessageClient extends Thread {
Socket socket;
DataOutputStream out;
DataInputStream in;
public MessageClient(String host, int port) throws IOException {
socket = new Socket(host, port);
out = new DataOutputStream(socket.getOutputStream());
in = new DataInputStream(socket.getInputStream());
}
public Message call(Message message) {
try {
message.encode(out);
} catch (Exception e) {
System.err.println("MessageClient: Call (to) failure: " + e);
return null;
}
try {
Message m = new Message();
m.decode(in);
return m;
} catch (Exception e) {
System.err.println("MessageClient: Call (from) failure: " + e);
return new Message();
}
}
public void disconnect() {
Message m = new Message();
m.setType(0);
m.setParam("$disconnect", "$disconnect");
call(m);
try {
socket.close();
} catch (Exception e) {
System.err.println("ungraceful disconnect on client " + e);
}
}
}
public class MessageServer extends Thread {
private ServerSocket callListener;
private Hashtable<String, MessageService> subscribers;
public static final boolean logging = true;
public void log(String s) {
if (!logging)
return;
System.err.println("MessageServer: " + s);
}
public MessageServer(int port) throws IOException {
log("Simple Messaging Architecture (SMA) version 1.0");
log("Copyright (c) 2000, George K. Thiruvathukal");
callListener = new ServerSocket(port);
subscribers = new Hashtable<String, MessageService>();
log("Created MessageServer instance fully!");
}
public void subscribe(int messageType, MessageService d) {
subscribers.put(messageType + "", d);
}
public MessageService getSubscriber(int messageType) {
return subscribers.get(messageType + "");
}
public void run() {
log("MessageServer thread started. run() method dispatched.");
while (true) {
try {
Socket s = callListener.accept();
MessageServerDispatcher csd = new MessageServerDispatcher(this, s);
csd.setDaemon(false);
csd.start();
} catch (Exception e) {
log("Exception " + e);
e.printStackTrace();
}
}
}
}
public class MessageServerDispatcher extends Thread {
MessageServer callServer;
Socket socket;
DataInputStream in;
DataOutputStream out;
public static final boolean logging = true;
public MessageServerDispatcher(MessageServer callServer, Socket socket)
throws IOException {
this.callServer = callServer;
this.socket = socket;
this.in = new DataInputStream(socket.getInputStream());
this.out = new DataOutputStream(socket.getOutputStream());
}
public void log(String s) {
if (!logging)
return;
System.err.println("MessageServerDispatcher: " + s);
}
public void run() {
log("Beginning of dispatch run() method.");
try {
while (true) {
Message m = new Message();
m.decode(in);
Message result = null;
log("Received Message " + m + ".");
if (m.getType() == 0 && m.getParam("$disconnect") != null) {
log("Message found with reserved $disconnect parameter.");
System.err.println("-> Disconnect received by server.");
Message ack = new Message();
ack.encode(out);
socket.close();
return;
}
MessageService d = callServer.getSubscriber(m.getType());
if (d != null)
result = d.process(m);
else {
System.err.println("-> No subscribers for this message.");
result = new Message();
}
result.encode(out);
}
} catch (EOFException e1) {
try {
log("End of file exception." + e1);
out.close();
socket.close();
} catch (Exception e2) {
log("Unable to free open resources " + e2);
e2.printStackTrace();
}
} catch (Exception e) {
log("Unknown exception of unknown origin. Possibly a bug: " + e);
e.printStackTrace();
}
}
}
public class DateService implements MessageService {
public static final int DATE_SERVICE_MESSAGE = 100;
public static final int DATE_SERVICE_PORT = 1999;
public Message process(Message m) {
Date today = new Date();
m.setParam("date", today.toString());
return m;
}
public static void main(String args[]) {
DateService ds = new DateService();
MessageServer ms;
try {
ms = new MessageServer(DATE_SERVICE_PORT);
} catch (Exception e) {
System.err.println("Could not start service " + e);
return;
}
Thread msThread = new Thread(ms);
ms.subscribe(DATE_SERVICE_MESSAGE, ds);
msThread.start();
}
}
public class DateClient {
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Usage: DateClient host port");
}
String host = args[0];
int port;
try {
port = Integer.parseInt(args[1]);
} catch (Exception e) {
port = DateService.DATE_SERVICE_PORT;
}
MessageClient conn;
try {
conn = new MessageClient(host, port);
} catch (Exception e) {
System.err.println("Could not contact DateService @ " + host + ":"
+ port);
return;
}
Message m = new Message();
m.setType(DateService.DATE_SERVICE_MESSAGE);
m.setParam("person", "george");
m = conn.call(m);
System.out.println("Message instance received from server " + m);
System.out.println("Today's Date is " + m.getParam("date"));
m.setType(75);
m.setBooleanParam("b1", true);
m.setIntegerParam("i1", 15);
m.setLongParam("l1", 15);
m.setStringParam("s1", "George");
m = conn.call(m);
System.out.println("Message instance received from server " + m);
conn.disconnect();
}
}
FIGURE
FIGURE
count = read(fd, buf, nbytes)
[fd-file pointer (int), buf-array of chars, nbytes-integer)
FIGURE
parameters (in C): call-by-reference OR call-by-value
Value parameter (e.g., fd, nbytes) copied onto stack (original value not affected)
Value parameter is just an initialized variable on stack for callee
Many options are language dependent but we will ignore them…
How to deal with these situations?
in RPC —> read is remote ==> no way to put parameters on stack (no shared space/memory!)
Solution: In the library keep “client stub” which acts like “read”
So how does it work?
Parameter passing
Binding
Performance and implementation issues
Exception handling
Interface definition
client and server know parameter type
server sends msg to binder to know it is up (registration)
First call to RPC of function
Client stub sees not bound to server
Client stub sends msg to binder to “import” interface
If server exists, binder gives unique id and handle to client stub
Client stub uses these for communication
More difficult to handle
Rely on timer again?
Problem: Client’s kernel doesn’t know why no answer!
Solution - Client kernel uses a sequence number (needs to maintain state) for each request
Have a bit in message to distinguish initial vs. retransmissions
Solutions differ
FIGURE
But the client cannot tell the difference!
But none of the above attractive
what if orphans do RPC => grand orphans => difficult to kill all
Network Interface Chips (NICs) can send message fast
But receiving more difficult due to finite buffer
No overrun possible in stop-and-wait (assuming single sender)
Performance
Critical Path