package com.dotloop.mobile.service;

import android.database.SQLException;
import com.dotloop.mobile.core.platform.exceptions.NoNetworkConnectionException;
import com.dotloop.mobile.core.platform.model.messaging.Message;
import com.dotloop.mobile.core.utils.DeeplinkUtils;
import com.dotloop.mobile.messaging.MessageEvent;
import com.dotloop.mobile.messaging.sources.MessageHistorySource;
import com.dotloop.mobile.messaging.sources.MessageSender;
import com.dotloop.mobile.messaging.sources.MessageSource;
import com.dotloop.mobile.messaging.sources.MessageStore;
import com.dotloop.mobile.model.exception.ConversationRequiredException;
import com.dotloop.mobile.model.messaging.Conversation;
import com.dotloop.mobile.model.messaging.NewMessage;
import com.dotloop.mobile.model.messaging.NewMessageStatusBatch;
import com.dotloop.mobile.model.messaging.error.MessageError;
import com.dotloop.mobile.utils.ConversationHelper;
import d.a.a;
import io.reactivex.b.b;
import io.reactivex.c.f;
import io.reactivex.c.k;
import io.reactivex.h;
import io.reactivex.i;
import io.reactivex.j;
import io.reactivex.l;
import io.reactivex.n;
import io.reactivex.o;
import io.reactivex.p;
import io.reactivex.v;
import io.reactivex.z;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import kotlin.d.b.g;
import org.a.d;

/* compiled from: MessagePipe.kt */
/* loaded from: classes2.dex */
public final class MessagePipe implements j<MessageEvent> {
    public static final Companion Companion = new Companion(null);
    public static final int HISTORY_BATCH_SIZE = 20;
    public static final int MISSED_MESSAGE_BATCH_SIZE = 100;
    private final b compositeSubscription;
    private final ConversationHelper conversationHelper;
    private String conversationId;
    private final ConversationService conversationService;
    private boolean fetchingHistory;
    private final Set<MessageStore> messageCaches;
    private i<MessageEvent> messageEmitter;
    private final Set<MessageHistorySource> messageHistorySources;
    private final Set<MessageSender> messageSenders;
    private final Set<MessageSource> messageSources;

