package com.zhulong.eduvideo.module_video.view.cc.view.live;

import android.os.Handler;
import android.os.Looper;
import com.baidu.mobstat.Config;
import com.orhanobut.logger.Logger;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.TimeUnit;

/* loaded from: classes2.dex */
public class RxJavaSocketUtil {
    private static final int HEARTBEAT_INTERVAL = 10;
    private static final int MAX_RETRY_TIMES = 30;
    private Disposable disposable;
    private Disposable disposable2;
    private Handler handler;
    private String host;
    private boolean isConnected;
    private OnSocketConnectionListener onSocketConnectionListener;
    private OnSocketMessageListener onSocketMessageListener;
    private int port;
    private BufferedReader reader;
    private int retryTimes;
    private Socket socket;
    private BufferedWriter writer;

    /* loaded from: classes2.dex */
    public interface OnSocketConnectionListener {
        void onConnected();

        void onDisconnected(boolean z);
    }

    /* loaded from: classes2.dex */
    public interface OnSocketMessageListener {
        void onMessage(String str);
    }

    /* loaded from: classes2.dex */
    private static class SingletonHolder {
        private static final RxJavaSocketUtil INSTANCE = new RxJavaSocketUtil();

        private SingletonHolder() {
        }
    }

    private RxJavaSocketUtil() {
        this.handler = new Handler(Looper.getMainLooper());
    }

