package monasca.persister.consumer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/persister/consumer/KafkaConsumer.class */
public abstract class KafkaConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final int WAIT_TIME = 10;
    private ExecutorService executorService;
    private final KafkaChannel kafkaChannel;
    private final int threadNum;
    private KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;

    public KafkaConsumer(KafkaChannel kafkaChannel, int i) {
        this.kafkaChannel = kafkaChannel;
        this.threadNum = i;
    }

    protected abstract KafkaConsumerRunnableBasic<T> createRunnable(KafkaChannel kafkaChannel, int i);

    public void start() {
        this.executorService = Executors.newFixedThreadPool(1);
        this.executorService.submit(createRunnable(this.kafkaChannel, this.threadNum));
    }

    public void stop() {
        this.kafkaConsumerRunnableBasic.stop();
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    logger.warn("Did not shut down in {} seconds", Integer.valueOf(WAIT_TIME));
                }
            } catch (InterruptedException e) {
                logger.info("awaitTerminiation interrupted", e);
            }
        }
    }
}
