/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.impl.test;

import com.devexperts.qd.qtp.MessageConnector;
import com.devexperts.qd.qtp.QDEndpoint;
import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.osub.WildcardSymbol;
import com.dxfeed.event.market.Trade;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FilterTransferTest {
    private static final int PORT = 25899;
    private static final String CLIENT_FILTER = "[AC]*";
    private static final String SERVER_FILTER = "[AB]*";
    private static final String ACTUAL_SYMBOL = "ACTUAL";
    private static final String DXFEED_WILDCARD_ENABLE = "dxfeed.wildcard.enable";
    private static final int N = 10000;
    private static final long SERVER_MAX_BYTES_SENT = 4096L;
    private static final long CLIENT_MAX_BYTES_SENT = 8192L;
    private DXEndpoint server;
    private DXEndpoint client;
    private DXPublisher publisher;
    private DXFeed feed;
    private QDEndpoint serverQDEndpoint;
    private QDEndpoint clientQDEndpoint;
    private final BlockingQueue<Object> subAdded = new LinkedBlockingDeque<Object>();
    private final BlockingQueue<Trade> tradesReceived = new LinkedBlockingDeque<Trade>();
    private DXFeedSubscription<Trade> sub;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.server = DXEndpoint.newBuilder().withProperty(DXFEED_WILDCARD_ENABLE, "true").withRole(DXEndpoint.Role.PUBLISHER).build();
        this.client = DXEndpoint.newBuilder().withProperty(DXFEED_WILDCARD_ENABLE, "true").withRole(DXEndpoint.Role.FEED).build();
        this.server.connect("[AB]*@:25899");
        this.client.connect("[AC]*@localhost:25899");
        this.publisher = this.server.getPublisher();
        this.feed = this.client.getFeed();
        this.serverQDEndpoint = ((DXEndpointImpl)this.server).getQDEndpoint();
        this.clientQDEndpoint = ((DXEndpointImpl)this.client).getQDEndpoint();
        this.publisher.getSubscription(Trade.class).addChangeListener(this.subAdded::addAll);
        this.sub = this.feed.createSubscription(Trade.class);
        this.sub.addEventListener(this.tradesReceived::addAll);
    }

    @After
    public void tearDown() throws Exception {
        this.client.close();
        this.server.close();
        ThreadCleanCheck.after();
    }

    @Test
    public void testFilters() throws InterruptedException {
        int i;
        this.sub.addSymbols((Object)WildcardSymbol.ALL);
        Assert.assertEquals((Object)WildcardSymbol.ALL, (Object)this.subAdded.poll(10L, TimeUnit.SECONDS));
        for (i = 0; i < 10000; ++i) {
            this.sub.addSymbols((Object)("C" + i));
        }
        for (i = 0; i < 10000; ++i) {
            this.publishTrade("B" + i);
        }
        this.publishTrade(ACTUAL_SYMBOL);
        Trade tradeReceived = this.tradesReceived.poll(10L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)ACTUAL_SYMBOL, (Object)tradeReceived.getEventSymbol());
        long serverBytesSent = this.getTotalBytesSent(this.serverQDEndpoint);
        Assert.assertTrue((String)("serverBytesSent = " + serverBytesSent), (serverBytesSent > 0L && serverBytesSent <= 4096L ? 1 : 0) != 0);
        long clientBytesSent = this.getTotalBytesSent(this.clientQDEndpoint);
        Assert.assertTrue((String)("clientBytesSent = " + clientBytesSent), (clientBytesSent > 0L && clientBytesSent <= 8192L ? 1 : 0) != 0);
    }

    private void publishTrade(String symbol) {
        Trade trade = new Trade(symbol);
        this.publisher.publishEvents(Collections.singletonList(trade));
    }

    private long getTotalBytesSent(QDEndpoint endpoint) {
        long sum = 0L;
        for (MessageConnector connector : endpoint.getConnectors()) {
            sum += connector.retrieveCompleteEndpointStats().getWrittenBytes();
        }
        return sum;
    }
}

