package monasca.common.messaging.kafka;

import com.codahale.metrics.health.HealthCheck;
import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/* loaded from: input_file:monasca/common/messaging/kafka/KafkaHealthCheck.class */
public class KafkaHealthCheck extends HealthCheck {
    private final KafkaConfiguration config;

    public KafkaHealthCheck(KafkaConfiguration kafkaConfiguration) {
        this.config = kafkaConfiguration;
    }

    protected HealthCheck.Result check() throws Exception {
        Producer<String, String> producer = null;
        ConsumerConnector consumerConnector = null;
        ExecutorService executorService = null;
        try {
            try {
                producer = createProducer();
                consumerConnector = createConsumer();
                producer.send(new KeyedMessage(this.config.healthCheckTopic, (Object) null, "test"));
                HashMap hashMap = new HashMap();
                hashMap.put(this.config.healthCheckTopic, 1);
                final ConsumerIterator it = ((KafkaStream) ((List) consumerConnector.createMessageStreams(hashMap).get(this.config.healthCheckTopic)).get(0)).iterator();
                final Thread currentThread = Thread.currentThread();
                executorService = Executors.newSingleThreadExecutor();
                executorService.execute(new Runnable() { // from class: monasca.common.messaging.kafka.KafkaHealthCheck.1
                    @Override // java.lang.Runnable
                    public void run() {
                        while (it.hasNext()) {
                            String str = new String((byte[]) it.next().message());
                            System.out.println("Received " + str);
                            if (str.equals("test")) {
                                currentThread.interrupt();
                                return;
                            }
                        }
                    }
                });
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                }
                HealthCheck.Result healthy = HealthCheck.Result.healthy();
                if (executorService != null) {
                    executorService.shutdownNow();
                }
                if (producer != null) {
                    try {
                        producer.close();
                    } catch (Exception e2) {
                    }
                }
                if (consumerConnector != null) {
                    consumerConnector.commitOffsets();
                    try {
                        consumerConnector.shutdown();
                    } catch (Exception e3) {
                    }
                }
                return healthy;
            } catch (Exception e4) {
                HealthCheck.Result unhealthy = HealthCheck.Result.unhealthy(e4);
                if (executorService != null) {
                    executorService.shutdownNow();
                }
                if (producer != null) {
                    try {
                        producer.close();
                    } catch (Exception e5) {
                    }
                }
                if (consumerConnector != null) {
                    consumerConnector.commitOffsets();
                    try {
                        consumerConnector.shutdown();
                    } catch (Exception e6) {
                    }
                }
                return unhealthy;
            }
        } catch (Throwable th) {
            if (executorService != null) {
                executorService.shutdownNow();
            }
            if (producer != null) {
                try {
                    producer.close();
                } catch (Exception e7) {
                }
            }
            if (consumerConnector != null) {
                consumerConnector.commitOffsets();
                try {
                    consumerConnector.shutdown();
                } catch (Exception e8) {
                }
            }
            throw th;
        }
    }

    Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", Joiner.on(',').join(this.config.brokerUris));
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
        return new Producer<>(new ProducerConfig(properties));
    }

    ConsumerConnector createConsumer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", Joiner.on(',').join(this.config.zookeeperUris));
        properties.put("group.id", "test");
        properties.put("auto.offset.reset", "largest");
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }
}