    /* compiled from: MessagePipe.kt */
    /* loaded from: classes2.dex */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(g gVar) {
            this();
        }
    }

    public MessagePipe(Set<MessageHistorySource> set, Set<MessageStore> set2, Set<MessageSource> set3, Set<MessageSender> set4, ConversationService conversationService, ConversationHelper conversationHelper) {
        kotlin.d.b.i.b(set, "messageHistorySources");
        kotlin.d.b.i.b(set2, "messageCaches");
        kotlin.d.b.i.b(set3, "messageSources");
        kotlin.d.b.i.b(set4, "messageSenders");
        kotlin.d.b.i.b(conversationService, "conversationService");
        kotlin.d.b.i.b(conversationHelper, "conversationHelper");
        this.messageHistorySources = set;
        this.messageCaches = set2;
        this.messageSources = set3;
        this.messageSenders = set4;
        this.conversationService = conversationService;
        this.conversationHelper = conversationHelper;
        this.compositeSubscription = new b();
    }

    public /* synthetic */ MessagePipe(HashSet hashSet, HashSet hashSet2, HashSet hashSet3, HashSet hashSet4, ConversationService conversationService, ConversationHelper conversationHelper, int i, g gVar) {
        this((i & 1) != 0 ? new HashSet() : hashSet, (i & 2) != 0 ? new HashSet() : hashSet2, (i & 4) != 0 ? new HashSet() : hashSet3, (i & 8) != 0 ? new HashSet() : hashSet4, conversationService, conversationHelper);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final h<MessageEvent> buildMissedMessages(final boolean z, final boolean z2, String str, final String str2, final boolean z3) {
        if (str2 == null && str == null) {
            h<MessageEvent> b2 = h.b();
            kotlin.d.b.i.a((Object) b2, "empty()");
            return b2;
        }
        if (getUpdateCacheOnly()) {
            h<MessageEvent> b3 = h.b();
            kotlin.d.b.i.a((Object) b3, "empty()");
            return b3;
        }
        h b4 = getHistory(str, z, 100, z2, z3).b((io.reactivex.c.g<? super MessageEvent, ? extends org.a.b<? extends R>>) new io.reactivex.c.g<T, org.a.b<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$buildMissedMessages$1
            @Override // io.reactivex.c.g
            public final h<MessageEvent> apply(MessageEvent messageEvent) {
                kotlin.d.b.i.b(messageEvent, "historyEvent");
                return messageEvent.getType() == MessageEvent.Type.NETWORK_ERROR ? h.a(messageEvent) : h.a((Iterable) messageEvent.getMessages()).b((k) new k<Message>() { // from class: com.dotloop.mobile.service.MessagePipe$buildMissedMessages$1.1
                    @Override // io.reactivex.c.k
                    public final boolean test(Message message) {
                        kotlin.d.b.i.b(message, "message");
                        return kotlin.d.b.i.a((Object) message.index(), (Object) str2);
                    }
                }).i().b(new io.reactivex.c.g<T, org.a.b<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$buildMissedMessages$1.2
                    @Override // io.reactivex.c.g
                    public final h<List<Message>> apply(List<Message> list) {
                        kotlin.d.b.i.b(list, "messages");
                        return list.size() > 0 ? h.a(list) : h.b();
                    }
                }).k(new io.reactivex.c.g<T, org.a.b<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$buildMissedMessages$1.3
                    @Override // io.reactivex.c.g
                    public final h<MessageEvent> apply(List<Message> list) {
                        h buildMissedMessages;
                        kotlin.d.b.i.b(list, "messages");
                        a.b("Might not have all, fetching more missed messages", new Object[0]);
                        if (list.size() < 100) {
                            return h.a(new MessageEvent(null, list, MessageEvent.Type.HISTORY, 1, null));
                        }
                        Message message = list.get(list.size() - 1);
                        h a2 = h.a(new MessageEvent(null, list, MessageEvent.Type.HISTORY, 1, null));
                        buildMissedMessages = MessagePipe.this.buildMissedMessages(z, z2, message.index(), str2, z3);
                        return h.a(a2, buildMissedMessages);
                    }
                });
            }
        });
        kotlin.d.b.i.a((Object) b4, "getHistory(fromMessageId…  }\n                    }");
        return b4;
    }

    private final h<MessageEvent> connectToSources() {
        h<MessageEvent> f = h.a((Iterable) this.messageSources).b(new io.reactivex.c.g<T, org.a.b<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$1
            @Override // io.reactivex.c.g
            public final h<MessageEvent> apply(MessageSource messageSource) {
                kotlin.d.b.i.b(messageSource, "messageSource");
                return messageSource.connect(MessagePipe.this.getConversationId()).a(new f<Throwable>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$1.1
                    @Override // io.reactivex.c.f
                    public final void accept(Throwable th) {
                        a.a(th);
                    }
                }).b(h.b());
            }
        }).a((io.reactivex.c.g) new io.reactivex.c.g<T, K>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$2
            @Override // io.reactivex.c.g
            public final String apply(MessageEvent messageEvent) {
                kotlin.d.b.i.b(messageEvent, "it");
                return messageEvent.index();
            }
        }).b((f) new f<MessageEvent>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$3
            @Override // io.reactivex.c.f
            public final void accept(MessageEvent messageEvent) {
                MessagePipe messagePipe = MessagePipe.this;
                kotlin.d.b.i.a((Object) messageEvent, "it");
                messagePipe.storeMessageFromEvent(messageEvent);
            }
        }).f(new io.reactivex.c.g<T, z<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$4
            @Override // io.reactivex.c.g
            public final v<MessageEvent> apply(final MessageEvent messageEvent) {
                boolean updateCacheOnly;
                Set set;
                kotlin.d.b.i.b(messageEvent, "messageEvent");
                updateCacheOnly = MessagePipe.this.getUpdateCacheOnly();
                if (updateCacheOnly) {
                    return v.m_();
                }
                if (messageEvent.getType() == MessageEvent.Type.NEW_MESSAGE) {
                    return v.a(messageEvent);
                }
                set = MessagePipe.this.messageHistorySources;
                return h.a((Iterable) set).e(new io.reactivex.c.g<T, n<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$4.1
                    @Override // io.reactivex.c.g
                    public final l<Message> apply(MessageHistorySource messageHistorySource) {
                        kotlin.d.b.i.b(messageHistorySource, "historySource");
                        return messageHistorySource.getMessage(MessagePipe.this.getConversationId(), messageEvent.getMessageId()).a(l.a());
                    }
                }).c((h<R>) messageEvent.getMessage()).f(new io.reactivex.c.g<T, R>() { // from class: com.dotloop.mobile.service.MessagePipe$connectToSources$4.2
                    @Override // io.reactivex.c.g
                    public final MessageEvent apply(Message message) {
                        kotlin.d.b.i.b(message, "it");
                        return MessageEvent.Companion.updateMessage(message);
                    }
                });
            }
        });
        kotlin.d.b.i.a((Object) f, "fromIterable(messageSour…          }\n            }");
        return f;
    }

    private final h<MessageEvent> getHistory(String str, boolean z, int i, final boolean z2, boolean z3) {
        if (getUpdateCacheOnly()) {
            h<MessageEvent> b2 = h.b();
            kotlin.d.b.i.a((Object) b2, "empty()");
            return b2;
        }
        LinkedList linkedList = new LinkedList();
        for (final MessageHistorySource messageHistorySource : this.messageHistorySources) {
            linkedList.add(messageHistorySource.getMessages(this.conversationId, str, z, i, z3).a(new f<List<Message>>() { // from class: com.dotloop.mobile.service.MessagePipe$getHistory$1
                @Override // io.reactivex.c.f
                public final void accept(List<Message> list) {
                    if (messageHistorySource.shouldCacheResults()) {
                        MessagePipe messagePipe = MessagePipe.this;
                        kotlin.d.b.i.a((Object) list, "messages");
                        messagePipe.storeMessages(list);
                    }
                }
            }));
        }
        h<MessageEvent> c2 = l.a((Iterable) linkedList).a((k) new k<List<? extends Message>>() { // from class: com.dotloop.mobile.service.MessagePipe$getHistory$2
            @Override // io.reactivex.c.k
            public final boolean test(List<? extends Message> list) {
                kotlin.d.b.i.b(list, "it");
                return !list.isEmpty();
            }
        }).g(new io.reactivex.c.g<T, R>() { // from class: com.dotloop.mobile.service.MessagePipe$getHistory$3
            @Override // io.reactivex.c.g
            public final MessageEvent apply(List<? extends Message> list) {
                kotlin.d.b.i.b(list, "it");
                return new MessageEvent(null, list, MessageEvent.Type.HISTORY, 1, null);
            }
        }).h(new io.reactivex.c.g<Throwable, org.a.b<? extends MessageEvent>>() { // from class: com.dotloop.mobile.service.MessagePipe$getHistory$4
            @Override // io.reactivex.c.g
            public final org.a.b<? extends MessageEvent> apply(Throwable th) {
                kotlin.d.b.i.b(th, "it");
                return (z2 && (th instanceof NoNetworkConnectionException)) ? h.a(new MessageEvent(null, null, MessageEvent.Type.NETWORK_ERROR, 3, null)) : h.a(th);
            }
        }).e().a((k) new k<o<MessageEvent>>() { // from class: com.dotloop.mobile.service.MessagePipe$getHistory$5
            @Override // io.reactivex.c.k
            public final boolean test(o<MessageEvent> oVar) {
                kotlin.d.b.i.b(oVar, "it");
                return (oVar.b() && z2) ? false : true;
            }
        }).c();
        kotlin.d.b.i.a((Object) c2, "mergeDelayError(historyM…         .dematerialize()");
        return c2;
    }

    private final p<MessageSender> getMessageSenders() {
        p<MessageSender> b2 = p.b((Iterable) this.messageSenders);
        kotlin.d.b.i.a((Object) b2, "Observable.fromIterable(messageSenders)");
        return b2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean getUpdateCacheOnly() {
        return this.conversationId == null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void removeMessage(String str) {
        Iterator<MessageStore> it = this.messageCaches.iterator();
        while (it.hasNext()) {
            try {
                it.next().removeMessage(this.conversationId, str);
            } catch (SQLException e) {
                a.b(e, "error removing message from database", new Object[0]);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Removed duplicated region for block: B:22:0x0041  */
    /* JADX WARN: Removed duplicated region for block: B:25:0x0044  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final void storeMessageFromEvent(com.dotloop.mobile.messaging.MessageEvent r6) {
        /*
            r5 = this;
            java.util.Set<com.dotloop.mobile.messaging.sources.MessageStore> r0 = r5.messageCaches
            java.lang.Iterable r0 = (java.lang.Iterable) r0
            java.util.Iterator r0 = r0.iterator()
        L8:
            boolean r1 = r0.hasNext()
            if (r1 == 0) goto L65
            java.lang.Object r1 = r0.next()
            com.dotloop.mobile.messaging.sources.MessageStore r1 = (com.dotloop.mobile.messaging.sources.MessageStore) r1
            com.dotloop.mobile.messaging.MessageEvent$Type r2 = r6.getType()     // Catch: android.database.SQLException -> L38
            int[] r3 = com.dotloop.mobile.service.MessagePipe.WhenMappings.$EnumSwitchMapping$0     // Catch: android.database.SQLException -> L38
            int r2 = r2.ordinal()     // Catch: android.database.SQLException -> L38
            r2 = r3[r2]     // Catch: android.database.SQLException -> L38
            switch(r2) {
                case 1: goto L30;
                case 2: goto L24;
                default: goto L23;
            }     // Catch: android.database.SQLException -> L38
        L23:
            goto L8
        L24:
            java.lang.String r2 = r6.getMessageId()     // Catch: android.database.SQLException -> L38
            java.util.List r3 = r6.getStatuses()     // Catch: android.database.SQLException -> L38
            r1.storeMessageStatuses(r2, r3)     // Catch: android.database.SQLException -> L38
            goto L8
        L30:
            com.dotloop.mobile.core.platform.model.messaging.Message r2 = r6.getMessage()     // Catch: android.database.SQLException -> L38
            r1.storeMessage(r2)     // Catch: android.database.SQLException -> L38
            goto L8
        L38:
            r1 = move-exception
            com.dotloop.mobile.messaging.MessageEvent$Type r2 = r6.getType()
            com.dotloop.mobile.messaging.MessageEvent$Type r3 = com.dotloop.mobile.messaging.MessageEvent.Type.NEW_MESSAGE
            if (r2 != r3) goto L44
            java.lang.String r2 = "new message"
            goto L46
        L44:
            java.lang.String r2 = "message update"
        L46:
            java.lang.Throwable r1 = (java.lang.Throwable) r1
            java.lang.StringBuilder r3 = new java.lang.StringBuilder
            r3.<init>()
            java.lang.String r4 = "error saving "
            r3.append(r4)
            r3.append(r2)
            java.lang.String r2 = " to database"
            r3.append(r2)
            java.lang.String r2 = r3.toString()
            r3 = 0
            java.lang.Object[] r3 = new java.lang.Object[r3]
            d.a.a.b(r1, r2, r3)
            goto L8
        L65:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.dotloop.mobile.service.MessagePipe.storeMessageFromEvent(com.dotloop.mobile.messaging.MessageEvent):void");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void storeMessages(List<? extends Message> list) {
        Iterator<T> it = this.messageCaches.iterator();
        while (it.hasNext()) {
            try {
                ((MessageStore) it.next()).storeMessages(list);
            } catch (SQLException e) {
                a.b(e, "error saving older messages to database", new Object[0]);
            }
        }
    }

    public final void disconnect() {
        i<MessageEvent> iVar;
        Iterator<T> it = this.messageSources.iterator();
        while (it.hasNext()) {
            ((MessageSource) it.next()).disconnect(this.conversationId);
        }
        i<MessageEvent> iVar2 = this.messageEmitter;
        if ((iVar2 == null || !iVar2.b()) && (iVar = this.messageEmitter) != null) {
            iVar.k_();
        }
        if (this.compositeSubscription.isDisposed()) {
            return;
        }
        this.compositeSubscription.dispose();
    }

    public final h<List<Message>> fetchAllPreviousMessages(String str, boolean z) {
        if (str != null) {
            h g = buildMissedMessages(false, false, null, str, z).a(new k<MessageEvent>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchAllPreviousMessages$1
                @Override // io.reactivex.c.k
                public final boolean test(MessageEvent messageEvent) {
                    kotlin.d.b.i.b(messageEvent, "historyEvent");
                    return messageEvent.getType() == MessageEvent.Type.HISTORY;
                }
            }).g(new io.reactivex.c.g<T, R>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchAllPreviousMessages$2
                @Override // io.reactivex.c.g
                public final List<Message> apply(MessageEvent messageEvent) {
                    kotlin.d.b.i.b(messageEvent, "it");
                    return messageEvent.getMessages();
                }
            });
            kotlin.d.b.i.a((Object) g, "buildMissedMessages(fals…     .map { it.messages }");
            return g;
        }
        h<List<Message>> b2 = h.b();
        kotlin.d.b.i.a((Object) b2, "empty()");
        return b2;
    }

    public final l<Message> fetchMessage(final String str) {
        kotlin.d.b.i.b(str, DeeplinkUtils.URI_PARAM_MESSAGE_ID);
        if (getUpdateCacheOnly()) {
            l<Message> a2 = l.a((Throwable) new ConversationRequiredException());
            kotlin.d.b.i.a((Object) a2, "Maybe.error(ConversationRequiredException())");
            return a2;
        }
        l<Message> d2 = h.a((Iterable) this.messageHistorySources).e(new io.reactivex.c.g<T, n<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchMessage$1
            @Override // io.reactivex.c.g
            public final l<Message> apply(final MessageHistorySource messageHistorySource) {
                kotlin.d.b.i.b(messageHistorySource, "historySource");
                return messageHistorySource.getMessage(MessagePipe.this.getConversationId(), str).a(new f<Message>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchMessage$1.1
                    @Override // io.reactivex.c.f
                    public final void accept(Message message) {
                        if (messageHistorySource.shouldCacheResults()) {
                            MessagePipe.this.storeMessages(kotlin.a.l.a(message));
                        }
                    }
                });
            }
        }).d();
        kotlin.d.b.i.a((Object) d2, "fromIterable(messageHist…          .firstElement()");
        return d2;
    }

    public final h<List<Message>> fetchMissedMessages(String str) {
        if (str != null) {
            h g = buildMissedMessages(false, false, str, null, true).a(new k<MessageEvent>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchMissedMessages$1
                @Override // io.reactivex.c.k
                public final boolean test(MessageEvent messageEvent) {
                    kotlin.d.b.i.b(messageEvent, "historyEvent");
                    return messageEvent.getType() == MessageEvent.Type.HISTORY;
                }
            }).g(new io.reactivex.c.g<T, R>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchMissedMessages$2
                @Override // io.reactivex.c.g
                public final List<Message> apply(MessageEvent messageEvent) {
                    kotlin.d.b.i.b(messageEvent, "it");
                    return messageEvent.getMessages();
                }
            });
            kotlin.d.b.i.a((Object) g, "buildMissedMessages(fals…     .map { it.messages }");
            return g;
        }
        h<List<Message>> b2 = h.b();
        kotlin.d.b.i.a((Object) b2, "empty()");
        return b2;
    }

    public final h<MessageEvent> fetchMissingMessages(String str, String str2) {
        kotlin.d.b.i.b(str2, "newestMessageId");
        return str != null ? buildMissedMessages(true, true, str2, str, true) : fetchOlderHistory(null);
    }

    public final h<MessageEvent> fetchOlderHistory(String str) {
        if (this.fetchingHistory || getUpdateCacheOnly()) {
            h<MessageEvent> b2 = h.b();
            kotlin.d.b.i.a((Object) b2, "empty()");
            return b2;
        }
        a.b("Fetching messages older than " + str, new Object[0]);
        h<MessageEvent> a2 = getHistory(str, true, 20, true, false).b((h<MessageEvent>) new MessageEvent(null, null, MessageEvent.Type.HISTORY, 3, null)).c(new f<d>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchOlderHistory$1
            @Override // io.reactivex.c.f
            public final void accept(d dVar) {
                MessagePipe.this.fetchingHistory = true;
            }
        }).b(new io.reactivex.c.a() { // from class: com.dotloop.mobile.service.MessagePipe$fetchOlderHistory$2
            @Override // io.reactivex.c.a
            public final void run() {
                MessagePipe.this.fetchingHistory = false;
            }
        }).a(new f<Throwable>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchOlderHistory$3
            @Override // io.reactivex.c.f
            public final void accept(Throwable th) {
                MessagePipe.this.fetchingHistory = false;
            }
        }).a(new f<Throwable>() { // from class: com.dotloop.mobile.service.MessagePipe$fetchOlderHistory$4
            @Override // io.reactivex.c.f
            public final void accept(Throwable th) {
                a.a(th);
            }
        });
        kotlin.d.b.i.a((Object) a2, "getHistory(oldestMessage…oOnError { Timber.e(it) }");
        return a2;
    }

    public final String getConversationId() {
        return this.conversationId;
    }

    public final v<MessageEvent> sendMessage(final NewMessage newMessage) {
        kotlin.d.b.i.b(newMessage, "newMessage");
        if (getUpdateCacheOnly()) {
            v<MessageEvent> a2 = v.a((Throwable) new ConversationRequiredException());
            kotlin.d.b.i.a((Object) a2, "Single.error(ConversationRequiredException())");
            return a2;
        }
        v<MessageEvent> b2 = getMessageSenders().h((io.reactivex.c.g) new io.reactivex.c.g<T, z<? extends R>>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$1
            @Override // io.reactivex.c.g
            public final v<Message> apply(MessageSender messageSender) {
                kotlin.d.b.i.b(messageSender, "sender");
                return messageSender.sendMessage(NewMessage.this);
            }
        }).k().a((f) new f<Message>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$2
            @Override // io.reactivex.c.f
            public final void accept(Message message) {
                message.setLid(NewMessage.this.getLid());
            }
        }).a((f) new f<Message>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$3
            @Override // io.reactivex.c.f
            public final void accept(Message message) {
                if (newMessage.getMessageId() != null) {
                    MessagePipe.this.removeMessage(newMessage.getMessageId());
                }
            }
        }).a((f) new f<Message>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$4
            @Override // io.reactivex.c.f
            public final void accept(Message message) {
                a.b("message sent to conversationId " + MessagePipe.this.getConversationId(), new Object[0]);
            }
        }).g(new io.reactivex.c.g<Throwable, z<? extends Message>>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$5
            @Override // io.reactivex.c.g
            public final v<Message> apply(final Throwable th) {
                ConversationService conversationService;
                kotlin.d.b.i.b(th, "it");
                conversationService = MessagePipe.this.conversationService;
                return conversationService.getConversation(MessagePipe.this.getConversationId(), false).k().f(new io.reactivex.c.g<T, R>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$5.1
                    @Override // io.reactivex.c.g
                    public final Message apply(Conversation conversation) {
                        ConversationHelper conversationHelper;
                        kotlin.d.b.i.b(conversation, "conversation");
                        conversationHelper = MessagePipe.this.conversationHelper;
                        return conversationHelper.messageFromNewMessageWithError(conversation, newMessage, MessageError.SERVER_ERROR, th.getMessage());
                    }
                }).a(new f<Message>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$5.2
                    @Override // io.reactivex.c.f
                    public final void accept(Message message) {
                        message.setMessageId(message.getLid());
                    }
                });
            }
        }).f(new io.reactivex.c.g<T, R>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$6
            @Override // io.reactivex.c.g
            public final MessageEvent apply(Message message) {
                kotlin.d.b.i.b(message, "it");
                return MessageEvent.Companion.newMessage(message);
            }
        }).a((f) new f<MessageEvent>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$7
            @Override // io.reactivex.c.f
            public final void accept(MessageEvent messageEvent) {
                MessagePipe messagePipe = MessagePipe.this;
                kotlin.d.b.i.a((Object) messageEvent, "it");
                messagePipe.storeMessageFromEvent(messageEvent);
            }
        }).b((f<? super Throwable>) new f<Throwable>() { // from class: com.dotloop.mobile.service.MessagePipe$sendMessage$8
            @Override // io.reactivex.c.f
            public final void accept(Throwable th) {
                a.a(th);
            }
        });
        kotlin.d.b.i.a((Object) b2, "getMessageSenders().flat…oOnError { Timber.e(it) }");
        return b2;
    }

    public final void setConversationId(String str) {
        this.conversationId = str;
    }

    @Override // io.reactivex.j
    public void subscribe(final i<MessageEvent> iVar) {
        kotlin.d.b.i.b(iVar, "emitter");
        this.messageEmitter = iVar;
        this.compositeSubscription.a(connectToSources().c(new f<d>() { // from class: com.dotloop.mobile.service.MessagePipe$subscribe$disposable$1
            @Override // io.reactivex.c.f
            public final void accept(d dVar) {
                a.b("Started listening for messages in conversationId: " + MessagePipe.this.getConversationId(), new Object[0]);
            }
        }).a(new io.reactivex.c.a() { // from class: com.dotloop.mobile.service.MessagePipe$subscribe$disposable$2
            @Override // io.reactivex.c.a
            public final void run() {
                a.b("Stopped listening for messages in conversationId: " + MessagePipe.this.getConversationId(), new Object[0]);
            }
        }).a(new f<MessageEvent>() { // from class: com.dotloop.mobile.service.MessagePipe$subscribe$disposable$3
            @Override // io.reactivex.c.f
            public final void accept(MessageEvent messageEvent) {
                i.this.a((i) messageEvent);
            }
        }, new f<Throwable>() { // from class: com.dotloop.mobile.service.MessagePipe$subscribe$disposable$4
            @Override // io.reactivex.c.f
            public final void accept(Throwable th) {
                a.a(th);
            }
        }));
    }

    public final io.reactivex.b updateMessageStatus(final NewMessageStatusBatch newMessageStatusBatch) {
        kotlin.d.b.i.b(newMessageStatusBatch, "messageStatusBatch");
        io.reactivex.b e = getMessageSenders().e(new io.reactivex.c.g<MessageSender, io.reactivex.f>() { // from class: com.dotloop.mobile.service.MessagePipe$updateMessageStatus$1
            @Override // io.reactivex.c.g
            public final io.reactivex.b apply(MessageSender messageSender) {
                kotlin.d.b.i.b(messageSender, "sender");
                return messageSender.updateStatus(NewMessageStatusBatch.this);
            }
        });
        kotlin.d.b.i.a((Object) e, "getMessageSenders().flat…tus(messageStatusBatch) }");
        return e;
    }
}
