From 4f08a7a040c7521307cbb82d38aad7f089ab24a7 Mon Sep 17 00:00:00 2001 From: vxmark <16427543+vxmark@users.noreply.github.com> Date: Thu, 19 Sep 2019 14:35:04 +0200 Subject: [PATCH] Fixed missing ReceiveReady event wire-up for PPP worker on re-creation of DealerSocket. --- .../ParanoidPirate.Worker/Program.cs | 133 +++++++++--------- 1 file changed, 69 insertions(+), 64 deletions(-) diff --git a/src/Pirate Pattern/Paranoid Pirate/ParanoidPirate.Worker/Program.cs b/src/Pirate Pattern/Paranoid Pirate/ParanoidPirate.Worker/Program.cs index 881e56e..08c9417 100644 --- a/src/Pirate Pattern/Paranoid Pirate/ParanoidPirate.Worker/Program.cs +++ b/src/Pirate Pattern/Paranoid Pirate/ParanoidPirate.Worker/Program.cs @@ -9,6 +9,15 @@ namespace ParanoidPirate.Worker { internal static class Program { + // if liveliness == 0 -> queue is considered dead/disconnected + private static int liveliness = Commons.HeartbeatLiveliness; + + private static int cycles = 0; + private static int interval = Commons.IntervalInit; + private static bool crash = false; + private static bool verbose; + private static Random rnd; + /// /// ParanoidPirate.Worker [-v] /// @@ -20,71 +29,12 @@ private static void Main(string[] args) { Console.Title = "NetMQ ParanoidPirate Worker"; - var verbose = args.Length > 0 && args[0] == "-v"; - - var rnd = new Random(); + verbose = args.Length > 0 && args[0] == "-v"; + var heartbeatAt = DateTime.UtcNow + TimeSpan.FromMilliseconds(Commons.HeartbeatInterval); + rnd = new Random(); var workerId = rnd.Next(100, 500); var worker = GetWorkerSocket(verbose, workerId); - // if liveliness == 0 -> queue is considered dead/disconnected - var liveliness = Commons.HeartbeatLiveliness; - var interval = Commons.IntervalInit; - var heartbeatAt = DateTime.UtcNow + TimeSpan.FromMilliseconds(Commons.HeartbeatInterval); - var cycles = 0; - var crash = false; - - // upon receiving a message this event handler is called - // read the message, randomly simulate failure/problem - // process message or heartbeat or crash - worker.ReceiveReady += (s, e) => - { - var msg = e.Socket.ReceiveMultipartMessage(); - - // message is a NetMQMessage (!) - // - 3 part envelope + content -> request - // - 1 part HEARTBEAT -> heartbeat - if (msg.FrameCount > 3) - { - // in order to test the robustness we simulate a couple of typical problems - // e.g. worker crushing or running very slow - // that is initiated after multiple cycles to give everything time to stabilize first - cycles++; - - if (cycles > 3 && rnd.Next(5) == 0) - { - Console.WriteLine("[WORKER] simulating crashing!"); - crash = true; - return; - } - - if (cycles > 3 && rnd.Next(3) == 0) - { - Console.WriteLine("[WORKER] Simulating CPU overload!"); - Thread.Sleep(500); - } - - if (verbose) - Console.Write("[WORKER] working ...!"); - - // simulate high workload - Thread.Sleep(10); - - if (verbose) - Console.WriteLine("[WORKER] sending {0}", msg.ToString()); - - // answer - e.Socket.SendMultipartMessage(msg); - // reset liveliness - liveliness = Commons.HeartbeatLiveliness; - } - else if (IsHeartbeatMessage(msg)) - liveliness = Commons.HeartbeatLiveliness; - else - Console.WriteLine("[WORKER] Received invalid message!"); - - interval = Commons.IntervalInit; - }; - while (!crash) { // wait for incoming request for specified milliseconds @@ -132,6 +82,62 @@ private static void Main(string[] args) Console.ReadKey(); } + private static void OnWorkerReceiveReady(object sender, NetMQSocketEventArgs e) + { + // upon receiving a message this event handler is called + // read the message, randomly simulate failure/problem + // process message or heartbeat or crash + + var msg = e.Socket.ReceiveMultipartMessage(); + + // message is a NetMQMessage (!) + // - 3 part envelope + content -> request + // - 1 part HEARTBEAT -> heartbeat + if (msg.FrameCount > 3) + { + // in order to test the robustness we simulate a couple of typical problems + // e.g. worker crushing or running very slow + // that is initiated after multiple cycles to give everything time to stabilize first + cycles++; + + if (cycles > 3 && rnd.Next(5) == 0) + { + Console.WriteLine("[WORKER] simulating crashing!"); + crash = true; + return; + } + + if (cycles > 3 && rnd.Next(3) == 0) + { + Console.WriteLine("[WORKER] Simulating CPU overload!"); + Thread.Sleep(500); + } + + if (verbose) + Console.Write("[WORKER] working ...!"); + + // simulate high workload + Thread.Sleep(10); + + if (verbose) + Console.WriteLine("[WORKER] sending {0}", msg.ToString()); + + // answer + e.Socket.SendMultipartMessage(msg); + // reset liveliness + liveliness = Commons.HeartbeatLiveliness; + } + else if (IsHeartbeatMessage(msg)) + liveliness = Commons.HeartbeatLiveliness; + else + Console.WriteLine("[WORKER] Received invalid message!"); + + interval = Commons.IntervalInit; + + + } + + /// /// Create the DEALER socket and connect it to QUEUE backend. /// Set the identity. @@ -140,8 +146,7 @@ private static void Main(string[] args) private static DealerSocket GetWorkerSocket(bool verbose, int id) { var worker = new DealerSocket { Options = { Identity = Encoding.UTF8.GetBytes("Worker_" + id) } }; - - + worker.ReceiveReady += OnWorkerReceiveReady; worker.Connect(Commons.QueueBackend); if (verbose)