package com.xiaoenai.app.data.ntcp;

import android.app.Application;
import android.os.Handler;
import android.os.HandlerThread;
import android.util.Log;
import com.mzd.lib.http.BizException;
import com.mzd.lib.log.LogUtil;
import com.tencent.bugly.crashreport.CrashReport;
import com.xiaoenai.app.data.net.nchat.MessageApi;
import com.xiaoenai.app.data.net.nchat.entity.EmptyV2;
import com.xiaoenai.app.data.net.nchat.entity.GetMaxRecvSeqReplyV2;
import com.xiaoenai.app.data.net.nchat.entity.MsgV2;
import com.xiaoenai.app.data.net.nchat.entity.RecvMsgsReplyV2;
import com.xiaoenai.app.data.rxjava.DefaultErrorSubscriber;
import com.xiaoenai.app.utils.cachestore.CacheManager;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import rx.Completable;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.exceptions.CompositeException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

/* loaded from: classes9.dex */
public class NTcpManager {
    public static boolean DEBUG = false;
    public static final String TAG = "NTcpManager";
    private static MsgV2 currentPush;
    private static Handler handler;
    private static volatile long maxLocalSeq;
    private static volatile long maxRemoteSeq;
    private static NewPushDispatcher pushDispatcher;
    private static Subscription syncAllDataSubscription;
    private static Subscription syncMissMsgSubscription;
    private static Subscription syncRemoteSeqSubscription;
    public static BlockingQueue<MsgV2> xTcpPushBlockingQueue = new LinkedBlockingQueue();
    private static String LOCAL_MAX_RECV_SEQ_KEY = "LOCAL_MAX_RECV_SEQ_V2";
    private static long lastUploadMaxSeq = 0;
    private static long lastUploadMaxSeqTs = System.currentTimeMillis();
    private static int xtcpConnectionState = 0;
    private static volatile boolean networkIsAvailable = false;
    private static ReentrantLock workerLock = new ReentrantLock();
    private static Condition finishSyncMissMsg = workerLock.newCondition();
    private static volatile boolean hasSyncRemoteSeq = false;
    private static volatile boolean remoteSeqSyncing = false;
    private static boolean remoteMsgSyncing = false;
    private static HandlerThread handlerThread = new HandlerThread("NewSyncMissMsgThread");
    private static MessageApi messageApi = new MessageApi();

    /* loaded from: classes9.dex */
    public interface ConnectionState {
        public static final int CONNECTED = 2;
        public static final int CONNECTING = 1;
        public static final int DISCONNECTED = 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes9.dex */
    public static class DefaultErrorObserver<T> extends Subscriber<T> implements Observer<T> {
        private DefaultErrorObserver() {
        }

        @Override // rx.Observer
        public void onCompleted() {
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            boolean z = th instanceof RuntimeException;
            if (z) {
                CrashReport.postCatchedException(th);
            }
            if (NTcpManager.DEBUG) {
                Log.e("DefaultErrorObserver", "onError", th);
                if (z) {
                    if (!(th instanceof CompositeException)) {
                        throw new RuntimeException(th);
                    }
                    for (Throwable th2 : ((CompositeException) th).getExceptions()) {
                        if ((th2 instanceof RuntimeException) || (th2 instanceof Error)) {
                            throw new RuntimeException(th);
                        }
                    }
                }
            }
        }

        @Override // rx.Observer
        public void onNext(T t) {
        }
    }

    public static void forceSyncRemoteSeq() {
        syncRemoteSeqWhenNetAvailable();
    }

    public static int getXtcpConnectionState() {
        return xtcpConnectionState;
    }

    public static void init(Application application) {
        LogUtil.d("ntcp 初始化", new Object[0]);
        startProcessPushWorkerThread();
        handlerThread.start();
        handler = new Handler(handlerThread.getLooper());
    }

    private static boolean isNeedSyncMissMsg() {
        LogUtil.d("ntcp 本地最大序号seq={} 线上最大序号seq={}", Long.valueOf(maxLocalSeq), Long.valueOf(maxRemoteSeq));
        return maxLocalSeq < maxRemoteSeq;
    }

