diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java index c6e46fb0d82..8781128df63 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/VaultConfiguration.java @@ -31,10 +31,11 @@ import com.google.common.base.Preconditions; public class VaultConfiguration { + public static final String DEFAULT_SINGLE_BUCKET_NAME = "james-deleted-message-vault"; public static final VaultConfiguration DEFAULT = - new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES); + new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME); public static final VaultConfiguration ENABLED_DEFAULT = - new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES); + new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME); public static VaultConfiguration from(Configuration propertiesConfiguration) { Duration retentionPeriod = Optional.ofNullable(propertiesConfiguration.getString("retentionPeriod")) @@ -42,21 +43,26 @@ public static VaultConfiguration from(Configuration propertiesConfiguration) { .orElse(DEFAULT.getRetentionPeriod()); String restoreLocation = Optional.ofNullable(propertiesConfiguration.getString("restoreLocation")) .orElse(DEFAULT.getRestoreLocation()); + String singleBucketName = Optional.ofNullable(propertiesConfiguration.getString("singleBucketName")) + .orElse(DEFAULT.getSingleBucketName()); boolean enabled = propertiesConfiguration.getBoolean("enabled", false); - return new VaultConfiguration(enabled, retentionPeriod, restoreLocation); + return new VaultConfiguration(enabled, retentionPeriod, restoreLocation, singleBucketName); } private final boolean enabled; private final Duration retentionPeriod; private final String restoreLocation; + private final String singleBucketName; - VaultConfiguration(boolean enabled, Duration retentionPeriod, String restoreLocation) { + VaultConfiguration(boolean enabled, Duration retentionPeriod, String restoreLocation, String singleBucketName) { this.enabled = enabled; Preconditions.checkNotNull(retentionPeriod); Preconditions.checkNotNull(restoreLocation); + Preconditions.checkNotNull(singleBucketName); this.retentionPeriod = retentionPeriod; this.restoreLocation = restoreLocation; + this.singleBucketName = singleBucketName; } public boolean isEnabled() { @@ -71,6 +77,10 @@ public String getRestoreLocation() { return restoreLocation; } + public String getSingleBucketName() { + return singleBucketName; + } + @Override public final boolean equals(Object o) { if (o instanceof VaultConfiguration) { @@ -78,13 +88,14 @@ public final boolean equals(Object o) { return Objects.equals(this.retentionPeriod, that.retentionPeriod) && Objects.equals(this.restoreLocation, that.restoreLocation) - && Objects.equals(this.enabled, that.enabled); + && Objects.equals(this.enabled, that.enabled) + && Objects.equals(this.singleBucketName, that.singleBucketName); } return false; } @Override public final int hashCode() { - return Objects.hash(retentionPeriod, restoreLocation, enabled); + return Objects.hash(retentionPeriod, restoreLocation, enabled, singleBucketName); } } diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobIdTimeGenerator.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobIdTimeGenerator.java new file mode 100644 index 00000000000..c84c18c8cf8 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobIdTimeGenerator.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.blob; + +import java.time.Clock; +import java.time.ZonedDateTime; +import java.util.UUID; + +import jakarta.inject.Inject; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.PlainBlobId; + +public class BlobIdTimeGenerator { + private static final String BLOB_ID_GENERATING_FORMAT = "%d/%02d/%s"; + + private final BlobId.Factory blobIdFactory; + private final Clock clock; + + @Inject + public BlobIdTimeGenerator(BlobId.Factory blobIdFactory, Clock clock) { + this.blobIdFactory = blobIdFactory; + this.clock = clock; + } + + BlobId currentBlobId() { + ZonedDateTime now = ZonedDateTime.now(clock); + int month = now.getMonthValue(); + int year = now.getYear(); + + return new PlainBlobId(String.format(BLOB_ID_GENERATING_FORMAT, year, month, blobIdFactory.of(UUID.randomUUID().toString()).asString())); + } +} diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultV2.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultV2.java new file mode 100644 index 00000000000..fea28ff35f4 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultV2.java @@ -0,0 +1,166 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.blob; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + +import java.io.InputStream; + +import jakarta.inject.Inject; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStore.BlobIdProvider; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.core.Username; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.task.Task; +import org.apache.james.vault.DeletedMessage; +import org.apache.james.vault.DeletedMessageContentNotFoundException; +import org.apache.james.vault.DeletedMessageVault; +import org.apache.james.vault.VaultConfiguration; +import org.apache.james.vault.metadata.DeletedMessageMetadataVault; +import org.apache.james.vault.metadata.DeletedMessageWithStorageInformation; +import org.apache.james.vault.metadata.StorageInformation; +import org.apache.james.vault.search.Query; +import org.reactivestreams.Publisher; + +import com.google.common.base.Preconditions; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; + +public class BlobStoreDeletedMessageVaultV2 implements DeletedMessageVault { + private static final String BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC = "deletedMessageVault:blobStore:"; + static final String APPEND_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "append"; + static final String LOAD_MIME_MESSAGE_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "loadMimeMessage"; + static final String SEARCH_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "search"; + static final String DELETE_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "delete"; + + private final MetricFactory metricFactory; + private final DeletedMessageMetadataVault messageMetadataVault; + private final BlobStore blobStore; + private final BlobStoreDAO blobStoreDAO; + private final BlobIdTimeGenerator blobIdTimeGenerator; + private final VaultConfiguration vaultConfiguration; + + @Inject + public BlobStoreDeletedMessageVaultV2(MetricFactory metricFactory, DeletedMessageMetadataVault messageMetadataVault, + BlobStore blobStore, BlobStoreDAO blobStoreDAO, BlobIdTimeGenerator blobIdTimeGenerator, + VaultConfiguration vaultConfiguration) { + this.metricFactory = metricFactory; + this.messageMetadataVault = messageMetadataVault; + this.blobStore = blobStore; + this.blobStoreDAO = blobStoreDAO; + this.blobIdTimeGenerator = blobIdTimeGenerator; + this.vaultConfiguration = vaultConfiguration; + } + + @Override + public Publisher append(DeletedMessage deletedMessage, InputStream mimeMessage) { + Preconditions.checkNotNull(deletedMessage); + Preconditions.checkNotNull(mimeMessage); + BucketName bucketName = BucketName.of(vaultConfiguration.getSingleBucketName()); + + return metricFactory.decoratePublisherWithTimerMetric( + APPEND_METRIC_NAME, + appendMessage(deletedMessage, mimeMessage, bucketName)); + } + + private Mono appendMessage(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) { + return Mono.from(blobStore.save(bucketName, mimeMessage, withTimePrefixBlobId(), LOW_COST)) + .map(blobId -> StorageInformation.builder() + .bucketName(bucketName) + .blobId(blobId)) + .map(storageInformation -> new DeletedMessageWithStorageInformation(deletedMessage, storageInformation)) + .flatMap(message -> Mono.from(messageMetadataVault.store(message))) + .then(); + } + + private BlobIdProvider withTimePrefixBlobId() { + return data -> Mono.just(Tuples.of( + blobIdTimeGenerator.currentBlobId(), + data)); + } + + @Override + public Publisher loadMimeMessage(Username username, MessageId messageId) { + Preconditions.checkNotNull(username); + Preconditions.checkNotNull(messageId); + + return metricFactory.decoratePublisherWithTimerMetric( + LOAD_MIME_MESSAGE_METRIC_NAME, + Mono.from(messageMetadataVault.retrieveStorageInformation(username, messageId)) + .flatMap(storageInformation -> loadMimeMessage(storageInformation, username, messageId))); + } + + private Mono loadMimeMessage(StorageInformation storageInformation, Username username, MessageId messageId) { + return Mono.from(blobStore.readReactive(storageInformation.getBucketName(), storageInformation.getBlobId(), LOW_COST)) + .onErrorResume( + ObjectNotFoundException.class, + ex -> Mono.error(new DeletedMessageContentNotFoundException(username, messageId))); + } + + @Override + public Publisher search(Username username, Query query) { + Preconditions.checkNotNull(username); + Preconditions.checkNotNull(query); + + return metricFactory.decoratePublisherWithTimerMetric( + SEARCH_METRIC_NAME, + searchOn(username, query)); + } + + private Flux searchOn(Username username, Query query) { + Flux filterPublisher = Flux.from(messageMetadataVault.listRelatedBuckets()) + .concatMap(bucketName -> messageMetadataVault.listMessages(bucketName, username)) + .map(DeletedMessageWithStorageInformation::getDeletedMessage) + .filter(query.toPredicate()); + return query.getLimit() + .map(filterPublisher::take) + .orElse(filterPublisher); + } + + @Override + public Publisher delete(Username username, MessageId messageId) { + Preconditions.checkNotNull(username); + Preconditions.checkNotNull(messageId); + + return metricFactory.decoratePublisherWithTimerMetric( + DELETE_METRIC_NAME, + deleteMessage(username, messageId)); + } + + private Mono deleteMessage(Username username, MessageId messageId) { + return Mono.from(messageMetadataVault.retrieveStorageInformation(username, messageId)) + .flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), username, messageId)) + .thenReturn(storageInformation)) + .flatMap(storageInformation -> Mono.from(blobStoreDAO.delete(storageInformation.getBucketName(), storageInformation.getBlobId()))); + } + + @Override + public Task deleteExpiredMessagesTask() { + throw new NotImplementedException("Not implemented yet"); + } +} diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java index 656af119d6a..25f13ddc51e 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/VaultConfigurationTest.java @@ -19,6 +19,7 @@ package org.apache.james.vault; +import static org.apache.james.vault.VaultConfiguration.DEFAULT_SINGLE_BUCKET_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -40,13 +41,19 @@ void shouldMatchBeanContract() { @Test void constructorShouldThrowWhenRetentionPeriodIsNull() { - assertThatThrownBy(() -> new VaultConfiguration(true, null, DefaultMailboxes.RESTORED_MESSAGES)) + assertThatThrownBy(() -> new VaultConfiguration(true, null, DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME)) .isInstanceOf(NullPointerException.class); } @Test void constructorShouldThrowWhenRestoreLocationIsNull() { - assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), null)) + assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), null, DEFAULT_SINGLE_BUCKET_NAME)) + .isInstanceOf(NullPointerException.class); + } + + @Test + void constructorShouldThrowWhenSingleBucketNameIsNull() { + assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, null)) .isInstanceOf(NullPointerException.class); } @@ -62,7 +69,7 @@ void fromShouldReturnConfiguredRestoreLocation() { configuration.addProperty("restoreLocation", "INBOX"); assertThat(VaultConfiguration.from(configuration)).isEqualTo( - new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX)); + new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX, DEFAULT_SINGLE_BUCKET_NAME)); } @Test @@ -71,7 +78,7 @@ void fromShouldReturnConfiguredRetentionTime() { configuration.addProperty("retentionPeriod", "15d"); assertThat(VaultConfiguration.from(configuration)).isEqualTo( - new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES)); + new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME)); } @Test @@ -80,7 +87,7 @@ void fromShouldHandleHours() { configuration.addProperty("retentionPeriod", "15h"); assertThat(VaultConfiguration.from(configuration)).isEqualTo( - new VaultConfiguration(false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES)); + new VaultConfiguration(false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME)); } @Test @@ -89,7 +96,16 @@ void fromShouldUseDaysAsADefaultUnit() { configuration.addProperty("retentionPeriod", "15"); assertThat(VaultConfiguration.from(configuration)).isEqualTo( - new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES)); + new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME)); + } + + @Test + void fromShouldReturnConfiguredSingleBucketName() { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("singleBucketName", "bucketBlabla"); + + assertThat(VaultConfiguration.from(configuration)).isEqualTo( + new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, "bucketBlabla")); } @Test diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobIdTimeGeneratorTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobIdTimeGeneratorTest.java new file mode 100644 index 00000000000..9391da5718b --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobIdTimeGeneratorTest.java @@ -0,0 +1,78 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.blob; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Clock; +import java.time.Instant; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.PlainBlobId; +import org.apache.james.server.blob.deduplication.GenerationAwareBlobId; +import org.apache.james.server.blob.deduplication.MinIOGenerationAwareBlobId; +import org.apache.james.utils.UpdatableTickingClock; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +public class BlobIdTimeGeneratorTest { + private static final Instant NOW = Instant.parse("2007-07-03T10:15:30.00Z"); + private static final Clock CLOCK = new UpdatableTickingClock(NOW); + + interface BlobIdTimeGeneratorContract { + BlobId.Factory blobIdFactory(); + + @Test + default void currentBlobIdShouldReturnBlobIdFormattedWithYearAndMonthPrefix() { + BlobIdTimeGenerator blobIdTimeGenerator = new BlobIdTimeGenerator(blobIdFactory(), CLOCK); + String currentBlobId = blobIdTimeGenerator.currentBlobId().asString(); + + int firstSlash = currentBlobId.indexOf('/'); + int secondSlash = currentBlobId.indexOf('/', firstSlash + 1); + String prefix = currentBlobId.substring(0, secondSlash); + + assertThat(prefix).isEqualTo("2007/07"); + } + } + + @Nested + class PlainBlobIdTimeGeneratorTest implements BlobIdTimeGeneratorContract { + @Override + public BlobId.Factory blobIdFactory() { + return new PlainBlobId.Factory(); + } + } + + @Nested + class GenerationAwareBlobIdTimeGeneratorTest implements BlobIdTimeGeneratorContract { + @Override + public BlobId.Factory blobIdFactory() { + return new GenerationAwareBlobId.Factory(CLOCK, new PlainBlobId.Factory(), GenerationAwareBlobId.Configuration.DEFAULT); + } + } + + @Nested + class MinIOGenerationAwareBlobIdTimeGeneratorTest implements BlobIdTimeGeneratorContract { + @Override + public BlobId.Factory blobIdFactory() { + return new MinIOGenerationAwareBlobId.Factory(CLOCK, GenerationAwareBlobId.Configuration.DEFAULT, new PlainBlobId.Factory()); + } + } +} diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultV2Test.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultV2Test.java new file mode 100644 index 00000000000..2a69d720ebb --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVaultV2Test.java @@ -0,0 +1,206 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.blob; + +import static org.apache.james.vault.DeletedMessageFixture.CONTENT; +import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE; +import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_2; +import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_GENERATOR; +import static org.apache.james.vault.DeletedMessageFixture.DELETED_MESSAGE_WITH_SUBJECT; +import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID; +import static org.apache.james.vault.DeletedMessageFixture.NOW; +import static org.apache.james.vault.DeletedMessageFixture.SUBJECT; +import static org.apache.james.vault.DeletedMessageFixture.USERNAME; +import static org.apache.james.vault.search.Query.ALL; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.util.List; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.PlainBlobId; +import org.apache.james.blob.memory.MemoryBlobStoreDAO; +import org.apache.james.mailbox.inmemory.InMemoryMessageId; +import org.apache.james.metrics.tests.RecordingMetricFactory; +import org.apache.james.server.blob.deduplication.BlobStoreFactory; +import org.apache.james.utils.UpdatableTickingClock; +import org.apache.james.vault.DeletedMessage; +import org.apache.james.vault.DeletedMessageVault; +import org.apache.james.vault.DeletedMessageVaultContract; +import org.apache.james.vault.DeletedMessageVaultSearchContract; +import org.apache.james.vault.VaultConfiguration; +import org.apache.james.vault.memory.metadata.MemoryDeletedMessageMetadataVault; +import org.apache.james.vault.search.CriterionFactory; +import org.apache.james.vault.search.Query; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class BlobStoreDeletedMessageVaultV2Test implements DeletedMessageVaultContract, DeletedMessageVaultSearchContract.AllContracts { + private BlobStoreDeletedMessageVault messageVaultV1; + private BlobStoreDeletedMessageVaultV2 messageVaultV2; + private UpdatableTickingClock clock; + private RecordingMetricFactory metricFactory; + + @BeforeEach + void setUp() { + clock = new UpdatableTickingClock(NOW.toInstant()); + metricFactory = new RecordingMetricFactory(); + MemoryDeletedMessageMetadataVault deletedMessageMetadataVault = new MemoryDeletedMessageMetadataVault(); + MemoryBlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO(); + BlobStore blobStore = BlobStoreFactory.builder() + .blobStoreDAO(blobStoreDAO) + .blobIdFactory(new PlainBlobId.Factory()) + .defaultBucketName() + .passthrough(); + BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); + + messageVaultV1 = new BlobStoreDeletedMessageVault(metricFactory, deletedMessageMetadataVault, + blobStore, blobStoreDAO, new BucketNameGenerator(clock), clock, VaultConfiguration.ENABLED_DEFAULT); + + messageVaultV2 = new BlobStoreDeletedMessageVaultV2(metricFactory, deletedMessageMetadataVault, + blobStore, blobStoreDAO, new BlobIdTimeGenerator(blobIdFactory, clock), VaultConfiguration.ENABLED_DEFAULT); + } + + @Override + public DeletedMessageVault getVault() { + return messageVaultV2; + } + + @Override + public UpdatableTickingClock getClock() { + return clock; + } + + @Test + public void loadMimeMessageShouldReturnOldMessage() { + Mono.from(messageVaultV1.append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + assertThat(Mono.from(messageVaultV2.loadMimeMessage(USERNAME, MESSAGE_ID)).blockOptional()) + .isNotEmpty() + .satisfies(maybeContent -> assertThat(maybeContent.get()).hasSameContentAs(new ByteArrayInputStream(CONTENT))); + } + + @Test + public void loadMimeMessageShouldReturnEmptyWhenOldMessageDeleted() { + Mono.from(messageVaultV1.append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + Mono.from(messageVaultV2.delete(USERNAME, MESSAGE_ID)).block(); + + assertThat(Mono.from(messageVaultV2.loadMimeMessage(USERNAME, MESSAGE_ID)).blockOptional()) + .isEmpty(); + } + + @Test + public void searchAllShouldReturnOldMessage() { + Mono.from(messageVaultV1.append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + + assertThat(Flux.from(messageVaultV2.search(USERNAME, ALL)).collectList().block()) + .containsOnly(DELETED_MESSAGE); + } + + @Test + public void searchAllShouldReturnOldAndNewMessages() { + Mono.from(messageVaultV1.append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + Mono.from(messageVaultV2.append(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block(); + + assertThat(Flux.from(getVault().search(USERNAME, ALL)).collectList().block()) + .containsOnly(DELETED_MESSAGE, DELETED_MESSAGE_2); + } + + @Test + public void searchAllShouldSupportLimitQueryWithOldAndNewMessages() { + Mono.from(messageVaultV1.append(DELETED_MESSAGE, new ByteArrayInputStream(CONTENT))).block(); + Mono.from(messageVaultV1.append(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block(); + DeletedMessage deletedMessage3 = DELETED_MESSAGE_GENERATOR.apply(InMemoryMessageId.of(33).getRawId()); + Mono.from(messageVaultV2.append(deletedMessage3, new ByteArrayInputStream(CONTENT))).block(); + + assertThat(Flux.from(messageVaultV2.search(USERNAME, Query.of(1, List.of()))).collectList().block()) + .hasSize(1); + assertThat(Flux.from(messageVaultV2.search(USERNAME, Query.of(3, List.of()))).collectList().block()) + .containsExactlyInAnyOrder(DELETED_MESSAGE, DELETED_MESSAGE_2, deletedMessage3); + assertThat(Flux.from(messageVaultV2.search(USERNAME, Query.of(4, List.of()))).collectList().block()) + .containsExactlyInAnyOrder(DELETED_MESSAGE, DELETED_MESSAGE_2, deletedMessage3); + } + + @Test + public void searchShouldReturnMatchingOldMessages() { + Mono.from(messageVaultV1.append(DELETED_MESSAGE_2, new ByteArrayInputStream(CONTENT))).block(); + Mono.from(messageVaultV1.append(DELETED_MESSAGE_WITH_SUBJECT, new ByteArrayInputStream(CONTENT))).block(); + + assertThat( + Flux.from(messageVaultV2.search(USERNAME, + Query.of(CriterionFactory.subject().containsIgnoreCase(SUBJECT)))) + .collectList().block()) + .containsOnly(DELETED_MESSAGE_WITH_SUBJECT); + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldCompleteWhenNoMail() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldDeleteOldMails() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldCompleteWhenAllMailsDeleted() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldCompleteWhenOnlyRecentMails() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldDeleteOldMailsWhenRunSeveralTime() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldDoNothingWhenEmpty() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldNotDeleteRecentMails() { + + } + + @Override + @Disabled("JAMES-4156: gc task for V2 not implemented yet") + public void deleteExpiredMessagesTaskShouldCompleteWhenOnlyOldMails() { + + } +}