package com.edu.classroom.message.repo.datasource;

import androidx.lifecycle.LiveData;
import androidx.lifecycle.u;
import com.bytedance.boost_multidex.Constants;
import com.edu.classroom.base.log.CommonLog;
import com.edu.classroom.base.network.IRetrofit;
import com.edu.classroom.base.rxjava.RxjavaExKt;
import com.edu.classroom.base.sdkmonitor.ESDKMonitor;
import com.edu.classroom.base.settings.ClassroomSettingsManager;
import com.edu.classroom.channel.api.MessageLog;
import com.edu.classroom.channel.api.model.ClassroomMessage;
import com.edu.classroom.message.MessageProcessor;
import com.edu.classroom.message.MsgFetchException;
import com.edu.classroom.message.MsgParseException;
import com.edu.classroom.message.MsgPersistException;
import com.edu.classroom.message.NoStatusMsgException;
import com.edu.classroom.message.repo.cache.MessageCache;
import com.edu.classroom.message.repo.cache.PlaybackChatCache;
import com.edu.classroom.message.repo.cache.PlaybackMessageCache;
import com.edu.classroom.message.repo.db.dao.PlaybackInfoDao;
import com.edu.classroom.message.repo.db.dao.PlaybackMessageDao;
import com.edu.classroom.message.repo.db.entity.MessageEntity;
import com.edu.classroom.message.repo.db.entity.PlaybackInfoEntity;
import com.edu.classroom.message.repo.fetcher.PlaybackChatFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageDbFetcher;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageDbFetcherKt;
import com.edu.classroom.message.repo.fetcher.PlaybackMessageNetworkFetcher;
import com.edu.classroom.message.repo.model.ChatInfoBlock;
import com.edu.classroom.playback.utils.PlaybackLog;
import com.meituan.robust.ChangeQuickRedirect;
import com.meituan.robust.PatchProxy;
import com.meituan.robust.PatchProxyResult;
import com.squareup.wire.ProtoReader;
import edu.classroom.channel.ChannelMessage;
import io.reactivex.Observable;
import io.reactivex.b;
import io.reactivex.c;
import io.reactivex.e;
import io.reactivex.functions.f;
import io.reactivex.q;
import io.reactivex.r;
import io.reactivex.s;
import io.reactivex.schedulers.a;
import java.io.InputStream;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import kotlin.Lazy;
import kotlin.Metadata;
import kotlin.h;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Lambda;
import kotlin.jvm.internal.aa;
import kotlin.jvm.internal.n;
import kotlin.jvm.internal.y;
import kotlin.jvm.internal.z;
import kotlin.reflect.KProperty;
import kotlin.w;
import okio.Okio;
import org.json.JSONObject;

