Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -31,6 +33,7 @@
import com.adobe.cq.cloud.testing.it.smoke.replication.data.Queue;
import com.adobe.cq.cloud.testing.it.smoke.replication.data.ReplicationResponse;
import com.adobe.cq.cloud.testing.it.smoke.rules.ContentPublishRule;
import com.fasterxml.jackson.databind.JsonNode;
import com.adobe.cq.testing.client.CQClient;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonArray;
Expand All @@ -39,6 +42,8 @@
import com.google.gson.JsonParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.sling.testing.clients.ClientException;
import org.apache.sling.testing.clients.SlingClientConfig;
Expand All @@ -61,6 +66,7 @@ public class ReplicationClient extends CQClient {

private static final String BLOCKED = "BLOCKED";
private static final String CONTENT_TYPE_JSON = "application/json";
private static final int JSON_MAX_CHOICE_FOLLOW = 10;

// uses "NOSONAR" because CQRules:CQBP-71 is triggering, but can be ignored for this test case
protected static final String DIST_AGENTS_PATH = "/libs/sling/distribution/services/agents"; //NOSONAR
Expand Down Expand Up @@ -210,6 +216,139 @@ public Agents getAgentQueueJson() throws SmokeTestException {
}
}

/**
* Checks whether a distribution agent exists without fetching the full agents listing.
*
* <p>This avoids large /infinity JSON responses which can yield HTTP 300 with split resources.</p>
*
* @param agentName agent name (e.g. publish-internal, publish)
* @return true if agent exists, false if not found
* @throws SmokeTestException if a connection or unexpected status occurs
*/
public boolean distributionAgentExists(String agentName) throws SmokeTestException {
String path = DIST_AGENTS_PATH + "/" + agentName + ".0.json";
try {
SlingHttpResponse response = doGetAllowingAnyStatus(path);
int status = response.getStatusLine().getStatusCode();
if (status == HttpStatus.SC_OK) {
return true;
}
if (status == HttpStatus.SC_NOT_FOUND) {
return false;
}
// In case the JSON servlet decides to split, treat as "exists".
if (status == HttpStatus.SC_MULTIPLE_CHOICES) {
log.warn("Got 300 while checking agent existence at {}", path);
return true;
}
throw new SmokeTestException(GENERIC,
String.format("Unexpected HTTP status %s while checking agent existence at %s", status, path), null);
} catch (ClientException e) {
throw new SmokeTestException(GENERIC, "Exception checking distribution agent existence", e);
}
}

/**
* Fetches a single distribution agent JSON and converts it to {@link Agent}.
*
* <p>This is intentionally per-agent to keep responses small and stable.</p>
*
* @param agentName agent name (e.g. publish-internal, publish)
* @return parsed agent
* @throws SmokeTestException if a connection or parsing error occurs
*/
public Agent getAgent(String agentName) throws SmokeTestException {
ObjectMapper mapper = new ObjectMapper();
try {
// Never use .3.json: build the agent view from small, stable endpoints.
JsonNode agentNode =
readJsonFollowingMultipleChoices(mapper, DIST_AGENTS_PATH + "/" + agentName + ".0.json");
JsonNode queuesNode =
readJsonFollowingMultipleChoices(mapper, DIST_AGENTS_PATH + "/" + agentName + "/queues.1.json");

Agent agent = new Agent();
agent.setName(agentNode.path("name").asText(agentName));
agent.setState(agentNode.path("status").path("state").asText(""));

Map<String, Queue> queues = new HashMap<>();
for (JsonNode queueNameNode : queuesNode.path("items")) {
String queueName = queueNameNode.asText();
if (StringUtils.isBlank(queueName)) {
continue;
}

JsonNode queueNode = queuesNode.path(queueName);
// If the queue wasn't expanded in the queues listing, fetch it directly.
if (queueNode.isMissingNode() || queueNode.isNull()) {
queueNode = readJsonFollowingMultipleChoices(mapper,
DIST_AGENTS_PATH + "/" + agentName + "/queues/" + queueName + ".1.json");
}

Queue queue = Queue.fromJson(queueNode);
queue.setName(queueName);
queues.put(queueName, queue);
}
agent.setQueues(queues);
return agent;
} catch (IOException | ClientException e) {
throw new SmokeTestException(GENERIC, "Exception fetching distribution agent details", e);
}
}

private SlingHttpResponse doGetAllowingAnyStatus(String path) throws ClientException {
HttpUriRequest request = new HttpGet(getUrl(path));
return this.doStreamRequest(request, null);
}

