Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions client/src/main/java/io/oxia/client/grpc/OxiaStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
import io.grpc.stub.StreamObserver;
import io.oxia.client.ClientConfig;
import io.oxia.client.api.Authentication;
import io.oxia.proto.CloseSessionRequest;
import io.oxia.proto.CloseSessionResponse;
import io.oxia.proto.OxiaClientGrpc;
import io.oxia.proto.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -108,6 +107,10 @@ public OxiaClientGrpc.OxiaClientStub async() {
return asyncStub;
}

public CompletableFuture<CreateSessionResponse> createSession(CreateSessionRequest request) {

}

public CompletableFuture<CloseSessionResponse> closeSession(CloseSessionRequest request) {
final CompletableFuture<CloseSessionResponse> f = new CompletableFuture<>();
final var defer =
Expand Down
207 changes: 96 additions & 111 deletions client/src/main/java/io/oxia/client/session/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,78 +30,68 @@
import io.oxia.proto.CloseSessionRequest;
import io.oxia.proto.KeepAliveResponse;
import io.oxia.proto.SessionHeartbeat;

import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Session implements StreamObserver<KeepAliveResponse> {

private final @NonNull OxiaStubProvider stubProvider;
private final @NonNull Duration sessionTimeout;
private final @NonNull Duration heartbeatInterval;

public class Session implements Closeable {
@Getter(PACKAGE)
@VisibleForTesting
private final long shardId;

@Getter(PUBLIC)
private final long sessionId;

private final OxiaStubProvider stubProvider;
private final Duration sessionTimeout;
private final String clientIdentifier;

private final @NonNull SessionHeartbeat heartbeat;

private final @NonNull SessionNotificationListener listener;

private volatile boolean closed;

private Counter sessionsOpened;
private Counter sessionsExpired;
private Counter sessionsClosed;

private final ScheduledFuture<?> heartbeatFuture;
private final SessionHeartbeat heartbeatRequest;
private final StreamObserver<KeepAliveResponse> heartbeatHandler;
// metrics
private final Counter sessionsExpired;
private final Counter sessionsClosed;
// stats
private volatile Instant lastSucceedTimestamp;
private final AtomicBoolean finished;

private volatile Instant lastSuccessfullResponse;

Session(
@NonNull ScheduledExecutorService executor,
Session(@NonNull ScheduledExecutorService executor,
@NonNull OxiaStubProvider stubProvider,
@NonNull ClientConfig config,
long shardId,
long sessionId,
InstrumentProvider instrumentProvider,
SessionNotificationListener listener) {
InstrumentProvider instrumentProvider) {
this.finished = new AtomicBoolean(false);
this.stubProvider = stubProvider;
this.sessionTimeout = config.sessionTimeout();
this.heartbeatInterval =
Duration.ofMillis(
Math.max(config.sessionTimeout().toMillis() / 10, Duration.ofSeconds(2).toMillis()));
final Duration heartbeatInterval = Duration.ofMillis(
Math.max(config.sessionTimeout().toMillis() / 10, Duration.ofSeconds(2).toMillis()));
this.shardId = shardId;
this.sessionId = sessionId;
this.clientIdentifier = config.clientIdentifier();
this.heartbeat =
this.heartbeatRequest =
SessionHeartbeat.newBuilder().setShard(shardId).setSessionId(sessionId).build();
this.listener = listener;

log.info(
"Session created shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
config.clientIdentifier());

this.sessionsOpened =
instrumentProvider.newCounter(
"oxia.client.sessions.opened",
Unit.Sessions,
"The total number of sessions opened by this client",
Attributes.builder().put("oxia.shard", shardId).build());
Counter sessionsOpened = instrumentProvider.newCounter(
"oxia.client.sessions.opened",
Unit.Sessions,
"The total number of sessions opened by this client",
Attributes.builder().put("oxia.shard", shardId).build());
this.sessionsExpired =
instrumentProvider.newCounter(
"oxia.client.sessions.expired",
Expand All @@ -115,96 +105,91 @@ public class Session implements StreamObserver<KeepAliveResponse> {
"The total number of sessions closed by this client",
Attributes.builder().put("oxia.shard", shardId).build());

sessionsOpened.increment();

this.lastSuccessfullResponse = Instant.now();
this.heartbeatHandler = new StreamObserver<>() {

@Override
public void onNext(KeepAliveResponse keepAliveResponse) {
lastSucceedTimestamp = Instant.now();
if (log.isDebugEnabled()) {
log.debug(
"Received keep-alive response shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
}
}

@Override
public void onError(Throwable throwable) {
log.warn(
"Error during session keep-alive shard={} sessionId={} clientIdentity={}: {}",
shardId,
sessionId,
clientIdentifier,
throwable.getMessage());
}

@Override
public void onCompleted() {
// no action
}
};
sessionsOpened.increment();
this.lastSucceedTimestamp = Instant.now();
this.heartbeatFuture =
executor.scheduleAtFixedRate(
() -> {
// we should catch exception to avoid the schedule future complete
try {
sendKeepAlive();
} catch (Throwable ex) {
log.warn("receive error when send keep-alive request", Throwables.getRootCause(ex));
}
},
heartbeat(),
heartbeatInterval.toMillis(),
heartbeatInterval.toMillis(),
TimeUnit.MILLISECONDS);
}

private void sendKeepAlive() {
Duration diff = Duration.between(lastSuccessfullResponse, Instant.now());

if (diff.toMillis() > sessionTimeout.toMillis()) {
handleSessionExpired();
return;
}

stubProvider.getStubForShard(shardId).async().keepAlive(heartbeat, this);
public boolean isFinished() {
return finished.getAcquire();
}

@Override
public void onNext(KeepAliveResponse value) {
lastSuccessfullResponse = Instant.now();
if (log.isDebugEnabled()) {
log.debug(
"Received keep-alive response shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
}
}

@Override
public void onError(Throwable t) {
log.warn(
"Error during session keep-alive shard={} sessionId={} clientIdentity={}: {}",
shardId,
sessionId,
clientIdentifier,
t.getMessage());
}

@Override
public void onCompleted() {
// Nothing to do
}

private void handleSessionExpired() {
sessionsExpired.increment();
log.warn(
"Session expired shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
close();
private Runnable heartbeat() {
return () -> {
try {
final Duration diff = Duration.between(lastSucceedTimestamp, Instant.now());
if (diff.toMillis() > sessionTimeout.toMillis()) {
sessionsExpired.increment();
log.warn(
"Session expired shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
close();
return;
}
stubProvider.getStubForShard(shardId).async().keepAlive(heartbeatRequest, heartbeatHandler);
} catch (Throwable ex) {
log.warn("receive error when send keep-alive request", Throwables.getRootCause(ex));
}
};
}

public CompletableFuture<Void> close() {
CompletableFuture<Void> future;
public void close() {
finished.setRelease(true);
sessionsClosed.increment();
heartbeatFuture.cancel(true);
try {
sessionsClosed.increment();
heartbeatFuture.cancel(true);
final var stub = stubProvider.getStubForShard(shardId);
future =
stub.closeSession(
CloseSessionRequest.newBuilder()
.setShard(shardId)
.setSessionId(sessionId)
.build())
.thenApply(__ -> null); // we are not using the response so far
// this is try-our-best operation, we don't care the result
stub.closeSession(
CloseSessionRequest.newBuilder()
.setShard(shardId)
.setSessionId(sessionId)
.build());
} catch (Throwable ex) {
future = CompletableFuture.failedFuture(Throwables.getRootCause(ex));
log.warn("Unexpected error when notify oxia node the session has closed. shard={} sessionId={}",
shardId, sessionId, Throwables.getRootCause(ex));
}
return future.whenComplete(
(__, ignore) -> {
listener.onSessionClosed(Session.this);
log.info(
"Session closed shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
});
log.info(
"Session closed shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
}
}
81 changes: 0 additions & 81 deletions client/src/main/java/io/oxia/client/session/SessionFactory.java

This file was deleted.

Loading
Loading