package kafka.controller;

import java.util.concurrent.BlockingQueue;
import kafka.api.LeaderAndIsrResponse$;
import kafka.api.RequestKeys$;
import kafka.api.RequestOrResponse;
import kafka.api.StopReplicaResponse$;
import kafka.api.UpdateMetadataResponse$;
import kafka.network.BlockingChannel;
import kafka.network.Receive;
import kafka.utils.ShutdownableThread;
import kafka.utils.ShutdownableThread$;
import org.apache.log4j.Logger;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: ControllerChannelManager.scala */
@ScalaSignature(bytes = "\u0006\u0001a4A!\u0001\u0002\u0001\u000f\t\t\"+Z9vKN$8+\u001a8e)\"\u0014X-\u00193\u000b\u0005\r!\u0011AC2p]R\u0014x\u000e\u001c7fe*\tQ!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0007\u0001Aa\u0002\u0005\u0002\n\u00195\t!B\u0003\u0002\f\t\u0005)Q\u000f^5mg&\u0011QB\u0003\u0002\u0013'\",H\u000fZ8x]\u0006\u0014G.\u001a+ie\u0016\fG\r\u0005\u0002\u0010%5\t\u0001CC\u0001\u0012\u0003\u0015\u00198-\u00197b\u0013\t\u0019\u0002CA\u0006TG\u0006d\u0017m\u00142kK\u000e$\b\u0002C\u000b\u0001\u0005\u000b\u0007I\u0011\u0001\f\u0002\u0019\r|g\u000e\u001e:pY2,'/\u00133\u0016\u0003]\u0001\"a\u0004\r\n\u0005e\u0001\"aA%oi\"A1\u0004\u0001B\u0001B\u0003%q#A\u0007d_:$(o\u001c7mKJLE\r\t\u0005\t;\u0001\u0011)\u0019!C\u0001=\u0005\t2m\u001c8ue>dG.\u001a:D_:$X\r\u001f;\u0016\u0003}\u0001\"\u0001I\u0011\u000e\u0003\tI!A\t\u0002\u0003#\r{g\u000e\u001e:pY2,'oQ8oi\u0016DH\u000f\u0003\u0005%\u0001\t\u0005\t\u0015!\u0003 \u0003I\u0019wN\u001c;s_2dWM]\"p]R,\u0007\u0010\u001e\u0011\t\u0011\u0019\u0002!Q1A\u0005\u0002Y\t!\u0002^8Ce>\\WM]%e\u0011!A\u0003A!A!\u0002\u00139\u0012a\u0003;p\u0005J|7.\u001a:JI\u0002B\u0001B\u000b\u0001\u0003\u0006\u0004%\taK\u0001\u0006cV,W/Z\u000b\u0002YA\u0019Q\u0006\u000e\u001c\u000e\u00039R!a\f\u0019\u0002\u0015\r|gnY;se\u0016tGO\u0003\u00022e\u0005!Q\u000f^5m\u0015\u0005\u0019\u0014\u0001\u00026bm\u0006L!!\u000e\u0018\u0003\u001b\tcwnY6j]\u001e\fV/Z;f!\u0011yq'O \n\u0005a\u0002\"A\u0002+va2,'\u0007\u0005\u0002;{5\t1H\u0003\u0002=\t\u0005\u0019\u0011\r]5\n\u0005yZ$!\u0005*fcV,7\u000f^(s%\u0016\u001c\bo\u001c8tKB!q\u0002Q\u001dC\u0013\t\t\u0005CA\u0005Gk:\u001cG/[8ocA\u0011qbQ\u0005\u0003\tB\u0011A!\u00168ji\"Aa\t\u0001B\u0001B\u0003%A&\u0001\u0004rk\u0016,X\r\t\u0005\t\u0011\u0002\u0011)\u0019!C\u0001\u0013\u000691\r[1o]\u0016dW#\u0001&\u0011\u0005-sU\"\u0001'\u000b\u00055#\u0011a\u00028fi^|'o[\u0005\u0003\u001f2\u0013qB\u00117pG.LgnZ\"iC:tW\r\u001c\u0005\t#\u0002\u0011\t\u0011)A\u0005\u0015\u0006A1\r[1o]\u0016d\u0007\u0005C\u0003T\u0001\u0011\u0005A+\u0001\u0004=S:LGO\u0010\u000b\u0007+Z;\u0006,\u0017.\u0011\u0005\u0001\u0002\u0001\"B\u000bS\u0001\u00049\u0002\"B\u000fS\u0001\u0004y\u0002\"\u0002\u0014S\u0001\u00049\u0002\"\u0002\u0016S\u0001\u0004a\u0003\"\u0002%S\u0001\u0004Q\u0005b\u0002/\u0001\u0005\u0004%I!X\u0001\u0005Y>\u001c7.F\u0001_!\ty&-D\u0001a\u0015\t\t''\u0001\u0003mC:<\u0017BA2a\u0005\u0019y%M[3di\"1Q\r\u0001Q\u0001\ny\u000bQ\u0001\\8dW\u0002Bqa\u001a\u0001C\u0002\u0013%\u0001.A\tti\u0006$Xm\u00115b]\u001e,Gj\\4hKJ,\u0012!\u001b\t\u0003UFl\u0011a\u001b\u0006\u0003Y6\fQ\u0001\\8hi)T!A\\8\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0001\u0018aA8sO&\u0011!o\u001b\u0002\u0007\u0019><w-\u001a:\t\rQ\u0004\u0001\u0015!\u0003j\u0003I\u0019H/\u0019;f\u0007\"\fgnZ3M_\u001e<WM\u001d\u0011\t\u000bY\u0004A\u0011I<\u0002\r\u0011|wk\u001c:l)\u0005\u0011\u0005")
/* loaded from: input_file:kafka/controller/RequestSendThread.class */
public class RequestSendThread extends ShutdownableThread implements ScalaObject {
    private final int controllerId;
    private final ControllerContext controllerContext;
    private final int toBrokerId;
    private final BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue;
    private final BlockingChannel channel;
    private final Object lock;
    private final Logger stateChangeLogger;

