package monasca.thresh.infrastructure.thresholding;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import monasca.common.model.metric.Metric;
import monasca.common.streaming.storm.Logging;
import monasca.common.streaming.storm.Tuples;
import monasca.thresh.ThresholdingConfiguration;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubAlarmStats;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.model.TenantIdAndMetricName;
import monasca.thresh.domain.service.SubAlarmStatsRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/thresh/infrastructure/thresholding/MetricAggregationBolt.class */
public class MetricAggregationBolt extends BaseRichBolt {
    private static final long serialVersionUID = 5624314196838090726L;
    public static final String TICK_TUPLE_SECONDS_KEY = "monasca.thresh.aggregation.tick.seconds";
    public static final String METRIC_AGGREGATION_CONTROL_STREAM = "MetricAggregationControl";
    public static final String METRICS_BEHIND = "MetricsBehind";
    private final ThresholdingConfiguration config;
    private transient Logger logger;
    private OutputCollector collector;
    public static final String[] FIELDS = {"alarmId", "subAlarm"};
    public static final String[] METRIC_AGGREGATION_CONTROL_FIELDS = {"directive"};
    final Map<MetricDefinitionAndTenantId, SubAlarmStatsRepository> metricDefToSubAlarmStatsRepos = new HashMap();
    private final Set<SubAlarmStats> subAlarmStatsSet = new HashSet();
    private final Map<String, SubAlarmStats> subAlarmToSubAlarmStats = new HashMap();
    private Set<String> sporadicMetricNamespaces = Collections.emptySet();
    private boolean upToDate = true;

