Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions src/edu/umass/cs/reconfiguration/AbstractReplicaCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import edu.umass.cs.reconfiguration.interfaces.*;
import org.json.JSONException;
import org.json.JSONObject;

Expand All @@ -40,11 +41,6 @@
import edu.umass.cs.nio.interfaces.IntegerPacketType;
import edu.umass.cs.nio.interfaces.Messenger;
import edu.umass.cs.nio.nioutils.NIOHeader;
import edu.umass.cs.reconfiguration.interfaces.ReconfigurableRequest;
import edu.umass.cs.reconfiguration.interfaces.ReconfiguratorCallback;
import edu.umass.cs.reconfiguration.interfaces.ReplicaCoordinator;
import edu.umass.cs.reconfiguration.interfaces.ReplicableRequest;
import edu.umass.cs.reconfiguration.interfaces.Repliconfigurable;
import edu.umass.cs.reconfiguration.reconfigurationpackets.DefaultAppRequest;
import edu.umass.cs.reconfiguration.reconfigurationpackets.ReconfigurationPacket;
import edu.umass.cs.reconfiguration.reconfigurationpackets.ReplicableClientRequest;
Expand Down Expand Up @@ -128,7 +124,17 @@ protected AbstractReplicaCoordinator(Replicable app) {
this.messenger = null;
}


public AbstractReplicaCoordinator(Replicable app,
Messenger<NodeIDType, ?> messenger,CoordinatorCallback<NodeIDType> callback) {
this(app,messenger);
if(callback!=null){
this.callback = callback;
this.parser = callback;
callback.setCoordinator(this,messenger);
}
}


/**
* @param app
* @param messenger
Expand Down Expand Up @@ -341,6 +347,11 @@ public void setGetRequestImpl(AppRequestParserBytes parserBytes) {
public final Request getRequest(byte[] bytes, NIOHeader header)
throws RequestParseException {
try {
Request request;
if((this.parser != null)&&((request = this.parser.getRequest(bytes, header))!=null)){
return request;
}

return ByteBuffer.wrap(bytes).getInt() == ReconfigurationPacket.PacketType.REPLICABLE_CLIENT_REQUEST
.getInt() ? (this.app instanceof AppRequestParserBytes ? new ReplicableClientRequest(
bytes, header, (AppRequestParserBytes) this.app)
Expand Down Expand Up @@ -374,7 +385,12 @@ protected final Request getRequest(ReplicableClientRequest rcr,
* @return Set of request types that the app is designed to handle.
*/
public Set<IntegerPacketType> getAppRequestTypes() {
return this.app.getRequestTypes();
HashSet<IntegerPacketType> packetTypes = new HashSet<>();
packetTypes.addAll(this.app.getRequestTypes());
if(parser !=null){
packetTypes.addAll(parser.getRequestTypes());
}
return packetTypes;
}