private JsonNode readJsonFollowingMultipleChoices(ObjectMapper mapper, String path)
throws ClientException, IOException, SmokeTestException {
SlingHttpResponse response = doGetAllowingAnyStatus(path);
int status = response.getStatusLine().getStatusCode();
if (status == HttpStatus.SC_OK) {
return mapper.readTree(response.getContent());
}
if (status == HttpStatus.SC_MULTIPLE_CHOICES) {
List<String> choices = parseMultipleChoicePaths(response.getContent());
int attempts = 0;
for (String choice : choices) {
if (attempts++ >= JSON_MAX_CHOICE_FOLLOW) {
break;
}
SlingHttpResponse choiceResponse = doGetAllowingAnyStatus(choice);
if (choiceResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
return mapper.readTree(choiceResponse.getContent());
}
}
throw new SmokeTestException(GENERIC,
String.format("Could not resolve 300 Multiple Choices for %s", path), null);
}
throw new SmokeTestException(GENERIC,
String.format("Unexpected HTTP status %s for %s", status, path), null);
}

private List<String> parseMultipleChoicePaths(String responseBody) {
try {
JsonElement jsonElement = JsonParser.parseString(responseBody.trim());
if (!jsonElement.isJsonArray()) {
return Collections.emptyList();
}
JsonArray jsonArray = jsonElement.getAsJsonArray();
List<String> choices = new ArrayList<>();
for (JsonElement element : jsonArray) {
if (element != null && element.isJsonPrimitive()) {
String choice = element.getAsString();
if (StringUtils.isNotBlank(choice)) {
choices.add(choice);
}
}
}
return choices;
} catch (RuntimeException e) {
log.warn("Failed to parse 300 Multiple Choices response: {}", responseBody, e);
return Collections.emptyList();
}
}

public List<String> getBlockedQueueNames(Agent agent) throws SmokeTestException {
List<String> blockedQueues = new ArrayList<>();
try {
Expand All @@ -225,11 +364,10 @@ public List<String> getBlockedQueueNames(Agent agent) throws SmokeTestException
blockedQueues.add(queueName);
}
}
} catch(ClientException e) {
} catch (ClientException e) {
throw new SmokeTestException(GENERIC, "Exception getting blocked queues names", e);
} finally {
return blockedQueues;
}
return blockedQueues;
}