    public MetricAggregationBolt(ThresholdingConfiguration thresholdingConfiguration) {
        this.config = thresholdingConfiguration;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(FIELDS));
    }

    public void execute(Tuple tuple) {
        this.logger.debug("tuple: {}", tuple);
        try {
            try {
                if (Tuples.isTickTuple(tuple)) {
                    evaluateAlarmsAndSlideWindows();
                } else if ("default".equals(tuple.getSourceStreamId())) {
                    TenantIdAndMetricName tenantIdAndMetricName = (TenantIdAndMetricName) tuple.getValue(0);
                    Metric metric = (Metric) tuple.getValueByField("metric");
                    aggregateValues(new MetricDefinitionAndTenantId(metric.definition(), tenantIdAndMetricName.getTenantId()), metric);
                } else if (METRIC_AGGREGATION_CONTROL_STREAM.equals(tuple.getSourceStreamId())) {
                    processControl(tuple.getString(0));
                } else {
                    String string = tuple.getString(0);
                    if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
                        MetricDefinitionAndTenantId metricDefinitionAndTenantId = (MetricDefinitionAndTenantId) tuple.getValue(2);
                        String string2 = tuple.getString(4);
                        if (EventProcessingBolt.DELETED.equals(string)) {
                            handleAlarmDeleted(metricDefinitionAndTenantId, string2);
                        } else if (EventProcessingBolt.RESEND.equals(string)) {
                            handleAlarmResend(metricDefinitionAndTenantId, string2);
                        }
                    } else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
                        if (EventProcessingBolt.UPDATED.equals(string)) {
                            handleAlarmSubExpressionUpdated((SubExpression) tuple.getValue(1));
                        }
                    } else if (AlarmCreationBolt.ALARM_CREATION_STREAM.equals(tuple.getSourceStreamId())) {
                        MetricDefinitionAndTenantId metricDefinitionAndTenantId2 = (MetricDefinitionAndTenantId) tuple.getValue(2);
                        SubAlarm subAlarm = (SubAlarm) tuple.getValue(4);
                        if (EventProcessingBolt.CREATED.equals(string)) {
                            handleAlarmCreated(metricDefinitionAndTenantId2, subAlarm);
                        }
                    }
                }
                this.collector.ack(tuple);
            } catch (Exception e) {
                this.logger.error("Error processing tuple {}", tuple, e);
                this.collector.ack(tuple);
            }
        } catch (Throwable th) {
            this.collector.ack(tuple);
            throw th;
        }
    }

    private void processControl(String str) {
        if (!METRICS_BEHIND.equals(str)) {
            this.logger.error("Unknown directive '{}'", str);
        } else {
            this.logger.debug("Received {}", str);
            this.upToDate = false;
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        HashMap hashMap = new HashMap();
        hashMap.put("topology.tick.tuple.freq.secs", Integer.valueOf(Integer.valueOf(System.getProperty(TICK_TUPLE_SECONDS_KEY, "60")).intValue()));
        return hashMap;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), topologyContext));
        this.logger.info("Preparing");
        this.collector = outputCollector;
    }

    void aggregateValues(MetricDefinitionAndTenantId metricDefinitionAndTenantId, Metric metric) {
        SubAlarmStatsRepository orCreateSubAlarmStatsRepo = getOrCreateSubAlarmStatsRepo(metricDefinitionAndTenantId);
        if (orCreateSubAlarmStatsRepo == null || metric == null) {
            return;
        }
        for (SubAlarmStats subAlarmStats : orCreateSubAlarmStatsRepo.get()) {
            long j = metric.timestamp / 1000;
            if (subAlarmStats.getStats().addValue(metric.value, j)) {
                this.logger.trace("Aggregated value {} at {} for {}. Updated {}", Double.valueOf(metric.value), Long.valueOf(metric.timestamp), metricDefinitionAndTenantId, subAlarmStats.getStats());
            } else {
                this.logger.warn("Metric is too old, age {} seconds: timestamp {} for {}, {}", Long.valueOf(currentTimeSeconds() - j), Long.valueOf(j), metricDefinitionAndTenantId, subAlarmStats.getStats());
            }
        }
    }

    void evaluateAlarmsAndSlideWindows() {
        this.logger.debug("evaluateAlarmsAndSlideWindows called");
        long currentTimeSeconds = currentTimeSeconds();
        for (SubAlarmStats subAlarmStats : this.subAlarmStatsSet) {
            if (this.upToDate) {
                this.logger.debug("Evaluating {}", subAlarmStats);
                if (subAlarmStats.evaluateAndSlideWindow(currentTimeSeconds, this.config.alarmDelay.intValue())) {
                    this.logger.debug("Alarm state changed for {}", subAlarmStats);
                    this.collector.emit(new Values(new Object[]{subAlarmStats.getSubAlarm().getAlarmId(), subAlarmStats.getSubAlarm()}));
                }
            } else {
                subAlarmStats.slideWindow(currentTimeSeconds, this.config.alarmDelay.intValue());
            }
        }
        if (this.upToDate) {
            return;
        }
        this.logger.info("Did not evaluate SubAlarms because Metrics are not up to date");
        this.upToDate = true;
    }

    protected long currentTimeSeconds() {
        return System.currentTimeMillis() / 1000;
    }

    SubAlarmStatsRepository getOrCreateSubAlarmStatsRepo(MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
        SubAlarmStatsRepository subAlarmStatsRepository = this.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId);
        if (subAlarmStatsRepository == null) {
            this.logger.debug("Failed to find sub alarms for {}", metricDefinitionAndTenantId);
        }
        return subAlarmStatsRepository;
    }

    void handleAlarmCreated(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) {
        this.logger.info("Received AlarmCreatedEvent for {}", subAlarm);
        addSubAlarm(metricDefinitionAndTenantId, subAlarm);
    }

    void handleAlarmResend(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
        SubAlarmStats findExistingSubAlarmStats = findExistingSubAlarmStats(metricDefinitionAndTenantId, str);
        if (findExistingSubAlarmStats == null) {
            return;
        }
        SubAlarm subAlarm = findExistingSubAlarmStats.getSubAlarm();
        subAlarm.setNoState(true);
        this.logger.info("Forcing SubAlarm {} to send state at next evaluation", subAlarm);
    }

    private SubAlarmStats findExistingSubAlarmStats(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
        SubAlarmStatsRepository subAlarmStatsRepository = this.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId);
        if (subAlarmStatsRepository == null) {
            this.logger.error("Did not find SubAlarmStatsRepository for MetricDefinition {}", metricDefinitionAndTenantId);
            return null;
        }
        SubAlarmStats subAlarmStats = subAlarmStatsRepository.get(str);
        if (subAlarmStats != null) {
            return subAlarmStats;
        }
        this.logger.error("Did not find existing SubAlarm {} in SubAlarmStatsRepository", str);
        return null;
    }

    private void addSubAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, SubAlarm subAlarm) {
        SubAlarmStats subAlarmStats = this.subAlarmToSubAlarmStats.get(subAlarm.getId());
        if (subAlarmStats == null) {
            subAlarmStats = new SubAlarmStats(subAlarm, currentTimeSeconds() + subAlarm.getExpression().getPeriod());
            this.subAlarmToSubAlarmStats.put(subAlarm.getId(), subAlarmStats);
            this.subAlarmStatsSet.add(subAlarmStats);
        }
        SubAlarmStatsRepository subAlarmStatsRepository = this.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId);
        if (subAlarmStatsRepository == null) {
            subAlarmStatsRepository = new SubAlarmStatsRepository();
            this.metricDefToSubAlarmStatsRepos.put(metricDefinitionAndTenantId, subAlarmStatsRepository);
        }
        subAlarmStatsRepository.add(subAlarm.getId(), subAlarmStats);
    }

    protected boolean subAlarmRemoved(String str, MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
        if (this.subAlarmToSubAlarmStats.containsKey(str)) {
            return false;
        }
        if (this.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId) != null && this.metricDefToSubAlarmStatsRepos.containsKey(str)) {
            return false;
        }
        Iterator<SubAlarmStats> it = this.subAlarmStatsSet.iterator();
        while (it.hasNext()) {
            if (it.next().getSubAlarm().getId().equals(str)) {
                return false;
            }
        }
        return true;
    }

    void handleAlarmSubExpressionUpdated(SubExpression subExpression) {
        this.logger.debug("Processing SubExpression updated for {}", subExpression);
        int i = 0;
        for (SubAlarmStats subAlarmStats : this.subAlarmStatsSet) {
            if (subAlarmStats.getSubAlarm().getAlarmSubExpressionId().equals(subExpression.getId())) {
                subAlarmStats.updateSubAlarm(subExpression.getAlarmSubExpression(), currentTimeSeconds() + subExpression.getAlarmSubExpression().getPeriod());
                i++;
            }
        }
        this.logger.debug("Updated {} SubAlarms", Integer.valueOf(i));
    }

    void handleAlarmDeleted(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
        this.logger.debug("Received AlarmDeletedEvent for subAlarm id {}", str);
        SubAlarmStatsRepository subAlarmStatsRepository = this.metricDefToSubAlarmStatsRepos.get(metricDefinitionAndTenantId);
        if (subAlarmStatsRepository != null) {
            subAlarmStatsRepository.remove(str);
            if (subAlarmStatsRepository.isEmpty()) {
                this.metricDefToSubAlarmStatsRepos.remove(metricDefinitionAndTenantId);
            }
        }
        SubAlarmStats remove = this.subAlarmToSubAlarmStats.remove(str);
        if (remove != null) {
            this.subAlarmStatsSet.remove(remove);
        }
    }
}
