package monasca.thresh.infrastructure.thresholding;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import monasca.common.configuration.KafkaConsumerProperties;
import monasca.thresh.KafkaSpoutConfig;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseRichSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/thresh/infrastructure/thresholding/KafkaSpout.class */
public abstract class KafkaSpout extends BaseRichSpout implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSpout.class);
    private static final long serialVersionUID = 744004533863562119L;
    private final KafkaSpoutConfig kafkaSpoutConfig;
    private transient ConsumerConnector consumerConnector;
    private SpoutOutputCollector collector;
    private volatile boolean shouldContinue;
    private byte[] message;
    private Thread readerThread;
    private String spoutName;
    private transient List<KafkaStream<byte[], byte[]>> streams = null;
    private boolean waiting = false;

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaSpout(KafkaSpoutConfig kafkaSpoutConfig) {
        this.kafkaSpoutConfig = kafkaSpoutConfig;
    }

    public void activate() {
        logger.info("Activated");
        if (this.streams == null) {
            HashMap hashMap = new HashMap();
            hashMap.put(this.kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic(), new Integer(1));
            this.streams = (List) this.consumerConnector.createMessageStreams(hashMap).get(this.kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
        }
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        logger.info("Opened");
        this.collector = spoutOutputCollector;
        logger.info(" topic = " + this.kafkaSpoutConfig.kafkaConsumerConfiguration.getTopic());
        this.spoutName = String.format("%s-%d", topologyContext.getThisComponentId(), Integer.valueOf(topologyContext.getThisTaskId()));
        Properties createKafkaProperties = KafkaConsumerProperties.createKafkaProperties(this.kafkaSpoutConfig.kafkaConsumerConfiguration);
        createKafkaProperties.setProperty("consumer.id", String.valueOf(topologyContext.getThisTaskId()));
        this.consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(createKafkaProperties));
    }

    public synchronized void deactivate() {
        logger.info("deactivated");
        this.consumerConnector.shutdown();
        this.shouldContinue = false;
        notify();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.shouldContinue) {
            ConsumerIterator it = this.streams.get(0).iterator();
            if (it.hasNext()) {
                byte[] bArr = (byte[]) it.next().message();
                synchronized (this) {
                    this.message = bArr;
                    if (this.waiting) {
                        notify();
                    }
                    while (this.message != null && this.shouldContinue) {
                        try {
                            wait();
                        } catch (InterruptedException e) {
                            logger.info("Wait interrupted", e);
                        }
                    }
                }
            }
        }
        logger.info("readerThread {} exited", this.readerThread.getName());
        this.readerThread = null;
    }

    public void nextTuple() {
        logger.debug("nextTuple called");
        checkReaderRunning();
        byte[] message = getMessage();
        if (message != null) {
            logger.debug("streams iterator has next");
            processMessage(message, this.collector);
        }
    }

    private void checkReaderRunning() {
        this.shouldContinue = true;
        if (this.readerThread == null) {
            this.readerThread = new Thread(this, String.format("%s reader", this.spoutName));
            this.readerThread.start();
            logger.info("Started Reader Thread {}", this.readerThread.getName());
        }
    }

    private byte[] tryToGetMessage() {
        byte[] bArr = this.message;
        if (bArr != null) {
            this.message = null;
            notify();
        }
        return bArr;
    }

    private synchronized byte[] getMessage() {
        byte[] tryToGetMessage = tryToGetMessage();
        if (tryToGetMessage != null) {
            return tryToGetMessage;
        }
        this.waiting = true;
        try {
            wait(this.kafkaSpoutConfig.maxWaitTime.intValue());
        } catch (InterruptedException e) {
            logger.info("Sleep interrupted", e);
        }
        this.waiting = false;
        return tryToGetMessage();
    }

    protected abstract void processMessage(byte[] bArr, SpoutOutputCollector spoutOutputCollector);
}
