Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ val loggingDependencies = Seq(
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-remote" % akkaVersion,
"com.iheart" %% "ficus" % "1.4.2",
"commons-net" % "commons-net" % "3.3"
) ++ apiDependencies ++ loggingDependencies ++ databaseDependencies

fork := true
Expand Down
47 changes: 42 additions & 5 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
explorer {
parseSettings {
nodes = [""]
nodes = []
recoverBatchSize = 15
infinitePing = true // if set to true, explorer will continue to ping node infinetely, otherwise it stops after #numberOfAttempts attempts
askNode = false // if set false, explorer won't ping the node if it stopped working
infinitePing = true # if set to true, explorer will continue to ping node infinetely, otherwise it stops after #numberOfAttempts attempts
askNode = false # if set false, explorer won't ping the node if it stopped working
}
blackListSettings {
banTime = 60m
Expand All @@ -16,10 +16,35 @@ explorer {
maxPoolSize = 5
connectionTimeout = 60000
}
nodeSettings {
maxRollbackDepth = 100
networkSettings {
syncPacketLength = 1000
bindAddressHost = "0.0.0.0"
bindAddressPort = 0
nodeName = "explorer"
appVersion = 0.9.3
handshakeTimeout = 1m
peerForConnectionHost = ""
peerForConnectionPort = 9001
peerForConnectionApiPort = 9051
declaredAddressHost = ""
declaredAddressPort = 0
}
multisigSettings {
checkTxMinedPeriod = 30
numberOfBlocksToCheck = 3
mnemonicKeys = []
}
ntpSettings {
server = "pool.ntp.org"
updateEvery = 30m
timeout = 30s
}
frontendSettings {
host = ""
port = 5150
}
}

parser-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
Expand All @@ -34,11 +59,23 @@ blocking-dispatcher {
}
throughput = 1
}

akka {
log-dead-letters = 0
log-dead-letters-during-shutdown = off
loggers = [ "akka.event.slf4j.Slf4jLogger" ]
logger-startup-timeout = 60s
actor.warn-about-java-serializer-usage = false

actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}

2 changes: 1 addition & 1 deletion src/main/resources/data_model.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ CREATE INDEX tx_id_inputs_index ON inputs (txId);
CREATE TABLE contracts(
hash VARCHAR(64) PRIMARY KEY,
contract TEXT
)
);

CREATE TABLE accounts(
-- idx SERIAL,
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<maxHistory>2</maxHistory>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
<level>DEBUG</level>
</filter>
<encoder>
<pattern>[%d{yyyy-MM-dd HH:mm:ss}] >> [%thread] >> [%-5level] >> %msg%n</pattern>
Expand Down
21 changes: 15 additions & 6 deletions src/main/scala/encry/ExplorerApp.scala
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package encry

import akka.actor.{ActorSystem, Props}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.ActorMaterializer
import cats.effect.{Blocker, IO}
import cats.effect.{Blocker, ContextShift, IO}
import cats.implicits._
import doobie.hikari.HikariTransactor
import doobie.util.ExecutionContexts
import encry.database.{DBActor, DBService}
import encry.network.{NetworkServer, NetworkTimeProvider}
import encry.parser.ParsersController
import encry.settings.ExplorerSettings
import doobie.implicits._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}