    public int controllerId() {
        return this.controllerId;
    }

    public ControllerContext controllerContext() {
        return this.controllerContext;
    }

    public int toBrokerId() {
        return this.toBrokerId;
    }

    public BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue() {
        return this.queue;
    }

    public BlockingChannel channel() {
        return this.channel;
    }

    private Object lock() {
        return this.lock;
    }

    private Logger stateChangeLogger() {
        return this.stateChangeLogger;
    }

    /* JADX WARN: Type inference failed for: r0v16, types: [java.lang.Throwable, java.lang.Object] */
    @Override // kafka.utils.ShutdownableThread
    public void doWork() {
        RequestOrResponse readFrom;
        Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>> take = queue().take();
        RequestOrResponse mo3267_1 = take.mo3267_1();
        Function1<RequestOrResponse, BoxedUnit> mo3266_2 = take.mo3266_2();
        try {
            synchronized (lock()) {
                channel().connect();
                channel().send(mo3267_1);
                Receive receive = channel().receive();
                short unboxToShort = BoxesRunTime.unboxToShort(mo3267_1.requestId().get());
                if (BoxesRunTime.equals(BoxesRunTime.boxToShort(RequestKeys$.MODULE$.LeaderAndIsrKey()), BoxesRunTime.boxToShort(unboxToShort))) {
                    readFrom = LeaderAndIsrResponse$.MODULE$.readFrom(receive.buffer());
                } else if (BoxesRunTime.equals(BoxesRunTime.boxToShort(RequestKeys$.MODULE$.StopReplicaKey()), BoxesRunTime.boxToShort(unboxToShort))) {
                    readFrom = StopReplicaResponse$.MODULE$.readFrom(receive.buffer());
                } else {
                    if (!BoxesRunTime.equals(BoxesRunTime.boxToShort(RequestKeys$.MODULE$.UpdateMetadataKey()), BoxesRunTime.boxToShort(unboxToShort))) {
                        throw new MatchError(BoxesRunTime.boxToShort(unboxToShort));
                    }
                    readFrom = UpdateMetadataResponse$.MODULE$.readFrom(receive.buffer());
                }
                stateChangeLogger().trace(Predef$.MODULE$.augmentString("Controller %d epoch %d received response correlationId %d for a request sent to broker %d").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(controllerId()), BoxesRunTime.boxToInteger(controllerContext().epoch()), BoxesRunTime.boxToInteger(readFrom.correlationId()), BoxesRunTime.boxToInteger(toBrokerId())})));
                if (mo3266_2 == null) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    mo3266_2.mo1213apply(readFrom);
                }
            }
        } catch (Throwable th) {
            warn(new RequestSendThread$$anonfun$doWork$1(this), new RequestSendThread$$anonfun$doWork$2(this, th));
            channel().disconnect();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public RequestSendThread(int i, ControllerContext controllerContext, int i2, BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> blockingQueue, BlockingChannel blockingChannel) {
        super(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(i), BoxesRunTime.boxToInteger(i2)})), ShutdownableThread$.MODULE$.init$default$2());
        this.controllerId = i;
        this.controllerContext = controllerContext;
        this.toBrokerId = i2;
        this.queue = blockingQueue;
        this.channel = blockingChannel;
        this.lock = new Object();
        this.stateChangeLogger = Logger.getLogger(KafkaController$.MODULE$.stateChangeLogger());
    }
}