    private void connectInternal() {
        Logger.v("Connecting to " + this.host + Config.TRACE_TODAY_VISIT_SPLIT + this.port + "  【剩余重连次数】：" + this.retryTimes, new Object[0]);
        this.disposable = Observable.create(new ObservableOnSubscribe() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$Yy-A-6ZHeswzaj_3cTHD_1a8ps8
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                RxJavaSocketUtil.this.lambda$connectInternal$0$RxJavaSocketUtil(observableEmitter);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$Fu72_Y2he-NO10w-w4y2HsGNgU4
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                RxJavaSocketUtil.this.lambda$connectInternal$1$RxJavaSocketUtil(obj);
            }
        }, new Consumer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$d07OtQK8NlGg4G6BMRKEOcjMGgQ
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                RxJavaSocketUtil.this.lambda$connectInternal$2$RxJavaSocketUtil((Throwable) obj);
            }
        });
    }

    public static RxJavaSocketUtil getInstance() {
        return SingletonHolder.INSTANCE;
    }

    private void startHeartbeat() {
        this.disposable = Observable.interval(10L, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Consumer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$a9Y-QwjeqpmOhPQ2_ULOc79NQ4E
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                RxJavaSocketUtil.this.lambda$startHeartbeat$3$RxJavaSocketUtil((Long) obj);
            }
        }, new Consumer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$2Gj6E5RmQVmleeaEOCFXZmft4dA
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                RxJavaSocketUtil.this.lambda$startHeartbeat$4$RxJavaSocketUtil((Throwable) obj);
            }
        });
    }

    private void startReading() {
        this.disposable = Observable.create(new ObservableOnSubscribe() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$xKrQVJ2ERZtXH14LynNfQr72S4A
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                RxJavaSocketUtil.this.lambda$startReading$9$RxJavaSocketUtil(observableEmitter);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe();
    }

    public void connect(String str, int i) {
        if (this.isConnected) {
            Logger.v("Socket is already connected.", new Object[0]);
            return;
        }
        this.host = str;
        this.port = i;
        this.retryTimes = 0;
        connectInternal();
    }

    public void disconnect() {
        if (!this.isConnected) {
            Logger.v("Socket is not connected.", new Object[0]);
            return;
        }
        this.isConnected = false;
        this.disposable.dispose();
        try {
            this.socket.close();
            if (this.writer != null) {
                this.writer.close();
            }
            if (this.reader != null) {
                this.reader.close();
            }
            Logger.v("Socket disconnected.", new Object[0]);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public /* synthetic */ void lambda$connectInternal$0$RxJavaSocketUtil(ObservableEmitter observableEmitter) throws Exception {
        try {
            this.socket = new Socket();
            this.socket.connect(new InetSocketAddress(this.host, this.port), 5000);
            this.socket.setTcpNoDelay(true);
            this.socket.setKeepAlive(true);
            this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            this.writer = new BufferedWriter(new OutputStreamWriter(this.socket.getOutputStream()));
            this.isConnected = true;
            this.retryTimes = 0;
            Logger.v("Socket connected.", new Object[0]);
            observableEmitter.onNext(true);
        } catch (IOException e) {
            Logger.v("Socket connection error: " + e.getMessage(), new Object[0]);
            this.isConnected = false;
            int i = this.retryTimes;
            if (i >= 30) {
                Logger.v("Max retry times reached, failed to connect.", new Object[0]);
                observableEmitter.onError(e);
            } else {
                this.retryTimes = i + 1;
                Logger.v("Retry connecting...", new Object[0]);
                connectInternal();
            }
        }
    }

    public /* synthetic */ void lambda$connectInternal$1$RxJavaSocketUtil(Object obj) throws Exception {
        OnSocketConnectionListener onSocketConnectionListener = this.onSocketConnectionListener;
        if (onSocketConnectionListener != null) {
            onSocketConnectionListener.onConnected();
        }
        startHeartbeat();
        startReading();
    }

    public /* synthetic */ void lambda$connectInternal$2$RxJavaSocketUtil(Throwable th) throws Exception {
        OnSocketConnectionListener onSocketConnectionListener = this.onSocketConnectionListener;
        if (onSocketConnectionListener != null) {
            onSocketConnectionListener.onDisconnected(false);
        }
    }

    public /* synthetic */ void lambda$null$8$RxJavaSocketUtil(String str) {
        this.onSocketMessageListener.onMessage(str);
    }

    public /* synthetic */ void lambda$sendMessage$5$RxJavaSocketUtil(String str, ObservableEmitter observableEmitter) throws Exception {
        if (this.isConnected) {
            this.writer.write(str);
            this.writer.flush();
        }
        observableEmitter.onNext(true);
    }

    public /* synthetic */ void lambda$sendMessage$7$RxJavaSocketUtil(Throwable th) throws Exception {
        this.retryTimes++;
        connectInternal();
    }

    public /* synthetic */ void lambda$startHeartbeat$3$RxJavaSocketUtil(Long l) throws Exception {
        if (!this.isConnected) {
            this.retryTimes++;
            connectInternal();
        } else {
            Logger.v("Socket Sending heartbeat...", new Object[0]);
            this.writer.write("heartbeat");
            this.writer.flush();
        }
    }

    public /* synthetic */ void lambda$startHeartbeat$4$RxJavaSocketUtil(Throwable th) throws Exception {
        this.retryTimes++;
        connectInternal();
        Logger.v("Socket Heartbeat error: " + th.getMessage(), new Object[0]);
    }

    public /* synthetic */ void lambda$startReading$9$RxJavaSocketUtil(ObservableEmitter observableEmitter) throws Exception {
        try {
            this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            while (true) {
                final String readLine = this.reader.readLine();
                if (readLine == null) {
                    return;
                }
                Logger.v("Received message: " + readLine, new Object[0]);
                if (this.onSocketMessageListener != null) {
                    this.handler.post(new Runnable() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$qCg429h7XmtU2IRm2BVk5xRMm5E
                        @Override // java.lang.Runnable
                        public final void run() {
                            RxJavaSocketUtil.this.lambda$null$8$RxJavaSocketUtil(readLine);
                        }
                    });
                }
            }
        } catch (IOException e) {
            Logger.v("Socket read error: " + e.getMessage(), new Object[0]);
            this.isConnected = false;
            OnSocketConnectionListener onSocketConnectionListener = this.onSocketConnectionListener;
            if (onSocketConnectionListener != null) {
                onSocketConnectionListener.onDisconnected(true);
            }
        }
    }

    public void sendMessage(final String str) {
        if (!this.isConnected) {
            throw new RuntimeException("Socket is not connected.");
        }
        Disposable subscribe = Observable.create(new ObservableOnSubscribe() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$Jeq3aPJjrQBqacVTJKJxK8zdcC4
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                RxJavaSocketUtil.this.lambda$sendMessage$5$RxJavaSocketUtil(str, observableEmitter);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$Q_Sk9XzGz6CfUPtHJj_a2ZrWlJM
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                Logger.v("发送消息状态：" + obj, new Object[0]);
            }
        }, new Consumer() { // from class: com.zhulong.eduvideo.module_video.view.cc.view.live.-$$Lambda$RxJavaSocketUtil$U4iyVM6R-_j5K5qZSyDBvHoHE30
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                RxJavaSocketUtil.this.lambda$sendMessage$7$RxJavaSocketUtil((Throwable) obj);
            }
        });
        this.disposable = subscribe;
        this.disposable2 = subscribe;
    }

    public void setOnSocketConnectionListener(OnSocketConnectionListener onSocketConnectionListener) {
        this.onSocketConnectionListener = onSocketConnectionListener;
    }

    public void setOnSocketMessageListener(OnSocketMessageListener onSocketMessageListener) {
        this.onSocketMessageListener = onSocketMessageListener;
    }
}
