package monasca.persister.pipeline.event;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.RepoException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/persister/pipeline/event/FlushableHandler.class */
public abstract class FlushableHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
    private final int batchSize;
    private final long millisBetweenFlushes;
    private final int secondsBetweenFlushes;
    private final Meter processedMeter;
    private final Meter flushMeter;
    private final Timer flushTimer;
    protected final String threadId;
    protected final String handlerName;
    private long flushTimeMillis = System.currentTimeMillis();
    private int msgCount = 0;
    private long batchCount = 0;
    protected ObjectMapper objectMapper = new ObjectMapper();

    /* JADX INFO: Access modifiers changed from: protected */
    public FlushableHandler(PipelineConfig pipelineConfig, Environment environment, String str, int i) {
        this.threadId = str;
        this.handlerName = String.format("%s[%s]", getClass().getName(), str);
        this.processedMeter = environment.metrics().meter(this.handlerName + ".events-processed-meter");
        this.flushMeter = environment.metrics().meter(this.handlerName + ".flush-meter");
        this.flushTimer = environment.metrics().timer(this.handlerName + ".flush-timer");
        this.secondsBetweenFlushes = pipelineConfig.getMaxBatchTime().intValue();
        this.millisBetweenFlushes = this.secondsBetweenFlushes * 1000;
        this.batchSize = i;
        initObjectMapper();
    }

    protected abstract void initObjectMapper();

    protected abstract int flushRepository() throws RepoException;

    protected abstract int process(String str);

    public boolean onEvent(String str) throws RepoException {
        if (str == null) {
            return isFlushTime() && flush() > 0;
        }
        this.msgCount += process(str);
        this.processedMeter.mark();
        return isBatchSize() && flush() > 0;
    }

    private boolean isBatchSize() {
        logger.debug("[{}]: checking batch size", this.threadId);
        if (this.msgCount >= this.batchSize) {
            logger.debug("[{}]: batch sized {} attained", this.threadId, Integer.valueOf(this.batchSize));
            return true;
        }
        logger.debug("[{}]: batch size now at {}, batch size {} not attained", this.threadId, Integer.valueOf(this.msgCount), Integer.valueOf(this.batchSize));
        return false;
    }

    private boolean isFlushTime() {
        logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.", this.threadId, Integer.valueOf(this.secondsBetweenFlushes));
        long currentTimeMillis = System.currentTimeMillis();
        if (this.flushTimeMillis <= currentTimeMillis) {
            logger.debug("[{}]: {} ms past flush time. flushing to repository now.", this.threadId, Long.valueOf(currentTimeMillis - this.flushTimeMillis));
            return true;
        }
        logger.debug("[{}]: {} ms to next flush time. no need to flush at this time.", this.threadId, Long.valueOf(this.flushTimeMillis - currentTimeMillis));
        return false;
    }

    public int flush() throws RepoException {
        logger.debug("[{}]: flushing", this.threadId);
        Timer.Context time = this.flushTimer.time();
        int flushRepository = flushRepository();
        time.stop();
        this.flushMeter.mark();
        this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
        logger.debug("[{}]: flushed {} msg", this.threadId, Integer.valueOf(flushRepository));
        this.msgCount = 0;
        this.batchCount++;
        return flushRepository;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getBatchCount() {
        return this.batchCount;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getMsgCount() {
        return this.msgCount;
    }
}