/* compiled from: PlaybackMessageDataSourceImpl.kt */
@Metadata(bv = {1, 0, 3}, d1 = {"\u0000À\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000b\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\"\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0006\n\u0002\u0018\u0002\n\u0002\u0010 \n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\t\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\r\u0018\u00002\u00020\u0001B#\b\u0007\u0012\b\b\u0001\u0010\u0002\u001a\u00020\u0003\u0012\b\b\u0001\u0010\u0004\u001a\u00020\u0003\u0012\u0006\u0010\u0005\u001a\u00020\u0006¢\u0006\u0002\u0010\u0007J\u0010\u00109\u001a\u00020:2\u0006\u0010;\u001a\u00020<H\u0016J\u0010\u0010=\u001a\u00020:2\u0006\u0010;\u001a\u00020<H\u0016J\u000e\u0010>\u001a\u00020:2\u0006\u0010;\u001a\u00020<J\u000e\u0010?\u001a\u00020:2\u0006\u0010;\u001a\u00020<J\u000e\u0010@\u001a\u00020:2\u0006\u0010;\u001a\u00020<J\u000e\u0010A\u001a\u00020:2\u0006\u0010;\u001a\u00020<J$\u0010B\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020E0D0C2\u0006\u0010F\u001a\u00020G2\u0006\u0010H\u001a\u00020GH\u0016J$\u0010I\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020E0D0C2\u0006\u0010H\u001a\u00020G2\u0006\u0010\r\u001a\u00020JH\u0002J\u0016\u0010K\u001a\u00020L2\f\u0010M\u001a\b\u0012\u0004\u0012\u00020N0DH\u0016J\b\u0010O\u001a\u00020:H\u0016J \u0010P\u001a\u00020:2\u0006\u0010Q\u001a\u00020R2\u0006\u0010;\u001a\u00020<2\u0006\u0010S\u001a\u00020\nH\u0002J\u001e\u0010T\u001a\u0010\u0012\f\u0012\n W*\u0004\u0018\u00010V0V0U2\u0006\u0010X\u001a\u00020RH\u0002J0\u0010Y\u001a\u001c\u0012\u0018\u0012\u0016\u0012\u0004\u0012\u00020Z W*\n\u0012\u0004\u0012\u00020Z\u0018\u00010D0D0U2\f\u0010[\u001a\b\u0012\u0004\u0012\u00020Z0DH\u0002J\u0016\u0010\\\u001a\u00020:2\f\u0010[\u001a\b\u0012\u0004\u0012\u00020Z0DH\u0002J\u001c\u0010]\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020E0D0C2\u0006\u0010^\u001a\u00020GH\u0016J\u001e\u0010_\u001a\u0010\u0012\f\u0012\n W*\u0004\u0018\u00010Z0Z0C2\u0006\u0010`\u001a\u00020GH\u0002J\u001c\u0010a\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020Z0D0C2\u0006\u0010`\u001a\u00020GH\u0002J\b\u0010b\u001a\u00020LH\u0016J\u0010\u0010c\u001a\u00020L2\u0006\u0010;\u001a\u00020<H\u0002J\u0010\u0010d\u001a\u00020Z2\u0006\u0010e\u001a\u00020VH\u0002J\u0010\u0010f\u001a\u00020L2\u0006\u0010;\u001a\u00020<H\u0002R\u0017\u0010\b\u001a\b\u0012\u0004\u0012\u00020\n0\t¢\u0006\b\n\u0000\u001a\u0004\b\u000b\u0010\fR\u001b\u0010\r\u001a\u00020\u000e8BX\u0082\u0084\u0002¢\u0006\f\n\u0004\b\u0011\u0010\u0012\u001a\u0004\b\u000f\u0010\u0010R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082.¢\u0006\u0002\n\u0000R\u001e\u0010\u0015\u001a\u00020\u00168\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b\u0017\u0010\u0018\"\u0004\b\u0019\u0010\u001aR\u001e\u0010\u001b\u001a\u00020\u001c8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b\u001d\u0010\u001e\"\u0004\b\u001f\u0010 R\u001a\u0010!\u001a\b\u0012\u0004\u0012\u00020\n0\"X\u0096\u0004¢\u0006\b\n\u0000\u001a\u0004\b#\u0010$R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u0004¢\u0006\u0002\n\u0000R\u001e\u0010%\u001a\u00020&8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b'\u0010(\"\u0004\b)\u0010*R)\u0010+\u001a\r\u0012\t\u0012\u00070-¢\u0006\u0002\b.0,8\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b/\u00100\"\u0004\b1\u00102R\u001e\u00103\u001a\u0002048\u0006@\u0006X\u0087.¢\u0006\u000e\n\u0000\u001a\u0004\b5\u00106\"\u0004\b7\u00108R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n\u0000R\u000e\u0010\u0004\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n\u0000¨\u0006g"}, d2 = {"Lcom/edu/classroom/message/repo/datasource/PlaybackMessageDataSourceImpl;", "Lcom/edu/classroom/message/repo/datasource/PlaybackMessageDataSource;", "roomId", "", "userId", "messageNetworkFetcher", "Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageNetworkFetcher;", "(Ljava/lang/String;Ljava/lang/String;Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageNetworkFetcher;)V", "_messageLoadState", "Landroidx/lifecycle/MutableLiveData;", "", "get_messageLoadState", "()Landroidx/lifecycle/MutableLiveData;", "cache", "Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;", "getCache", "()Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;", "cache$delegate", "Lkotlin/Lazy;", "chatCache", "Lcom/edu/classroom/message/repo/cache/PlaybackChatCache;", "messageDao", "Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;", "getMessageDao", "()Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;", "setMessageDao", "(Lcom/edu/classroom/message/repo/db/dao/PlaybackMessageDao;)V", "messageDbFetcher", "Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;", "getMessageDbFetcher", "()Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;", "setMessageDbFetcher", "(Lcom/edu/classroom/message/repo/fetcher/PlaybackMessageDbFetcher;)V", "messageLoadState", "Landroidx/lifecycle/LiveData;", "getMessageLoadState", "()Landroidx/lifecycle/LiveData;", "playbackInfoDao", "Lcom/edu/classroom/message/repo/db/dao/PlaybackInfoDao;", "getPlaybackInfoDao", "()Lcom/edu/classroom/message/repo/db/dao/PlaybackInfoDao;", "setPlaybackInfoDao", "(Lcom/edu/classroom/message/repo/db/dao/PlaybackInfoDao;)V", "processors", "", "Lcom/edu/classroom/message/MessageProcessor;", "Lkotlin/jvm/JvmSuppressWildcards;", "getProcessors", "()Ljava/util/Set;", "setProcessors", "(Ljava/util/Set;)V", "retrofit", "Lcom/edu/classroom/base/network/IRetrofit;", "getRetrofit", "()Lcom/edu/classroom/base/network/IRetrofit;", "setRetrofit", "(Lcom/edu/classroom/base/network/IRetrofit;)V", "downloadRoomMessage", "Lio/reactivex/Completable;", "entity", "Lcom/edu/classroom/message/repo/db/entity/PlaybackInfoEntity;", "downloadSelfMessage", "fetchRoomMessages", "fetchRoomMessagesOld", "fetchSelfMessages", "fetchSelfMessagesOld", "getMessages", "Lio/reactivex/Single;", "", "Lcom/edu/classroom/channel/api/model/ClassroomMessage;", "start", "", "end", "getMessagesFromCache", "Lcom/edu/classroom/message/repo/cache/MessageCache;", "init", "", "chatInfoBlocks", "Lcom/edu/classroom/message/repo/model/ChatInfoBlock;", "onComplete", "parseAndPersistMessages", "r", "Ljava/io/InputStream;", "isFetchRoomMsg", "parseMessages", "Lio/reactivex/Observable;", "Ledu/classroom/channel/ChannelMessage;", "kotlin.jvm.PlatformType", "inputStream", "persistMessages", "Lcom/edu/classroom/message/repo/db/entity/MessageEntity;", "messages", "persistMessagesOld", "prefetchMessageToCache", Constants.KEY_TIME_STAMP, "queryLatestFsmMessage", "ts", "queryLatestMessage", "release", "savePlaybackInfoEntity", "transformToMessageEntity", "msg", "updateMessageState", "message_release"}, k = 1, mv = {1, 1, 16})
/* loaded from: classes9.dex */
public final class PlaybackMessageDataSourceImpl implements PlaybackMessageDataSource {