public void clearQueue(Agent agent) throws SmokeTestException {
Expand All @@ -239,7 +377,7 @@ public void clearQueue(Agent agent) throws SmokeTestException {
try {
FormEntityBuilder formEntityBuilder = FormEntityBuilder.create().addParameter("operation", "delete").addParameter("limit", "-1");
this.doPost(DIST_AGENTS_PATH + "/" + agent.getName() + "/queues/" + queueName, formEntityBuilder.build(), Collections.emptyList());
} catch(ClientException e) {
} catch (ClientException e) {
throw new SmokeTestException(GENERIC, "Exception clearing the blocked queues", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ private void addQueue(String id, Queue queue) {
public Map<String, Queue> getQueues() {
return queues;
}

public void setQueues(Map<String, Queue> queues) {
this.queues = queues != null ? new HashMap<>(queues) : new HashMap<>();
}

public void setState(String state) {
this.state = state;
Expand All @@ -67,7 +71,7 @@ public void setName(String name) {

@JsonIgnore
public boolean isBlocked() {
return getState().equalsIgnoreCase(BLOCKED);
return state != null && state.equalsIgnoreCase(BLOCKED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import com.adobe.cq.cloud.testing.it.smoke.exception.PublishException;
import com.adobe.cq.cloud.testing.it.smoke.exception.SmokeTestException;
import com.adobe.cq.cloud.testing.it.smoke.replication.ReplicationClient;
import com.adobe.cq.cloud.testing.it.smoke.replication.data.Agent;
import com.adobe.cq.cloud.testing.it.smoke.replication.data.Agents;
import com.adobe.cq.cloud.testing.it.smoke.replication.data.ReplicationResponse;
import com.adobe.cq.testing.client.CQClient;
import com.adobe.cq.testing.junit.rules.Page;
Expand Down Expand Up @@ -262,22 +260,24 @@ public void deactivateAssertPreview() throws SmokeTestException {
*/
private void doReplicationChecks() throws SmokeTestException {
Polling polling = null;
AtomicReference<Agents> agentsRef = new AtomicReference<>();

log.info("Checking Replication agents available and not blocked");

// Check if the publish agent is present and retry till timeout
// Resolve the publish agent name and retry till timeout
try {
polling = new Polling(() -> {
agentsRef.set(replicationClient.getAgentQueueJson());
log.info("Replication agents list: {}", agentsRef.get());
boolean internalPublishAgentExists = ReplicationClient.checkDistributionAgentExists(agentsRef.get(), INTERNAL_PUBLISH_DIST_AGENT);
if (!internalPublishAgentExists) {
log.info("Internal publish agent does not exist");
boolean internalPublishAgentExists =
replicationClient.distributionAgentExists(INTERNAL_PUBLISH_DIST_AGENT);
if (internalPublishAgentExists) {
this.publishDistAgent = INTERNAL_PUBLISH_DIST_AGENT;
return true;
}
log.info("Internal publish agent does not exist");
boolean publishAgentExists = replicationClient.distributionAgentExists(PUBLISH_DIST_AGENT);
if (publishAgentExists) {
this.publishDistAgent = PUBLISH_DIST_AGENT;
return ReplicationClient.checkDistributionAgentExists(agentsRef.get(), PUBLISH_DIST_AGENT);
}
return internalPublishAgentExists;
return publishAgentExists;
});
polling.poll(TIMEOUT, 500);
} catch (TimeoutException e) {
Expand All @@ -286,46 +286,43 @@ private void doReplicationChecks() throws SmokeTestException {
} catch (InterruptedException | RuntimeException e) {
throw replicationClient.getGenericException("Replication agent unavailable", e);
}

Agents agents = agentsRef.get();

boolean agentQueueBlocked = ReplicationClient.isAgentQueueBlocked(agents, this.publishDistAgent);
if (agentQueueBlocked) {
Agent publishAgent = replicationClient.getAgent(this.publishDistAgent);
if (publishAgent.isBlocked()) {
if (!this.publishDistAgent.equals(INTERNAL_PUBLISH_DIST_AGENT)) {
// throw if publish agent is blocked
throw replicationClient.getReplicationException(QUEUE_BLOCKED,
"Replication agent queue blocked - " + agents.getAgent(this.publishDistAgent), null);
"Replication agent queue blocked - " + publishAgent, null);
}
Agent publishAgent = agents.getAgent(this.publishDistAgent);
log.warn("Replication internal publish agent queue blocked - " + agents.getAgent(this.publishDistAgent));
log.warn("Replication internal publish agent queue blocked - {}", publishAgent);
replicationClient.clearQueue(publishAgent);
}

// Check if preview agent is available and not blocked
this.previewAvailable = doPreviewChecks(agents);
this.previewAvailable = doPreviewChecks();
}

private boolean doPreviewChecks(Agents agents) throws SmokeTestException {
boolean internalPreviewAgentExists = ReplicationClient.checkDistributionAgentExists(agents, INTERNAL_PREVIEW_DIST_AGENT);
boolean previewAgentExists = ReplicationClient.checkDistributionAgentExists(agents, PREVIEW_DIST_AGENT);
if (!internalPreviewAgentExists) {
private boolean doPreviewChecks() throws SmokeTestException {
boolean internalPreviewAgentExists = replicationClient.distributionAgentExists(INTERNAL_PREVIEW_DIST_AGENT);
boolean previewAgentExists = replicationClient.distributionAgentExists(PREVIEW_DIST_AGENT);
if (internalPreviewAgentExists) {
this.previewDistAgent = INTERNAL_PREVIEW_DIST_AGENT;
} else {
log.info("Internal preview agent does not exist");
this.previewDistAgent = PREVIEW_DIST_AGENT;
}
if (previewAgentExists || internalPreviewAgentExists) {
boolean previewBlocked = ReplicationClient.isAgentQueueBlocked(agents, this.previewDistAgent);
if (previewBlocked) {
Agent previewAgent = replicationClient.getAgent(this.previewDistAgent);
if (previewAgent.isBlocked()) {
if (!this.previewDistAgent.equals(INTERNAL_PREVIEW_DIST_AGENT)) {
//throw if preview agent is blocked
throw replicationClient.getReplicationException(QUEUE_BLOCKED,
"Replication agent queue blocked - " + agents.getAgent(this.previewDistAgent), null);
"Replication agent queue blocked - " + previewAgent, null);
}
Agent previewAgent = agents.getAgent(this.previewDistAgent);
log.warn("Replication internal preview agent queue blocked - " + agents.getAgent(this.previewDistAgent));
log.warn("Replication internal preview agent queue blocked - {}", previewAgent);
replicationClient.clearQueue(previewAgent);
}
}
return previewAgentExists;
return previewAgentExists || internalPreviewAgentExists;
}

/**
Expand All @@ -340,20 +337,19 @@ private boolean doPreviewChecks(Agents agents) throws SmokeTestException {
public void waitQueueEmptyOfPath(final String agent, final String path, final String id, final String action)
throws SmokeTestException {
Polling polling = null;
AtomicReference<Agents> agentsRef = new AtomicReference<>();

log.info("Checking the replication queue [{}] for action [{}] contains item [pkgId: {}] with paths [{}]",
agent, action, id, path);

// Check if the agent has the package
try {
polling = new Polling(() -> {
agentsRef.set(replicationClient.getAgentQueueJson());
return !checkPackageInQueue(agentsRef.get().getAgent(agent), path, id);
Agent agentJson = replicationClient.getAgent(agent);
return !checkPackageInQueue(agentJson, path, id);
});
polling.poll(TIMEOUT, 2000);
} catch (TimeoutException e) {
log.warn("Agent not empty of item {}", ((agentsRef.get() != null) ? agentsRef.get().getAgent(agent) : ""));
log.warn("Agent not empty of item {}", agent);
throw replicationClient.getReplicationException(ACTION_NOT_REPLICATED,
String.format("Item not activated within %s ms", TIMEOUT),
polling.getLastException());
Expand Down