package lava.net.mmp; import java.io.IOException; import java.net.InetAddress; import java.util.Enumeration; import java.util.Hashtable; import java.util.Vector; import lava.net.common.UNA; import lava.net.common.Value; import lava.net.common.VariableModifier; import lava.net.common.VariableNormalizer; /** * **/ public class MMPMessageCenter extends MMPCenter { /** * The official name of the protocol implemented here. **/ public final static String ProtocolName = "MMP"; /** * The major version number of the protocol implemented here. **/ public final static int ProtocolMajorVersion = 1; /** * The minor version number of the protocol implemented here. **/ public final static int ProtocolMinorVersion = 0; /** * The default scheme used in UNAs (Uniform Network Locators) by this * protocol. **/ public final static String defaultScheme = "mmp"; /** * The well known port used by this protocol. **/ public final static int defaultPort = 4004; /** * The default underlying network protocol used by this protocol. **/ public final static String defaultProtocol = MMPTCPDispatcher.protocol; /** * All known underlying network protocols used by this protocol. **/ public final static String[] protocols = {// MMPTCPDispatcher.protocol, // MMPUDPDispatcher.protocol,// }; /** * The default Resource used if there is no in UNAs. **/ public final static String defaultResource = "/"; /** * The maximum number of permanent variables per connection. * This is to prevent denial of service. * Numbers smaller than 1 are considered infinite. **/ public static int maxVariables = 50; /** * The maximum number of open connections. * This is to prevent denial of service. * Numbers smaller than 1 are considered infinite. **/ public static int maxConnections = 2048; /** * Do we close peers overriding the maxConnections limit? * Note: Such overriding CAN be but MUST NOT be a * denial of service attack! * Note: This should be more intelligent in further versions. **/ public static boolean closeOnMaxConnections = true; /** * The MMP variable describing the length of a body of a MMP packet. **/ public final static String lengthTag = "_length"; /** * **/ private final static String[] lengthAliases = {"length", "l"}; /** * The MMP variable describing the source of a MMP packet. **/ public final static String sourceTag = "_source"; /** * **/ private final static String[] sourceAliases = {"source", "s"}; /** * The MMP variable describing the target of a MMP packet. **/ public final static String targetTag = "_target"; /** * **/ private final static String[] targetAliases = {"target", "t"}; /** * The MMP variable describing a variable state reset of a MMP connection. **/ public final static String initializeTag = "_initialize"; /** * **/ private final static String[] initializeAliases = {"initialize", "i"}; /** * The MMP variable describing a packet counter of a MMP connection. **/ public final static String counterTag = "_counter"; /** * **/ private final static String[] counterAliases = {"counter", "c"}; /** * A default MMPPacketManager, used if there is no other given. * It does simply nothing. **/ public class DummyManager implements MMPPacketManager { /** * **/ public String getGreetingBody(UNA remote, UNA local) { return null; } /** * **/ public void manage(MMPPacket packet) { } /** * **/ public void error(MMPException e, UNA remote, UNA local, // MMPPacket packet) { } } /** * Wrapper class to a long counter variable. **/ private class Counter { /** * **/ long value = 0; } /** * The MMPPacketManager. To this instance every information about * received packets, errors and so on is sent. **/ public MMPPacketManager manager = null; /** * **/ private VariableNormalizer normalizer = new Normalizer(); /** * **/ private MMPDispatcherPeer peer = new DispatcherPeer(); /** * A set of underlying network protocol dispatchers supported by this * protocol. **/ private Hashtable dispatchers = new Hashtable(); /** * mapping physical remote -> logical remote * for ASSIGNed sourceTags **/ private Hashtable remote = new Hashtable(); /** * mapping logical remote -> logical local * for ASSIGNed targetTags **/ private Hashtable local = new Hashtable(); /** * mapping connection -> logical remote * for checking if we sent greeting **/ private Hashtable sentGreeting = new Hashtable(); /** * mapping connection -> logical remote * for checking if we sent greeting in last Packet **/ private Hashtable dontAnswerGreeting = new Hashtable(); /** * mapping connection -> incoming counters **/ private Hashtable incomingCounter = new Hashtable(); /** * mapping connection -> outgoing counters **/ private Hashtable outgoingCounter = new Hashtable(); /** * mapping connection -> Hashtable of global variables **/ private Hashtable globVars = new Hashtable(); /** * mapping connection -> Hashtable of temporarily variables **/ private Hashtable tempVars = new Hashtable(); /** * mapping connection -> Vector of outgoing variables **/ private Hashtable outgoings = new Hashtable(); /** * mapping variable alias -> normalized variable **/ private Hashtable alias = new Hashtable(); /** * place where we store last error occured in delivering packets */ private MMPDeliveryException gotException = null; /** * Do we send every packet to a specific scheme to a proxy? * Send scheme://host/resource to proxy/scheme://host/resource * Works parallel to and in front of default proxy * and protocol-specific proxies, that means: in delivering packets, * first the target is expanded with the scheme proxy and then the * result is expanded with the other proxies. **/ private Hashtable schemeProxies = null; /** * Do we send every packet to a proxy? * Overridden by protocol-specific proxies. * Default: no. **/ private UNA proxy = null; /** * Do we send every Packet to one protocol-specific proxy? Default: no. * Overrides default proxy. * mapping protocol -> Proxy-UNA **/ private Hashtable proxies = null; /** * answer a counter tag with counted packets **/ private boolean answerCounterWithCounter = false; /** * for greeting ... the _initialize tag, build it here to save cpu power **/ private VariableModifier initialize = new VariableModifier(// VariableModifier.GLYPH_SET,initializeTag,null); /** * for greeting ... variables added to every greeting packet * elements are of type VariableModifier **/ private Vector generalVariables = new Vector(); /** * for greeting ... variables added only to greeting packets * for a specific protocol * mapping protocol -> Vector with elements of type VariableModifier **/ private Hashtable protocolVariables = new Hashtable(); /** * for greeting ... variables added only to greeting packets * for a specific UNA * mapping UNA -> Vector with elements of type VariableModifier **/ private Hashtable unlVariables = new Hashtable(); /** * **/ public MMPMessageCenter(MMPPacketManager manager) { setDefaults(); initializeAliases(); setManager(manager); try { addDispatcher(MMPTCPDispatcher.protocol,// new MMPTCPDispatcher(peer,normalizer)); } catch(IOException e) { } try { addDispatcher(MMPUDPDispatcher.protocol,// new MMPUDPDispatcher(peer,normalizer)); } catch(IOException e) { } } /** * **/ public MMPMessageCenter(MMPPacketManager manager, boolean listen) { setDefaults(); initializeAliases(); setManager(manager); try { addDispatcher(MMPTCPDispatcher.protocol,// new MMPTCPDispatcher(peer,normalizer,listen)); } catch(IOException e) { } try { addDispatcher(MMPUDPDispatcher.protocol,// new MMPUDPDispatcher(peer,normalizer,listen)); } catch(IOException e) { } } /** * **/ public MMPMessageCenter(MMPPacketManager manager, UNA[] listenLocations) { this (manager,false); for(int i = 0;i < listenLocations.length;++i) try { listen(listenLocations[i]); } catch(IOException e) { } } /** * **/ public static String getLocalHost() { try { if(UNA.isApplet()) { InetAddress loopback = InetAddress.getByName(null); UNA.addAppletHost(loopback); return UNA.lookup(loopback); } return UNA.lookup(InetAddress.getLocalHost()); } catch(Exception e) { } return null; } /** * **/ public static void setDefaults() { UNA.setDefaults(defaultScheme,getLocalHost(),defaultPort,// defaultProtocol,defaultResource); } /** * **/ public MMPDispatcher addDispatcher(String protocol, // MMPDispatcher dispatcher) { if(protocol == null) return null; MMPDispatcher old = (MMPDispatcher)dispatchers.remove(protocol); if(dispatcher != null) dispatchers.put(protocol,dispatcher); return old; } /** * **/ public MMPDispatcher getDispatcher(UNA remote) { if(remote == null) return null; String prot = remote.getProtocol(); if(prot == null) return null; MMPDispatcher dispatcher = (MMPDispatcher)dispatchers.get(prot); return dispatcher; } /** * **/ public MMPDispatcher removeDispatcher(String protocol) { if(protocol == null) return null; return (MMPDispatcher)dispatchers.remove(protocol); } /** * **/ public Value usingProtocols() { String s; Value sub, prot = new Value(ProtocolName + "/" + // ProtocolMajorVersion + "." + ProtocolMinorVersion); for(Enumeration e = dispatchers.elements();e.hasMoreElements();) { if((sub = ((MMPDispatcher)e.nextElement()).usingProtocols()) == null) continue; if((s = sub.toString()) == null) continue; if(s.length() > 0) prot.augment(s); } return prot; } /** * **/ public void listen(UNA listenLocation) throws IOException { if(listenLocation == null) return; MMPDispatcher dispatcher = getDispatcher(listenLocation); if(dispatcher == null) return; dispatcher.listen(listenLocation); } /** * **/ public void removeListenLocation(UNA listenLocation) { if(listenLocation == null) return; MMPDispatcher dispatcher = getDispatcher(listenLocation); if(dispatcher == null) return; dispatcher.remove(listenLocation); } /** * **/ public UNA[] getListenLocations() { Enumeration e; UNA[] locations, l; int size = 0; synchronized(dispatchers) { for(e = dispatchers.elements();e.hasMoreElements();) { l = ((MMPDispatcher)e.nextElement()).getListenLocations(); if(l != null) size += l.length; } locations = new UNA[size]; size = 0; for(e = dispatchers.elements();e.hasMoreElements();) { l = ((MMPDispatcher)e.nextElement()).getListenLocations(); if(l != null) { System.arraycopy(l,0,locations,size,l.length); size += l.length; } } } return locations; } /** * **/ public void setManager(MMPPacketManager manager) { if(manager == null) manager = new DummyManager(); this.manager = manager; } /** * **/ public void setSchemeProxy(String scheme, UNA proxy) { if(scheme == null) return; if(proxy != null) { // make sure, that last char in proxy UNA is a '/' String name = proxy.toString(); if(name.charAt(name.length() - 1) != '/') proxy = new UNA(name + "/"); if(schemeProxies == null) schemeProxies = new Hashtable(); schemeProxies.put(scheme,proxy); } else { if(schemeProxies != null) schemeProxies.remove(scheme); } } /** * Sets or - if proxy is null - removes a proxy for a specific * protocol or - if protocol is null - for every packet at all. * Default: no proxies at all. **/ public void setProxy(String protocol, UNA proxy) { if(proxy != null) { // make sure, that last char in proxy UNA is a '/' String name = proxy.toString(); if(name.charAt(name.length() - 1) != '/') proxy = new UNA(name + "/"); } if(protocol == null) { if(proxies != null) proxies.clear(); proxies = null; this.proxy = proxy; return; } if(proxy == null) { if(proxies != null) { proxies.remove(protocol); if(proxies.size() == 0) proxies = null; } return; } if(proxies == null) proxies = new Hashtable(); proxies.put(protocol,proxy); } /** * **/ public void answerCounterWithCounter(boolean counter) { answerCounterWithCounter = counter; } /** * **/ public void addAlias(String alias, String longVar) { if(alias == null || longVar == null) return; this.alias.put(alias,longVar); } /** * **/ public String removeAlias(String alias) { if(alias == null) return null; return (String)this.alias.remove(alias); } /** * **/ private void initializeAliases() { int i; for(i = 0;i < lengthAliases.length;++i) addAlias(lengthAliases[i],lengthTag); for(i = 0;i < sourceAliases.length;++i) addAlias(sourceAliases[i],sourceTag); for(i = 0;i < targetAliases.length;++i) addAlias(targetAliases[i],targetTag); for(i = 0;i < initializeAliases.length;++i) addAlias(initializeAliases[i],initializeTag); for(i = 0;i < counterAliases.length;++i) addAlias(counterAliases[i],counterTag); addGreetingVariable(new VariableModifier(// VariableModifier.GLYPH_ASSIGN,"_list_using_modules",// new Value("_state"))); } /** * **/ private void setCounter(Hashtable container, MMPConnection connection, // long value) { if(connection == null) return; Counter counter = new Counter(); counter.value = value; container.put(connection,counter); } /** * **/ private void removeCounter(Hashtable container, MMPConnection connection) { if(connection == null) return; container.remove(connection); } /** * **/ private boolean haveCounter(Hashtable container, MMPConnection connection) { if(connection == null) return false; return container.get(connection) != null; } /** * **/ private long incrementCounter(Hashtable container, // MMPConnection connection) { if(connection == null) return 0; Counter counter = (Counter)container.get(connection); if(counter == null) return 0; return ++counter.value; } /** * **/ private long decrementCounter(Hashtable container, // MMPConnection connection) { if(connection == null) return 0; Counter counter = (Counter)container.get(connection); if(counter == null) return 0; return --counter.value; } /** * **/ public String[] getVars(UNA remote) { return getVars(remote,null); } /** * **/ public String[] getVars(UNA remote, UNA local) { if(remote == null) return null; MMPConnection connection = new MMPConnection(remote,local); Hashtable globVars = (Hashtable)this.globVars.get(connection); Hashtable tempVars = (Hashtable)this.tempVars.get(connection); if(globVars == null && tempVars == null) return null; String[] names; synchronized(globVars) { synchronized(tempVars) { int i = 0, size = globVars == null ? 0 : globVars.size(); Enumeration e; if(tempVars != null) for(e = tempVars.keys();e.hasMoreElements();) if(globVars.get(e.nextElement()) == null) ++size; names = new String[size]; String tmp; if(globVars != null) for(e = globVars.keys();e.hasMoreElements();) names[i++] = (String)e.nextElement(); if(tempVars != null) for(e = tempVars.keys();e.hasMoreElements();) if(globVars.get(tmp = (String)e.nextElement()) == null) names[i++] = tmp; } } return names; } /** * **/ public Value getValue(UNA remote, String name) { return getValue(remote,null,name); } /** * **/ public Value getValue(UNA remote, UNA local, String name) { if(remote == null) return null; MMPConnection connection = new MMPConnection(remote,local); Value val; Hashtable vars = (Hashtable)tempVars.get(connection); if(vars == null) vars = (Hashtable)globVars.get(connection); else if((val = (Value)vars.get(name)) != null) return val; else vars = (Hashtable)globVars.get(connection); if(vars == null) return null; return (Value)vars.get(name); } /** * **/ public Value removeValue(UNA remote, String name) { return removeValue(remote,null,name); } /** * **/ public Value removeValue(UNA remote, UNA local, String name) { if(remote == null) return null; MMPConnection connection = new MMPConnection(remote,local); Value old = getValue(remote,local,name); Hashtable vars = (Hashtable)tempVars.get(connection); if(vars != null) vars.remove(name); vars = (Hashtable)globVars.get(connection); if(vars != null) vars.remove(name); return old; } /** * **/ public void close() { synchronized(dispatchers) { for(Enumeration e = dispatchers.elements();e.hasMoreElements();) ((MMPDispatcher)e.nextElement()).close(); dispatchers.clear(); } } /** * **/ public void close(UNA remote) { close(new MMPReceiverException(MMPReceiverException.CLOSED),// remote,null,true); } /** * **/ protected void close(UNA remote, UNA local) { close(new MMPReceiverException(MMPReceiverException.CLOSED),// remote,local,false); } /** * **/ private void close(MMPException e, UNA remote, UNA local, boolean allLocal) { if(remote == null) return; if(remote.getResource() == null || // remote.getResource().equals(// UNA.getDefaultResource(remote.getScheme()))) { MMPDispatcher dispatcher = getDispatcher(remote); if(dispatcher != null) dispatcher.close(remote); } // remove every greeting sent for all slaves of this source Hashtable rm = new Hashtable(); rm.put(new MMPConnection(remote,null),remote); this.remote.remove(remote); Enumeration en; MMPConnection connection; for(en = sentGreeting.keys();en.hasMoreElements();) { connection = (MMPConnection)en.nextElement(); if(remote.master(connection.remote)) if(allLocal || local == null && connection.local == null || local != null && local.master(connection.local)) rm.put(connection,connection.remote); } for(en = rm.keys();en.hasMoreElements();) { connection = (MMPConnection)en.nextElement(); this.local.remove(connection.remote); globVars.remove(connection); tempVars.remove(connection); removeCounter(incomingCounter,connection); removeCounter(outgoingCounter,connection); sentGreeting.remove(connection); dontAnswerGreeting.remove(connection); manager.error(e,connection.remote,connection.local,null); } } /** * Adds a Variable to every outgoing greeting Packet. **/ public void addGreetingVariable(VariableModifier modifier) { if(modifier == null) return; generalVariables.addElement(modifier); } /** * Adds a Variable only to every outgoing greeting Packet for * the specified protocol. * A protocol value of null means default protocol. **/ public void addGreetingVariable(String protocol, VariableModifier modifier) { if(modifier == null) return; if(protocol == null) protocol = UNA.getDefaultProtocol(); Vector modifiers = (Vector)protocolVariables.get(protocol); if(modifiers == null) { modifiers = new Vector(); protocolVariables.put(protocol,modifiers); } modifiers.addElement(modifier); } /** * Adds a Variable only to every outgoing greeting Packet for * the specified UNA. **/ public void addGreetingVariable(UNA remote, VariableModifier modifier) { if(remote == null || modifier == null) return; Vector modifiers = (Vector)unlVariables.get(remote); if(modifiers == null) { modifiers = new Vector(); unlVariables.put(remote,modifiers); } modifiers.addElement(modifier); } /** * **/ public void sendGreeting(UNA remote, UNA local) { if(remote == null) return; MMPConnection connection = new MMPConnection(remote,local); sentGreeting.put(connection,remote); dontAnswerGreeting.put(connection,remote); String protocol = remote.getProtocol(); if(protocol == null) return; Vector v, variables = new Vector(); variables.addElement(initialize); for(Enumeration e = generalVariables.elements();e.hasMoreElements();) variables.addElement(e.nextElement()); if((v = (Vector)protocolVariables.get(protocol)) != null) for(Enumeration e = v.elements();e.hasMoreElements();) variables.addElement(e.nextElement()); if((v = (Vector)unlVariables.get(remote)) != null) for(Enumeration e = v.elements();e.hasMoreElements();) variables.addElement(e.nextElement()); send(remote,local,new MMPPacket(variables,// manager.getGreetingBody(remote,local))); } /** * **/ private void clean(MMPException e, UNA remote) { close(e,remote,null,true); // check if the remote is a proxy if(proxies != null) { String protocol = remote.getProtocol(); if(protocol != null) { UNA proxy = (UNA)proxies.get(protocol); if(proxy != null && proxy.equals(remote)) { Hashtable connections = (Hashtable)sentGreeting.clone(); for(Enumeration en = connections.elements();// en.hasMoreElements();) { UNA testRemote = (UNA)en.nextElement(); if(protocol.equals(testRemote.getProtocol())) close(e,testRemote,null,true); } } } } if(proxy != null && proxy.equals(remote)) { Hashtable connections = (Hashtable)sentGreeting.clone(); for(Enumeration en = connections.elements();// en.hasMoreElements();) close(e,(UNA)en.nextElement(),null,true); } } private class Normalizer implements VariableNormalizer { /** * **/ public String normalizeVariableName(String name) { if(name == null) return null; String expanded = (String)alias.get(name); if(expanded == null) return name; return expanded; } } private class DispatcherPeer implements MMPDispatcherPeer { /** * **/ public void error(MMPException e, UNA remote, MMPPacket packet) { if(e == null) return; int code = e.getErrorCode(); boolean clean = false; try { if(e instanceof MMPDeliveryException) { // TODO: make that better gotException = (MMPDeliveryException)e; if(code == MMPDeliveryException.CANNOT_DELIVER && // remote != null) clean = true; } else { if(code == MMPReceiverException.CLOSED && remote != null) clean = true; } } finally { if(clean) clean(e,remote); } } /** * **/ public void manage(UNA remote, MMPPacket packet) { int i, j; Vector header = packet.getHeader(); VariableModifier modifier; char glyph; String name; Value val; UNA correctRemote = (UNA)MMPMessageCenter.this.remote.get(remote); UNA local = null; MMPConnection connection = null; boolean removeLocal = false; boolean staticLocal = false; boolean gotInitialize = false; boolean gotCounter = false; UNA allocateRemote = null; UNA allocateLocal = null; long counter = 0; boolean gotHeader = header != null && header.size() > 0; if(correctRemote != null) packet.setRemote(correctRemote); else correctRemote = remote; UNA initialRemote = remote; if(gotHeader) { // correct remote and check if proxy only for(i = 0;i < header.size();++i) { modifier = (VariableModifier)header.elementAt(i); name = modifier.getName(); if(sourceTag.equals(name)) { glyph = modifier.getGlyph(); if(glyph == VariableModifier.GLYPH_SET || // glyph == VariableModifier.GLYPH_ASSIGN) { // We know only SET and ASSIGN, because DIMINISH is not // allowed (a remote cannot say that the packet comes // from nirwana or one of it's masters) and AUGMENT is not // possible (it's no array, just a simple UNA) - so // it's only interestend if it is a temporary or a // static source tag (see below). // Even we don't know QUERY. val = modifier.getValue(); if(val != null && val.toString().length() > 0) { UNA newRemote = new UNA(val.toString()); // check if the source tag is valid and allowed if(!correctRemote.master(newRemote)) { // check if the remote just forgot the port newRemote = new UNA(newRemote.getScheme(),// newRemote.getHostName(),// correctRemote.getPort(),// newRemote.getProtocol(),// newRemote.getResource()); if(!correctRemote.master(newRemote)) { manager.error(new MMPReceiverException(// MMPReceiverException.UNTRUSTED_SOURCE),// correctRemote,null,packet); return; } } // check if the remote is a proxy UNA proxy = null; if(proxies != null) { proxy = (UNA)proxies.get(newRemote.getProtocol()); } if(proxy == null) proxy = MMPMessageCenter.this.proxy; if(proxy != null && proxy.master(newRemote)) { String resource = newRemote.getResource(); if(resource != null && resource.length() > 0) { resource = resource.substring(1); if(resource.length() > 0) { // make sure, it's not a packet from the // proxy itself UNA directRemote = // new UNA(newRemote,resource); if(directRemote.getHostName() != null) { newRemote = directRemote; } } } } // check if the remote is a scheme proxy if(schemeProxies != null) { proxy = (UNA)schemeProxies.get(// newRemote.getScheme()); } else { proxy = null; } if(proxy != null && proxy.master(newRemote)) { String resource = newRemote.getResource(); if(resource != null && resource.length() > 0) { resource = resource.substring(1); if(resource.length() > 0) { // make sure, it's not a packet from the // proxy itself UNA directRemote = // new UNA(newRemote,resource); if(directRemote.getHostName() != null) { newRemote = directRemote; } } } } /** * if this tag is not temporary, remind it * TAKE CARE! from now on, NO master of this remote * can talk to us any longer, because we assume * every packet comes from this remote but this * remote is not allowed to send packets from its * master - even no _initialize packet! **/ if(!VariableModifier.isTemporary(modifier.getGlyph())) allocateRemote = newRemote; packet.setRemote(newRemote); correctRemote = newRemote; } } header.removeElementAt(i--); continue; } if(targetTag.equals(name)) { glyph = modifier.getGlyph(); val = modifier.getValue(); if(val == null || val.toString().length() <= 0) { if(glyph == VariableModifier.GLYPH_DIMINISH) { // handle remove on complete variable explicitely removeLocal = true; local = null; } // else, if we have no body... ignore the tag } else if(glyph == VariableModifier.GLYPH_SET || // glyph == VariableModifier.GLYPH_ASSIGN) { local = new UNA(val.toString()); if(VariableModifier.isTemporary(glyph)) staticLocal = false; else staticLocal = true; } header.removeElementAt(i--); continue; } if(initializeTag.equals(name)) { if(modifier.getGlyph() == VariableModifier.GLYPH_SET) { gotInitialize = true; packet.setInitialize(); } header.removeElementAt(i--); continue; } if(counterTag.equals(name)) { if(modifier.getGlyph() == VariableModifier.GLYPH_SET) { gotCounter = true; val = modifier.getValue(); // if we have no value, assume it as zero (already set) if(val != null) try { counter = val.toLong(); } catch(NumberFormatException e) { counter = 0; } packet.setCounter(counter); } header.removeElementAt(i--); continue; } } } remote = packet.getRemote(); if(removeLocal) MMPMessageCenter.this.local.remove(remote); if(local == null) local = (UNA)MMPMessageCenter.this.local.get(remote); else if(staticLocal) allocateLocal = local; if(local != null) packet.setLocal(local); connection = new MMPConnection(remote,local); Hashtable globVars = // (Hashtable)MMPMessageCenter.this.globVars.get(connection); Hashtable tempVars = // (Hashtable)MMPMessageCenter.this.tempVars.get(connection); if(globVars == null) { // if this is an unknown connection... // check if there are more connections than allowed... if(MMPMessageCenter.this.globVars.size() >= maxConnections && // maxConnections > 0) { // if there are more connections than allowed... // TODO: generate some error // ...free everything possibly allocated until now // there are no resources allocated // ...close the remote // TODO: be more intelligent maybe if(closeOnMaxConnections) close(initialRemote); // ...drop the packet return; } } if(allocateRemote != null) MMPMessageCenter.this.remote.put(initialRemote,allocateRemote); if(allocateLocal != null) MMPMessageCenter.this.local.put(remote,allocateLocal); if(!gotInitialize && (globVars == null || tempVars == null)) { // whoups, we got no initialize tag, and we have even no variable // buffers for this remote it seems, we lost at least one packet manager.error(new MMPReceiverException(// MMPReceiverException.PACKET_LOSS_NO_INITIALIZE),// remote,local,packet); // assume, that we got an initialize - send greeting packet later gotInitialize = true; } if(globVars == null) MMPMessageCenter.this.globVars.put(connection,// globVars = new Hashtable()); if(tempVars == null) MMPMessageCenter.this.tempVars.put(connection,// tempVars = new Hashtable()); synchronized(globVars) { // remove all global variables, if we got an initialize tag if(gotInitialize) globVars.clear(); synchronized(tempVars) { // remove all temporarily variables in any case tempVars.clear(); if(gotHeader) { if(gotCounter) { // yeah, we have a counter if(gotInitialize) { // uh, we should initialize it setCounter(incomingCounter,connection,counter); if(answerCounterWithCounter) setCounter(outgoingCounter,connection,0); } else { // ah, we should have a counter if(haveCounter(incomingCounter,connection)) { // ah, we have to increment our and check it back long number = incrementCounter(incomingCounter,// connection); if(counter < number) { // TODO: check for overflows decrementCounter(incomingCounter,connection); return; // a double, simply ignore it } if(counter > number) { // it seems, we lost at least one packet manager.error(new MMPReceiverException(// MMPReceiverException.// PACKET_LOSS_COUNTER_DIFFERS),// remote,local,packet); } } else { // huh? we should have a counter, but we haven't? // well, then at least let's initialize our counter setCounter(incomingCounter,connection,counter); if(answerCounterWithCounter) setCounter(outgoingCounter,connection,0); // it seems, we lost at least one packet manager.error(new MMPReceiverException(// MMPReceiverException.// PACKET_LOSS_COUNTER_NOT_INITIALIZED),// remote,local,packet); } } } // apply variables for(Enumeration e = header.elements();// e.hasMoreElements();) { modifier = (VariableModifier)e.nextElement(); glyph = modifier.getGlyph(); name = modifier.getName(); val = modifier.getValue(); do { if(val == null || val.toString().length() <= 0) { if(glyph == VariableModifier.GLYPH_DIMINISH) { // handle remove on complete variable explicitely tempVars.remove(name); globVars.remove(name); continue; } else if(val == null) val = new Value(); } if(glyph == VariableModifier.GLYPH_SET) tempVars.put(name,val); else if(glyph == VariableModifier.GLYPH_ASSIGN) { if(globVars.size() < maxVariables || // maxVariables < 1) { globVars.put(name,val); } else { //TODO: generate some error } } else if(// glyph == VariableModifier.GLYPH_AUGMENT || // glyph == VariableModifier.GLYPH_DIMINISH) { Value oldVal = getValue(remote,local,name); // TODO: discuss, if we'll be strict //if(oldVal == null) continue; if(oldVal == null) { if(globVars.size() < maxVariables || // maxVariables < 1) { globVars.put(name,oldVal = new Value()); } else { //TODO: generate some error } } oldVal.handle(glyph,val.toString()); } } while(false); } } // send greeting, if we got an initialize tag if(gotInitialize && dontAnswerGreeting.get(connection) == null) sendGreeting(remote,local); dontAnswerGreeting.remove(connection); // forward the packet to our manager manager.manage(packet); } } } } /** * **/ public UNA getLocal(UNA remote) { if(remote == null) return null; // check SchemeProxies if(schemeProxies != null) { UNA proxy = (UNA)schemeProxies.get(remote.getScheme()); if(proxy != null) remote = new UNA(proxy.toString() + remote); } UNA correctRemote = null; // check explicite proxies if(proxy != null) correctRemote = proxy; // if there arent explicite proxies, check implicite proxies else { if(proxies != null) { correctRemote = (UNA)proxies.get(remote.getProtocol()); } } // if we have no proxies, send it directly if(correctRemote == null) correctRemote = remote; MMPDispatcher dispatcher = getDispatcher(correctRemote); if(dispatcher == null) return null; try { return dispatcher.getLocal(remote); } catch(IOException e) { return null; } } /** * **/ private void addVariableModifier(MMPConnection connection, // VariableModifier modifier) { if(connection == null || modifier == null) return; Vector modifiers = (Vector)outgoings.get(connection); if(modifiers == null) { modifiers = new Vector(); outgoings.put(connection,modifiers); } modifiers.addElement(modifier); } /** * **/ public void assignVariable(UNA remote, String name, Value value) { assignVariable(remote,null,name,value); } /** * **/ public void assignVariable(UNA remote, UNA local, // String name, Value value) { addVariableModifier(new MMPConnection(remote,local),// new VariableModifier(VariableModifier.GLYPH_ASSIGN,name,value)); } /** * **/ public void augmentVariable(UNA remote, String name, Value value) { augmentVariable(remote,null,name,value); } /** * **/ public void augmentVariable(UNA remote, UNA local, // String name, Value value) { addVariableModifier(new MMPConnection(remote,local),// new VariableModifier(VariableModifier.GLYPH_AUGMENT,name,value)); } /** * **/ public void diminishVariable(UNA remote, String name, Value value) { diminishVariable(remote,null,name,value); } /** * **/ public void diminishVariable(UNA remote, UNA local, // String name, Value value) { addVariableModifier(new MMPConnection(remote,local),// new VariableModifier(VariableModifier.GLYPH_DIMINISH,name,value)); } /** * **/ public void send(UNA remote, MMPPacket packet) { send(remote,null,packet); } /** * **/ public void send(UNA remote, UNA local, MMPPacket packet) { if(remote == null) return; gotException = null; if(local != null && (local.getResource() == null || // local.getResource().equals(// UNA.getDefaultResource(local.getScheme())))) local = null; Vector header = packet.getHeader(); MMPConnection connection = new MMPConnection(remote,local); Vector outgoings = (Vector)this.outgoings.get(connection); if(outgoings != null) { for(Enumeration e = outgoings.elements();e.hasMoreElements();) header.addElement(e.nextElement()); outgoings.removeAllElements(); } if(sentGreeting.get(connection) == null) sendGreeting(remote,local); if(gotException != null) { sentGreeting.remove(connection); dontAnswerGreeting.remove(connection); return; } if(local != null) header.insertElementAt(new VariableModifier(// VariableModifier.GLYPH_SET,sourceTag,// new Value(local.toString())),0); if(haveCounter(outgoingCounter,connection)) header.insertElementAt(// new VariableModifier(VariableModifier.GLYPH_SET,counterTag,// new Value(String.valueOf(incrementCounter(outgoingCounter,// connection)))),0); // check SchemeProxies if(schemeProxies != null) { UNA proxy = (UNA)schemeProxies.get(remote.getScheme()); if(proxy != null) remote = new UNA(proxy.toString() + remote); } UNA correctRemote = null; // check proxies if(proxies != null) { correctRemote = (UNA)proxies.get(remote.getProtocol()); } if(correctRemote == null) correctRemote = proxy; // if we got some kind of proxy, check if the packet is already // addressed directly to it if(correctRemote != null && correctRemote.master(remote)) correctRemote = null; // if we got some kind of proxy or the remote is specified // by any kind of resource: set target variable if(correctRemote != null || remote.getResource() != null && // !remote.getResource().equals(// UNA.getDefaultResource(remote.getScheme()))) header.insertElementAt(new VariableModifier(// VariableModifier.GLYPH_SET,targetTag,new Value(// (correctRemote != null ? correctRemote.toString() : "") + // remote.toString())),0); // if we have no proxies, send it directly if(correctRemote == null) correctRemote = remote; try { MMPDispatcher dispatcher = getDispatcher(correctRemote); if(dispatcher == null) { gotException = new MMPDeliveryException(// MMPDeliveryException.UNKNOWN_PROTOCOL); return; } synchronized(this) { gotException = null; try { dispatcher.send(correctRemote,packet); } catch(Exception e) { // TODO: do not close it here because of problem_with_lost_data // BUT: force the dispatcher to forget the connection // AND: try it again with a new one instead of simply throwing // a cannot deliver. dispatcher.forget(correctRemote); try { dispatcher.send(correctRemote,packet); } catch(Exception ex) { gotException = new MMPDeliveryException(// MMPDeliveryException.CANNOT_DELIVER); } } } } finally { if(gotException != null) manager.error(gotException,remote,local,packet); } } /** * **/ public Enumeration getAllRemotes() { return sentGreeting.elements(); } /** * **/ public Enumeration getAllRemotes(UNA local) { if(local == null) return getAllRemotes(); Vector remote = new Vector(); MMPConnection connection; for(Enumeration e = sentGreeting.keys();e.hasMoreElements();) { connection = (MMPConnection)e.nextElement(); if(local.equals(connection.local)) remote.addElement(sentGreeting.get(connection)); } return remote.elements(); } /** * **/ public void broadcast(MMPPacket packet) { for(Enumeration e = getAllRemotes();e.hasMoreElements();) send((UNA)e.nextElement(),null,packet); } /** * **/ public void broadcast(UNA local, MMPPacket packet) { for(Enumeration e = getAllRemotes(local);e.hasMoreElements();) send((UNA)e.nextElement(),null,packet); } }