@Override
Expand Down Expand Up @@ -415,6 +431,8 @@ public boolean restore(String name, String state) {
return this.stopCallback != null
&& this.stopCallback.preRestore(name, state) ? true

:this.callback!=null &&this.callback.preRestore(name,state) ? true

/* Will be a no-op except during recovery when stopCallback will be null
* as it wouldn't yet have been set. */
: this.preRestore(name, state) ? true
Expand Down
25 changes: 2 additions & 23 deletions src/edu/umass/cs/reconfiguration/ActiveReplica.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private ActiveReplica(AbstractReplicaCoordinator<NodeIDType> appC,
ReconfigurableNodeConfig<NodeIDType> nodeConfig,
SSLMessenger<NodeIDType, ?> messenger, boolean noReporting) {
this.appCoordinator = (AbstractReplicaCoordinator<NodeIDType>)
wrapCoordinator(appC.setStopCallback(
(appC.setStopCallback(
// setting default callback is optional
// (ReconfiguratorCallback) this).setCallback(
(ReconfiguratorCallback) this));
Expand Down Expand Up @@ -191,28 +191,7 @@ private ActiveReplica(AbstractReplicaCoordinator<NodeIDType> appC,
// initInstrumenter();
}

protected static AbstractReplicaCoordinator<?> wrapCoordinator(
AbstractReplicaCoordinator<?> coordinator) {
Class<?> clazz = null;
try {
clazz = Class.forName(Config
.getGlobalString(RC.COORDINATOR_WRAPPER));
} catch (ClassNotFoundException e) {
// eat up exception, normal case
}
if (clazz == null)
return coordinator;
// reflectively instantiate
try {
return (AbstractReplicaCoordinator<?>) clazz.getConstructor(
AbstractReplicaCoordinator.class).newInstance(coordinator);
} catch (InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException
| NoSuchMethodException | SecurityException e) {
e.printStackTrace();
}
return coordinator;
}


/**
* @param name
Expand Down
15 changes: 10 additions & 5 deletions src/edu/umass/cs/reconfiguration/PaxosReplicaCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import edu.umass.cs.nio.interfaces.IntegerPacketType;
import edu.umass.cs.nio.interfaces.Messenger;
import edu.umass.cs.nio.interfaces.Stringifiable;
import edu.umass.cs.reconfiguration.interfaces.CoordinatorCallback;
import edu.umass.cs.reconfiguration.interfaces.ReconfigurableRequest;
import edu.umass.cs.reconfiguration.interfaces.ReplicableRequest;
import edu.umass.cs.reconfiguration.reconfigurationpackets.ReconfigurationPacket;
Expand Down Expand Up @@ -61,8 +62,8 @@ public class PaxosReplicaCoordinator<NodeIDType> extends
private PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
Stringifiable<NodeIDType> unstringer,
Messenger<NodeIDType, ?> niot, String paxosLogFolder,
boolean enableNullCheckpoints) {
super(app, niot);
boolean enableNullCheckpoints,CoordinatorCallback callback) {
super(app, niot,callback);
assert (niot instanceof JSONMessenger);
this.paxosManager = new PaxosManager<NodeIDType>(myID, unstringer,
(JSONMessenger<NodeIDType>) niot, this, paxosLogFolder,
Expand All @@ -72,7 +73,11 @@ private PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
.getNodePort(myID)), niot);
}


public PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
Stringifiable<NodeIDType> unstringer, Messenger<NodeIDType, ?> niot,
CoordinatorCallback<NodeIDType> coordinatorCallback) {
this(app, myID, unstringer, niot, null, true,coordinatorCallback);
}
/**
* @param app
* @param myID
Expand All @@ -81,7 +86,7 @@ private PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
*/
public PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
Stringifiable<NodeIDType> unstringer, Messenger<NodeIDType, ?> niot) {
this(app, myID, unstringer, niot, null, true);
this(app, myID, unstringer, niot, null, true,null);
}

/**
Expand All @@ -95,7 +100,7 @@ public PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
public PaxosReplicaCoordinator(Replicable app, NodeIDType myID,
Stringifiable<NodeIDType> unstringer,
Messenger<NodeIDType, ?> niot, int outOfOrderLimit) {
this(app, myID, unstringer, (JSONMessenger<NodeIDType>) niot);
this(app, myID, unstringer, (JSONMessenger<NodeIDType>) niot,null);
assert (niot instanceof JSONMessenger);
this.paxosManager.setOutOfOrderLimit(outOfOrderLimit);
}
Expand Down
26 changes: 25 additions & 1 deletion src/edu/umass/cs/reconfiguration/ReconfigurableNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;

import edu.umass.cs.reconfiguration.interfaces.CoordinatorCallback;
import org.json.JSONObject;

import edu.umass.cs.gigapaxos.AbstractPaxosLogger;
Expand Down Expand Up @@ -183,6 +184,29 @@ private AbstractReplicaCoordinator<NodeIDType> createApp(String[] args,
}
}

@SuppressWarnings("unchecked")
private CoordinatorCallback<NodeIDType> getWrappingCoordinatorCallback(){
Class<?> clazz = null;
try {
clazz = Class.forName(Config
.getGlobalString(ReconfigurationConfig.RC.COORDINATOR_WRAPPER));
} catch (ClassNotFoundException e) {
// eat up exception, normal case
}
if (clazz == null) {
return null;
}
// reflectively instantiate
try {
return (CoordinatorCallback<NodeIDType>) clazz.newInstance();
} catch (InstantiationException | IllegalAccessException
| IllegalArgumentException | SecurityException e) {
e.printStackTrace();
}
return null;

}

/**
*
* TODO: Extend this method to enable support for other coordinators.
Expand All @@ -198,7 +222,7 @@ private AbstractReplicaCoordinator<NodeIDType> getAppCoordinator(
ReconfigurableNodeConfig<NodeIDType> nodeConfig,
JSONMessenger<NodeIDType> messenger) {
return new PaxosReplicaCoordinator<NodeIDType>(app, myID, nodeConfig,
messenger).setOutOfOrderLimit(Config
messenger,getWrappingCoordinatorCallback()).setOutOfOrderLimit(Config
.getGlobalInt(ReconfigurationConfig.RC.OUT_OF_ORDER_LIMIT));
}

Expand Down
6 changes: 3 additions & 3 deletions src/edu/umass/cs/reconfiguration/ReconfigurationConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public static enum RC implements Config.ConfigurableEnum {
* that this parameter is irrelevant after name creation as the number
* of replicas thereafter is controlled by the reconfiguration policy.
*/
REPLICATE_ALL(true),
REPLICATE_ALL(false),
/**
*
*/
Expand Down Expand Up @@ -385,13 +385,13 @@ public static enum RC implements Config.ConfigurableEnum {
/**
* If true, transactions are enabled; else disabled.
*/
ENABLE_TRANSACTIONS (false),
ENABLE_TRANSACTIONS (true),

/**
* The name of the class used to wrap the application's default
* coordinator.
*/
COORDINATOR_WRAPPER("edu.umass.cs.txn.DistTransactor"),
COORDINATOR_WRAPPER("edu.umass.cs.transaction.DistTransactor"),

/**
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edu.umass.cs.reconfiguration.interfaces;

import edu.umass.cs.gigapaxos.interfaces.AppRequestParser;
import edu.umass.cs.nio.interfaces.IntegerPacketType;
import edu.umass.cs.nio.interfaces.Messenger;
import edu.umass.cs.reconfiguration.AbstractReplicaCoordinator;

import java.util.Set;

public interface CoordinatorCallback<NodeIDType> extends ReconfiguratorCallback,AppRequestParser{

void setCoordinator(AbstractReplicaCoordinator<NodeIDType> coordinator,Messenger messenger);

Set<IntegerPacketType> getRequestTypes() ;

}