package monasca.thresh.infrastructure.thresholding;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import monasca.common.model.alarm.AlarmExpression;
import monasca.common.model.alarm.AlarmSubExpression;
import monasca.common.model.event.AlarmDefinitionCreatedEvent;
import monasca.common.model.event.AlarmDefinitionDeletedEvent;
import monasca.common.model.metric.Metric;
import monasca.common.streaming.storm.Logging;
import monasca.common.util.Injector;
import monasca.thresh.domain.model.Alarm;
import monasca.thresh.domain.model.AlarmDefinition;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.MetricDefinitionAndTenantIdMatcher;
import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.model.TenantIdAndMetricName;
import monasca.thresh.domain.service.AlarmDAO;
import monasca.thresh.domain.service.AlarmDefinitionDAO;
import monasca.thresh.infrastructure.persistence.PersistenceModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/thresh/infrastructure/thresholding/MetricFilteringBolt.class */
public class MetricFilteringBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1096706128973976599L;
    public static final String NEW_METRIC_FOR_ALARM_DEFINITION_STREAM = "newMetricForAlarmDefinitionStream";
    public static final int MIN_LAG_VALUE_DEFAULT = 10;
    public static final int MAX_LAG_MESSAGES_DEFAULT = 10;
    public static final int LAG_MESSAGE_PERIOD_DEFAULT = 30;
    private transient Logger logger;
    private DataSourceFactory dbConfig;
    private transient AlarmDAO alarmDAO;
    private transient AlarmDefinitionDAO alarmDefDAO;
    private OutputCollector collector;
    private long minLag = Long.MAX_VALUE;
    private long lastMinLagMessageSent = 0;
    private long minLagMessageSent = 0;
    private boolean lagging = true;
    public static final String[] NEW_METRIC_FOR_ALARM_DEFINITION_FIELDS = {"metricDefinitionAndTenantId", "alarmDefinitionId"};
    public static final String[] FIELDS = {"tenantIdAndMetricName", "metric"};
    public static final String MIN_LAG_VALUE_KEY = "monasca.thresh.filtering.minLagValue";
    private static final int MIN_LAG_VALUE = PropertyFinder.getIntProperty(MIN_LAG_VALUE_KEY, 10, 0, Integer.MAX_VALUE);
    public static final String MAX_LAG_MESSAGES_KEY = "monasca.thresh.filtering.maxLagMessages";
    private static final int MAX_LAG_MESSAGES = PropertyFinder.getIntProperty(MAX_LAG_MESSAGES_KEY, 10, 0, Integer.MAX_VALUE);
    public static final String LAG_MESSAGE_PERIOD_KEY = "monasca.thresh.filtering.lagMessagePeriod";
    private static final int LAG_MESSAGE_PERIOD = PropertyFinder.getIntProperty(LAG_MESSAGE_PERIOD_KEY, 30, 1, 600);
    private static final MetricDefinitionAndTenantIdMatcher matcher = new MetricDefinitionAndTenantIdMatcher();
    private static final ExistingHolder alreadyFound = new ExistingHolder();
    private static final Object SENTINAL = new Object();
    private static final Map<String, AlarmDefinition> alarmDefinitions = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:monasca/thresh/infrastructure/thresholding/MetricFilteringBolt$ExistingHolder.class */
    public static class ExistingHolder {
        private final Map<MetricDefinitionAndTenantId, Set<String>> metricDefs;
        private static final Map<String, List<MetricDefinitionAndTenantId>> usedMetrics = new ConcurrentHashMap();

        private ExistingHolder() {
            this.metricDefs = new ConcurrentHashMap();
        }

        public void add(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
            Set<String> set = this.metricDefs.get(metricDefinitionAndTenantId);
            if (set == null) {
                set = new HashSet();
                this.metricDefs.put(metricDefinitionAndTenantId, set);
            } else if (set.contains(str)) {
                return;
            }
            set.add(str);
            List<MetricDefinitionAndTenantId> list = usedMetrics.get(str);
            if (list == null) {
                list = new LinkedList();
                usedMetrics.put(str, list);
            }
            list.add(metricDefinitionAndTenantId);
        }

        public void removeAlarmDefinition(String str) {
            List<MetricDefinitionAndTenantId> list = usedMetrics.get(str);
            if (list != null) {
                Iterator<MetricDefinitionAndTenantId> it = list.iterator();
                while (it.hasNext()) {
                    removeFromMetricDefs(it.next(), str);
                }
                usedMetrics.remove(str);
            }
        }

        public void remove(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
            removeFromMetricDefs(metricDefinitionAndTenantId, str);
            List<MetricDefinitionAndTenantId> list = usedMetrics.get(str);
            if (list != null && list.remove(metricDefinitionAndTenantId) && list.isEmpty()) {
                usedMetrics.remove(str);
            }
        }

        private void removeFromMetricDefs(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
            Set<String> matches = matches(metricDefinitionAndTenantId);
            if (matches != null && matches.remove(str) && matches.isEmpty()) {
                this.metricDefs.remove(metricDefinitionAndTenantId);
            }
        }

        public Set<String> matches(MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
            return this.metricDefs.get(metricDefinitionAndTenantId);
        }

        public int size() {
            return this.metricDefs.size();
        }

        public boolean isEmpty() {
            return this.metricDefs.isEmpty();
        }

        public void clear() {
            this.metricDefs.clear();
            usedMetrics.clear();
        }
    }

    public MetricFilteringBolt(DataSourceFactory dataSourceFactory) {
        this.dbConfig = dataSourceFactory;
    }

    public MetricFilteringBolt(AlarmDefinitionDAO alarmDefinitionDAO, AlarmDAO alarmDAO) {
        this.alarmDefDAO = alarmDefinitionDAO;
        this.alarmDAO = alarmDAO;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(FIELDS));
        outputFieldsDeclarer.declareStream(NEW_METRIC_FOR_ALARM_DEFINITION_STREAM, new Fields(NEW_METRIC_FOR_ALARM_DEFINITION_FIELDS));
        outputFieldsDeclarer.declareStream(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Fields(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_FIELDS));
        outputFieldsDeclarer.declareStream(AlarmCreationBolt.ALARM_CREATION_STREAM, new Fields(AlarmCreationBolt.ALARM_CREATION_FIELDS));
    }

    public void execute(Tuple tuple) {
        this.logger.debug("tuple: {}", tuple);
        try {
            try {
                if ("default".equals(tuple.getSourceStreamId())) {
                    TenantIdAndMetricName tenantIdAndMetricName = (TenantIdAndMetricName) tuple.getValue(0);
                    Long l = (Long) tuple.getValue(1);
                    Metric metric = (Metric) tuple.getValue(2);
                    MetricDefinitionAndTenantId metricDefinitionAndTenantId = new MetricDefinitionAndTenantId(metric.definition(), tenantIdAndMetricName.getTenantId());
                    checkLag(l);
                    this.logger.debug("metric definition and tenant id: {}", metricDefinitionAndTenantId);
                    if (checkForMatch(metricDefinitionAndTenantId)) {
                        this.collector.emit(new Values(new Object[]{tenantIdAndMetricName, metric}));
                    }
                } else {
                    String string = tuple.getString(0);
                    this.logger.debug("Received {} on {}", string, tuple.getSourceStreamId());
                    if (EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
                        if (EventProcessingBolt.DELETED.equals(string)) {
                            removeAlarm((MetricDefinitionAndTenantId) tuple.getValue(2), tuple.getString(3));
                        }
                    } else if (EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
                        if (EventProcessingBolt.CREATED.equals(string)) {
                            synchronized (SENTINAL) {
                                AlarmDefinitionCreatedEvent alarmDefinitionCreatedEvent = (AlarmDefinitionCreatedEvent) tuple.getValue(1);
                                newAlarmDefinition(new AlarmDefinition(alarmDefinitionCreatedEvent.alarmDefinitionId, alarmDefinitionCreatedEvent.tenantId, alarmDefinitionCreatedEvent.alarmName, alarmDefinitionCreatedEvent.alarmDescription, new AlarmExpression(alarmDefinitionCreatedEvent.alarmExpression), "LOW", true, createSubExpressions(alarmDefinitionCreatedEvent.alarmSubExpressions), alarmDefinitionCreatedEvent.matchBy));
                            }
                        } else if (EventProcessingBolt.DELETED.equals(string)) {
                            deleteAlarmDefinition(((AlarmDefinitionDeletedEvent) tuple.getValue(1)).alarmDefinitionId);
                        }
                    }
                }
                this.collector.ack(tuple);
            } catch (Exception e) {
                this.logger.error("Error processing tuple {}", tuple, e);
                this.collector.ack(tuple);
            }
        } catch (Throwable th) {
            this.collector.ack(tuple);
            throw th;
        }
    }

    private List<SubExpression> createSubExpressions(Map<String, AlarmSubExpression> map) {
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<String, AlarmSubExpression> entry : map.entrySet()) {
            arrayList.add(new SubExpression(entry.getKey(), entry.getValue()));
        }
        return arrayList;
    }

    private void deleteAlarmDefinition(String str) {
        synchronized (SENTINAL) {
            AlarmDefinition alarmDefinition = alarmDefinitions.get(str);
            if (alarmDefinition != null) {
                this.logger.info("Deleting Alarm Definition {}", str);
                alarmDefinitions.remove(str);
                alreadyFound.removeAlarmDefinition(str);
                Iterator<MetricDefinitionAndTenantId> it = getAllMetricDefinitions(alarmDefinition).iterator();
                while (it.hasNext()) {
                    matcher.remove(it.next(), alarmDefinition.getId());
                }
            }
        }
    }

    private Set<MetricDefinitionAndTenantId> getAllMetricDefinitions(AlarmDefinition alarmDefinition) {
        HashSet hashSet = new HashSet(alarmDefinition.getAlarmExpression().getSubExpressions().size());
        Iterator<AlarmSubExpression> it = alarmDefinition.getAlarmExpression().getSubExpressions().iterator();
        while (it.hasNext()) {
            hashSet.add(new MetricDefinitionAndTenantId(it.next().getMetricDefinition(), alarmDefinition.getTenantId()));
        }
        return hashSet;
    }

    private void newAlarmDefinition(AlarmDefinition alarmDefinition) {
        alarmDefinitions.put(alarmDefinition.getId(), alarmDefinition);
        Iterator<MetricDefinitionAndTenantId> it = getAllMetricDefinitions(alarmDefinition).iterator();
        while (it.hasNext()) {
            matcher.add(it.next(), alarmDefinition.getId());
        }
    }

    private boolean checkForMatch(MetricDefinitionAndTenantId metricDefinitionAndTenantId) {
        Set<String> match = matcher.match(metricDefinitionAndTenantId);
        if (match.isEmpty()) {
            return false;
        }
        Set<String> matches = alreadyFound.matches(metricDefinitionAndTenantId);
        if (matches != null) {
            match.removeAll(matches);
        }
        if (match.isEmpty()) {
            return true;
        }
        for (String str : match) {
            this.logger.info("Add metric {} for Alarm Definition id = {} name = {}", metricDefinitionAndTenantId, str, alarmDefinitions.get(str).getName());
            this.collector.emit(NEW_METRIC_FOR_ALARM_DEFINITION_STREAM, new Values(new Object[]{metricDefinitionAndTenantId, str}));
            synchronized (SENTINAL) {
                alreadyFound.add(metricDefinitionAndTenantId, str);
            }
        }
        return true;
    }

    private void checkLag(Long l) {
        if (!this.lagging || l == null || l.longValue() == 0) {
            return;
        }
        long currentTime = getCurrentTime();
        long longValue = currentTime - l.longValue();
        if (longValue < this.minLag) {
            this.minLag = longValue;
        }
        if (this.minLag <= MIN_LAG_VALUE) {
            this.lagging = false;
            this.logger.info("Metrics no longer lagging, minLag = {}", Long.valueOf(this.minLag));
            return;
        }
        if (this.minLagMessageSent >= MAX_LAG_MESSAGES) {
            this.logger.info("Waited for {} seconds for Metrics to catch up. Giving up. minLag = {}", Integer.valueOf(MAX_LAG_MESSAGES * LAG_MESSAGE_PERIOD), Long.valueOf(this.minLag));
            this.lagging = false;
        } else if (this.lastMinLagMessageSent == 0) {
            this.lastMinLagMessageSent = currentTime;
        } else if (currentTime - this.lastMinLagMessageSent >= LAG_MESSAGE_PERIOD) {
            this.logger.info("Sending {} message, minLag = {}", MetricAggregationBolt.METRICS_BEHIND, Long.valueOf(this.minLag));
            this.collector.emit(MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM, new Values(new Object[]{MetricAggregationBolt.METRICS_BEHIND}));
            this.lastMinLagMessageSent = currentTime;
            this.minLagMessageSent++;
        }
    }

    private void removeAlarm(MetricDefinitionAndTenantId metricDefinitionAndTenantId, String str) {
        if (alarmDefinitions.get(str) != null) {
            synchronized (SENTINAL) {
                alreadyFound.remove(metricDefinitionAndTenantId, str);
            }
            this.logger.debug("Removed {} for Alarm Definition {}", metricDefinitionAndTenantId, str);
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), topologyContext));
        this.logger.info("Preparing");
        this.collector = outputCollector;
        if (this.alarmDefDAO == null) {
            Injector.registerIfNotBound(AlarmDefinitionDAO.class, new PersistenceModule(this.dbConfig));
            this.alarmDefDAO = (AlarmDefinitionDAO) Injector.getInstance(AlarmDefinitionDAO.class);
        }
        if (this.alarmDAO == null) {
            Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(this.dbConfig));
            this.alarmDAO = (AlarmDAO) Injector.getInstance(AlarmDAO.class);
        }
        if (alreadyFound.isEmpty()) {
            synchronized (SENTINAL) {
                if (alreadyFound.isEmpty()) {
                    Iterator<AlarmDefinition> it = this.alarmDefDAO.listAll().iterator();
                    while (it.hasNext()) {
                        newAlarmDefinition(it.next());
                    }
                    for (Alarm alarm : this.alarmDAO.listAll()) {
                        if (alarmDefinitions.get(alarm.getAlarmDefinitionId()) == null) {
                            this.logger.error("AlarmDefinition {} does not exist for Alarm {}, ignoring", alarm.getAlarmDefinitionId(), alarm.getId());
                        } else {
                            for (MetricDefinitionAndTenantId metricDefinitionAndTenantId : alarm.getAlarmedMetrics()) {
                                alreadyFound.add(metricDefinitionAndTenantId, alarm.getAlarmDefinitionId());
                                for (SubAlarm subAlarm : alarm.getSubAlarms()) {
                                    if (AlarmCreationBolt.metricFitsInAlarmSubExpr(subAlarm.getExpression(), metricDefinitionAndTenantId.metricDefinition)) {
                                        Values values = new Values(new Object[]{EventProcessingBolt.CREATED, new TenantIdAndMetricName(metricDefinitionAndTenantId), metricDefinitionAndTenantId, alarm.getAlarmDefinitionId(), subAlarm});
                                        this.logger.debug("Emitting new SubAlarm {}", values);
                                        outputCollector.emit(AlarmCreationBolt.ALARM_CREATION_STREAM, values);
                                    }
                                }
                            }
                        }
                    }
                    this.logger.info("Found {} Alarmed Metrics", Integer.valueOf(alreadyFound.size()));
                    this.logger.info("MIN_LAG_VALUE set to {} seconds", Integer.valueOf(MIN_LAG_VALUE));
                    this.logger.info("MAX_LAG_MESSAGES set to {}", Integer.valueOf(MAX_LAG_MESSAGES));
                    this.logger.info("LAG_MESSAGE_PERIOD set to {} seconds", Integer.valueOf(LAG_MESSAGE_PERIOD));
                }
            }
        }
        this.lastMinLagMessageSent = 0L;
    }

    protected long getCurrentTime() {
        return System.currentTimeMillis() / 1000;
    }

    public static void clearMetricDefinitions() {
        alreadyFound.clear();
        matcher.clear();
        alarmDefinitions.clear();
    }

    static int sizeMetricDefinitions() {
        return alreadyFound.size();
    }
}
