Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,38 @@
import com.google.common.base.Preconditions;

public class VaultConfiguration {
public static final String DEFAULT_SINGLE_BUCKET_NAME = "james-deleted-message-vault";
public static final VaultConfiguration DEFAULT =
new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME);
public static final VaultConfiguration ENABLED_DEFAULT =
new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES);
new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME);

public static VaultConfiguration from(Configuration propertiesConfiguration) {
Duration retentionPeriod = Optional.ofNullable(propertiesConfiguration.getString("retentionPeriod"))
.map(string -> DurationParser.parse(string, ChronoUnit.DAYS))
.orElse(DEFAULT.getRetentionPeriod());
String restoreLocation = Optional.ofNullable(propertiesConfiguration.getString("restoreLocation"))
.orElse(DEFAULT.getRestoreLocation());
String singleBucketName = Optional.ofNullable(propertiesConfiguration.getString("singleBucketName"))
.orElse(DEFAULT.getSingleBucketName());
boolean enabled = propertiesConfiguration.getBoolean("enabled", false);
return new VaultConfiguration(enabled, retentionPeriod, restoreLocation);
return new VaultConfiguration(enabled, retentionPeriod, restoreLocation, singleBucketName);
}

private final boolean enabled;
private final Duration retentionPeriod;
private final String restoreLocation;
private final String singleBucketName;

