Skip to content

RSB Participants stuck on deactivation #4

@DivineThreepwood

Description

@DivineThreepwood

Release: rsb 0.18

In most cases the rsb participants can not be deactivated when calling the deactivation method of different participants simultaneously. After checking the stack trace it seems like a unlucky copy paste mistake in the SpreadReceiver.leave(...) method.

            // join the group
            SpreadReceiver.this.spread.leave(group);

            // wait until the membership message confirms the join
            membershipHandler.waitForMessage();

at least the comments are wrong because we are leaving the spread group and not joining it.
The thread blocks forever at the waitForMessage() method. Calling it after joining the group makes sense, but after leaving the group it's maybe not the best idea to wait for new incoming messages, right? Because the thread blocks the subscriptions lock at rsb.transport.spread.SpreadMultiReceiver.unsubscribe(SpreadMultiReceiver.java:341) all other participants calling the deactivation method blocks as well.

I would suggest to just remove themembershipHandler.waitForMessage();
call.

Stacktrace of the thread leaving the spread group:

"ShutdownDaemon[UnitRemotePool]" #299 prio=5 os_prio=31 cpu=1.95ms elapsed=478.97s tid=0x00007ff8038e5800 nid=0xe5007 in Object.wait()  [0x000070002a4f9000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(java.base@11.0.4/Native Method)
	- waiting on 
	at java.lang.Object.wait(java.base@11.0.4/Object.java:328)
	at rsb.transport.spread.SpreadReceiver$OneTimeWaitingMembershipHander.waitForMessage(SpreadReceiver.java:106)
	- waiting to re-lock in wait() <0x00000007896579f0> (a java.lang.Object)
	at rsb.transport.spread.SpreadReceiver$StateActive.leave(SpreadReceiver.java:322)
	at rsb.transport.spread.SpreadReceiver.leave(SpreadReceiver.java:415)
	at rsb.transport.spread.SpreadMultiReceiver.manageLeave(SpreadMultiReceiver.java:316)
	- locked <0x0000000782be5208> (a java.util.HashMap)
	at rsb.transport.spread.SpreadMultiReceiver.unsubscribe(SpreadMultiReceiver.java:360)
	- locked <0x0000000782be5208> (a java.util.HashMap)
	at rsb.transport.spread.MultiSpreadInPushConnector.deactivate(MultiSpreadInPushConnector.java:128)
	at rsb.eventprocessing.RouteConfiguratorUtility.deactivate(RouteConfiguratorUtility.java:119)
	- locked <0x0000000783b38f30> (a rsb.eventprocessing.RouteConfiguratorUtility)
	at rsb.eventprocessing.DefaultPushInRouteConfigurator.deactivate(DefaultPushInRouteConfigurator.java:79)
	- locked <0x0000000783b38f30> (a rsb.eventprocessing.RouteConfiguratorUtility)
	at rsb.Listener$StateActive.deactivate(Listener.java:97)
	at rsb.Listener.deactivate(Listener.java:196)
	at org.openbase.jul.extension.rsb.com.RSBSynchronizedParticipant.deactivate(RSBSynchronizedParticipant.java:217)

Example Stacktrace of all other participants:

"pool-1-thread-1" #37 prio=5 os_prio=31 cpu=152.51ms elapsed=779.29s tid=0x00007ff7fc1f1000 nid=0x6703 waiting for monitor entry  [0x000070000f801000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at rsb.transport.spread.SpreadMultiReceiver.unsubscribe(SpreadMultiReceiver.java:341)
	- waiting to lock <0x0000000782be5208> (a java.util.HashMap)
	at rsb.transport.spread.MultiSpreadInPushConnector.deactivate(MultiSpreadInPushConnector.java:128)
	at rsb.eventprocessing.RouteConfiguratorUtility.deactivate(RouteConfiguratorUtility.java:119)
	- locked <0x0000000783c36438> (a rsb.eventprocessing.RouteConfiguratorUtility)
	at rsb.eventprocessing.DefaultPushInRouteConfigurator.deactivate(DefaultPushInRouteConfigurator.java:79)
	- locked <0x0000000783c36438> (a rsb.eventprocessing.RouteConfiguratorUtility)
	at rsb.Listener$StateActive.deactivate(Listener.java:97)
	at rsb.Listener.deactivate(Listener.java:196)
	at rsb.patterns.Method$StateActive.deactivate(Method.java:83)
	at rsb.patterns.Method.deactivate(Method.java:207)
	at rsb.patterns.Server$StateActive.deactivate(Server.java:77)
	at rsb.patterns.Server.deactivate(Server.java:225)
	- locked <0x0000000783c33858> (a rsb.patterns.LocalServer)
	at org.openbase.jul.extension.rsb.com.RSBSynchronizedParticipant.lambda$deactivate$0(RSBSynchronizedParticipant.java:205)

