/*
 * Decompiled with CFR 0.152.
 */
package de.rcenvironment.core.communication.transport.jms.activemq.internal;

import de.rcenvironment.core.communication.channel.ServerContactPoint;
import de.rcenvironment.core.communication.model.NetworkContactPoint;
import de.rcenvironment.core.communication.transport.jms.activemq.internal.ActiveMQConnectionFilterPlugin;
import de.rcenvironment.core.communication.transport.jms.common.InitialInboxConsumer;
import de.rcenvironment.core.communication.transport.jms.common.JmsBroker;
import de.rcenvironment.core.communication.transport.jms.common.JmsProtocolUtils;
import de.rcenvironment.core.communication.transport.jms.common.RemoteInitiatedMessageChannelFactory;
import de.rcenvironment.core.communication.transport.jms.common.RequestInboxConsumer;
import de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMQBroker
implements JmsBroker {
    private static final int SHUTDOWN_WAIT_AFTER_ANNOUNCE_MSEC = 1000;
    private static final long ACTIVEMQ_TEMPORARY_STORE_LIMIT = 0x3200000L;
    private static final AtomicInteger sharedInboxConsumerIdGenerator = new AtomicInteger();
    private final String brokerName;
    private final String externalUrl;
    private final String jvmLocalUrl;
    private BrokerService brokerService;
    private Connection localBrokerConnection;
    private final ServerContactPoint scp;
    private final RemoteInitiatedMessageChannelFactory remoteInitiatedConnectionFactory;
    private int numRequestConsumers;
    private ActiveMQConnectionFilterPlugin connectionFilterPlugin;
    private final AsyncTaskService threadPool = ConcurrencyUtils.getAsyncTaskService();
    private final Log log = LogFactory.getLog(this.getClass());

    public ActiveMQBroker(ServerContactPoint scp, RemoteInitiatedMessageChannelFactory remoteInitiatedConnectionFactory) {
        this.scp = scp;
        this.remoteInitiatedConnectionFactory = remoteInitiatedConnectionFactory;
        NetworkContactPoint ncp = scp.getNetworkContactPoint();
        int port = ncp.getPort();
        String host = ncp.getHost();
        this.brokerName = "RCE_ActiveMQ_" + host + "_" + port;
        this.externalUrl = "tcp://" + host + ":" + port;
        this.jvmLocalUrl = "vm://" + this.brokerName;
        this.numRequestConsumers = 1;
        String property = System.getProperty("jms.numRequestConsumers");
        if (property != null) {
            try {
                this.numRequestConsumers = Integer.parseInt(property);
            }
            catch (NumberFormatException numberFormatException) {
                this.log.warn((Object)("Ignoring invalid property value: " + property));
            }
        }
    }

    @Override
    public void start() throws Exception {
        this.connectionFilterPlugin = new ActiveMQConnectionFilterPlugin();
        this.connectionFilterPlugin.setFilter(this.scp.getConnectionFilter());
        this.brokerService = ActiveMQBroker.createTransientEmbeddedBroker(this.brokerName, this.connectionFilterPlugin, this.externalUrl, this.jvmLocalUrl);
        this.brokerService.start();
        this.log.info((Object)StringUtils.format((String)"Listening for standard connections on %s:%d", (Object[])new Object[]{this.scp.getNetworkContactPoint().getHost(), this.scp.getNetworkContactPoint().getPort()}));
        ActiveMQConnectionFactory localConnectionFactory = new ActiveMQConnectionFactory(this.jvmLocalUrl);
        this.localBrokerConnection = localConnectionFactory.createConnection();
        this.localBrokerConnection.setExceptionListener(new ExceptionListener(){

            public void onException(JMSException exception) {
                ActiveMQBroker.this.handleAsyncJMSException(exception);
            }
        });
        this.localBrokerConnection.start();
        this.spawnInboxConsumers(this.getLocalConnection());
    }

    @Override
    public void stop() {
        try {
            try {
                try (Session shutdownSession = this.localBrokerConnection.createSession(false, 1);){
                    this.log.debug((Object)"Sending internal queue shutdown commands");
                    String securityToken = "secToken";
                    Message poisonPill = JmsProtocolUtils.createQueueShutdownMessage(shutdownSession, securityToken);
                    MessageProducer producer = shutdownSession.createProducer(null);
                    JmsProtocolUtils.configureMessageProducer(producer);
                    producer.send((Destination)shutdownSession.createQueue("initial/c2b"), poisonPill);
                    int i = 0;
                    while (i < this.numRequestConsumers) {
                        producer.send((Destination)shutdownSession.createQueue("requests/c2b/common"), poisonPill);
                        ++i;
                    }
                }
                Thread.sleep(1000L);
            }
            catch (JMSException e) {
                this.log.error((Object)"Error while shutting down queue listeners", (Throwable)e);
                try {
                    this.localBrokerConnection.close();
                }
                catch (JMSException e2) {
                    this.log.warn((Object)("Error closing local connection to broker " + this.brokerService.getBrokerName()), (Throwable)e2);
                }
                try {
                    this.log.info((Object)("Shutting down server port " + this.scp.getNetworkContactPoint().getPort()));
                    this.brokerService.stop();
                    this.log.debug((Object)("Stopped JMS broker " + this.brokerService.getBrokerName()));
                }
                catch (Exception e3) {
                    this.log.warn((Object)("Error shutting down JMS broker " + this.brokerService.getBrokerName()), (Throwable)e3);
                }
            }
            catch (InterruptedException e) {
                this.log.error((Object)"Interrupted while waiting for queue shutdown", (Throwable)e);
                try {
                    this.localBrokerConnection.close();
                }
                catch (JMSException e4) {
                    this.log.warn((Object)("Error closing local connection to broker " + this.brokerService.getBrokerName()), (Throwable)e4);
                }
                try {
                    this.log.info((Object)("Shutting down server port " + this.scp.getNetworkContactPoint().getPort()));
                    this.brokerService.stop();
                    this.log.debug((Object)("Stopped JMS broker " + this.brokerService.getBrokerName()));
                }
                catch (Exception e5) {
                    this.log.warn((Object)("Error shutting down JMS broker " + this.brokerService.getBrokerName()), (Throwable)e5);
                }
            }
        }
        finally {
            try {
                this.localBrokerConnection.close();
            }
            catch (JMSException e) {
                this.log.warn((Object)("Error closing local connection to broker " + this.brokerService.getBrokerName()), (Throwable)e);
            }
            try {
                this.log.info((Object)("Shutting down server port " + this.scp.getNetworkContactPoint().getPort()));
                this.brokerService.stop();
                this.log.debug((Object)("Stopped JMS broker " + this.brokerService.getBrokerName()));
            }
            catch (Exception e) {
                this.log.warn((Object)("Error shutting down JMS broker " + this.brokerService.getBrokerName()), (Throwable)e);
            }
        }
    }

    @Override
    public Connection getLocalConnection() {
        return this.localBrokerConnection;
    }

    private static BrokerService createTransientEmbeddedBroker(String brokerName, ActiveMQConnectionFilterPlugin filterPlugin, String ... urls) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName(brokerName);
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.getSystemUsage().getTempUsage().setLimit(0x3200000L);
        broker.setPlugins(new BrokerPlugin[]{filterPlugin});
        String[] stringArray = urls;
        int n = urls.length;
        int n2 = 0;
        while (n2 < n) {
            String url = stringArray[n2];
            broker.addConnector(url);
            ++n2;
        }
        return broker;
    }

    private void spawnInboxConsumers(Connection connection) throws JMSException {
        this.log.debug((Object)("Spawning initial inbox consumer for " + this.scp.toString()));
        this.threadPool.execute((Runnable)new InitialInboxConsumer(connection, this.scp, this.remoteInitiatedConnectionFactory));
        this.log.debug((Object)("Spawning " + this.numRequestConsumers + " request inbox consumer(s) for " + this.scp));
        MessageChannelEndpointHandler endpointHandler = this.scp.getEndpointHandler();
        int i = 1;
        while (i <= this.numRequestConsumers) {
            this.threadPool.execute((Runnable)new RequestInboxConsumer("requests/c2b/common", connection, endpointHandler), StringUtils.format((String)"Shared C2B Request Inbox Consumer #%d (worker #%d for %s')", (Object[])new Object[]{sharedInboxConsumerIdGenerator.incrementAndGet(), i, this.scp.toString()}));
            ++i;
        }
    }

    private void handleAsyncJMSException(JMSException e) {
        boolean isKnownHarmlessMessage;
        String exceptionString = e.toString();
        boolean bl = isKnownHarmlessMessage = exceptionString.contains("The destination temp-queue") || exceptionString.contains("Cannot remove session that had not been registered");
        if (isKnownHarmlessMessage) {
            this.log.debug((Object)("Asynchronous JMS exception (usually a follow-up error of a broken connection): " + exceptionString));
        } else {
            this.log.warn((Object)"Asynchronous JMS exception in local broker connection", (Throwable)e);
        }
    }
}

