package monasca.persister.pipeline.event;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PipelineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/persister/pipeline/event/FlushableHandler.class */
public abstract class FlushableHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
    private final int ordinal;
    private final int batchSize;
    private final String handlerName;
    private final long millisBetweenFlushes;
    private final int secondsBetweenFlushes;
    private final Environment environment;
    private final Meter processedMeter;
    private final Meter commitMeter;
    private final Timer commitTimer;
    private long millisSinceLastFlush = System.currentTimeMillis();
    private int eventCount = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    public FlushableHandler(PipelineConfiguration pipelineConfiguration, Environment environment, int i, int i2, String str) {
        this.handlerName = String.format("%s[%d]", str, Integer.valueOf(i));
        this.environment = environment;
        this.processedMeter = this.environment.metrics().meter(this.handlerName + ".events-processed-processedMeter");
        this.commitMeter = this.environment.metrics().meter(this.handlerName + ".commits-executed-processedMeter");
        this.commitTimer = this.environment.metrics().timer(this.handlerName + ".total-commit-and-flush-timer");
        this.secondsBetweenFlushes = pipelineConfiguration.getMaxBatchTime().intValue();
        this.millisBetweenFlushes = this.secondsBetweenFlushes * 1000;
        this.ordinal = i;
        this.batchSize = i2;
    }

    protected abstract void flushRepository();

    protected abstract int process(T t) throws Exception;

    public boolean onEvent(T t) throws Exception {
        if (t != null) {
            this.processedMeter.mark();
            logger.debug("Ordinal:  Event: {}", Integer.valueOf(this.ordinal), t);
            this.eventCount += process(t);
            if (this.eventCount < this.batchSize) {
                return false;
            }
            flush();
            return true;
        }
        long j = this.millisSinceLastFlush + this.millisBetweenFlushes;
        logger.debug("{} received heartbeat message, flush every {} seconds.", this.handlerName, Integer.valueOf(this.secondsBetweenFlushes));
        if (j >= System.currentTimeMillis()) {
            logger.debug("{}: {} seconds since last flush. No need to flush at this time.", this.handlerName, Long.valueOf(j));
            return false;
        }
        logger.debug("{}: {} seconds since last flush. Flushing to repository now.", this.handlerName, Long.valueOf(j));
        flush();
        return true;
    }

    public void flush() {
        if (this.eventCount == 0) {
            logger.debug("{}: Nothing to flush", this.handlerName);
        }
        Timer.Context time = this.commitTimer.time();
        flushRepository();
        time.stop();
        this.commitMeter.mark();
        this.millisSinceLastFlush = System.currentTimeMillis();
        logger.debug("{}: Flushed {} events", this.handlerName, Integer.valueOf(this.eventCount));
        this.eventCount = 0;
    }
}
