/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.Serializable;
import junit.framework.Assert;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.log4j.Logger;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import org.jgroups.View;
import org.jgroups.util.Util;

public class ChannelDuo
extends TestCase {
    static final boolean DEBUG = false;
    private Channel channel1 = null;
    private Channel channel2 = null;
    static Logger logger = Logger.getLogger((Class)(class$org$jgroups$tests$ChannelDuo == null ? (class$org$jgroups$tests$ChannelDuo = ChannelDuo.class$("org.jgroups.tests.ChannelDuo")) : class$org$jgroups$tests$ChannelDuo));
    String channelName = "ChannelLog4jTest";
    String protocol = null;
    static /* synthetic */ Class class$org$jgroups$tests$ChannelDuo;

    public ChannelDuo(String Name_) {
        super(Name_);
    }

    public String getProtocol() {
        return this.protocol;
    }

    public void setProtocol(String proto) {
        this.protocol = proto;
    }

    public void setUp() {
        try {
            this.channel1 = new JChannel();
            this.channel1.connect(this.channelName);
        }
        catch (ChannelException e) {
            logger.error((Object)"Channel init problem", (Throwable)e);
        }
    }

    public void tearDown() {
        if (this.channel1 != null) {
            this.channel1.close();
            this.channel1 = null;
        }
    }

    public void testLargeInsertion() {
        int nitems = 10000;
        logger.debug((Object)"Starting testLargeInsertion");
        try {
            logger.info((Object)("Inserting " + nitems + " elements"));
            ReadItems mythread = new ReadItems(this.channel1, 0, nitems);
            mythread.start();
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
            long start = System.currentTimeMillis();
            for (int i = 0; i < nitems; ++i) {
                this.channel2.send(new Message(null, null, ("Msg #" + i).getBytes()));
            }
            mythread.join();
            long stop = System.currentTimeMillis();
            logger.info((Object)("Took " + (stop - start) + " msecs"));
            ChannelDuo.assertEquals((int)nitems, (int)mythread.getNum_items());
            ChannelDuo.assertFalse((boolean)mythread.isAlive());
            this.channel2.close();
            this.channel2 = null;
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
            ChannelDuo.assertTrue((boolean)false);
        }
        logger.debug((Object)"end testLargeInsertion");
    }

    public void testBarrierWithTimeOut() {
        int i;
        logger.info((Object)"start testBarrierWithTimeOut");
        RemoveOneItemWithTimeout[] removers = new RemoveOneItemWithTimeout[10];
        int num_dead = 0;
        long timeout = 200L;
        for (int i2 = 0; i2 < removers.length; ++i2) {
            removers[i2] = new RemoveOneItemWithTimeout(this.channel1, i2, timeout);
            removers[i2].start();
        }
        try {
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
        }
        catch (Exception e) {
            logger.error((Object)"Problem", (Throwable)e);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 99");
        try {
            this.channel2.send(null, null, new Long(99L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 100");
        try {
            this.channel2.send(null, null, new Long(100L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(1000L);
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelDuo.assertEquals((int)2, (int)num_dead);
        this.channel1.disconnect();
        Util.sleep(2000L);
        for (i = 0; i < removers.length; ++i) {
            try {
                logger.debug((Object)("Waiting for thread " + i + " to join"));
                removers[i].join();
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelDuo.assertEquals((int)removers.length, (int)num_dead);
        for (i = 0; i < removers.length; ++i) {
            removers[i] = null;
        }
        this.channel2.close();
        this.channel2 = null;
        logger.info((Object)"end testBarrierWithTimeOut");
    }

    public void testMultipleWriterOneReader() {
        logger.info((Object)"start testMultipleWriterOneReader");
        AddOneItem[] adders = new AddOneItem[10];
        int num_dead = 0;
        int num_items = 0;
        int items = 200;
        try {
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
        }
        catch (Exception e) {
            logger.error((Object)"Problem", (Throwable)e);
        }
        Util.sleep(1000L);
        for (int i = 0; i < adders.length; ++i) {
            adders[i] = new AddOneItem(this.channel1, i, items);
            adders[i].start();
        }
        while (num_items < adders.length * items) {
            try {
                Object obj = this.channel2.receive(0L);
                if (obj instanceof View) {
                    logger.info((Object)("--> NEW VIEW: " + obj));
                    continue;
                }
                if (obj instanceof Message) {
                    Message msg = (Message)obj;
                    ++num_items;
                    logger.debug((Object)("Received " + msg.getObject()));
                    continue;
                }
                logger.error((Object)("Unexpected object type " + obj.getClass()));
            }
            catch (ChannelNotConnectedException conn) {
                logger.error((Object)"Problem", (Throwable)conn);
                ChannelDuo.assertTrue((boolean)false);
                break;
            }
            catch (TimeoutException e) {
                logger.error((Object)"Main thread timed out but should'nt had...", (Throwable)e);
                ChannelDuo.assertTrue((boolean)false);
                break;
            }
            catch (Exception e) {
                logger.error((Object)"Problem", (Throwable)e);
                break;
            }
        }
        ChannelDuo.assertEquals((int)(adders.length * items), (int)num_items);
        Util.sleep(1000L);
        for (int i = 0; i < adders.length; ++i) {
            try {
                logger.debug((Object)("Waiting for thread " + i + " to join"));
                adders[i].join();
                logger.info((Object)("adder #" + i + " is " + (adders[i].isAlive() ? "alive" : "terminated")));
                if (!adders[i].isAlive()) {
                    ++num_dead;
                }
                adders[i] = null;
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        ChannelDuo.assertEquals((int)adders.length, (int)num_dead);
        this.channel2.close();
        this.channel2 = null;
        logger.info((Object)"end testMultipleWriterOneReader");
    }

    public void testBarrier() {
        int i;
        logger.info((Object)"start testBarrier");
        ReadItems[] removers = new ReadItems[10];
        int num_dead = 0;
        for (int i2 = 0; i2 < removers.length; ++i2) {
            removers[i2] = new ReadItems(this.channel1, i2, 1);
            removers[i2].start();
        }
        try {
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(1000L);
        logger.info((Object)"-- adding element 99");
        try {
            this.channel2.send(null, null, (Serializable)"99".getBytes());
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 100");
        try {
            this.channel2.send(null, null, (Serializable)"100".getBytes());
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(2000L);
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelDuo.assertEquals((int)2, (int)num_dead);
        this.channel1.close();
        for (i = 0; i < removers.length; ++i) {
            try {
                removers[i].join(1000L);
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i = 0; i < removers.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removers[i].isAlive() ? "alive" : "terminated")));
            if (removers[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelDuo.assertEquals((int)removers.length, (int)num_dead);
        this.channel2.close();
        this.channel2 = null;
        logger.info((Object)"stop testBarrier");
    }

    public void testMultipleWriterMultipleReader() {
        int i;
        logger.info((Object)"start testMultipleWriterMultipleReader");
        int nWriters = 10;
        int nReaders = 10;
        Writer[] adders = new Writer[nWriters];
        Reader[] readers = new Reader[nReaders];
        boolean num_dead = false;
        boolean num_items = false;
        int[] writes = new int[nWriters];
        int[] reads = new int[nReaders];
        try {
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
        }
        catch (Exception e) {
            logger.error((Object)"Problem", (Throwable)e);
        }
        for (i = 0; i < readers.length; ++i) {
            readers[i] = new Reader(this.channel2, i, reads);
            readers[i].start();
        }
        Util.sleep(2000L);
        for (i = 0; i < adders.length; ++i) {
            adders[i] = new Writer(this.channel1, i, writes);
            adders[i].start();
        }
        Util.sleep(10000L);
        for (i = 0; i < adders.length; ++i) {
            adders[i].stopThread();
        }
        Util.sleep(1000L);
        for (i = 0; i < adders.length; ++i) {
            try {
                logger.debug((Object)("Waiting for Writer thread " + i + " to join"));
                adders[i].join(1000L);
                logger.info((Object)("adder #" + i + " is " + (adders[i].isAlive() ? "alive" : "terminated")));
                adders[i] = null;
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        Util.sleep(10000L);
        this.channel2.close();
        boolean allStopped = true;
        do {
            allStopped = true;
            Util.sleep(2000L);
            for (int i2 = 0; i2 < readers.length; ++i2) {
                try {
                    logger.debug((Object)("Waiting for Reader thread " + i2 + " to join"));
                    readers[i2].join(1000L);
                    if (readers[i2].isAlive()) {
                        allStopped = false;
                        logger.info((Object)("reader #" + i2 + ' ' + reads[i2] + " read items"));
                    }
                    logger.info((Object)("reader #" + i2 + " is " + (readers[i2].isAlive() ? "alive" : "terminated")));
                    continue;
                }
                catch (InterruptedException e) {
                    logger.error((Object)"Thread joining() interrupted", (Throwable)e);
                }
            }
        } while (!allStopped);
        int total_writes = 0;
        for (int i3 = 0; i3 < writes.length; ++i3) {
            total_writes += writes[i3];
        }
        int total_reads = 0;
        for (int i4 = 0; i4 < reads.length; ++i4) {
            total_reads += reads[i4];
        }
        logger.info((Object)("Total writes:" + total_writes));
        logger.info((Object)("Total reads:" + total_reads));
        ChannelDuo.assertEquals((int)total_writes, (int)total_reads);
        this.channel2.close();
        this.channel2 = null;
        logger.info((Object)"end testMultipleWriterMultipleReader");
    }

    public static void main(String[] args) {
        String[] testCaseName = new String[]{(class$org$jgroups$tests$ChannelDuo == null ? (class$org$jgroups$tests$ChannelDuo = ChannelDuo.class$("org.jgroups.tests.ChannelDuo")) : class$org$jgroups$tests$ChannelDuo).getName()};
        TestRunner.main((String[])testCaseName);
    }

    static /* synthetic */ Class class$(String x0) {
        try {
            return Class.forName(x0);
        }
        catch (ClassNotFoundException x1) {
            throw new NoClassDefFoundError(x1.getMessage());
        }
    }

    class Reader
    extends Thread {
        int rank;
        int num_reads = 0;
        int[] reads = null;
        boolean running = true;
        Channel channel = null;

        Reader(Channel channel, int i, int[] reads) {
            super("Reader thread #" + i);
            this.rank = i;
            this.reads = reads;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            Message msg = null;
            while (this.running) {
                try {
                    Object obj = this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Reader thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    msg = (Message)obj;
                    Long retval = (Long)msg.getObject();
                    logger.debug((Object)("Reader thread #" + this.rank + ": received " + retval));
                    ++this.num_reads;
                    Assert.assertNotNull((Object)retval);
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Reader thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.running = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Reader thread #" + this.rank + ": channel time out but should'nt have..."), (Throwable)e);
                    this.running = false;
                }
                catch (ChannelClosedException e) {
                    this.running = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Reader thread #" + this.rank + ": problem"), (Throwable)e);
                }
            }
            this.reads[this.rank] = this.num_reads;
        }

        void stopThread() {
            this.running = false;
        }
    }

    class Writer
    extends Thread {
        int rank = 0;
        int num_writes = 0;
        boolean running = true;
        int[] writes = null;
        Channel channel = null;

        Writer(Channel channel, int i, int[] writes) {
            super("Writer thread #" + i);
            this.rank = i;
            this.writes = writes;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            while (this.running) {
                try {
                    this.channel.send(null, null, new Long(System.currentTimeMillis()));
                    Util.sleepRandom(50L);
                    ++this.num_writes;
                }
                catch (ChannelException closed) {
                    this.running = false;
                }
                catch (Throwable t) {
                    logger.debug((Object)("ChannelTest.Writer.run(): exception=" + t), t);
                }
            }
            this.writes[this.rank] = this.num_writes;
        }

        void stopThread() {
            this.running = false;
        }
    }

    class RemoveOneItemWithTimeout
    extends Thread {
        Long retval = null;
        int rank = 0;
        long timeout = 0L;
        Channel channel = null;

        RemoveOneItemWithTimeout(Channel channel, int rank, long timeout) {
            super("RemoveOneItemWithTimeout thread #" + rank);
            this.rank = rank;
            this.timeout = timeout;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            boolean finished = false;
            while (!finished) {
                try {
                    Object obj = this.channel.receive(this.timeout);
                    if (obj != null) {
                        if (obj instanceof View) {
                            logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                            continue;
                        }
                        if (!(obj instanceof Message)) continue;
                        Message msg = (Message)obj;
                        this.retval = (Long)msg.getObject();
                        finished = true;
                        logger.debug((Object)("Thread #" + this.rank + " received :" + this.retval));
                        continue;
                    }
                    logger.debug((Object)("Thread #" + this.rank + ": channel read NULL"));
                }
                catch (ChannelNotConnectedException conn) {
                    finished = true;
                }
                catch (TimeoutException e) {
                }
                catch (ChannelClosedException e) {
                    logger.debug((Object)("Thread #" + this.rank + ": channel closed"), (Throwable)e);
                    finished = true;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + " problem"), (Throwable)e);
                    finished = true;
                }
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    class AddOneItem
    extends Thread {
        Long retval = null;
        int rank = 0;
        int iteration = 0;
        Channel channel = null;

        AddOneItem(Channel channel, int rank, int iteration) {
            super("AddOneItem thread #" + rank);
            this.rank = rank;
            this.iteration = iteration;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            try {
                for (int i = 0; i < this.iteration; ++i) {
                    this.channel.send(null, null, new Long(this.rank));
                    logger.debug((Object)("Thread #" + this.rank + " added element (" + this.rank + ')'));
                    Util.sleepRandom(100L);
                }
            }
            catch (ChannelException ex) {
                logger.error((Object)("Thread #" + this.rank + ": channel was closed"), (Throwable)ex);
            }
        }
    }

    class RemoveOneItem
    extends Thread {
        private boolean looping = true;
        int rank;
        Long retval = null;

        public RemoveOneItem(int rank) {
            super("RemoveOneItem thread #" + rank);
            this.rank = rank;
            this.setDaemon(true);
        }

        public void stopLooping() {
            this.looping = false;
        }

        public void run() {
            while (this.looping) {
                try {
                    Object obj = ChannelDuo.this.channel1.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    Message msg = (Message)obj;
                    this.looping = false;
                    this.retval = (Long)msg.getObject();
                    logger.debug((Object)("Thread #" + this.rank + ": received " + this.retval));
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.looping = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Thread #" + this.rank + ": channel time out but should'nt have..."), (Throwable)e);
                    this.looping = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)e);
                }
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    class ReadItems
    extends Thread {
        private boolean looping = true;
        int num_items = 0;
        int max = 0;
        int rank;
        Channel channel;

        public ReadItems(Channel channel, int rank, int num) {
            super("ReadItems thread #" + rank);
            this.rank = rank;
            this.max = num;
            this.channel = channel;
            this.setDaemon(true);
        }

        public void stopLooping() {
            this.looping = false;
        }

        public void run() {
            while (this.looping) {
                try {
                    Object obj = this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    Message msg = (Message)obj;
                    ++this.num_items;
                    if (this.num_items < this.max) continue;
                    this.looping = false;
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.looping = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Thread #" + this.rank + ": channel timed out but should'nt have..."), (Throwable)e);
                    this.looping = false;
                }
                catch (ChannelClosedException e) {
                    logger.debug((Object)("Thread #" + this.rank + ": channel closed"), (Throwable)e);
                    this.looping = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)e);
                    this.looping = false;
                }
            }
        }

        public int getNum_items() {
            return this.num_items;
        }
    }
}