    public static boolean isNetworkIsAvailable() {
        return networkIsAvailable;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$sendSyncMissMsgRequest$2(long j, long j2, int i, RecvMsgsReplyV2 recvMsgsReplyV2) {
        boolean z;
        List<Long> recv_seqs = recvMsgsReplyV2.getRecv_seqs();
        List<MsgV2> msgs = recvMsgsReplyV2.getMsgs();
        LogUtil.d("拉漏聊天消息 remotePushSeq = {} localRecvSeq = {} seqList = {},  msgList = {}", Long.valueOf(j), Long.valueOf(j2), recv_seqs, msgs);
        if (recv_seqs != null) {
            z = false;
            for (int i2 = 0; i2 < recv_seqs.size(); i2++) {
                recv_seqs.get(i2);
                boolean dispatchNTcpPush = pushDispatcher.dispatchNTcpPush(msgs.get(i2));
                if (!z && dispatchNTcpPush) {
                    z = true;
                }
            }
        } else {
            z = false;
        }
        updateLocalMaxSeq(j2 + i);
        if (z) {
            uploadMaxRecvSeq();
        } else {
            uploadLocalMaxSeqIfNeed();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$sendSyncRemoteSeqRequest$1(GetMaxRecvSeqReplyV2 getMaxRecvSeqReplyV2) {
        hasSyncRemoteSeq = true;
        LogUtil.d("ntcp sendSyncRemoteSeqRequest Success. 应答remoteLocal = {} 最大remoteReal = {} 本地maxLocalSeq = {}", Long.valueOf(getMaxRecvSeqReplyV2.getAck_seq()), Long.valueOf(getMaxRecvSeqReplyV2.getReceived_seq()), Long.valueOf(maxLocalSeq));
        if (maxLocalSeq == -1) {
            LogUtil.d("ntcp sendSyncRemoteSeqRequest maxRecvSeqReply = {}", getMaxRecvSeqReplyV2);
            updateLocalMaxSeq(getMaxRecvSeqReplyV2.getAck_seq());
            updateRemoteMaxSeq(getMaxRecvSeqReplyV2.getReceived_seq());
        } else if (getMaxRecvSeqReplyV2.getAck_seq() <= getMaxRecvSeqReplyV2.getReceived_seq()) {
            long ack_seq = getMaxRecvSeqReplyV2.getAck_seq();
            if (maxLocalSeq < ack_seq) {
                LogUtil.d("ntcp sendSyncRemoteSeqRequest localInServer > maxLocalSeq : 有另一个设备获取了一些推送，并更新了remoteLocalsSeq", new Object[0]);
                saveMaxLocalRecvSeq(maxLocalSeq, getMaxRecvSeqReplyV2.getAck_seq());
                updateRemoteMaxSeq(getMaxRecvSeqReplyV2.getReceived_seq());
            } else if (maxLocalSeq > ack_seq) {
                if (maxLocalSeq < getMaxRecvSeqReplyV2.getReceived_seq()) {
                    LogUtil.d("ntcp sendSyncRemoteSeqRequest getMaxRecvSeqReply.getReceivedSeq() > maxLocalSeq : 以当前设备为准，并拉取服务器遗漏的信息", new Object[0]);
                    updateRemoteMaxSeq(getMaxRecvSeqReplyV2.getReceived_seq());
                } else if (maxLocalSeq > getMaxRecvSeqReplyV2.getReceived_seq()) {
                    LogUtil.d("ntcp sendSyncRemoteSeqRequest getMaxRecvSeqReply.getReceivedSeq() < maxLocalSeq : 本地无效，以远程为准", new Object[0]);
                    updateLocalMaxSeq(getMaxRecvSeqReplyV2.getAck_seq());
                    updateRemoteMaxSeq(getMaxRecvSeqReplyV2.getReceived_seq());
                } else {
                    LogUtil.d("ntcp sendSyncRemoteSeqRequest getMaxRecvSeqReply.getReceivedSeq() == maxLocalSeq : 没有漏消息", new Object[0]);
                    uploadMaxRecvSeq();
                }
            } else if (maxLocalSeq != getMaxRecvSeqReplyV2.getReceived_seq()) {
                LogUtil.d("ntcp sendSyncRemoteSeqRequest getMaxRecvSeqReply.getAckSeq() == maxLocalSeq : 直接获取MissMsg", new Object[0]);
                updateRemoteMaxSeq(getMaxRecvSeqReplyV2.getReceived_seq());
            } else {
                LogUtil.d("ntcp sendSyncRemoteSeqRequest getMaxRecvSeqReply.getAckSeq() == maxLocalSeq == getReceivedSeq : 没有漏消息", new Object[0]);
            }
        } else {
            LogUtil.d("ntcp sendSyncRemoteSeqRequest getMaxRecvSeqReply.getAckSeq() == maxLocalSeq : 无效的local值，以real作为local", new Object[0]);
            long received_seq = getMaxRecvSeqReplyV2.getReceived_seq();
            LogUtil.d("ntcp 上报 seq uploadMaxRecvSeq localInServer:{}", Long.valueOf(received_seq));
            messageApi.uploadMaxRecvSeq(received_seq).subscribe(new DefaultErrorSubscriber());
            updateLocalMaxSeq(received_seq);
            updateRemoteMaxSeq(received_seq);
        }
        LogUtil.d("ntcp lastUploadMaxSeq lastUploadMaxSeq:{} lastUploadMaxSeq:{}", Long.valueOf(lastUploadMaxSeq), Long.valueOf(maxLocalSeq));
        lastUploadMaxSeq = maxLocalSeq;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static /* synthetic */ void lambda$startProcessPushWorkerThread$0() {
        ReentrantLock reentrantLock;
        while (true) {
            try {
                LogUtil.d("ntcp blockingQueue\u3000开始等待", new Object[0]);
                currentPush = xTcpPushBlockingQueue.take();
                LogUtil.d("ntcp blockingQueue\u3000结束等待", new Object[0]);
                updateRemoteMaxSeq(currentPush.getSeq() - 1);
                try {
                    try {
                        workerLock.lock();
                        LogUtil.d("ntcp 是否需要拉取丢失消息：{}", Boolean.valueOf(isNeedSyncMissMsg()));
                        while (isNeedSyncMissMsg()) {
                            sendSyncMissMsgRequest();
                            LogUtil.d("ntcp sendSyncMissMsgRequest await", new Object[0]);
                            finishSyncMissMsg.await();
                        }
                        if (currentPush == null) {
                            LogUtil.d("ntcp continue跳出当前循环", new Object[0]);
                            reentrantLock = workerLock;
                        } else {
                            LogUtil.d("ntcp getUserSeq {} 本地seq {}", Integer.valueOf(currentPush.getSeq()), Long.valueOf(maxLocalSeq));
                            if (currentPush.getSeq() == 0) {
                                LogUtil.d("ntcp 不含有UserSeq的推送消息", new Object[0]);
                                if (pushDispatcher.dispatchNTcpPush(currentPush)) {
                                    uploadMaxRecvSeq();
                                } else {
                                    uploadLocalMaxSeqIfNeed();
                                }
                            } else if (currentPush.getSeq() == maxLocalSeq + 1) {
                                LogUtil.d("ntcp pushSeq == maxLocalSeq + 1 : 新的消息", new Object[0]);
                                boolean dispatchNTcpPush = pushDispatcher.dispatchNTcpPush(currentPush);
                                updateLocalMaxSeq(maxLocalSeq + 1);
                                if (dispatchNTcpPush) {
                                    uploadMaxRecvSeq();
                                } else {
                                    uploadLocalMaxSeqIfNeed();
                                }
                            } else if (currentPush.getSeq() <= maxLocalSeq) {
                                LogUtil.d("ntcp pushSeq <= maxLocalSeq : 过期的消息，忽略", new Object[0]);
                            } else {
                                LogUtil.d("ntcp pushSeq > maxLocalSeq + 1 : 可能遗漏部分新消息，需要通过漏消息逻辑拉取", new Object[0]);
                            }
                            reentrantLock = workerLock;
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        reentrantLock = workerLock;
                    }
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    workerLock.unlock();
                    throw th;
                }
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processSyncMissMsgRequest() {
        if (isNetworkIsAvailable()) {
            LogUtil.d("ntcp processSyncMissMsgRequest 1 拉漏 {}", Boolean.valueOf(hasSyncRemoteSeq));
            if (!hasSyncRemoteSeq) {
                if (remoteSeqSyncing) {
                    return;
                }
                remoteSeqSyncing = true;
                syncRemoteSeqSubscription = sendSyncRemoteSeqRequest().subscribe((Subscriber<? super GetMaxRecvSeqReplyV2>) new DefaultErrorObserver<GetMaxRecvSeqReplyV2>() { // from class: com.xiaoenai.app.data.ntcp.NTcpManager.2
                    @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
                    public void onError(Throwable th) {
                        LogUtil.e("ntcp sendSyncRemoteSeqRequest Failed {}", th.getMessage());
                        boolean unused = NTcpManager.remoteSeqSyncing = false;
                        NTcpManager.retrySyncMissMsg(th);
                        super.onError(th);
                    }

                    @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
                    public void onNext(GetMaxRecvSeqReplyV2 getMaxRecvSeqReplyV2) {
                        super.onNext((AnonymousClass2) getMaxRecvSeqReplyV2);
                        LogUtil.d("ntcp getMaxRecvSeqReply {}", getMaxRecvSeqReplyV2);
                        boolean unused = NTcpManager.remoteSeqSyncing = false;
                        NTcpManager.sendSyncMissMsgRequest();
                    }
                });
                return;
            }
            if (!isNeedSyncMissMsg()) {
                wakePushProcessThread();
                return;
            }
            LogUtil.d("ntcp processSyncMissMsgRequest 3 isNeedSyncMissMsg 拉漏聊天消息 {}", Boolean.valueOf(remoteMsgSyncing));
            if (remoteMsgSyncing) {
                return;
            }
            remoteMsgSyncing = true;
            syncMissMsgSubscription = sendSyncMissMsgRequest(maxLocalSeq, maxRemoteSeq).subscribe((Subscriber<? super RecvMsgsReplyV2>) new DefaultErrorObserver<RecvMsgsReplyV2>() { // from class: com.xiaoenai.app.data.ntcp.NTcpManager.3
                @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
                public void onCompleted() {
                    super.onCompleted();
                    LogUtil.d("ntcp processSyncMissMsgRequest 3 isNeedSyncMissMsg 拉漏聊天消息", new Object[0]);
                    boolean unused = NTcpManager.remoteMsgSyncing = false;
                    NTcpManager.sendSyncMissMsgRequest();
                }

                @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
                public void onError(Throwable th) {
                    boolean unused = NTcpManager.remoteMsgSyncing = false;
                    NTcpManager.retrySyncMissMsg(th);
                    super.onError(th);
                }
            });
        }
    }

    public static void registerNTcp() {
        maxLocalSeq = CacheManager.getUserCacheStore().getLong(LOCAL_MAX_RECV_SEQ_KEY, -1L);
        lastUploadMaxSeq = maxLocalSeq;
    }

    private static void resetMaxLocalRecvSeq() {
        CacheManager.getUserCacheStore().save(LOCAL_MAX_RECV_SEQ_KEY, -1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void retrySyncMissMsg(Throwable th) {
        if (isNetworkIsAvailable()) {
            sendSyncMissMsgRequestWithDelay(th instanceof BizException ? ((BizException) th).getErrorBean().getCode() / 100 == 5 ? 60 : 5 : 0);
        }
    }

    private static synchronized boolean saveMaxLocalRecvSeq(long j, long j2) {
        synchronized (NTcpManager.class) {
            LogUtil.d("ntcp saveMaxLocalRecvSeq expectMaxSeq:{} updateMaxSeq:{}", Long.valueOf(j), Long.valueOf(j2));
            if (j != maxLocalSeq) {
                LogUtil.d("ntcp saveMaxSeq failed current = {}, expect = {}, update = {}, retry!", Long.valueOf(maxLocalSeq), Long.valueOf(j), Long.valueOf(j2));
                return false;
            }
            maxLocalSeq = j2;
            CacheManager.getUserCacheStore().save(LOCAL_MAX_RECV_SEQ_KEY, maxLocalSeq);
            LogUtil.d("ntcp saveMaxSeq success. maxLocalSeq = {}", Long.valueOf(maxLocalSeq));
            return true;
        }
    }

    private static Observable<RecvMsgsReplyV2> sendSyncMissMsgRequest(final long j, final long j2) {
        int i = (int) (j2 - j);
        LogUtil.d("limit = {}", Integer.valueOf(i));
        LogUtil.d("sendSyncMissMsgRequest isNeedSyncMissMsg 拉漏聊天消息 localRecvSeq:{} remotePushSeq:{} limit:{}", Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i));
        final int i2 = i > 20 ? 20 : i;
        LogUtil.d("localRecvSeq:{} limit:{}", Long.valueOf(j), Integer.valueOf(i2));
        return messageApi.getRecvMsg(j, i2).observeOn(AndroidSchedulers.from(handlerThread.getLooper())).doOnNext(new Action1() { // from class: com.xiaoenai.app.data.ntcp.-$$Lambda$NTcpManager$wwQxgFdPYVuQes2y9PFubfjCi9c
            @Override // rx.functions.Action1
            public final void call(Object obj) {
                NTcpManager.lambda$sendSyncMissMsgRequest$2(j2, j, i2, (RecvMsgsReplyV2) obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sendSyncMissMsgRequest() {
        handler.post($$Lambda$NTcpManager$UVLRF2gwmph5pBMLDv38uV0D2F4.INSTANCE);
    }

    private static void sendSyncMissMsgRequestWithDelay(int i) {
        handler.postDelayed($$Lambda$NTcpManager$UVLRF2gwmph5pBMLDv38uV0D2F4.INSTANCE, i * 1000);
    }

    private static Observable<GetMaxRecvSeqReplyV2> sendSyncRemoteSeqRequest() {
        LogUtil.d("ntcp sendSyncRemoteSeqRequest Start", new Object[0]);
        return messageApi.queryMaxRecv().observeOn(AndroidSchedulers.from(handlerThread.getLooper())).doOnNext(new Action1() { // from class: com.xiaoenai.app.data.ntcp.-$$Lambda$NTcpManager$JeSuWsNXiZY07T6ZsFhAiltfEFg
            @Override // rx.functions.Action1
            public final void call(Object obj) {
                NTcpManager.lambda$sendSyncRemoteSeqRequest$1((GetMaxRecvSeqReplyV2) obj);
            }
        });
    }

    public static void setNetworkIsAvailable(boolean z) {
        LogUtil.d("current networkAvailable = {}, networkIsAvailable = {}", Boolean.valueOf(networkIsAvailable), Boolean.valueOf(z));
        if (networkIsAvailable != z) {
            networkIsAvailable = z;
            syncRemoteSeqWhenNetAvailable();
        }
    }

    public static void setPushDispatcher(NewPushDispatcher newPushDispatcher) {
        pushDispatcher = newPushDispatcher;
    }

    public static void setXtcpConnectionState(int i) {
        LogUtil.d("ntcp 连接状态：{}", Integer.valueOf(i));
        xtcpConnectionState = i;
        if (i > 0) {
            setNetworkIsAvailable(true);
        } else {
            setNetworkIsAvailable(false);
        }
    }

    private static synchronized void startProcessPushWorkerThread() {
        synchronized (NTcpManager.class) {
            Completable.fromAction(new Action0() { // from class: com.xiaoenai.app.data.ntcp.-$$Lambda$NTcpManager$C2t0LFi6PuPZB-dRxRjbs_x8IkU
                @Override // rx.functions.Action0
                public final void call() {
                    NTcpManager.lambda$startProcessPushWorkerThread$0();
                }
            }).subscribeOn(Schedulers.io()).toObservable().subscribe((Subscriber) new DefaultErrorObserver<Object>() { // from class: com.xiaoenai.app.data.ntcp.NTcpManager.1
                @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
                public void onCompleted() {
                    super.onCompleted();
                }

                @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
                public void onError(Throwable th) {
                    super.onError(th);
                }
            });
        }
    }

    private static void syncRemoteSeqWhenNetAvailable() {
        if (isNetworkIsAvailable()) {
            LogUtil.d("ntcp syncRemoteSeqWhenNetAvailable 拉漏", new Object[0]);
            hasSyncRemoteSeq = false;
            sendSyncMissMsgRequest();
        }
    }

    private static void unSubscriptionWhenUnregister() {
        Subscription subscription = syncAllDataSubscription;
        if (subscription != null) {
            subscription.unsubscribe();
        }
        Subscription subscription2 = syncRemoteSeqSubscription;
        if (subscription2 != null) {
            subscription2.unsubscribe();
            remoteSeqSyncing = false;
        }
        Subscription subscription3 = syncMissMsgSubscription;
        if (subscription3 != null) {
            subscription3.unsubscribe();
            remoteMsgSyncing = false;
        }
    }

    public static void unregisterNTcp() {
        try {
            try {
                workerLock.lock();
                xTcpPushBlockingQueue.clear();
                currentPush = null;
                hasSyncRemoteSeq = false;
                maxRemoteSeq = 0L;
                maxLocalSeq = -1L;
                resetMaxLocalRecvSeq();
                unSubscriptionWhenUnregister();
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            workerLock.unlock();
        }
    }

    private static void updateLocalMaxSeq(long j) {
        long j2;
        LogUtil.d("ntcp updateLocalMaxSeq update:{}", Long.valueOf(j));
        do {
            j2 = maxLocalSeq;
            if (j <= j2) {
                return;
            }
        } while (!saveMaxLocalRecvSeq(j2, j));
    }

    private static void updateRemoteMaxSeq(long j) {
        LogUtil.d("ntcp 更新远程seq：{} {}", Long.valueOf(maxRemoteSeq), Long.valueOf(j));
        try {
            workerLock.lock();
            if (maxRemoteSeq < j) {
                maxRemoteSeq = j;
            }
        } finally {
            workerLock.unlock();
        }
    }

    private static void uploadLocalMaxSeqIfNeed() {
        if (hasSyncRemoteSeq) {
            if (maxLocalSeq - lastUploadMaxSeq > 20 || System.currentTimeMillis() - lastUploadMaxSeqTs > 600000) {
                uploadMaxRecvSeq();
            }
        }
    }

    public static void uploadMaxRecvSeq() {
        LogUtil.d("上报 seq uploadMaxRecvSeq:{}", Long.valueOf(maxLocalSeq));
        final long j = maxLocalSeq;
        messageApi.uploadMaxRecvSeq(j).subscribeOn(Schedulers.io()).subscribe((Subscriber<? super EmptyV2>) new DefaultErrorObserver<EmptyV2>() { // from class: com.xiaoenai.app.data.ntcp.NTcpManager.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // com.xiaoenai.app.data.ntcp.NTcpManager.DefaultErrorObserver, rx.Observer
            public void onCompleted() {
                super.onCompleted();
                long unused = NTcpManager.lastUploadMaxSeqTs = System.currentTimeMillis();
                long unused2 = NTcpManager.lastUploadMaxSeq = j;
            }
        });
    }

    private static void wakePushProcessThread() {
        LogUtil.d("ntcp 获取当前锁，唤醒其他锁", new Object[0]);
        try {
            workerLock.lock();
            finishSyncMissMsg.signalAll();
            LogUtil.d("ntcp 释放锁", new Object[0]);
            workerLock.unlock();
        } catch (Throwable th) {
            LogUtil.d("ntcp 释放锁", new Object[0]);
            workerLock.unlock();
            throw th;
        }
    }
}
