diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java index 3a9e9ab..a5b309e 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/ExnConnector.java @@ -66,10 +66,10 @@ public Context getContext() { synchronized(this) { while (context_ == null) { try { - wait(); - } catch (InterruptedException e) { + wait(); + } catch (InterruptedException e) { log.error("Caught InterruptException while waiting for ActiveMQ connection Context; looping", e); - } + } } } } @@ -105,15 +105,22 @@ public Context getContext() { /** The metrics to send to EMS and Solver */ public static final String metric_list_channel = "eu.nebulouscloud.optimiser.controller.metric_list"; - /** The status channel for the solvers. We send out an app's AMPL file on - * the channel named by {@link #ampl_message_channel} when getting the - * "started" message from a solver. */ + /** The status channel for an app's digital twin. We send out the + * initialization message on the channel named by {@link + * #twin_init_channel} when getting the "started" message from a digital + * twin. */ + public static final String twin_status_channel = "eu.nebulouscloud.optimiser.twin.state"; + /** The topic for sending the initialization message to the digital twin + * (app creation message and initial solution) */ + public static final String twin_init_channel = "eu.nebulouscloud.optimiser.controller.twin_init"; + /** The status channel for the per-app solvers. We send out an app's AMPL + * file on the channel named by {@link #ampl_message_channel} when getting + * the "started" message from a solver. */ public static final String solver_status_channel = "eu.nebulouscloud.solver.state"; /** The per-app status channel, read by at least the UI and the solver. */ public static final String app_status_channel = "eu.nebulouscloud.optimiser.controller.app_state"; - private static final int findBrokerNodeCandidatesTimeout = 60*1000*2; /** * The Message producer for sending AMPL files, shared between all @@ -122,19 +129,23 @@ public Context getContext() { * @return the publisher configured to send AMPL files to the solver. */ @Getter - private final Publisher amplMessagePublisher; + private final Publisher amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true); /** The publisher for sending the metric list to EMS and Solver during app * creation. */ @Getter - private final Publisher metricListPublisher; + private final Publisher metricListPublisher = new Publisher("controller_metric_list", metric_list_channel, true, true); + + /** The publisher for sending the twin init message to the digital twin. */ + @Getter + private final Publisher twinInitMessagePublisher = new Publisher("twin_init", twin_init_channel, true, true); /** * The publisher for broadcasting the current status of each application * (new, ready, deploying, running, failed). */ @Getter - private final Publisher appStatusPublisher; + private final Publisher appStatusPublisher = new Publisher("app_status", app_status_channel, true, true); /** * The publisher for a first synthetic solver solution. This is needed @@ -143,7 +154,7 @@ public Context getContext() { * from the solver, of course. */ @Getter - private final Publisher solverSolutionPublisher; + private final Publisher solverSolutionPublisher = new Publisher("controller_synthetic_solution", solver_solution_channel, true, true); /** * Create a connection to ActiveMQ via the exn middleware, and set up the @@ -155,11 +166,6 @@ public Context getContext() { * @param password the login password to use */ public ExnConnector(String host, int port, String name, String password) { - amplMessagePublisher = new Publisher("controller_ampl", ampl_message_channel, true, true); - metricListPublisher = new Publisher("controller_metric_list", metric_list_channel, true, true); - appStatusPublisher = new Publisher("app_status", app_status_channel, true, true); - solverSolutionPublisher = new Publisher("controller_synthetic_solution", solver_solution_channel, true, true); - conn = new Connector( "optimiser_controller", new ConnectorHandler() { @@ -177,11 +183,14 @@ public void onReady(Context context) { // here. amplMessagePublisher, metricListPublisher, + twinInitMessagePublisher, appStatusPublisher, solverSolutionPublisher), List.of( new Consumer("solver_status", solver_status_channel, new SolverStatusMessageHandler(), true, true), + new Consumer("twin_status", twin_status_channel, + new DigitalTwinStatusMessageHandler(), true, true), new Consumer("ui_app_messages", app_creation_channel, new AppCreationMessageHandler(), true, true), new Consumer("app_message_reset", app_reset_channel, @@ -452,6 +461,51 @@ public void onMessage(String key, String address, Map body, Message message, Con } } + /** + * A handler that detects when the digital twin for a given application + * has started, and sends it the app creation message. + */ + public class DigitalTwinStatusMessageHandler extends Handler { + @Override + public void onMessage(String key, String address, Map body, Message message, Context context) { + try { + Object appIdObject = null; + String appId = null; + try { + appIdObject = message.property("application"); + } catch (ClientException e) { + log.error("Received digital twin ready message {} without application property, aborting", body); + return; + } + if (appIdObject == null) { + log.error("Received digital twin ready message {} without application property, aborting", body); + return; + } + appId = appIdObject.toString(); // should be a string already + MDC.put("appId", appId); + + JsonNode appMessage = mapper.valueToTree(body); + String status = appMessage.at("/state").textValue(); + if (status == null || !status.equals("started")) { + return; + } + + NebulousApp app = NebulousApps.get(appId); + if (app == null) { + log.info("Received twin startupo message {} for unknown application, this should not happen", body); + } else try { + // This should be very quick, no need to start a thread + MDC.put("clusterName", app.getClusterName()); + app.sendTwinInitializationMessage(); + } catch (Exception e) { + log.error("Internal error while processing digital twin status message", e); + } + } finally { + MDC.clear(); + } + } + } + /** * A message handler for incoming messages from the solver, containing * mappings from variable names to new values. This is used to produce an @@ -577,8 +631,8 @@ public List findNodeCandidates(List requirements, St Map response = findBrokerNodeCandidates.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to fetch node candidates"); - return Collections.emptyList(); + log.error("Failed to fetch node candidates"); + return Collections.emptyList(); } // Note: we do not call extractPayloadFromExnResponse here, since this // response does not come from the exn-middleware, so will not be @@ -659,8 +713,8 @@ public List findNodeCandidatesMultiple(List> re Map response = findBrokerNodeCandidatesMultiple.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to fetch node candidates"); - return Collections.emptyList(); + log.error("Failed to fetch node candidates"); + return Collections.emptyList(); } // Note: we do not call extractPayloadFromExnResponse here, since this // response does not come from the exn-middleware, so will not be @@ -729,12 +783,12 @@ public List findNodeCandidatesFromSal(List requireme true, true,findBrokerNodeCandidatesTimeout); try { context.registerPublisher(findSalNodeCandidates); - Map response = findSalNodeCandidates.sendSync(msg, appID, null, false); - if(response==null) - { - log.error("Failed to fetch node candidates"); - return null; - } + Map response = findSalNodeCandidates.sendSync(msg, appID, null, false); + if(response==null) + { + log.error("Failed to fetch node candidates"); + return null; + } JsonNode payload = extractPayloadFromExnResponse(response, "findNodeCandidatesFromSal"); if (payload.isMissingNode()) return null; if (!payload.isArray()) return null; @@ -798,7 +852,7 @@ public List findNodeCandidatesFromSal(List requireme public boolean defineCluster(String appID, String clusterName, ObjectNode cluster) { // https://openproject.nebulouscloud.eu/projects/nebulous-collaboration-hub/wiki/deployment-manager-sal-1#specification-of-endpoints-being-developed Main.logFile("define-cluster-" + appID + ".json", cluster.toPrettyString()); - + Context context = getContext(); if (context == null) { log.error("Trying to send request before Connector gave us a context (internal error)"); return false; } Map msg; try { @@ -813,12 +867,12 @@ public boolean defineCluster(String appID, String clusterName, ObjectNode cluste "defineCluster" + publisherNameCounter.incrementAndGet(), "eu.nebulouscloud.exn.sal.cluster.define", true, true,10*60*1000); try { - context.registerPublisher(defineCluster); + context.registerPublisher(defineCluster); Map response = defineCluster.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to define cluster"); - return false; + log.error("Failed to define cluster"); + return false; } JsonNode payload = extractPayloadFromExnResponse(response, "defineCluster"); return payload.asBoolean(); @@ -858,7 +912,7 @@ public JsonNode getCluster(String appID, String clusterName, boolean removeEnvVa "eu.nebulouscloud.exn.sal.cluster.get", true, true, 30*1000); try { context.registerPublisher(getCluster); - Map response = getCluster.sendSync(msg, appID, null, false); + Map response = getCluster.sendSync(msg, appID, null, false); JsonNode payload = extractPayloadFromExnResponse(response, "getCluster"); if (removeEnvVars && payload.isObject()) { ((ObjectNode)payload).remove("env-var-script"); @@ -892,12 +946,12 @@ public boolean labelNodes(String appID, String clusterID, JsonNode labels) { "eu.nebulouscloud.exn.sal.cluster.label", true, true,30*1000); try { context.registerPublisher(labelNodes); - Map response = labelNodes.sendSync(msg, appID, null, false); - if(response==null) - { - log.error("Failed to label nodes"); - return false; - } + Map response = labelNodes.sendSync(msg, appID, null, false); + if(response==null) + { + log.error("Failed to label nodes"); + return false; + } JsonNode payload = extractPayloadFromExnResponse(response, "labelNodes"); return payload.isMissingNode() ? false : true; } finally { @@ -925,12 +979,12 @@ public boolean deployCluster(String appID, String clusterName) { "eu.nebulouscloud.exn.sal.cluster.deploy", true, true,10*60*1000); try { context.registerPublisher(deployCluster); - Map response = deployCluster.sendSync(msg, appID, null, false); - if(response==null) - { - log.error("Failed to deploy cluster"); - return false; - } + Map response = deployCluster.sendSync(msg, appID, null, false); + if(response==null) + { + log.error("Failed to deploy cluster"); + return false; + } JsonNode payload = extractPayloadFromExnResponse(response, "deployCluster"); return payload.asBoolean(); } finally { @@ -975,8 +1029,8 @@ public long deployApplication(String appID, String clusterName, String appName, Map response = deployApplication.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to deploy application"); - return -1; + log.error("Failed to deploy application"); + return -1; } JsonNode payload = extractPayloadFromExnResponse(response, "deployApplication"); return payload.asLong(); @@ -1014,7 +1068,7 @@ public void scaleOut(String appID, String clusterName, ArrayNode nodesToAdd) { Map response = scaleOut.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to scale"); + log.error("Failed to scale"); } // Called for side-effect only; we want to log errors. The return // value from scaleOut is the same as getCluster, but since we have to @@ -1055,8 +1109,8 @@ public boolean scaleIn(String appID, String clusterName, List superfluou Map response = scaleIn.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to scalein"); - return false; + log.error("Failed to scalein"); + return false; } JsonNode payload = extractPayloadFromExnResponse(response, "scaleIn"); return payload.asBoolean(); @@ -1085,8 +1139,8 @@ public boolean deleteCluster(String appID, String clusterName) { Map response = deleteCluster.sendSync(msg, appID, null, false); if(response==null) { - log.error("Failed to delete cluster"); - return false; + log.error("Failed to delete cluster"); + return false; } JsonNode payload = extractPayloadFromExnResponse(response, "deleteCluster"); return payload.asBoolean(); @@ -1145,60 +1199,60 @@ public void sendSyntheticSolutionMessage(String appID, ObjectNode solutions) { Map msg = mapper.convertValue(solutions, Map.class); solverSolutionPublisher.send(msg, appID); } - - + + /** * Get the dead nodes of a deployed application. * Query SAL to get cluster nodes URLs and compare them with the alive nodes URLs from the Resource Manager. */ public List getAppDeadNodes(String appID, String clusterName) { - JsonNode clusterState = getCluster(appID, clusterName); - if (clusterState == null) { - log.error("Cluster state is null for appID: {}, clusterName: {}", appID, clusterName); - return List.of(); - } - JsonNode clusterNodes = clusterState.at("/nodes"); - if (clusterNodes == null) { - log.error("Cluster nodes are null for appID: {}, clusterName: {}", appID, clusterName); - return List.of(); - } + JsonNode clusterState = getCluster(appID, clusterName); + if (clusterState == null) { + log.error("Cluster state is null for appID: {}, clusterName: {}", appID, clusterName); + return List.of(); + } + JsonNode clusterNodes = clusterState.at("/nodes"); + if (clusterNodes == null) { + log.error("Cluster nodes are null for appID: {}, clusterName: {}", appID, clusterName); + return List.of(); + } + + /* Create a map of node URL to node name */ + Map nodeUrlToNodeName = new HashMap(); + clusterNodes.forEach(e -> { + nodeUrlToNodeName.put(e.at("/nodeUrl").asText(), e.at("/nodeName").asText()); + }); - /* Create a map of node URL to node name */ - Map nodeUrlToNodeName = new HashMap(); - clusterNodes.forEach(e -> { - nodeUrlToNodeName.put(e.at("/nodeUrl").asText(), e.at("/nodeName").asText()); - }); + /* Query the Resource Manager to get the alive nodes URLs */ + SyncedPublisher getNodeStates = new SyncedPublisher("getNodeStates" + publisherNameCounter.incrementAndGet(), + "eu.nebulouscloud.exn.proactive.state", true, true, findBrokerNodeCandidatesTimeout); + Context context = getContext(); + if (context == null) { + log.error("Trying to send request before Connector gave us a context (internal error)"); + return List.of(); + } + try { + context.registerPublisher(getNodeStates); + Map response = getNodeStates.sendSync(Map.of(), appID, null, false); + if (response == null) { + log.error("Failed to call eu.nebulouscloud.exn.proactive.state"); + return Collections.emptyList(); + } + ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class); + List aliveNodesUrls = (ArrayList) mapper + .readValue(jsonBody.get("body").asText(), new HashMap().getClass()).get("aliveNodes"); + /* Return the dead nodes names */ + return nodeUrlToNodeName.keySet().stream().filter(u -> !aliveNodesUrls.contains(u)) + .map(u -> nodeUrlToNodeName.get(u)).toList(); + } catch (Exception e) { + log.error("Failed to get alive nodes URLs from Resource Manager for appID: {}, clusterName: {}", appID, + clusterName, e); + return List.of(); + } finally { + context.unregisterPublisher(getNodeStates.key()); + } - /* Query the Resource Manager to get the alive nodes URLs */ - SyncedPublisher getNodeStates = new SyncedPublisher("getNodeStates" + publisherNameCounter.incrementAndGet(), - "eu.nebulouscloud.exn.proactive.state", true, true, findBrokerNodeCandidatesTimeout); - Context context = getContext(); - if (context == null) { - log.error("Trying to send request before Connector gave us a context (internal error)"); - return List.of(); - } - try { - context.registerPublisher(getNodeStates); - Map response = getNodeStates.sendSync(Map.of(), appID, null, false); - if (response == null) { - log.error("Failed to call eu.nebulouscloud.exn.proactive.state"); - return Collections.emptyList(); - } - ObjectNode jsonBody = mapper.convertValue(response, ObjectNode.class); - List aliveNodesUrls = (ArrayList) mapper - .readValue(jsonBody.get("body").asText(), new HashMap().getClass()).get("aliveNodes"); - /* Return the dead nodes names */ - return nodeUrlToNodeName.keySet().stream().filter(u -> !aliveNodesUrls.contains(u)) - .map(u -> nodeUrlToNodeName.get(u)).toList(); - } catch (Exception e) { - log.error("Failed to get alive nodes URLs from Resource Manager for appID: {}, clusterName: {}", appID, - clusterName, e); - return List.of(); - } finally { - context.unregisterPublisher(getNodeStates.key()); - } - } diff --git a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java index 79b33f1..08f3ede 100644 --- a/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java +++ b/optimiser-controller/src/main/java/eu/nebulouscloud/optimiser/controller/NebulousApp.java @@ -1,24 +1,5 @@ package eu.nebulouscloud.optimiser.controller; -import com.fasterxml.jackson.core.JsonPointer; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.BooleanNode; -import com.fasterxml.jackson.databind.node.LongNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; - -import eu.nebulouscloud.exn.core.Publisher; -import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer; -import eu.nebulouscloud.optimiser.sal.*; -import lombok.Getter; -import lombok.Synchronized; -import lombok.extern.slf4j.Slf4j; - import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -34,11 +15,30 @@ import java.util.Set; import java.util.Spliterator; import java.util.Spliterators; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import eu.nebulouscloud.exn.core.Publisher; +import eu.nebulouscloud.optimiser.kubevela.KubevelaAnalyzer; +import eu.nebulouscloud.optimiser.sal.NodeCandidate; +import eu.nebulouscloud.optimiser.sal.Requirement; +import lombok.Getter; +import lombok.Synchronized; +import lombok.extern.slf4j.Slf4j; /** @@ -120,8 +120,8 @@ public enum State { private static final JsonPointer name_path = JsonPointer.compile("/title"); private static final JsonPointer utility_function_path = JsonPointer.compile("/utilityFunctions"); private static final JsonPointer constraints_path = JsonPointer.compile("/sloViolations"); - - + + /** The YAML converter */ // Note that instantiating this is apparently expensive, so we do it only once @@ -198,7 +198,7 @@ public enum State { */ @Getter private final ObjectNode originalKubevela; - + /** * Current version of the kubevela. It is the originalKubevela with variable values dictated by solver. * It is different from deployedKubevela @@ -265,7 +265,7 @@ public enum State { /** Scheduled executor service for periodic health checks */ private ScheduledExecutorService healthCheckExecutor = null; - + /** Interval in seconds between health checks */ private static final int HEALTH_CHECK_INTERVAL_SECONDS = 30; @@ -286,14 +286,14 @@ public NebulousApp(JsonNode app_message, String kubevela_string, ExnConnector ex this.clusterName = NebulousApps.calculateUniqueClusterName(this.UUID); this.originalAppMessage = app_message; ObjectNode kubevela = null; - try { - kubevela = (ObjectNode)readKubevelaString(kubevela_string); - } catch (JsonProcessingException e) { + try { + kubevela = (ObjectNode)readKubevelaString(kubevela_string); + } catch (JsonProcessingException e) { this.state = State.FAILED; this.originalKubevela = null; log.error("Could not parse KubeVela YAML in app creation message: " + e.getMessage()); return; - } + } this.originalKubevela = kubevela; this.exnConnector = exnConnector; JsonNode parameters = app_message.at(variables_path); @@ -450,7 +450,7 @@ public boolean setStateDeploying() { return true; } } - + /** * Set the state from DEPLOYING to RUNNING * @@ -484,57 +484,57 @@ public boolean sendDeploymentStatus(JsonNode clusterState, Map ad return false; } } - + public static Map buildAppStatusReport( - String clusterName, - int deployGeneration, - Map> componentRequirements, - Map nodeCounts, - Map> componentNodeNames, - Map deployedNodeCandidates) + String clusterName, + int deployGeneration, + Map> componentRequirements, + Map nodeCounts, + Map> componentNodeNames, + Map deployedNodeCandidates) { - Map appStatusReport = new HashMap<>(); - try { - - appStatusReport.put("clusterName", clusterName); - appStatusReport.put("deployGeneration", deployGeneration); - Map components= new HashMap<>(); - appStatusReport.put("components",components); - for(String componentName : componentNodeNames.keySet()) { - Map componentStatusReport = new HashMap<>(); - componentStatusReport.put("componentName", componentName); - - if(componentRequirements.containsKey(componentName)) { - componentStatusReport.put("componentRequirements", componentRequirements.get(componentName)); - } else { - componentStatusReport.put("componentRequirements", new LinkedList<>()); - } - if(nodeCounts.containsKey(componentName)) { - componentStatusReport.put("nodeCount", nodeCounts.get(componentName)); - } else { - componentStatusReport.put("nodeCount", 0); - } - LinkedList deployedNodes = new LinkedList(); - componentStatusReport.put("deployedNodes",deployedNodes); - componentNodeNames.get(componentName).forEach(nodeCandidate -> { - if(deployedNodeCandidates.containsKey(nodeCandidate)) { - deployedNodes.add(deployedNodeCandidates.get(nodeCandidate)); - } - }); - - components.put(componentName, componentStatusReport); - } + Map appStatusReport = new HashMap<>(); + try { + + appStatusReport.put("clusterName", clusterName); + appStatusReport.put("deployGeneration", deployGeneration); + Map components= new HashMap<>(); + appStatusReport.put("components",components); + for(String componentName : componentNodeNames.keySet()) { + Map componentStatusReport = new HashMap<>(); + componentStatusReport.put("componentName", componentName); + + if(componentRequirements.containsKey(componentName)) { + componentStatusReport.put("componentRequirements", componentRequirements.get(componentName)); + } else { + componentStatusReport.put("componentRequirements", new LinkedList<>()); + } + if(nodeCounts.containsKey(componentName)) { + componentStatusReport.put("nodeCount", nodeCounts.get(componentName)); + } else { + componentStatusReport.put("nodeCount", 0); + } + LinkedList deployedNodes = new LinkedList(); + componentStatusReport.put("deployedNodes",deployedNodes); + componentNodeNames.get(componentName).forEach(nodeCandidate -> { + if(deployedNodeCandidates.containsKey(nodeCandidate)) { + deployedNodes.add(deployedNodeCandidates.get(nodeCandidate)); + } + }); + + components.put(componentName, componentStatusReport); + } NodeCandidate master = deployedNodeCandidates.entrySet().stream().filter(n->n.getKey().startsWith("m"+clusterName+"-master")).findFirst().get().getValue(); - - appStatusReport.put("master",master); - ObjectMapper om = new ObjectMapper(); - - return om.readValue(om.writeValueAsString(Map.of("details",appStatusReport)), new HashMap().getClass()); - } catch (Exception ex) { - log.error("Failed to build appStatusReport",ex); - } + + appStatusReport.put("master",master); + ObjectMapper om = new ObjectMapper(); + + return om.readValue(om.writeValueAsString(Map.of("details",appStatusReport)), new HashMap().getClass()); + } catch (Exception ex) { + log.error("Failed to build appStatusReport",ex); + } return new HashMap(); - + } /** Set state from DEPLOYING to RUNNING and update app cluster information. @@ -594,10 +594,10 @@ public boolean setStateRedeploying() { public void setStateFailed(Collection acquiredNodes) { EdgeNodes.release(this.UUID, acquiredNodes); if (state == State.DELETED) return; - + // Stop health monitoring when app fails stopHealthMonitoring(); - + state = State.FAILED; exnConnector.sendAppStatus(UUID, state); } @@ -614,10 +614,10 @@ public void setStateFailed(Collection acquiredNodes) { @Synchronized public void resetState() { if (state == State.DELETED) return; - + // Stop health monitoring when resetting state stopHealthMonitoring(); - + this.deployGeneration = 0; this.componentRequirements = Map.of(); this.componentReplicaCounts = Map.of(); @@ -638,7 +638,7 @@ public void resetState() { public void setStateDeletedAndUnregister() { // Stop health monitoring before deleting stopHealthMonitoring(); - + state = State.DELETED; NebulousApps.remove(UUID); exnConnector.sendAppStatus(UUID, state); @@ -850,7 +850,7 @@ public JsonNode calculateAMPLMessage() { if (KubevelaAnalyzer.isKubevelaNumeric(meaning)) { // kubevelaNumberToLong handles converting "8Gi" to 8192 for // meaning "memory", and rounds up kubevela cpu/gpu (floating - // point) to SAL number of cores (int) for meaning "cpu" + // point) to SAL number of cores (int) for meaning "cpu" constant.put("Value", KubevelaAnalyzer.kubevelaNumberToLong(value, meaning)); } else { constant.set("Value", value); @@ -890,6 +890,23 @@ public void sendMetricList() { Main.logFile("metric-names-" + getUUID() + ".json", msg.toPrettyString()); } + /** + * Send the app creation message and initial solution to the digital twin. + * We send a message with two keys: {@code dsl} contains the app + * defintion, {@code solution} contains an initial solution. With these + * two, the twin can initialize itself. + */ + public void sendTwinInitializationMessage() { + log.info("Sending app creation message to digital twin"); + ObjectNode msg = jsonMapper.createObjectNode(); + msg.set("dsl", this.originalAppMessage); + // We can use the app message's kubevela as-is: rewriting kubevela + // only adds node affinity traits, which don't influence the solution + msg.set("solution", createSolutionFromKubevela(originalKubevela)); + exnConnector.getTwinInitMessagePublisher() + .send(jsonMapper.convertValue(msg, Map.class), getUUID(), true); + } + /** * Return the utility function to use. A utility function is a function * of type "minimize" or "maximize" in the {@code /utilityFunctions} array @@ -952,7 +969,7 @@ public void redeployWithSolution(ObjectNode solution) { log.info("variables: {}",variables); ObjectNode kubevela = rewriteKubevelaWithSolution(variables); log.info("kubevela with variables substitution: {}",kubevela); - + /** * Check if the kubevela has changed. If it has, redeploy the vela file */ @@ -961,10 +978,10 @@ public void redeployWithSolution(ObjectNode solution) { kubevelaChanged = !yamlMapper.writeValueAsString(kubevela).equals(kubevela); }catch(Exception ex) { - log.error("Failed to check if kubevela changed. Assuming it changed.",ex); + log.error("Failed to check if kubevela changed. Assuming it changed.",ex); kubevelaChanged = true; } - + if (deployGeneration > 0) { NebulousAppDeployer.redeployApplication(this, kubevela,kubevelaChanged); } else { @@ -983,17 +1000,17 @@ public void redeployWithSolution(ObjectNode solution) { */ @Synchronized public void deploy() { - currentKubevela = getOriginalKubevela().deepCopy(); + currentKubevela = getOriginalKubevela().deepCopy(); NebulousAppDeployer.deployApplication(this, getOriginalKubevela()); - + } - + /** * Check if the app nodes are alive. If not, redeploy the application. */ public void appHealthCheck() { - boolean isMasterNodeDead = false; + boolean isMasterNodeDead = false; try { if (state != State.RUNNING) { return; @@ -1006,55 +1023,55 @@ public void appHealthCheck() { } //Check if master node is among the dead nodes isMasterNodeDead = deadNodeNames.stream().anyMatch(n->n.startsWith("m"+clusterName+"-master")); - + log.info("Some nodes are missing: {}, redeploying application", deadNodeNames.stream().collect(Collectors.joining(", "))); - + if(isMasterNodeDead) { - log.error("Master node is dead, can't re-deploy application"); - NebulousAppDeployer.undeployApplication(this); - return; + log.error("Master node is dead, can't re-deploy application"); + NebulousAppDeployer.undeployApplication(this); + return; } - + NebulousAppDeployer.redeployApplication(this, currentKubevela.deepCopy(),false); }catch(Exception ex) { - log.error("Failed appHealthCheck",ex); + log.error("Failed appHealthCheck",ex); } finally { - //On any execution path, schedule a helthCheck (except if isMasterNodeDead) - if(!isMasterNodeDead) { + //On any execution path, schedule a helthCheck (except if isMasterNodeDead) + if(!isMasterNodeDead) { healthCheckExecutor.schedule( this::appHealthCheck, HEALTH_CHECK_INTERVAL_SECONDS, TimeUnit.SECONDS); log.debug("Scheduling health check for app {} in {} seconds", UUID, HEALTH_CHECK_INTERVAL_SECONDS); - } + } } } /** * Start the periodic health monitoring thread. */ - public void startHealthMonitoring() { - if(healthCheckExecutor!=null) { - log.error("healthCheckExecutor for app {} already exists. Ignore", UUID); - return; - } + public void startHealthMonitoring() { + if(healthCheckExecutor!=null) { + log.error("healthCheckExecutor for app {} already exists. Ignore", UUID); + return; + } log.info("Starting health monitoring thread for app {}", UUID); healthCheckExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "health-monitor-" + UUID); t.setDaemon(true); return t; }); - - healthCheckExecutor.schedule( - this::appHealthCheck, - HEALTH_CHECK_INTERVAL_SECONDS, - TimeUnit.SECONDS - ); - log.debug("Scheduling health check for app {} in {} seconds", UUID, HEALTH_CHECK_INTERVAL_SECONDS); + + healthCheckExecutor.schedule( + this::appHealthCheck, + HEALTH_CHECK_INTERVAL_SECONDS, + TimeUnit.SECONDS + ); + log.debug("Scheduling health check for app {} in {} seconds", UUID, HEALTH_CHECK_INTERVAL_SECONDS); } /**