    /* renamed from: a, reason: collision with root package name */
    public static ChangeQuickRedirect f17251a;

    /* renamed from: b, reason: collision with root package name */
    static final /* synthetic */ KProperty[] f17252b = {aa.a(new y(aa.a(PlaybackMessageDataSourceImpl.class), "cache", "getCache()Lcom/edu/classroom/message/repo/cache/PlaybackMessageCache;"))};

    /* renamed from: c, reason: collision with root package name */
    public PlaybackMessageDao f17253c;

    /* renamed from: d, reason: collision with root package name */
    public PlaybackInfoDao f17254d;
    public PlaybackMessageDbFetcher e;
    public IRetrofit f;
    public Set<MessageProcessor> g;
    private final u<Boolean> h;
    private final LiveData<Boolean> i;
    private final Lazy j;
    private PlaybackChatCache k;
    private final String l;
    private final String m;
    private final PlaybackMessageNetworkFetcher n;

    public PlaybackMessageDataSourceImpl(String str, String str2, PlaybackMessageNetworkFetcher playbackMessageNetworkFetcher) {
        n.b(str, "roomId");
        n.b(str2, "userId");
        n.b(playbackMessageNetworkFetcher, "messageNetworkFetcher");
        this.l = str;
        this.m = str2;
        this.n = playbackMessageNetworkFetcher;
        u<Boolean> uVar = new u<>();
        uVar.b((u<Boolean>) Boolean.valueOf(!ClassroomSettingsManager.f13256b.c().getClassroomPlaybackSettings().getF13299a()));
        this.h = uVar;
        this.i = this.h;
        this.j = h.a((Function0) new PlaybackMessageDataSourceImpl$cache$2(this));
    }