Code Example to reproduce:

import rsb.Event;
import rsb.Factory;
import rsb.RSBException;
import rsb.config.ParticipantConfig;
import rsb.patterns.Callback;
import rsb.patterns.LocalServer;
import rsb.patterns.RemoteServer;
import rsb.transport.spread.InPushConnectorFactoryRegistry;
import rsb.transport.spread.SharedInPushConnectorFactory;

import java.util.logging.Level;
import java.util.logging.Logger;

public class DeactivationOnEventLoadTest {

    // ----- setup test parameter ---------------------------------------
    public static final boolean EXIT_ON_ERROR               = false;
    public static final boolean STOP_PRINT_ON_ERROR         = true;
    public static final boolean ENABLE_CONNECTION_SHARING   = true;
    public static final long    SLEEP_TIME_BETWEEN_MESSES   = 10;
    public static final Level   LOG_LEVEL                   = Level.ALL;
    // ------------------------------------------------------------------

    // other static fields
    private static final String KEY_IN_PUSH_FACTORY = "shareIfPossible";
    private static final String METTHOD = "testmethod";

    public static void main(String[] args) throws InterruptedException {

        Logger.getLogger("").setLevel(LOG_LEVEL);

        // setup participant config
        int runCounter = 0;
        final ParticipantConfig participantConfig = Factory.getInstance().getDefaultParticipantConfig();
        if (ENABLE_CONNECTION_SHARING) {
            InPushConnectorFactoryRegistry.getInstance().registerFactory(KEY_IN_PUSH_FACTORY, new SharedInPushConnectorFactory());

            // instruct the spread transport to use your newly registered factory
            // for creating in push connector instances
            participantConfig.getOrCreateTransport("spread")
                    .getOptions()
                    .setProperty("transport.spread.java.infactory", KEY_IN_PUSH_FACTORY);
        }

        // start test
        mainLoop:
        while (!Thread.interrupted()) {

            // prepare run
            runCounter++;
            int serverRemotePairCount = runCounter * 10;
            final ShutdownHandler shutdownHandler = new ShutdownHandler(serverRemotePairCount);

            System.out.println("Start run " + runCounter + " with " + serverRemotePairCount * 2 + " participants...");
            for (int i = 0; i < serverRemotePairCount; i++) {
                final String id = Integer.toString(i);

                // create server
                new Thread("ServerThread-" + id) {
                    @Override
                    public void run() {
                        try {
                            LocalServer server = Factory.getInstance().createLocalServer("/test/scope/" + id, participantConfig);
                            server.addMethod("testmethod", new Callback() {
                                @Override
                                public Event internalInvoke(Event request) {
                                    return new Event(Void.class, null);
                                }
                            });
                            server.activate();
                            shutdownHandler.activeParticipants++;

                            while (!shutdownHandler.shutdown) {
                                Thread.sleep(SLEEP_TIME_BETWEEN_MESSES);
                            }

                            server.deactivate();
                            shutdownHandler.activeParticipants--;
                        } catch (RSBException | InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                }.start();

                // create remote
                new Thread("RemoteThread-" + id) {

                    @Override
                    public void run() {
                        try {
                            RemoteServer remote = Factory.getInstance().createRemoteServer("/test/scope/" + id, participantConfig);

                            remote.activate();
                            shutdownHandler.activeParticipants++;

                            while (!shutdownHandler.shutdown) {
                                remote.callAsync(METTHOD, new Event(Void.class, null));
                                Thread.sleep(SLEEP_TIME_BETWEEN_MESSES);
                            }

                            remote.deactivate();
                            shutdownHandler.activeParticipants--;
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                    }
                }.start();
            }

            System.out.println("Wait until pool is active...");
            while (shutdownHandler.activeParticipants != serverRemotePairCount * 2) {
                Thread.yield();
            }

            // wait some additional time to increase message load
            Thread.sleep(2000);

            System.out.println("Initiate Shutdown...");
            shutdownHandler.shutdown = true;

            int lastCount = -1;
            while (shutdownHandler.activeParticipants != 0) {

                System.out.println("Shutdown progress: " + shutdownHandler.getProgress() + "% " + (shutdownHandler.activeParticipants == lastCount ? "(deactivation stuck detected)" : ""));

                if (lastCount == shutdownHandler.activeParticipants) {
                    Thread.sleep(2000);
                    if ((STOP_PRINT_ON_ERROR || EXIT_ON_ERROR) && lastCount == shutdownHandler.activeParticipants) {
                        System.err.println("Shutdown failed in run " + runCounter + " with " + serverRemotePairCount * 2 + " participants where " + shutdownHandler.activeParticipants + " could not be deactivated!");
                        break mainLoop;
                    }
                }
                lastCount = shutdownHandler.activeParticipants;
                Thread.sleep(200);
            }
            System.out.println("Shutdown progress: 100% (successful)");
        }
        System.out.println("Exit test. Probably some threads do not terminate because of the deactivation failure.");

        if (EXIT_ON_ERROR) {
            System.exit(200);
        }
    }

    static class ShutdownHandler {
        private boolean shutdown = false;
        private transient int activeParticipants = 0;
        private final int totalParticipantCount;

        public ShutdownHandler(int totalParticipantCount) {
            this.totalParticipantCount = totalParticipantCount;
        }

        public int getProgress() {
            return (100 - (int) ((activeParticipants / (totalParticipantCount * 2d)) * 100d));
        }
    }
}

Code Example Log

/Library/Java/JavaVirtualMachines/zulu11.33.15-ca-jdk11.0.4-macosx_x64/bin/java -Dvisualvm.id=60730495710092 "-javaagent:/Users/divine/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/192.7142.36/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=59640:/Users/divine/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/192.7142.36/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Users/divine/workspace/openbase/bco/module/jul/extension/rsb/com/target/classes:/Users/divine/workspace/openbase/bco/module/jul/extension/type/interface/target/classes:/Users/divine/.m2/repository/org/openbase/type/1.0-SNAPSHOT/type-1.0-SNAPSHOT.jar:/Users/divine/workspace/openbase/bco/module/jul/extension/type/util/target/classes:/Users/divine/workspace/openbase/bco/module/jul/extension/rsb/interface/target/classes:/Users/divine/workspace/openbase/bco/module/jul/extension/rsb/scope/target/classes:/Users/divine/workspace/openbase/bco/module/jul/extension/type/processing/target/classes:/Users/divine/workspace/openbase/bco/module/jul/extension/protobuf/target/classes:/Users/divine/workspace/openbase/bco/module/jul/processing/json/target/classes:/Users/divine/workspace/openbase/bco/module/jul/processing/default/target/classes:/Users/divine/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/divine/.m2/repository/java3d/vecmath/1.3.1/vecmath-1.3.1.jar:/Users/divine/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.10.0.pr3/jackson-core-2.10.0.pr3.jar:/Users/divine/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.10.0.pr3/jackson-databind-2.10.0.pr3.jar:/Users/divine/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.10.0.pr3/jackson-annotations-2.10.0.pr3.jar:/Users/divine/.m2/repository/com/googlecode/protobuf-java-format/protobuf-java-format/1.4/protobuf-java-format-1.4.jar:/Users/divine/.m2/repository/com/google/code/gson/gson/2.6.2/gson-2.6.2.jar:/Users/divine/workspace/openbase/bco/module/jul/interface/target/classes:/Users/divine/workspace/openbase/bco/module/jul/annotation/target/classes:/Users/divine/workspace/openbase/bco/module/jul/schedule/target/classes:/Users/divine/workspace/openbase/bco/module/jul/pattern/default/target/classes:/Users/divine/workspace/openbase/bco/module/jul/pattern/controller/target/classes:/Users/divine/.m2/repository/rsb/rsb/0.18.4/rsb-0.18.4.jar:/Users/divine/.m2/repository/com/google/protobuf/protobuf-java/3.0.2/protobuf-java-3.0.2.jar:/Users/divine/.m2/repository/com/github/jnr/jnr-ffi/2.0.9/jnr-ffi-2.0.9.jar:/Users/divine/.m2/repository/com/github/jnr/jffi/1.2.11/jffi-1.2.11.jar:/Users/divine/.m2/repository/com/github/jnr/jffi/1.2.11/jffi-1.2.11-native.jar:/Users/divine/.m2/repository/org/ow2/asm/asm/5.0.3/asm-5.0.3.jar:/Users/divine/.m2/repository/org/ow2/asm/asm-commons/5.0.3/asm-commons-5.0.3.jar:/Users/divine/.m2/repository/org/ow2/asm/asm-analysis/5.0.3/asm-analysis-5.0.3.jar:/Users/divine/.m2/repository/org/ow2/asm/asm-tree/5.0.3/asm-tree-5.0.3.jar:/Users/divine/.m2/repository/org/ow2/asm/asm-util/5.0.3/asm-util-5.0.3.jar:/Users/divine/.m2/repository/com/github/jnr/jnr-x86asm/1.0.2/jnr-x86asm-1.0.2.jar:/Users/divine/workspace/openbase/bco/module/jul/exception/target/classes:/Users/divine/.m2/repository/org/openbase/jps/3.4.16-SNAPSHOT/jps-3.4.16-SNAPSHOT.jar:/Users/divine/.m2/repository/org/fusesource/jansi/jansi/1.11/jansi-1.11.jar:/Users/divine/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/Users/divine/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/Users/divine/.m2/repository/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar:/Users/divine/.m2/repository/commons-io/commons-io/2.6/commons-io-2.6.jar:/Users/divine/.m2/repository/org/apache/commons/commons-lang3/3.9/commons-lang3-3.9.jar org.openbase.jul.extension.rsb.com.test.DeactivationOnEventLoadTest
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.google.protobuf.UnsafeUtil (file:/Users/divine/.m2/repository/com/google/protobuf/protobuf-java/3.0.2/protobuf-java-3.0.2.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of com.google.protobuf.UnsafeUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Start run 1 with 20 participants...
Wait until pool is active...
Initiate Shutdown...
Shutdown progress: 0% 
Nov 08, 2019 3:57:25 PM rsb.introspection.IntrospectionParticipantObserver destroyed
INFO: Deactivating introspection protocol handler since no more participants exist
Shutdown progress: 100% (successful)
Start run 2 with 40 participants...
Wait until pool is active...
Initiate Shutdown...
Shutdown progress: 0% 
Nov 08, 2019 3:57:27 PM rsb.introspection.IntrospectionParticipantObserver destroyed
INFO: Deactivating introspection protocol handler since no more participants exist
Shutdown progress: 100% (successful)
Start run 3 with 60 participants...
Wait until pool is active...
Initiate Shutdown...
Shutdown progress: 0% 
Nov 08, 2019 3:57:30 PM rsb.introspection.IntrospectionParticipantObserver destroyed
INFO: Deactivating introspection protocol handler since no more participants exist
Shutdown progress: 100% (successful)
Start run 4 with 80 participants...
Wait until pool is active...
Initiate Shutdown...
Shutdown progress: 0% 
Nov 08, 2019 3:57:32 PM rsb.introspection.IntrospectionParticipantObserver destroyed
INFO: Deactivating introspection protocol handler since no more participants exist
Shutdown progress: 100% (successful)
Start run 5 with 100 participants...
Wait until pool is active...
Initiate Shutdown...
Shutdown progress: 0% 
Shutdown progress: 12% 
Shutdown progress: 12% (deactivation stuck detected)
Exit test. Probably some threads do not terminate because of the deactivation failure.
Shutdown failed in run 5 with 100 participants where 88 could not be deactivated!

Critical Stacktrace

"RemoteThread-14" #455 prio=5 os_prio=31 cpu=32.33ms elapsed=280.49s tid=0x00007fe32eb74800 nid=0x1bd0b in Object.wait()  [0x0000700006fa9000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(java.base@11.0.4/Native Method)
	- waiting on 
	at java.lang.Object.wait(java.base@11.0.4/Object.java:328)
	at rsb.transport.spread.SpreadReceiver$OneTimeWaitingMembershipHander.waitForMessage(SpreadReceiver.java:106)
	- waiting to re-lock in wait() <0x00000007871e3718> (a java.lang.Object)
	at rsb.transport.spread.SpreadReceiver$StateActive.leave(SpreadReceiver.java:322)
	at rsb.transport.spread.SpreadReceiver.leave(SpreadReceiver.java:415)
	at rsb.transport.spread.SpreadMultiReceiver.manageLeave(SpreadMultiReceiver.java:316)
	- locked <0x000000078027a4e8> (a java.util.HashMap)
	at rsb.transport.spread.SpreadMultiReceiver.unsubscribe(SpreadMultiReceiver.java:360)
	- locked <0x000000078027a4e8> (a java.util.HashMap)
	at rsb.transport.spread.MultiSpreadInPushConnector.deactivate(MultiSpreadInPushConnector.java:128)
	at rsb.eventprocessing.RouteConfiguratorUtility.deactivate(RouteConfiguratorUtility.java:119)
	- locked <0x00000007836374f0> (a rsb.eventprocessing.RouteConfiguratorUtility)
	at rsb.eventprocessing.DefaultPushInRouteConfigurator.deactivate(DefaultPushInRouteConfigurator.java:79)
	- locked <0x00000007836374f0> (a rsb.eventprocessing.RouteConfiguratorUtility)
	at rsb.Listener$StateActive.deactivate(Listener.java:97)
	at rsb.Listener.deactivate(Listener.java:196)
	at rsb.patterns.Method$StateActive.deactivate(Method.java:83)
	at rsb.patterns.Method.deactivate(Method.java:207)
	at rsb.patterns.Server$StateActive.deactivate(Server.java:77)
	at rsb.patterns.Server.deactivate(Server.java:225)
	- locked <0x0000000783630bd8> (a rsb.patterns.RemoteServer)
	at org.openbase.jul.extension.rsb.com.test.DeactivationOnEventLoadTest$2.run(DeactivationOnEventLoadTest.java:104)

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions