package lava.net.mmp; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.OutputStream; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.Enumeration; import java.util.Hashtable; import lava.net.common.UNA; import lava.net.common.Value; import lava.net.common.VariableNormalizer; /** * **/ class MMPUDPDispatcher extends MMPDispatcher { class UDPOutputStream extends OutputStream { /** * **/ DatagramPacket p = null; /** * **/ MMPDatagramSocket socket = null; /** * **/ public UDPOutputStream(UNA target, MMPDatagramSocket socket) { this.socket = socket; p = new DatagramPacket(new byte[0],0,// target.getHost(),target.getPort()); } /** * **/ public void write(int b) throws IOException { byte[] buf = {(byte)b}; p.setData(buf); p.setLength(1); socket.send(p); } /** * **/ public void write(byte b[]) throws IOException { p.setData(b); p.setLength(b.length); socket.send(p); } /** * **/ public void write(byte b[], int off, int len) throws IOException { if(off > 0) { byte[] buf = new byte[len]; System.arraycopy(b,off,buf,0,len); b = buf; } p.setData(b); p.setLength(len); socket.send(p); } /** * **/ public void flush() throws IOException { } /** * **/ public void close() throws IOException { } } class MMPDatagramSocket extends Thread { /** * **/ private DatagramSocket socket = null; /** * **/ private DatagramPacket p = null; /** * **/ private UNA bind = null; /** * **/ public MMPDatagramSocket() throws SocketException { try { socket = new DatagramSocket(UNA.getDefaultPort()); } catch(SocketException e) { socket = new DatagramSocket(); } getUNA(); initBuffer(defaultLength); start(); } /** * **/ public MMPDatagramSocket(UNA bind) throws SocketException { socket = new DatagramSocket(bind.getPort(),bind.getHost()); bind = getUNA(); initBuffer(defaultLength); start(); } /** * **/ public UNA getUNA() { if(bind != null) return bind; InetAddress addr = socket.getLocalAddress(); do { if(addr.hashCode() == 0) try { bind = new UNA(null,InetAddress.getLocalHost(),// socket.getLocalPort(),protocol,null); continue; } catch(UnknownHostException e) { } bind = new UNA(null,addr,socket.getLocalPort(),protocol,null); } while(false); return bind; } /** * **/ public void close() { if(!isInterrupted()) { interrupt(); socket.close(); // stop() is deprecated in java 1.2 } } /** * **/ public void send(DatagramPacket p) throws IOException { socket.send(p); } /** * **/ public void initBuffer(int length) { byte[] buf = new byte[length]; p = new DatagramPacket(buf,length); } /** * **/ public void run() { while(true) { try { socket.receive(p); receive(bind,p); if(isInterrupted()) return; } catch(IOException e) { exception(bind,e); return; } } } } /** * **/ public final static String ProtocolName = "UDP"; /** * **/ protected static String protocol = "udp"; /** * default MMPDatagramSocket used for sending packets **/ private MMPDatagramSocket sendSocket = null; /** * mapping UNA -> MMPDatagramSocket **/ private Hashtable listener = new Hashtable(); /** * mapping UNA -> MMPOutput **/ private Hashtable outConnections = new Hashtable(); /** * mapping UNA -> UDPInputStream **/ private Hashtable inConnections = new Hashtable(); /** * mapping UNA -> UNA without resource **/ private Hashtable simplified = new Hashtable(); /** * **/ private MMPDispatcherPeer peer = null; /** * **/ private VariableNormalizer normalizer = null; /** * **/ public int defaultLength = 1024; /** * **/ public MMPUDPDispatcher(MMPDispatcherPeer peer, // VariableNormalizer normalizer) throws IOException { super(peer,normalizer); this.peer = peer; this.normalizer = normalizer; listen(null); } /** * **/ public MMPUDPDispatcher(MMPDispatcherPeer peer, // VariableNormalizer normalizer, boolean listen) // throws IOException { super(peer,normalizer,listen); this.peer = peer; this.normalizer = normalizer; if(listen) listen(null); } /** * **/ public MMPUDPDispatcher(MMPDispatcherPeer peer, // VariableNormalizer normalizer, UNA bind) // throws IOException { super(peer,normalizer); this.peer = peer; this.normalizer = normalizer; listen(bind); } /** * **/ public Value usingProtocols() { return new Value(ProtocolName); } /** * **/ public UNA listen(UNA bind) throws IOException { MMPDatagramSocket l; if(bind == null) l = new MMPDatagramSocket(); else l = new MMPDatagramSocket(bind); listener.put(bind = l.getUNA(),l); if(sendSocket == null) sendSocket = l; return bind; } /** * **/ public synchronized void remove(UNA bind) { if(bind == null) { for(Enumeration e = listener.elements();e.hasMoreElements();) ((MMPDatagramSocket)e.nextElement()).close(); listener.clear(); sendSocket = null; } else { MMPDatagramSocket l = (MMPDatagramSocket)listener.remove(bind); if(l != null) { l.close(); if(sendSocket == l) { sendSocket = null; Enumeration e = listener.elements(); if(e.hasMoreElements()) sendSocket = (MMPDatagramSocket)e.nextElement(); } } } } /** * **/ public synchronized UNA[] getListenLocations() { UNA[] r; int i; Enumeration e; synchronized(listener) { r = new UNA[listener.size()]; for(i = 0, e = listener.keys();e.hasMoreElements();++i) r[i] = (UNA)e.nextElement(); } return r; } /** * **/ public synchronized void close() { remove(null); for(Enumeration e = outConnections.elements();e.hasMoreElements();) try { ((MMPOutput)e.nextElement()).close(); } catch(IOException ex) { } outConnections.clear(); } /** * **/ public synchronized void forget(UNA d) { if(d == null) return; UNA si; String t = d.getScheme(); if(!((t = d.getScheme()) == null ? false : // t.equals(UNA.getDefaultScheme())) || // !((t = d.getResource()) == null ? false : // t.equals(UNA.getDefaultResource()))) if((si = (UNA)simplified.get(d)) == null) return; else d = si; outConnections.remove(d); } /** * **/ public synchronized void close(UNA d) { if(d == null) return; UNA si; String t = d.getScheme(); if(!((t = d.getScheme()) == null ? false : // t.equals(UNA.getDefaultScheme())) || // !((t = d.getResource()) == null ? false : // t.equals(UNA.getDefaultResource()))) if((si = (UNA)simplified.get(d)) == null) return; else d = si; MMPOutput s = (MMPOutput)outConnections.remove(d); if(s == null) return; try { s.close(); } catch(IOException e) { } } /** * **/ public UNA getLocal(UNA d) throws IOException { if(d == null) return null; if(d.getHost() == null) throw new IOException("no host name given"); if(sendSocket == null) listen(null); return sendSocket.getUNA(); } /** * **/ public void send(UNA d, MMPPacket p) throws IOException { if(d == null || p == null) return; if(d.getHost() == null) throw new IOException("no host name given"); UNA dest; String t = d.getScheme(); MMPOutput s; if(!((t = d.getScheme()) == null ? false : // t.equals(UNA.getDefaultScheme())) || // !((t = d.getResource()) == null ? false : // t.equals(UNA.getDefaultResource()))) { if((dest = (UNA)simplified.get(d)) == null) { dest = new UNA(null,d.getHost(),d.getPort(),// d.getProtocol(),null); simplified.put(d,dest); } } else dest = d; s = (MMPOutput)outConnections.get(dest); if(s == null) { if(sendSocket == null) listen(null); s = new MMPOutput(new UDPOutputStream(dest,sendSocket)); outConnections.put(dest,s); } s.writePacket(p); } /** * **/ public void receive(UNA bind, DatagramPacket p) { bind = new UNA(null,p.getAddress(),p.getPort(),protocol,null); PipedOutputStream s = (PipedOutputStream)inConnections.get(bind); if(s == null) { s = new PipedOutputStream(); try { new MMPInput(peer,normalizer,bind,new PipedInputStream(s)); inConnections.put(bind,s); } catch(IOException e) { // this should never happen } } try { s.write(p.getData(),0,p.getLength()); } catch(IOException e) { // this should never happen try { s.close(); } catch(IOException ex) { } inConnections.remove(bind); } } /** * **/ public void exception(UNA bind, IOException e) { remove(bind); } }