object ExplorerApp extends App {
object ExplorerApp extends App {

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = system.dispatcher

val settings = ExplorerSettings.read

implicit val cs = IO.contextShift(ExecutionContext.global)
val frontRemoteActor =
system.actorSelection(s"akka.tcp://application@${settings.frontendSettings.host}:${settings.frontendSettings.port}/user/receiver")

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val pgTransactor = for {
ce <- ExecutionContexts.fixedThreadPool[IO](settings.databaseSettings.maxPoolSize)
Expand All @@ -44,7 +48,12 @@ object ExplorerApp extends App {
} *> IO {
val dbService = DBService(xa)
val dbActor = system.actorOf(Props(new DBActor(dbService)), s"dbActor")
system.actorOf(Props(new ParsersController(settings.parseSettings, settings.blackListSettings, dbActor)), s"parserController")

val timeProvider: NetworkTimeProvider = new NetworkTimeProvider(settings.ntpSettings)
val networkServer: ActorRef = system.actorOf(NetworkServer.props(settings.networkSettings, timeProvider, frontRemoteActor), "networkServer")

system.actorOf(Props(new ParsersController(settings.parseSettings, settings.blackListSettings, dbActor, networkServer)),
s"parserController")
} *> IO.never
}.unsafeRunSync()
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.google.common.primitives.Ints
import io.circe.syntax._
import encry.blockchain.modifiers.Directive.DTypeId
import encry.blockchain.modifiers.boxes.{AssetBox, EncryBaseBox, EncryProposition}
import encry.utils.CoreTaggedTypes.ModifierId
import encry.utils.Utils
import io.circe.{Decoder, Encoder, HCursor}
import org.encryfoundation.common.utils.Algos
Expand Down
59 changes: 59 additions & 0 deletions src/main/scala/encry/network/NetworkMessagesHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package encry.network

import TransactionProto.TransactionProtoMessage
import akka.actor.{Actor, ActorRef, Props}
import com.typesafe.scalalogging.StrictLogging
import encry.network.NetworkMessagesHandler.MessageFromNetwork
import encry.network.PeerHandler.ConnectedPeer
import org.encryfoundation.common.modifiers.mempool.transaction.{Transaction, TransactionProtoSerializer}
import org.encryfoundation.common.network.BasicMessagesRepo._
import org.encryfoundation.common.utils.Algos

class NetworkMessagesHandler(networkServer: ActorRef) extends Actor with StrictLogging {

override def receive: Receive = {

case MessageFromNetwork(message, peerOpt) => message match {

case InvNetworkMessage((modifierTypeId, modifierIds)) =>
peerOpt.foreach { peer =>
if (Transaction.modifierTypeId == modifierTypeId) {
logger.debug(s"Request modifier: $modifierTypeId ${modifierIds.map(Algos.encode).mkString(",")}")
peer.handlerRef ! RequestModifiersNetworkMessage((modifierTypeId, modifierIds))
}
}

case ModifiersNetworkMessage((modifierTypeId, modifierMap)) =>
logger.debug(s"Response modifiers: $modifierTypeId size ${modifierMap.size}")
modifierMap.foreach { case (modifierId, bytes) =>
modifierTypeId match {
case Transaction.modifierTypeId =>
val tx = TransactionProtoSerializer.fromProto(TransactionProtoMessage.parseFrom(bytes))
tx.foreach(networkServer ! _)

case _ =>
}
}

case _ =>
}
case _ =>
}
}

object NetworkMessagesHandler {

case class Transaction(tx: Transaction)
case class Block(block: Block)

/**
* @param message - message, received from network
* @param source - sender of received message
*
* This case class transfers network message from PeerConnectionHandler actor to the NetworkController.
* Main duty is to transfer message from network with sender of it message to the NetworkController as an end point.
*/
case class MessageFromNetwork(message: NetworkMessage, source: Option[ConnectedPeer])

def props(networkServer: ActorRef) = Props(new NetworkMessagesHandler(networkServer))
}
96 changes: 96 additions & 0 deletions src/main/scala/encry/network/NetworkServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package encry.network

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import akka.io.Tcp.SO.KeepAlive
import akka.io.Tcp._
import akka.io.{IO, Tcp}
import com.typesafe.scalalogging.StrictLogging
import NetworkServer.{CheckConnection, ConnectionSetupSuccessfully}
import PeerHandler._
import encry.parser.NodeParser.BlockFromNode
import encry.settings.NetworkSettings
import org.encryfoundation.common.modifiers.mempool.transaction.Transaction

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

class NetworkServer(settings: NetworkSettings, timeProvider: NetworkTimeProvider,
frontRemoteActor: ActorSelection) extends Actor with StrictLogging {

implicit val system: ActorSystem = context.system
implicit val ec: ExecutionContextExecutor = context.dispatcher

var isConnected = false

val messagesHandler: ActorRef = context.actorOf(NetworkMessagesHandler.props(self))

var tmpConnectionHandler: Option[ActorRef] = None

val selfPeer: InetSocketAddress =
new InetSocketAddress(settings.bindAddressHost, settings.bindAddressPort)

val connectingPeer: InetSocketAddress =
new InetSocketAddress(settings.peerForConnectionHost, settings.peerForConnectionPort)

IO(Tcp) ! Bind(self, selfPeer)

override def receive: Receive = {

case Bound(localAddress) =>
logger.info(s"Local app was successfully bound to $localAddress!")
context.system.scheduler.schedule(5.seconds, 30.seconds, self, CheckConnection)

case CommandFailed(add: Bind) =>
logger.info(s"Failed to bind to ${add.localAddress}.")
context.stop(self)

case Connected(remote, _) if !isConnected && remote.getAddress == connectingPeer.getAddress =>
val handler: ActorRef = context.actorOf(
PeerHandler.props(remote, sender(), settings, timeProvider, messagesHandler)
)
logger.info(s"Successfully connected to $remote. Creating handler: $handler.")
isConnected = true
tmpConnectionHandler = Some(handler)
sender ! Register(handler)
sender ! ResumeReading

case Connected(remote, _) => logger.info(s"Remote: $remote try to connect but isConnected: $isConnected.")

case CommandFailed(c: Connect) =>
isConnected = false
tmpConnectionHandler = None
logger.info(s"Failed to connect to: ${c.remoteAddress}")

case CheckConnection if !isConnected =>
IO(Tcp) ! Connect(connectingPeer, options = KeepAlive(true) :: Nil, timeout = Some(5.seconds))
logger.info(s"Trying to connect to $connectingPeer.")

case CheckConnection =>
logger.info(s"Triggered CheckConnection. Current connection is: $isConnected")

case RemovePeerFromConnectionList(peer) =>
isConnected = false
tmpConnectionHandler = None
logger.info(s"Disconnected from $peer.")

case tx: Transaction =>
frontRemoteActor ! tx

case BlockFromNode(block, nodeAddr, nodeInfo) =>
val txIds = block.payload.txs.map(_.id)
frontRemoteActor ! txIds

case msg =>
logger.info(s"Got strange message on NetworkServer: $msg.")
}
}

object NetworkServer {
case object CheckConnection
case object ConnectionSetupSuccessfully

def props(settings: NetworkSettings, timeProvider: NetworkTimeProvider, frontRemoteActor: ActorSelection): Props =
Props(new NetworkServer(settings, timeProvider, frontRemoteActor))
}
Loading