diff --git a/src/edu/umass/cs/reconfiguration/AbstractReplicaCoordinator.java b/src/edu/umass/cs/reconfiguration/AbstractReplicaCoordinator.java index 7fc5b129..376430f1 100644 --- a/src/edu/umass/cs/reconfiguration/AbstractReplicaCoordinator.java +++ b/src/edu/umass/cs/reconfiguration/AbstractReplicaCoordinator.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import edu.umass.cs.reconfiguration.interfaces.*; import org.json.JSONException; import org.json.JSONObject; @@ -40,11 +41,6 @@ import edu.umass.cs.nio.interfaces.IntegerPacketType; import edu.umass.cs.nio.interfaces.Messenger; import edu.umass.cs.nio.nioutils.NIOHeader; -import edu.umass.cs.reconfiguration.interfaces.ReconfigurableRequest; -import edu.umass.cs.reconfiguration.interfaces.ReconfiguratorCallback; -import edu.umass.cs.reconfiguration.interfaces.ReplicaCoordinator; -import edu.umass.cs.reconfiguration.interfaces.ReplicableRequest; -import edu.umass.cs.reconfiguration.interfaces.Repliconfigurable; import edu.umass.cs.reconfiguration.reconfigurationpackets.DefaultAppRequest; import edu.umass.cs.reconfiguration.reconfigurationpackets.ReconfigurationPacket; import edu.umass.cs.reconfiguration.reconfigurationpackets.ReplicableClientRequest; @@ -128,7 +124,17 @@ protected AbstractReplicaCoordinator(Replicable app) { this.messenger = null; } - + public AbstractReplicaCoordinator(Replicable app, + Messenger messenger,CoordinatorCallback callback) { + this(app,messenger); + if(callback!=null){ + this.callback = callback; + this.parser = callback; + callback.setCoordinator(this,messenger); + } + } + + /** * @param app * @param messenger @@ -341,6 +347,11 @@ public void setGetRequestImpl(AppRequestParserBytes parserBytes) { public final Request getRequest(byte[] bytes, NIOHeader header) throws RequestParseException { try { + Request request; + if((this.parser != null)&&((request = this.parser.getRequest(bytes, header))!=null)){ + return request; + } + return ByteBuffer.wrap(bytes).getInt() == ReconfigurationPacket.PacketType.REPLICABLE_CLIENT_REQUEST .getInt() ? (this.app instanceof AppRequestParserBytes ? new ReplicableClientRequest( bytes, header, (AppRequestParserBytes) this.app) @@ -374,7 +385,12 @@ protected final Request getRequest(ReplicableClientRequest rcr, * @return Set of request types that the app is designed to handle. */ public Set getAppRequestTypes() { - return this.app.getRequestTypes(); + HashSet packetTypes = new HashSet<>(); + packetTypes.addAll(this.app.getRequestTypes()); + if(parser !=null){ + packetTypes.addAll(parser.getRequestTypes()); + } + return packetTypes; } @Override @@ -415,6 +431,8 @@ public boolean restore(String name, String state) { return this.stopCallback != null && this.stopCallback.preRestore(name, state) ? true + :this.callback!=null &&this.callback.preRestore(name,state) ? true + /* Will be a no-op except during recovery when stopCallback will be null * as it wouldn't yet have been set. */ : this.preRestore(name, state) ? true diff --git a/src/edu/umass/cs/reconfiguration/ActiveReplica.java b/src/edu/umass/cs/reconfiguration/ActiveReplica.java index 92e2d6e1..8bf2fcba 100644 --- a/src/edu/umass/cs/reconfiguration/ActiveReplica.java +++ b/src/edu/umass/cs/reconfiguration/ActiveReplica.java @@ -158,7 +158,7 @@ private ActiveReplica(AbstractReplicaCoordinator appC, ReconfigurableNodeConfig nodeConfig, SSLMessenger messenger, boolean noReporting) { this.appCoordinator = (AbstractReplicaCoordinator) - wrapCoordinator(appC.setStopCallback( + (appC.setStopCallback( // setting default callback is optional // (ReconfiguratorCallback) this).setCallback( (ReconfiguratorCallback) this)); @@ -191,28 +191,7 @@ private ActiveReplica(AbstractReplicaCoordinator appC, // initInstrumenter(); } - protected static AbstractReplicaCoordinator wrapCoordinator( - AbstractReplicaCoordinator coordinator) { - Class clazz = null; - try { - clazz = Class.forName(Config - .getGlobalString(RC.COORDINATOR_WRAPPER)); - } catch (ClassNotFoundException e) { - // eat up exception, normal case - } - if (clazz == null) - return coordinator; - // reflectively instantiate - try { - return (AbstractReplicaCoordinator) clazz.getConstructor( - AbstractReplicaCoordinator.class).newInstance(coordinator); - } catch (InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException - | NoSuchMethodException | SecurityException e) { - e.printStackTrace(); - } - return coordinator; - } + /** * @param name diff --git a/src/edu/umass/cs/reconfiguration/PaxosReplicaCoordinator.java b/src/edu/umass/cs/reconfiguration/PaxosReplicaCoordinator.java index d3bdc9ff..9be003d9 100644 --- a/src/edu/umass/cs/reconfiguration/PaxosReplicaCoordinator.java +++ b/src/edu/umass/cs/reconfiguration/PaxosReplicaCoordinator.java @@ -34,6 +34,7 @@ import edu.umass.cs.nio.interfaces.IntegerPacketType; import edu.umass.cs.nio.interfaces.Messenger; import edu.umass.cs.nio.interfaces.Stringifiable; +import edu.umass.cs.reconfiguration.interfaces.CoordinatorCallback; import edu.umass.cs.reconfiguration.interfaces.ReconfigurableRequest; import edu.umass.cs.reconfiguration.interfaces.ReplicableRequest; import edu.umass.cs.reconfiguration.reconfigurationpackets.ReconfigurationPacket; @@ -61,8 +62,8 @@ public class PaxosReplicaCoordinator extends private PaxosReplicaCoordinator(Replicable app, NodeIDType myID, Stringifiable unstringer, Messenger niot, String paxosLogFolder, - boolean enableNullCheckpoints) { - super(app, niot); + boolean enableNullCheckpoints,CoordinatorCallback callback) { + super(app, niot,callback); assert (niot instanceof JSONMessenger); this.paxosManager = new PaxosManager(myID, unstringer, (JSONMessenger) niot, this, paxosLogFolder, @@ -72,7 +73,11 @@ private PaxosReplicaCoordinator(Replicable app, NodeIDType myID, .getNodePort(myID)), niot); } - + public PaxosReplicaCoordinator(Replicable app, NodeIDType myID, + Stringifiable unstringer, Messenger niot, + CoordinatorCallback coordinatorCallback) { + this(app, myID, unstringer, niot, null, true,coordinatorCallback); + } /** * @param app * @param myID @@ -81,7 +86,7 @@ private PaxosReplicaCoordinator(Replicable app, NodeIDType myID, */ public PaxosReplicaCoordinator(Replicable app, NodeIDType myID, Stringifiable unstringer, Messenger niot) { - this(app, myID, unstringer, niot, null, true); + this(app, myID, unstringer, niot, null, true,null); } /** @@ -95,7 +100,7 @@ public PaxosReplicaCoordinator(Replicable app, NodeIDType myID, public PaxosReplicaCoordinator(Replicable app, NodeIDType myID, Stringifiable unstringer, Messenger niot, int outOfOrderLimit) { - this(app, myID, unstringer, (JSONMessenger) niot); + this(app, myID, unstringer, (JSONMessenger) niot,null); assert (niot instanceof JSONMessenger); this.paxosManager.setOutOfOrderLimit(outOfOrderLimit); } diff --git a/src/edu/umass/cs/reconfiguration/ReconfigurableNode.java b/src/edu/umass/cs/reconfiguration/ReconfigurableNode.java index eb63239e..a2ad32c7 100644 --- a/src/edu/umass/cs/reconfiguration/ReconfigurableNode.java +++ b/src/edu/umass/cs/reconfiguration/ReconfigurableNode.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; +import edu.umass.cs.reconfiguration.interfaces.CoordinatorCallback; import org.json.JSONObject; import edu.umass.cs.gigapaxos.AbstractPaxosLogger; @@ -183,6 +184,29 @@ private AbstractReplicaCoordinator createApp(String[] args, } } + @SuppressWarnings("unchecked") + private CoordinatorCallback getWrappingCoordinatorCallback(){ + Class clazz = null; + try { + clazz = Class.forName(Config + .getGlobalString(ReconfigurationConfig.RC.COORDINATOR_WRAPPER)); + } catch (ClassNotFoundException e) { + // eat up exception, normal case + } + if (clazz == null) { + return null; + } + // reflectively instantiate + try { + return (CoordinatorCallback) clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException + | IllegalArgumentException | SecurityException e) { + e.printStackTrace(); + } + return null; + + } + /** * * TODO: Extend this method to enable support for other coordinators. @@ -198,7 +222,7 @@ private AbstractReplicaCoordinator getAppCoordinator( ReconfigurableNodeConfig nodeConfig, JSONMessenger messenger) { return new PaxosReplicaCoordinator(app, myID, nodeConfig, - messenger).setOutOfOrderLimit(Config + messenger,getWrappingCoordinatorCallback()).setOutOfOrderLimit(Config .getGlobalInt(ReconfigurationConfig.RC.OUT_OF_ORDER_LIMIT)); } diff --git a/src/edu/umass/cs/reconfiguration/ReconfigurationConfig.java b/src/edu/umass/cs/reconfiguration/ReconfigurationConfig.java index bc006d3b..0da7a978 100644 --- a/src/edu/umass/cs/reconfiguration/ReconfigurationConfig.java +++ b/src/edu/umass/cs/reconfiguration/ReconfigurationConfig.java @@ -288,7 +288,7 @@ public static enum RC implements Config.ConfigurableEnum { * that this parameter is irrelevant after name creation as the number * of replicas thereafter is controlled by the reconfiguration policy. */ - REPLICATE_ALL(true), + REPLICATE_ALL(false), /** * */ @@ -385,13 +385,13 @@ public static enum RC implements Config.ConfigurableEnum { /** * If true, transactions are enabled; else disabled. */ - ENABLE_TRANSACTIONS (false), + ENABLE_TRANSACTIONS (true), /** * The name of the class used to wrap the application's default * coordinator. */ - COORDINATOR_WRAPPER("edu.umass.cs.txn.DistTransactor"), + COORDINATOR_WRAPPER("edu.umass.cs.transaction.DistTransactor"), /** * diff --git a/src/edu/umass/cs/reconfiguration/interfaces/CoordinatorCallback.java b/src/edu/umass/cs/reconfiguration/interfaces/CoordinatorCallback.java new file mode 100644 index 00000000..074f92d0 --- /dev/null +++ b/src/edu/umass/cs/reconfiguration/interfaces/CoordinatorCallback.java @@ -0,0 +1,16 @@ +package edu.umass.cs.reconfiguration.interfaces; + +import edu.umass.cs.gigapaxos.interfaces.AppRequestParser; +import edu.umass.cs.nio.interfaces.IntegerPacketType; +import edu.umass.cs.nio.interfaces.Messenger; +import edu.umass.cs.reconfiguration.AbstractReplicaCoordinator; + +import java.util.Set; + +public interface CoordinatorCallback extends ReconfiguratorCallback,AppRequestParser{ + + void setCoordinator(AbstractReplicaCoordinator coordinator,Messenger messenger); + + Set getRequestTypes() ; + +}