VaultConfiguration(boolean enabled, Duration retentionPeriod, String restoreLocation) {
VaultConfiguration(boolean enabled, Duration retentionPeriod, String restoreLocation, String singleBucketName) {
this.enabled = enabled;
Preconditions.checkNotNull(retentionPeriod);
Preconditions.checkNotNull(restoreLocation);
Preconditions.checkNotNull(singleBucketName);

this.retentionPeriod = retentionPeriod;
this.restoreLocation = restoreLocation;
this.singleBucketName = singleBucketName;
}

public boolean isEnabled() {
Expand All @@ -71,20 +77,25 @@ public String getRestoreLocation() {
return restoreLocation;
}

public String getSingleBucketName() {
return singleBucketName;
}

@Override
public final boolean equals(Object o) {
if (o instanceof VaultConfiguration) {
VaultConfiguration that = (VaultConfiguration) o;

return Objects.equals(this.retentionPeriod, that.retentionPeriod)
&& Objects.equals(this.restoreLocation, that.restoreLocation)
&& Objects.equals(this.enabled, that.enabled);
&& Objects.equals(this.enabled, that.enabled)
&& Objects.equals(this.singleBucketName, that.singleBucketName);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(retentionPeriod, restoreLocation, enabled);
return Objects.hash(retentionPeriod, restoreLocation, enabled, singleBucketName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.vault.blob;

import java.time.Clock;
import java.time.ZonedDateTime;
import java.util.UUID;

import jakarta.inject.Inject;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.PlainBlobId;

public class BlobIdTimeGenerator {
private static final String BLOB_ID_GENERATING_FORMAT = "%d/%02d/%s";

private final BlobId.Factory blobIdFactory;
private final Clock clock;

@Inject
public BlobIdTimeGenerator(BlobId.Factory blobIdFactory, Clock clock) {
this.blobIdFactory = blobIdFactory;
this.clock = clock;
}

BlobId currentBlobId() {
ZonedDateTime now = ZonedDateTime.now(clock);
int month = now.getMonthValue();
int year = now.getYear();

return new PlainBlobId(String.format(BLOB_ID_GENERATING_FORMAT, year, month, blobIdFactory.of(UUID.randomUUID().toString()).asString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.vault.blob;

import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;

import java.io.InputStream;

import jakarta.inject.Inject;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BlobStore.BlobIdProvider;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.core.Username;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.task.Task;
import org.apache.james.vault.DeletedMessage;
import org.apache.james.vault.DeletedMessageContentNotFoundException;
import org.apache.james.vault.DeletedMessageVault;
import org.apache.james.vault.VaultConfiguration;
import org.apache.james.vault.metadata.DeletedMessageMetadataVault;
import org.apache.james.vault.metadata.DeletedMessageWithStorageInformation;
import org.apache.james.vault.metadata.StorageInformation;
import org.apache.james.vault.search.Query;
import org.reactivestreams.Publisher;

import com.google.common.base.Preconditions;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

public class BlobStoreDeletedMessageVaultV2 implements DeletedMessageVault {
private static final String BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC = "deletedMessageVault:blobStore:";
static final String APPEND_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "append";
static final String LOAD_MIME_MESSAGE_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "loadMimeMessage";
static final String SEARCH_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "search";
static final String DELETE_METRIC_NAME = BLOBSTORE_DELETED_MESSAGE_VAULT_METRIC + "delete";

private final MetricFactory metricFactory;
private final DeletedMessageMetadataVault messageMetadataVault;
private final BlobStore blobStore;
private final BlobStoreDAO blobStoreDAO;
private final BlobIdTimeGenerator blobIdTimeGenerator;
private final VaultConfiguration vaultConfiguration;

@Inject
public BlobStoreDeletedMessageVaultV2(MetricFactory metricFactory, DeletedMessageMetadataVault messageMetadataVault,
BlobStore blobStore, BlobStoreDAO blobStoreDAO, BlobIdTimeGenerator blobIdTimeGenerator,
VaultConfiguration vaultConfiguration) {
this.metricFactory = metricFactory;
this.messageMetadataVault = messageMetadataVault;
this.blobStore = blobStore;
this.blobStoreDAO = blobStoreDAO;
this.blobIdTimeGenerator = blobIdTimeGenerator;
this.vaultConfiguration = vaultConfiguration;
}

@Override
public Publisher<Void> append(DeletedMessage deletedMessage, InputStream mimeMessage) {
Preconditions.checkNotNull(deletedMessage);
Preconditions.checkNotNull(mimeMessage);
BucketName bucketName = BucketName.of(vaultConfiguration.getSingleBucketName());

return metricFactory.decoratePublisherWithTimerMetric(
APPEND_METRIC_NAME,
appendMessage(deletedMessage, mimeMessage, bucketName));
}

private Mono<Void> appendMessage(DeletedMessage deletedMessage, InputStream mimeMessage, BucketName bucketName) {
return Mono.from(blobStore.save(bucketName, mimeMessage, withTimePrefixBlobId(), LOW_COST))
.map(blobId -> StorageInformation.builder()
.bucketName(bucketName)
.blobId(blobId))
.map(storageInformation -> new DeletedMessageWithStorageInformation(deletedMessage, storageInformation))
.flatMap(message -> Mono.from(messageMetadataVault.store(message)))
.then();
}

private BlobIdProvider<InputStream> withTimePrefixBlobId() {
return data -> Mono.just(Tuples.of(
blobIdTimeGenerator.currentBlobId(),
data));
}

@Override
public Publisher<InputStream> loadMimeMessage(Username username, MessageId messageId) {
Preconditions.checkNotNull(username);
Preconditions.checkNotNull(messageId);

return metricFactory.decoratePublisherWithTimerMetric(
LOAD_MIME_MESSAGE_METRIC_NAME,
Mono.from(messageMetadataVault.retrieveStorageInformation(username, messageId))
.flatMap(storageInformation -> loadMimeMessage(storageInformation, username, messageId)));
}

private Mono<InputStream> loadMimeMessage(StorageInformation storageInformation, Username username, MessageId messageId) {
return Mono.from(blobStore.readReactive(storageInformation.getBucketName(), storageInformation.getBlobId(), LOW_COST))
.onErrorResume(
ObjectNotFoundException.class,
ex -> Mono.error(new DeletedMessageContentNotFoundException(username, messageId)));
}

@Override
public Publisher<DeletedMessage> search(Username username, Query query) {
Preconditions.checkNotNull(username);
Preconditions.checkNotNull(query);

return metricFactory.decoratePublisherWithTimerMetric(
SEARCH_METRIC_NAME,
searchOn(username, query));
}

private Flux<DeletedMessage> searchOn(Username username, Query query) {
Flux<DeletedMessage> filterPublisher = Flux.from(messageMetadataVault.listRelatedBuckets())
.concatMap(bucketName -> messageMetadataVault.listMessages(bucketName, username))
.map(DeletedMessageWithStorageInformation::getDeletedMessage)
.filter(query.toPredicate());
return query.getLimit()
.map(filterPublisher::take)
.orElse(filterPublisher);
}

@Override
public Publisher<Void> delete(Username username, MessageId messageId) {
Preconditions.checkNotNull(username);
Preconditions.checkNotNull(messageId);

return metricFactory.decoratePublisherWithTimerMetric(
DELETE_METRIC_NAME,
deleteMessage(username, messageId));
}

private Mono<Void> deleteMessage(Username username, MessageId messageId) {
return Mono.from(messageMetadataVault.retrieveStorageInformation(username, messageId))
.flatMap(storageInformation -> Mono.from(messageMetadataVault.remove(storageInformation.getBucketName(), username, messageId))
.thenReturn(storageInformation))
.flatMap(storageInformation -> Mono.from(blobStoreDAO.delete(storageInformation.getBucketName(), storageInformation.getBlobId())));
}

@Override
public Task deleteExpiredMessagesTask() {
throw new NotImplementedException("Not implemented yet");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.james.vault;

import static org.apache.james.vault.VaultConfiguration.DEFAULT_SINGLE_BUCKET_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -40,13 +41,19 @@ void shouldMatchBeanContract() {

@Test
void constructorShouldThrowWhenRetentionPeriodIsNull() {
assertThatThrownBy(() -> new VaultConfiguration(true, null, DefaultMailboxes.RESTORED_MESSAGES))
assertThatThrownBy(() -> new VaultConfiguration(true, null, DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME))
.isInstanceOf(NullPointerException.class);
}

@Test
void constructorShouldThrowWhenRestoreLocationIsNull() {
assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), null))
assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), null, DEFAULT_SINGLE_BUCKET_NAME))
.isInstanceOf(NullPointerException.class);
}

@Test
void constructorShouldThrowWhenSingleBucketNameIsNull() {
assertThatThrownBy(() -> new VaultConfiguration(true, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, null))
.isInstanceOf(NullPointerException.class);
}

Expand All @@ -62,7 +69,7 @@ void fromShouldReturnConfiguredRestoreLocation() {
configuration.addProperty("restoreLocation", "INBOX");

assertThat(VaultConfiguration.from(configuration)).isEqualTo(
new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX));
new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.INBOX, DEFAULT_SINGLE_BUCKET_NAME));
}

@Test
Expand All @@ -71,7 +78,7 @@ void fromShouldReturnConfiguredRetentionTime() {
configuration.addProperty("retentionPeriod", "15d");

assertThat(VaultConfiguration.from(configuration)).isEqualTo(
new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME));
}

@Test
Expand All @@ -80,7 +87,7 @@ void fromShouldHandleHours() {
configuration.addProperty("retentionPeriod", "15h");

assertThat(VaultConfiguration.from(configuration)).isEqualTo(
new VaultConfiguration(false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES));
new VaultConfiguration(false, Duration.ofHours(15), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME));
}

@Test
Expand All @@ -89,7 +96,16 @@ void fromShouldUseDaysAsADefaultUnit() {
configuration.addProperty("retentionPeriod", "15");

assertThat(VaultConfiguration.from(configuration)).isEqualTo(
new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES));
new VaultConfiguration(false, Duration.ofDays(15), DefaultMailboxes.RESTORED_MESSAGES, DEFAULT_SINGLE_BUCKET_NAME));
}

@Test
void fromShouldReturnConfiguredSingleBucketName() {
PropertiesConfiguration configuration = new PropertiesConfiguration();
configuration.addProperty("singleBucketName", "bucketBlabla");

assertThat(VaultConfiguration.from(configuration)).isEqualTo(
new VaultConfiguration(false, ChronoUnit.YEARS.getDuration(), DefaultMailboxes.RESTORED_MESSAGES, "bucketBlabla"));
}

@Test
Expand Down
Loading