    public static final /* synthetic */ MessageEntity a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, ChannelMessage channelMessage) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, channelMessage}, null, f17251a, true, 7490);
        return proxy.isSupported ? (MessageEntity) proxy.result : playbackMessageDataSourceImpl.a(channelMessage);
    }

    private final MessageEntity a(ChannelMessage channelMessage) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{channelMessage}, this, f17251a, false, 7479);
        if (proxy.isSupported) {
            return (MessageEntity) proxy.result;
        }
        String str = channelMessage.msg_id;
        n.a((Object) str, "msg_id");
        long parseLong = Long.parseLong(str);
        String str2 = channelMessage.msg_type;
        n.a((Object) str2, "msg_type");
        Long l = channelMessage.send_timestamp;
        n.a((Object) l, "send_timestamp");
        long longValue = l.longValue();
        String str3 = channelMessage.room_id;
        n.a((Object) str3, "room_id");
        return new MessageEntity(parseLong, str2, longValue, str3, channelMessage.payload.toByteArray(), null, 32, null);
    }

    public static final /* synthetic */ Observable a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, InputStream inputStream) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, inputStream}, null, f17251a, true, 7489);
        return proxy.isSupported ? (Observable) proxy.result : playbackMessageDataSourceImpl.a(inputStream);
    }

    private final Observable<ChannelMessage> a(final InputStream inputStream) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{inputStream}, this, f17251a, false, 7476);
        if (proxy.isSupported) {
            return (Observable) proxy.result;
        }
        Observable<ChannelMessage> a2 = Observable.a(new r<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17322a;

            @Override // io.reactivex.r
            public final void a(q<ChannelMessage> qVar) {
                if (PatchProxy.proxy(new Object[]{qVar}, this, f17322a, false, 7524).isSupported) {
                    return;
                }
                n.b(qVar, "emitter");
                ProtoReader protoReader = new ProtoReader(Okio.buffer(Okio.source(inputStream)));
                try {
                    long beginMessage = protoReader.beginMessage();
                    while (protoReader.nextTag() != -1) {
                        qVar.a((q<ChannelMessage>) ChannelMessage.ADAPTER.decode(protoReader));
                    }
                    protoReader.endMessageAndGetUnknownFields(beginMessage);
                    qVar.a();
                } catch (Throwable th) {
                    if (!(th instanceof ProtocolException)) {
                        throw new MsgFetchException(th);
                    }
                    throw new MsgParseException(th);
                }
            }
        });
        n.a((Object) a2, "Observable.create<Channe…mitter.onComplete()\n    }");
        return a2;
    }

    public static final /* synthetic */ b a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, InputStream inputStream, PlaybackInfoEntity playbackInfoEntity, boolean z) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, inputStream, playbackInfoEntity, new Byte(z ? (byte) 1 : (byte) 0)}, null, f17251a, true, 7488);
        return proxy.isSupported ? (b) proxy.result : playbackMessageDataSourceImpl.a(inputStream, playbackInfoEntity, z);
    }

    public static final /* synthetic */ b a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, List list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, list}, null, f17251a, true, 7491);
        return proxy.isSupported ? (b) proxy.result : playbackMessageDataSourceImpl.c((List<MessageEntity>) list);
    }

    private final b a(final InputStream inputStream, final PlaybackInfoEntity playbackInfoEntity, final boolean z) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{inputStream, playbackInfoEntity, new Byte(z ? (byte) 1 : (byte) 0)}, this, f17251a, false, 7475);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        b a2 = b.a(new e() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17302a;

            /* compiled from: PlaybackMessageDataSourceImpl.kt */
            @Metadata(bv = {1, 0, 3}, d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010\u0002\n\u0000\n\u0002\u0010\t\n\u0000\u0010\u0000\u001a\u00020\u00012\u0006\u0010\u0002\u001a\u00020\u0003H\n¢\u0006\u0002\b\u0004"}, d2 = {"<anonymous>", "", "it", "", "invoke"}, k = 3, mv = {1, 1, 16})
            /* renamed from: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1$4, reason: invalid class name */
            /* loaded from: classes9.dex */
            static final class AnonymousClass4 extends Lambda implements Function1<Long, w> {

                /* renamed from: a, reason: collision with root package name */
                public static ChangeQuickRedirect f17314a;

                /* renamed from: c, reason: collision with root package name */
                final /* synthetic */ z.a f17316c;

                /* renamed from: d, reason: collision with root package name */
                final /* synthetic */ c f17317d;

                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                AnonymousClass4(z.a aVar, c cVar) {
                    super(1);
                    this.f17316c = aVar;
                    this.f17317d = cVar;
                }

                public final void a(long j) {
                    if (PatchProxy.proxy(new Object[]{new Long(j)}, this, f17314a, false, 7522).isSupported) {
                        return;
                    }
                    if (!this.f17316c.f33370a) {
                        this.f17317d.a();
                    }
                    if (z) {
                        playbackInfoEntity.a(true);
                        ESDKMonitor.a(ESDKMonitor.f13180b, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j), null, 8, null);
                    } else {
                        playbackInfoEntity.b(true);
                        ESDKMonitor.a(ESDKMonitor.f13180b, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j), null, 8, null);
                    }
                    PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, playbackInfoEntity);
                    PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, playbackInfoEntity);
                    CommonLog.a(PlaybackLog.f17683a, "playback messages download finish completely: " + j, null, 2, null);
                }

                @Override // kotlin.jvm.functions.Function1
                public /* synthetic */ w invoke(Long l) {
                    a(l.longValue());
                    return w.f35730a;
                }
            }

            @Override // io.reactivex.e
            public final void a(final c cVar) {
                if (PatchProxy.proxy(new Object[]{cVar}, this, f17302a, false, 7518).isSupported) {
                    return;
                }
                n.b(cVar, "emitter");
                final z.a aVar = new z.a();
                aVar.f33370a = false;
                Observable a3 = PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream).b(a.b()).h(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.1

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f17306a;

                    @Override // io.reactivex.functions.f
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final MessageEntity apply(ChannelMessage channelMessage) {
                        PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f17306a, false, 7519);
                        if (proxy2.isSupported) {
                            return (MessageEntity) proxy2.result;
                        }
                        n.b(channelMessage, "it");
                        return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, channelMessage);
                    }
                }).a(50).a(new f<T, s<? extends R>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.2

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f17308a;

                    @Override // io.reactivex.functions.f
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final Observable<List<MessageEntity>> apply(List<MessageEntity> list) {
                        PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f17308a, false, 7520);
                        if (proxy2.isSupported) {
                            return (Observable) proxy2.result;
                        }
                        n.b(list, "it");
                        return PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, list);
                    }
                }).a(new io.reactivex.functions.e<List<? extends MessageEntity>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.3

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f17310a;

                    @Override // io.reactivex.functions.e
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final void accept(List<MessageEntity> list) {
                        if (PatchProxy.proxy(new Object[]{list}, this, f17310a, false, 7521).isSupported || aVar.f33370a) {
                            return;
                        }
                        n.a((Object) list, "it");
                        if (((MessageEntity) kotlin.collections.n.e((List) list)).getF17388d() > playbackInfoEntity.getF() + 60000) {
                            cVar.a();
                            aVar.f33370a = true;
                        }
                    }
                });
                n.a((Object) a3, "parseMessages(r).subscri…      }\n                }");
                RxjavaExKt.a(a3, new AnonymousClass4(aVar, cVar)).a(new io.reactivex.functions.e<List<? extends MessageEntity>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.5
                    @Override // io.reactivex.functions.e
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final void accept(List<MessageEntity> list) {
                    }
                }, new io.reactivex.functions.e<Throwable>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1.6

                    /* renamed from: a, reason: collision with root package name */
                    public static ChangeQuickRedirect f17319a;

                    @Override // io.reactivex.functions.e
                    /* renamed from: a, reason: merged with bridge method [inline-methods] */
                    public final void accept(Throwable th) {
                        if (PatchProxy.proxy(new Object[]{th}, this, f17319a, false, 7523).isSupported) {
                            return;
                        }
                        CommonLog.a(PlaybackLog.f17683a, "playback messages download failed", th, null, 4, null);
                        if (z.a.this.f33370a) {
                            return;
                        }
                        cVar.a(th);
                    }
                });
            }
        });
        n.a((Object) a2, "Completable.create { emi…)\n                }\n    }");
        return a2;
    }

    private final io.reactivex.w<List<ClassroomMessage>> a(final long j, final MessageCache messageCache) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j), messageCache}, this, f17251a, false, 7484);
        if (proxy.isSupported) {
            return (io.reactivex.w) proxy.result;
        }
        io.reactivex.w<List<ClassroomMessage>> k = Observable.a(new r<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$getMessagesFromCache$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17297a;

            @Override // io.reactivex.r
            public final void a(q<ClassroomMessage> qVar) {
                if (PatchProxy.proxy(new Object[]{qVar}, this, f17297a, false, 7516).isSupported) {
                    return;
                }
                n.b(qVar, "emitter");
                ClassroomMessage c2 = MessageCache.this.c();
                while (c2 != null && c2.l() <= j) {
                    ClassroomMessage b2 = MessageCache.this.b();
                    if (b2 == null) {
                        n.a();
                    }
                    qVar.a((q<ClassroomMessage>) b2);
                    c2 = MessageCache.this.c();
                }
                MessageCache.this.b(j);
                qVar.a();
            }
        }).k();
        n.a((Object) k, "Observable.create<Classr…lete()\n        }.toList()");
        return k;
    }

    public static final /* synthetic */ void a(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, PlaybackInfoEntity playbackInfoEntity) {
        if (PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, playbackInfoEntity}, null, f17251a, true, 7492).isSupported) {
            return;
        }
        playbackMessageDataSourceImpl.h(playbackInfoEntity);
    }

    public static final /* synthetic */ PlaybackMessageCache b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f17251a, true, 7495);
        return proxy.isSupported ? (PlaybackMessageCache) proxy.result : playbackMessageDataSourceImpl.f();
    }

    public static final /* synthetic */ Observable b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, List list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, list}, null, f17251a, true, 7494);
        return proxy.isSupported ? (Observable) proxy.result : playbackMessageDataSourceImpl.b((List<MessageEntity>) list);
    }

    private final Observable<List<MessageEntity>> b(final List<MessageEntity> list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{list}, this, f17251a, false, 7477);
        if (proxy.isSupported) {
            return (Observable) proxy.result;
        }
        Observable<List<MessageEntity>> b2 = Observable.b(new Callable<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$persistMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17324a;

            @Override // java.util.concurrent.Callable
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final List<MessageEntity> call() {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[0], this, f17324a, false, 7525);
                if (proxy2.isSupported) {
                    return (List) proxy2.result;
                }
                if (PlaybackMessageDataSourceImpl.this.c().a(list).length == list.size()) {
                    return list;
                }
                throw new MsgPersistException();
            }
        });
        n.a((Object) b2, "Observable.fromCallable …()\n        messages\n    }");
        return b2;
    }

    private final io.reactivex.w<List<MessageEntity>> b(final long j) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j)}, this, f17251a, false, 7482);
        if (proxy.isSupported) {
            return (io.reactivex.w) proxy.result;
        }
        MessageLog.f14559a.b("PlaybackMessageDataSourceImpl.queryLatestMessage ts=" + j);
        io.reactivex.w<List<MessageEntity>> k = Observable.a(new r<T>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$queryLatestMessage$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17339a;

            @Override // io.reactivex.r
            public final void a(q<MessageEntity> qVar) {
                String str;
                String str2;
                String str3;
                if (PatchProxy.proxy(new Object[]{qVar}, this, f17339a, false, 7531).isSupported) {
                    return;
                }
                n.b(qVar, "emitter");
                PlaybackMessageDao c2 = PlaybackMessageDataSourceImpl.this.c();
                str = PlaybackMessageDataSourceImpl.this.l;
                MessageEntity a2 = c2.a(str, "fsm", j);
                if (a2 == null) {
                    throw new NoStatusMsgException(j);
                }
                qVar.a((q<MessageEntity>) a2);
                PlaybackMessageDao c3 = PlaybackMessageDataSourceImpl.this.c();
                str2 = PlaybackMessageDataSourceImpl.this.l;
                MessageEntity a3 = c3.a(str2, "user_state", j);
                if (a3 != null) {
                    qVar.a((q<MessageEntity>) a3);
                }
                PlaybackMessageDao c4 = PlaybackMessageDataSourceImpl.this.c();
                str3 = PlaybackMessageDataSourceImpl.this.l;
                MessageEntity a4 = c4.a(str3, "text_clicker_review", j);
                if (a4 != null) {
                    qVar.a((q<MessageEntity>) a4);
                }
                qVar.a();
            }
        }).k();
        n.a((Object) k, "Observable.create<Messag…lete()\n        }.toList()");
        return k;
    }

    public static final /* synthetic */ void b(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl, PlaybackInfoEntity playbackInfoEntity) {
        if (PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl, playbackInfoEntity}, null, f17251a, true, 7493).isSupported) {
            return;
        }
        playbackMessageDataSourceImpl.g(playbackInfoEntity);
    }

    public static final /* synthetic */ PlaybackChatCache c(PlaybackMessageDataSourceImpl playbackMessageDataSourceImpl) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackMessageDataSourceImpl}, null, f17251a, true, 7496);
        if (proxy.isSupported) {
            return (PlaybackChatCache) proxy.result;
        }
        PlaybackChatCache playbackChatCache = playbackMessageDataSourceImpl.k;
        if (playbackChatCache == null) {
            n.b("chatCache");
        }
        return playbackChatCache;
    }

    private final b c(final List<MessageEntity> list) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{list}, this, f17251a, false, 7478);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        b a2 = b.a(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$persistMessagesOld$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17327a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (!PatchProxy.proxy(new Object[0], this, f17327a, false, 7526).isSupported && PlaybackMessageDataSourceImpl.this.c().a(list).length != list.size()) {
                    throw new MsgPersistException();
                }
            }
        });
        n.a((Object) a2, "Completable.fromAction {…gPersistException()\n    }");
        return a2;
    }

    private final PlaybackMessageCache f() {
        Object value;
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f17251a, false, 7467);
        if (proxy.isSupported) {
            value = proxy.result;
        } else {
            Lazy lazy = this.j;
            KProperty kProperty = f17252b[0];
            value = lazy.getValue();
        }
        return (PlaybackMessageCache) value;
    }

    private final void g(PlaybackInfoEntity playbackInfoEntity) {
        if (PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7486).isSupported) {
            return;
        }
        if (playbackInfoEntity.getE() || playbackInfoEntity.getF17392d()) {
            PlaybackInfoDao playbackInfoDao = this.f17254d;
            if (playbackInfoDao == null) {
                n.b("playbackInfoDao");
            }
            playbackInfoDao.a(playbackInfoEntity).b();
        }
    }

    private final void h(PlaybackInfoEntity playbackInfoEntity) {
        if (!PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7487).isSupported && playbackInfoEntity.getF17392d()) {
            if (!playbackInfoEntity.getE()) {
                if (!(playbackInfoEntity.getJ().length() == 0)) {
                    return;
                }
            }
            this.h.a((u<Boolean>) true);
        }
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public LiveData<Boolean> a() {
        return this.i;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public b a(PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7469);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        if (!playbackInfoEntity.getF17392d()) {
            return ClassroomSettingsManager.f13256b.c().getClassroomPlaybackSettings().getF13299a() ? c(playbackInfoEntity) : e(playbackInfoEntity);
        }
        h(playbackInfoEntity);
        b a2 = b.a();
        n.a((Object) a2, "Completable.complete()");
        return a2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public io.reactivex.w<List<ClassroomMessage>> a(long j) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j)}, this, f17251a, false, 7480);
        if (proxy.isSupported) {
            return (io.reactivex.w) proxy.result;
        }
        CommonLog.a(MessageLog.f14559a, "prefetch message to cache: " + j, null, 2, null);
        io.reactivex.w<List<ClassroomMessage>> d2 = RxjavaExKt.a(b(j)).c(new io.reactivex.functions.e<List<? extends MessageEntity>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$prefetchMessageToCache$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17330a;

            @Override // io.reactivex.functions.e
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final void accept(List<MessageEntity> list) {
                if (PatchProxy.proxy(new Object[]{list}, this, f17330a, false, 7527).isSupported) {
                    return;
                }
                PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this).c(list.get(0).getF17388d());
                PlaybackMessageDataSourceImpl.c(PlaybackMessageDataSourceImpl.this).c(list.get(0).getF17388d());
            }
        }).d(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$prefetchMessageToCache$2

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17332a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final List<ClassroomMessage> apply(List<MessageEntity> list) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f17332a, false, 7528);
                if (proxy2.isSupported) {
                    return (List) proxy2.result;
                }
                n.b(list, "list");
                List<MessageEntity> list2 = list;
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a((Iterable) list2, 10));
                Iterator<T> it = list2.iterator();
                while (it.hasNext()) {
                    arrayList.add(PlaybackMessageDbFetcherKt.a((MessageEntity) it.next()));
                }
                return arrayList;
            }
        });
        n.a((Object) d2, "queryLatestMessage(times…)\n            }\n        }");
        return d2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public io.reactivex.w<List<ClassroomMessage>> a(long j, long j2) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{new Long(j), new Long(j2)}, this, f17251a, false, 7483);
        if (proxy.isSupported) {
            return (io.reactivex.w) proxy.result;
        }
        MessageLog.f14559a.b("getMessages start:" + j + " end:" + j2);
        io.reactivex.w<List<ClassroomMessage>> a2 = a(j2, f());
        PlaybackChatCache playbackChatCache = this.k;
        if (playbackChatCache == null) {
            n.b("chatCache");
        }
        io.reactivex.w a3 = a2.a(a(j2, playbackChatCache), new io.reactivex.functions.b<List<? extends ClassroomMessage>, List<? extends ClassroomMessage>, List<? extends ClassroomMessage>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$getMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17295a;

            @Override // io.reactivex.functions.b
            public final List<ClassroomMessage> a(List<? extends ClassroomMessage> list, List<? extends ClassroomMessage> list2) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list, list2}, this, f17295a, false, 7515);
                if (proxy2.isSupported) {
                    return (List) proxy2.result;
                }
                n.b(list, "t1");
                n.b(list2, "t2");
                return kotlin.collections.n.d((Collection) list, (Iterable) list2);
            }
        });
        n.a((Object) a3, "getMessagesFromCache(end…on { t1, t2 -> t1 + t2 })");
        return a3;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public void a(List<ChatInfoBlock> list) {
        if (PatchProxy.proxy(new Object[]{list}, this, f17251a, false, 7468).isSupported) {
            return;
        }
        n.b(list, "chatInfoBlocks");
        IRetrofit iRetrofit = this.f;
        if (iRetrofit == null) {
            n.b("retrofit");
        }
        this.k = new PlaybackChatCache(new PlaybackChatFetcher(iRetrofit, list), new LinkedBlockingQueue());
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public b b() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f17251a, false, 7485);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        b a2 = b.a(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$onComplete$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17300a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (PatchProxy.proxy(new Object[0], this, f17300a, false, 7517).isSupported) {
                    return;
                }
                Set<MessageProcessor> e = PlaybackMessageDataSourceImpl.this.e();
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a(e, 10));
                Iterator<T> it = e.iterator();
                while (it.hasNext()) {
                    ((MessageProcessor) it.next()).a();
                    arrayList.add(w.f35730a);
                }
            }
        });
        n.a((Object) a2, "Completable.fromAction {…p { it.complete() }\n    }");
        return a2;
    }

    @Override // com.edu.classroom.message.repo.datasource.PlaybackMessageDataSource
    public b b(PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7470);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        if (!playbackInfoEntity.getE()) {
            if (!(playbackInfoEntity.getJ().length() == 0)) {
                return ClassroomSettingsManager.f13256b.c().getClassroomPlaybackSettings().getF13299a() ? d(playbackInfoEntity) : f(playbackInfoEntity);
            }
        }
        h(playbackInfoEntity);
        b a2 = b.a();
        n.a((Object) a2, "Completable.complete()");
        return a2;
    }

    public final PlaybackMessageDao c() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f17251a, false, 7457);
        if (proxy.isSupported) {
            return (PlaybackMessageDao) proxy.result;
        }
        PlaybackMessageDao playbackMessageDao = this.f17253c;
        if (playbackMessageDao == null) {
            n.b("messageDao");
        }
        return playbackMessageDao;
    }

    public final b c(final PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7471);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        b b2 = this.n.a(playbackInfoEntity.getI()).c(new f<InputStream, io.reactivex.f>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17257a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final b apply(InputStream inputStream) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{inputStream}, this, f17257a, false, 7498);
                if (proxy2.isSupported) {
                    return (b) proxy2.result;
                }
                n.b(inputStream, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream, playbackInfoEntity, true);
            }
        }).b(a.b());
        n.a((Object) b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    public final PlaybackMessageDbFetcher d() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f17251a, false, 7461);
        if (proxy.isSupported) {
            return (PlaybackMessageDbFetcher) proxy.result;
        }
        PlaybackMessageDbFetcher playbackMessageDbFetcher = this.e;
        if (playbackMessageDbFetcher == null) {
            n.b("messageDbFetcher");
        }
        return playbackMessageDbFetcher;
    }

    public final b d(final PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7472);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        b b2 = this.n.a(playbackInfoEntity.getJ()).c(new f<InputStream, io.reactivex.f>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessages$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17275a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final b apply(InputStream inputStream) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{inputStream}, this, f17275a, false, 7506);
                if (proxy2.isSupported) {
                    return (b) proxy2.result;
                }
                n.b(inputStream, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream, playbackInfoEntity, false);
            }
        }).b(a.b());
        n.a((Object) b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    public final b e(final PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7473);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        b b2 = this.n.a(playbackInfoEntity.getI()).b((f<? super InputStream, ? extends s<? extends R>>) new f<T, s<? extends R>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17260a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final Observable<ChannelMessage> apply(InputStream inputStream) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{inputStream}, this, f17260a, false, 7499);
                if (proxy2.isSupported) {
                    return (Observable) proxy2.result;
                }
                n.b(inputStream, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream);
            }
        }).a(new io.reactivex.functions.e<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$2

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17262a;

            @Override // io.reactivex.functions.e
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final void accept(ChannelMessage channelMessage) {
                if (PatchProxy.proxy(new Object[]{channelMessage}, this, f17262a, false, 7500).isSupported) {
                    return;
                }
                Set<MessageProcessor> e = PlaybackMessageDataSourceImpl.this.e();
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a(e, 10));
                for (MessageProcessor messageProcessor : e) {
                    n.a((Object) channelMessage, "msg");
                    messageProcessor.a(channelMessage);
                    arrayList.add(w.f35730a);
                }
            }
        }).a(new io.reactivex.functions.h<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$3

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17264a;

            @Override // io.reactivex.functions.h
            public final boolean a(ChannelMessage channelMessage) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f17264a, false, 7501);
                if (proxy2.isSupported) {
                    return ((Boolean) proxy2.result).booleanValue();
                }
                n.b(channelMessage, "it");
                return (n.a((Object) channelMessage.msg_type, (Object) "fsm_version") ^ true) && (n.a((Object) channelMessage.msg_type, (Object) "user_state_version") ^ true);
            }
        }).h(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$4

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17266a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final MessageEntity apply(ChannelMessage channelMessage) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f17266a, false, 7502);
                if (proxy2.isSupported) {
                    return (MessageEntity) proxy2.result;
                }
                n.b(channelMessage, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, channelMessage);
            }
        }).a(50).e(new f<List<MessageEntity>, io.reactivex.f>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$5

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17268a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final b apply(List<MessageEntity> list) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f17268a, false, 7503);
                if (proxy2.isSupported) {
                    return (b) proxy2.result;
                }
                n.b(list, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, list);
            }
        }).b(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$6

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17270a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (PatchProxy.proxy(new Object[0], this, f17270a, false, 7504).isSupported) {
                    return;
                }
                playbackInfoEntity.a(true);
                PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, playbackInfoEntity);
                PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, playbackInfoEntity);
            }
        });
        n.a((Object) b2, "messageNetworkFetcher.fe…ty)\n                    }");
        b b3 = RxjavaExKt.a(b2, PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$7.f17274b).b(a.b());
        n.a((Object) b3, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b3;
    }

    public final Set<MessageProcessor> e() {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[0], this, f17251a, false, 7465);
        if (proxy.isSupported) {
            return (Set) proxy.result;
        }
        Set<MessageProcessor> set = this.g;
        if (set == null) {
            n.b("processors");
        }
        return set;
    }

    public final b f(final PlaybackInfoEntity playbackInfoEntity) {
        PatchProxyResult proxy = PatchProxy.proxy(new Object[]{playbackInfoEntity}, this, f17251a, false, 7474);
        if (proxy.isSupported) {
            return (b) proxy.result;
        }
        n.b(playbackInfoEntity, "entity");
        b b2 = this.n.a(playbackInfoEntity.getJ()).b((f<? super InputStream, ? extends s<? extends R>>) new f<T, s<? extends R>>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$1

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17278a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final Observable<ChannelMessage> apply(InputStream inputStream) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{inputStream}, this, f17278a, false, 7507);
                if (proxy2.isSupported) {
                    return (Observable) proxy2.result;
                }
                n.b(inputStream, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, inputStream);
            }
        }).a(new io.reactivex.functions.e<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$2

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17280a;

            @Override // io.reactivex.functions.e
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final void accept(ChannelMessage channelMessage) {
                if (PatchProxy.proxy(new Object[]{channelMessage}, this, f17280a, false, 7508).isSupported) {
                    return;
                }
                Set<MessageProcessor> e = PlaybackMessageDataSourceImpl.this.e();
                ArrayList arrayList = new ArrayList(kotlin.collections.n.a(e, 10));
                for (MessageProcessor messageProcessor : e) {
                    n.a((Object) channelMessage, "msg");
                    messageProcessor.a(channelMessage);
                    arrayList.add(w.f35730a);
                }
            }
        }).a(new io.reactivex.functions.h<ChannelMessage>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$3

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17282a;

            @Override // io.reactivex.functions.h
            public final boolean a(ChannelMessage channelMessage) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f17282a, false, 7509);
                if (proxy2.isSupported) {
                    return ((Boolean) proxy2.result).booleanValue();
                }
                n.b(channelMessage, "it");
                return (n.a((Object) channelMessage.msg_type, (Object) "fsm_version") ^ true) && (n.a((Object) channelMessage.msg_type, (Object) "user_state_version") ^ true);
            }
        }).h(new f<T, R>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$4

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17284a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final MessageEntity apply(ChannelMessage channelMessage) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{channelMessage}, this, f17284a, false, 7510);
                if (proxy2.isSupported) {
                    return (MessageEntity) proxy2.result;
                }
                n.b(channelMessage, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, channelMessage);
            }
        }).a(new io.reactivex.functions.e<MessageEntity>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$5

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17286a;

            @Override // io.reactivex.functions.e
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final void accept(MessageEntity messageEntity) {
                String str;
                if (PatchProxy.proxy(new Object[]{messageEntity}, this, f17286a, false, 7511).isSupported) {
                    return;
                }
                str = PlaybackMessageDataSourceImpl.this.m;
                messageEntity.a(str);
            }
        }).a(50).e(new f<List<MessageEntity>, io.reactivex.f>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$6

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17288a;

            @Override // io.reactivex.functions.f
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public final b apply(List<MessageEntity> list) {
                PatchProxyResult proxy2 = PatchProxy.proxy(new Object[]{list}, this, f17288a, false, 7512);
                if (proxy2.isSupported) {
                    return (b) proxy2.result;
                }
                n.b(list, "it");
                return PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, list);
            }
        }).b(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$7

            /* renamed from: a, reason: collision with root package name */
            public static ChangeQuickRedirect f17290a;

            @Override // io.reactivex.functions.a
            public final void run() {
                if (PatchProxy.proxy(new Object[0], this, f17290a, false, 7513).isSupported) {
                    return;
                }
                playbackInfoEntity.b(true);
                PlaybackMessageDataSourceImpl.a(PlaybackMessageDataSourceImpl.this, playbackInfoEntity);
                PlaybackMessageDataSourceImpl.b(PlaybackMessageDataSourceImpl.this, playbackInfoEntity);
            }
        });
        n.a((Object) b2, "messageNetworkFetcher.fe…ty)\n                    }");
        b b3 = RxjavaExKt.a(b2, PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$8.f17294b).b(a.b());
        n.a((Object) b3, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b3;
    }
}
