package com.dxfeed.api.impl.test;

import com.devexperts.qd.qtp.MessageConnector;
import com.devexperts.qd.qtp.QDEndpoint;
import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.osub.ObservableSubscription;
import com.dxfeed.api.osub.WildcardSymbol;
import com.dxfeed.event.market.Trade;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;

/* loaded from: input_file:com/dxfeed/api/impl/test/FilterTransferTest.class */
public class FilterTransferTest extends TestCase {
    private static final int PORT = 25899;
    private static final String CLIENT_FILTER = "[AC]*";
    private static final String SERVER_FILTER = "[AB]*";
    private static final String ACTUAL_SYMBOL = "ACTUAL";
    private static final String DXFEED_WILDCARD_ENABLE = "dxfeed.wildcard.enable";
    private static final int N = 10000;
    private static final long MAX_BYTES_SENT = 4098;
    private DXEndpoint server;
    private DXEndpoint client;
    private DXPublisher publisher;
    private DXFeed feed;
    private QDEndpoint serverQDEndpoint;
    private QDEndpoint clientQDEndpoint;
    private final BlockingQueue<Object> subAdded = new LinkedBlockingDeque();
    private final BlockingQueue<Trade> tradesReceived = new LinkedBlockingDeque();
    private DXFeedSubscription<Trade> sub;

    protected void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.server = DXEndpoint.newBuilder().withProperty(DXFEED_WILDCARD_ENABLE, "true").withRole(DXEndpoint.Role.PUBLISHER).build();
        this.client = DXEndpoint.newBuilder().withProperty(DXFEED_WILDCARD_ENABLE, "true").withRole(DXEndpoint.Role.FEED).build();
        this.server.connect("[AB]*@:25899");
        this.client.connect("[AC]*@localhost:25899");
        this.publisher = this.server.getPublisher();
        this.feed = this.client.getFeed();
        this.serverQDEndpoint = this.server.getQDEndpoint();
        this.clientQDEndpoint = this.client.getQDEndpoint();
        ObservableSubscription subscription = this.publisher.getSubscription(Trade.class);
        BlockingQueue<Object> blockingQueue = this.subAdded;
        blockingQueue.getClass();
        subscription.addChangeListener((v1) -> {
            r1.addAll(v1);
        });
        this.sub = this.feed.createSubscription(Trade.class);
        DXFeedSubscription<Trade> dXFeedSubscription = this.sub;
        BlockingQueue<Trade> blockingQueue2 = this.tradesReceived;
        blockingQueue2.getClass();
        dXFeedSubscription.addEventListener((v1) -> {
            r1.addAll(v1);
        });
    }

    protected void tearDown() throws Exception {
        this.client.close();
        this.server.close();
        ThreadCleanCheck.after();
    }

    public void testFilters() throws InterruptedException {
        this.sub.addSymbols(WildcardSymbol.ALL);
        assertEquals(WildcardSymbol.ALL, this.subAdded.poll(10L, TimeUnit.SECONDS));
        for (int i = 0; i < N; i++) {
            this.sub.addSymbols("C" + i);
        }
        for (int i2 = 0; i2 < N; i2++) {
            publishTrade("B" + i2);
        }
        publishTrade(ACTUAL_SYMBOL);
        assertEquals(ACTUAL_SYMBOL, this.tradesReceived.poll(10L, TimeUnit.SECONDS).getEventSymbol());
        long totalBytesSent = getTotalBytesSent(this.serverQDEndpoint);
        assertTrue("serverBytesSent = " + totalBytesSent, totalBytesSent > 0 && totalBytesSent <= MAX_BYTES_SENT);
        long totalBytesSent2 = getTotalBytesSent(this.clientQDEndpoint);
        assertTrue("clientBytesSent = " + totalBytesSent2, totalBytesSent2 > 0 && totalBytesSent2 <= MAX_BYTES_SENT);
    }

    private void publishTrade(String str) {
        this.publisher.publishEvents(Collections.singletonList(new Trade(str)));
    }

    private long getTotalBytesSent(QDEndpoint qDEndpoint) {
        long j = 0;
        Iterator it = qDEndpoint.getConnectors().iterator();
        while (it.hasNext()) {
            j += ((MessageConnector) it.next()).retrieveCompleteEndpointStats().getWrittenBytes();
        }
        return j;
    }
}
