diff --git a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/Rear.java b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/Rear.java index 1913cdc7f..7604457d0 100644 --- a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/Rear.java +++ b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/Rear.java @@ -3,6 +3,8 @@ import com.spbsu.flamestream.core.Batch; import com.spbsu.flamestream.core.data.meta.GlobalTime; +import java.util.concurrent.CompletionStage; + /** * User: Artem * Date: 27.11.2017 @@ -15,9 +17,9 @@ class MinTime { } /** - * Sync call, return mean accept + * Sync call, completion means accept */ - void accept(Batch batch); + CompletionStage accept(Batch batch); Batch last(); } diff --git a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/RearActor.java b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/RearActor.java index b33ce328e..210f9e558 100644 --- a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/RearActor.java +++ b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/RearActor.java @@ -42,8 +42,8 @@ public static Props props(EdgeContext edgeContext, FlameRuntime.RearInstance public Receive createReceive() { return ReceiveBuilder.create() .match(Batch.class, b -> { - rear.accept(b); - sender().tell(new BatchAccepted(), self()); + final ActorRef sender = sender(), self = self(); + rear.accept(b).thenRun(() -> sender.tell(new BatchAccepted(), self)); }) .match(GimmeLastBatch.class, l -> sender().tell(rear.last(), ActorRef.noSender())) .build(); diff --git a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/akka/AkkaRear.java b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/akka/AkkaRear.java index c573ec5ba..72309fba2 100644 --- a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/akka/AkkaRear.java +++ b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/akka/AkkaRear.java @@ -16,6 +16,7 @@ import com.spbsu.flamestream.runtime.utils.akka.AwaitResolver; import com.spbsu.flamestream.runtime.utils.akka.LoggingActor; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -30,12 +31,8 @@ public AkkaRear(EdgeContext edgeContext, ActorRefFactory refFactory, String loca } @Override - public void accept(Batch batch) { - try { - PatternsCS.ask(innerActor, batch, FlameConfig.config.smallTimeout()).toCompletableFuture().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + public CompletionStage accept(Batch batch) { + return PatternsCS.ask(innerActor, batch, FlameConfig.config.smallTimeout()); } @Override diff --git a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/socket/SocketRear.java b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/socket/SocketRear.java index 570e47a7e..15f48b67b 100644 --- a/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/socket/SocketRear.java +++ b/runtime/src/main/java/com/spbsu/flamestream/runtime/edge/socket/SocketRear.java @@ -5,18 +5,22 @@ import com.esotericsoftware.kryonet.Connection; import com.esotericsoftware.kryonet.Listener; import com.spbsu.flamestream.core.Batch; -import com.spbsu.flamestream.runtime.edge.Rear; +import com.spbsu.flamestream.core.DataItem; import com.spbsu.flamestream.core.data.PayloadDataItem; import com.spbsu.flamestream.core.data.meta.EdgeId; import com.spbsu.flamestream.core.data.meta.GlobalTime; import com.spbsu.flamestream.core.data.meta.Meta; import com.spbsu.flamestream.runtime.edge.EdgeContext; +import com.spbsu.flamestream.runtime.edge.Rear; import org.objenesis.strategy.StdInstantiatorStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; /** * User: Artem @@ -24,6 +28,7 @@ */ public class SocketRear implements Rear { private static final Logger LOG = LoggerFactory.getLogger(SocketRear.class); + public static final int BUFFER_SIZE = 1_000_000; private final EdgeId edgeId; private final Client client; @@ -32,7 +37,7 @@ public class SocketRear implements Rear { public SocketRear(EdgeContext edgeContext, String host, int port, Class[] classes) { edgeId = edgeContext.edgeId(); - client = new Client(1_000_000, 1234); + client = new Client(BUFFER_SIZE, 1234); Arrays.stream(classes).forEach(clazz -> client.getKryo().register(clazz)); { //register inners of data item client.getKryo().register(PayloadDataItem.class); @@ -64,19 +69,43 @@ public void disconnected(Connection connection) { } @Override - public void accept(Batch batch) { + public CompletionStage accept(Batch batch) { + final CompletableFuture future = new CompletableFuture<>(); if (client.isConnected()) { - batch.payload().forEach(client::sendTCP); - if (batch.payload().noneMatch(ignored -> true)) { - client.sendTCP(new MinTime(batch.time())); + final Iterator iterator = batch.payload().iterator(); + if (writeAllUntilFull(iterator)) { + future.complete(null); + } else { + client.addListener(new Listener() { + @Override + public void idle(Connection connection) { + if (writeAllUntilFull(iterator)) { + future.complete(null); + client.removeListener(this); + } + } + }); } } else { LOG.warn("{}: writing to closed log", edgeId); + future.complete(null); } + return future; } @Override public Batch last() { return Batch.Default.EMPTY; } + + private boolean writeAllUntilFull(Iterator iterator) { + while (iterator.hasNext()) { + if (client.getTcpWriteBufferSize() < BUFFER_SIZE * 0.9) { + client.sendTCP(iterator.next()); + } else { + return false; + } + } + return true; + } } diff --git a/runtime/src/main/java/com/spbsu/flamestream/runtime/utils/AwaitCountConsumer.java b/runtime/src/main/java/com/spbsu/flamestream/runtime/utils/AwaitCountConsumer.java index b16103f12..c8f482720 100644 --- a/runtime/src/main/java/com/spbsu/flamestream/runtime/utils/AwaitCountConsumer.java +++ b/runtime/src/main/java/com/spbsu/flamestream/runtime/utils/AwaitCountConsumer.java @@ -34,11 +34,15 @@ public int expected() { } public void await(long timeout, TimeUnit unit) throws InterruptedException { - final long stop = System.currentTimeMillis() + unit.toMillis(timeout); synchronized (counter) { - while (counter.longValue() < expectedSize && System.currentTimeMillis() < stop) { + while (counter.longValue() < expectedSize) { counter.wait(unit.toMillis(timeout)); } } } + + @Override + public String toString() { + return "AwaitCountConsumer{" + counter.get() + "/" + expectedSize + "}"; + } }