/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.command.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.amqp.AbstractRequestResponseServiceClient;
import org.eclipse.hono.client.amqp.RequestResponseClient;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.amqp.connection.SendMessageSampler;
import org.eclipse.hono.client.command.CommandRouterClient;
import org.eclipse.hono.client.util.CachingClientFactory;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CacheDirective;
import org.eclipse.hono.util.CommandRouterConstants;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.RequestResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtonBasedCommandRouterClient
extends AbstractRequestResponseServiceClient<JsonObject, RequestResponseResult<JsonObject>>
implements CommandRouterClient {
    protected static final long SET_LAST_KNOWN_GATEWAY_UPDATE_INTERVAL_MILLIS = 400L;
    protected static final int SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_ENTRIES = 100;
    protected static final int SET_LAST_KNOWN_GATEWAY_UPDATE_MAX_PARALLEL_REQ = 50;
    private static final Logger LOG = LoggerFactory.getLogger(ProtonBasedCommandRouterClient.class);
    private final LinkedHashMap<Pair<String, String>, String> lastKnownGatewaysWorkQueue = new LinkedHashMap();
    private Long lastKnownGatewaysUpdateTimerId;
    private boolean stopped = false;
    private Clock clock = Clock.systemUTC();

    public ProtonBasedCommandRouterClient(HonoConnection connection, SendMessageSampler.Factory samplerFactory) {
        super(connection, samplerFactory, new CachingClientFactory(connection.getVertx(), RequestResponseClient::isOpen), null);
        connection.getVertx().eventBus().consumer("tenant.timeout", x$0 -> this.handleTenantTimeout((Message<Object>)x$0));
    }

    void setClock(Clock clock) {
        this.clock = Objects.requireNonNull(clock);
    }

    @Override
    public Future<Void> stop() {
        this.stopped = true;
        Optional.ofNullable(this.lastKnownGatewaysUpdateTimerId).ifPresent(tid -> this.connection.getVertx().cancelTimer((long)tid));
        return this.disconnectOnStop();
    }

    @Override
    protected String getKey(String tenantId) {
        return String.format("%s-%s", "cmd_router", tenantId);
    }

    private Future<RequestResponseClient<RequestResponseResult<JsonObject>>> getOrCreateClient(String tenantId) {
        return this.connection.isConnected(this.getDefaultConnectionCheckTimeout()).compose(v -> this.connection.executeOnContext(result -> this.clientFactory.getOrCreateClient(this.getKey(tenantId), () -> RequestResponseClient.forEndpoint(this.connection, "cmd_router", tenantId, this.samplerFactory.create("cmd_router"), x$0 -> this.removeClient((String)x$0), x$0 -> this.removeClient((String)x$0)), result)));
    }

    @Override
    protected final RequestResponseResult<JsonObject> getResult(int status, String contentType, Buffer payload, CacheDirective cacheDirective, ApplicationProperties applicationProperties) {
        Map props = Optional.ofNullable(applicationProperties).map(ApplicationProperties::getValue).orElse(null);
        if (payload == null) {
            return new RequestResponseResult<Object>(status, null, null, props);
        }
        try {
            return new RequestResponseResult<JsonObject>(status, new JsonObject(payload), CacheDirective.noCacheDirective(), props);
        }
        catch (DecodeException e) {
            LOG.warn("received malformed payload from Command Router service", e);
            return new RequestResponseResult<Object>(500, null, null, props);
        }
    }

    @Override
    public Future<Void> setLastKnownGatewayForDevice(String tenantId, String deviceId, String gatewayId, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(gatewayId);
        this.lastKnownGatewaysWorkQueue.put(Pair.of(tenantId, deviceId), gatewayId);
        if (this.lastKnownGatewaysUpdateTimerId == null && !this.stopped) {
            this.lastKnownGatewaysUpdateTimerId = this.connection.getVertx().setTimer(400L, tid -> this.processLastKnownGatewaysWorkQueue(null, null, null));
        }
        return Future.succeededFuture();
    }

    private void processLastKnownGatewaysWorkQueue(Instant processingStartParam, Set<String> tenantsToProcess, Span spanParam) {
        this.log.debug("processLastKnownGatewaysWorkQueue; queue size: {}", (Object)this.lastKnownGatewaysWorkQueue.size());
        Instant processingStart = Optional.ofNullable(processingStartParam).orElseGet(() -> Instant.now(this.clock));
        Span currentSpan = Optional.ofNullable(spanParam).orElseGet(() -> this.newFollowingSpan(null, "set last known gateways"));
        if (tenantsToProcess == null) {
            currentSpan.log(Map.of("no_of_device_entries_to_set", this.lastKnownGatewaysWorkQueue.size()));
        }
        LinkedHashMap<String, Map> deviceToGatewayMapPerTenant = new LinkedHashMap<String, Map>();
        HashSet<String> tenantsWithRequestLimitReached = new HashSet<String>();
        Iterator<Map.Entry<Pair<String, String>, String>> iter = this.lastKnownGatewaysWorkQueue.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<Pair<String, String>, String> tenantDeviceToGatewayEntry = iter.next();
            String tenantId2 = tenantDeviceToGatewayEntry.getKey().one();
            if (tenantsToProcess != null && !tenantsToProcess.contains(tenantId2)) continue;
            if (!deviceToGatewayMapPerTenant.containsKey(tenantId2) && deviceToGatewayMapPerTenant.size() >= 50) {
                tenantsWithRequestLimitReached.add(tenantId2);
                continue;
            }
            deviceToGatewayMapPerTenant.putIfAbsent(tenantId2, new HashMap());
            Map deviceToGatewayMap2 = (Map)deviceToGatewayMapPerTenant.get(tenantId2);
            if (deviceToGatewayMap2.size() < 100) {
                deviceToGatewayMap2.put(tenantDeviceToGatewayEntry.getKey().two(), tenantDeviceToGatewayEntry.getValue());
                iter.remove();
                continue;
            }
            tenantsWithRequestLimitReached.add(tenantId2);
        }
        ArrayList resultFutures = new ArrayList();
        deviceToGatewayMapPerTenant.forEach((tenantId, deviceToGatewayMap) -> resultFutures.add(this.setLastKnownGateways((String)tenantId, (Map<String, String>)deviceToGatewayMap, currentSpan.context())));
        Future.join(resultFutures).onComplete(ar -> {
            if (ar.failed()) {
                TracingHelper.logError(currentSpan, ar.cause());
            }
            if (this.stopped) {
                currentSpan.finish();
            } else if (!this.lastKnownGatewaysWorkQueue.isEmpty()) {
                long millisToWaitForNextInvocation = 400L - Duration.between(processingStart, Instant.now(this.clock)).toMillis();
                if (millisToWaitForNextInvocation < 1L) {
                    if (!tenantsWithRequestLimitReached.isEmpty()) {
                        currentSpan.log(String.format("still remaining entries to be set for %d tenants - will be handled in next overall run", tenantsWithRequestLimitReached.size()));
                        this.log.info("processLastKnownGatewaysWorkQueue: not all entries could be set during update interval; current queue size: {}", (Object)this.lastKnownGatewaysWorkQueue.size());
                    }
                    currentSpan.finish();
                    this.processLastKnownGatewaysWorkQueue(null, null, null);
                } else if (!tenantsWithRequestLimitReached.isEmpty()) {
                    currentSpan.log(String.format("starting another round of requests for %d tenants (request size/count limit was reached)", tenantsWithRequestLimitReached.size()));
                    this.processLastKnownGatewaysWorkQueue(processingStart, tenantsWithRequestLimitReached, currentSpan);
                } else {
                    this.log.debug("schedule next processLastKnownGatewaysWorkQueue invocation in {}ms", (Object)millisToWaitForNextInvocation);
                    currentSpan.finish();
                    this.lastKnownGatewaysUpdateTimerId = this.connection.getVertx().setTimer(millisToWaitForNextInvocation, tid -> this.processLastKnownGatewaysWorkQueue(null, null, null));
                }
            } else {
                currentSpan.finish();
                this.lastKnownGatewaysUpdateTimerId = null;
            }
        });
    }

    protected Future<Void> setLastKnownGateways(String tenantId, Map<String, String> deviceToGatewayMap, SpanContext context) {
        Future resultTracker;
        Span currentSpan;
        if (deviceToGatewayMap.size() == 1) {
            Map.Entry<String, String> entry = deviceToGatewayMap.entrySet().iterator().next();
            String deviceId = entry.getKey();
            String gatewayId = entry.getValue();
            Map<String, Object> properties = this.createDeviceIdProperties(deviceId);
            properties.put("gateway_id", gatewayId);
            currentSpan = this.newChildSpan(context, "set last known gateway for device");
            TracingHelper.setDeviceTags(currentSpan, tenantId, deviceId);
            currentSpan.setTag("gateway_id", gatewayId);
            resultTracker = this.getOrCreateClient(tenantId).compose(client -> client.createAndSendRequest(CommandRouterConstants.CommandRouterAction.SET_LAST_KNOWN_GATEWAY.getSubject(), properties, null, null, x$0 -> this.getRequestResponseResult((org.apache.qpid.proton.message.Message)x$0), currentSpan));
        } else {
            currentSpan = this.newChildSpan(context, "set last known gateways for tenant devices");
            TracingHelper.setDeviceTags(currentSpan, tenantId, null);
            currentSpan.log(Map.of("no_of_entries", deviceToGatewayMap.size()));
            JsonObject payload = new JsonObject();
            deviceToGatewayMap.forEach(payload::put);
            resultTracker = this.getOrCreateClient(tenantId).compose(client -> client.createAndSendRequest(CommandRouterConstants.CommandRouterAction.SET_LAST_KNOWN_GATEWAY.getSubject(), null, payload.toBuffer(), "application/json", x$0 -> this.getRequestResponseResult((org.apache.qpid.proton.message.Message)x$0), currentSpan));
        }
        return this.mapResultAndFinishSpan(resultTracker, result -> {
            switch (result.getStatus()) {
                case 204: {
                    return null;
                }
            }
            throw StatusCodeMapper.from(result);
        }, currentSpan).onFailure(thr -> this.log.debug("failed to set last known gateway(s) for tenant [{}]", (Object)tenantId, thr)).mapEmpty();
    }

    @Override
    public Future<Void> registerCommandConsumer(String tenantId, String deviceId, boolean sendEvent, String adapterInstanceId, Duration lifespan, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(adapterInstanceId);
        int lifespanSeconds = lifespan != null && lifespan.getSeconds() <= Integer.MAX_VALUE ? (int)lifespan.getSeconds() : -1;
        Map<String, Object> properties = this.createDeviceIdProperties(deviceId);
        properties.put("adapter_instance_id", adapterInstanceId);
        properties.put("lifespan", lifespanSeconds);
        Span currentSpan = this.newChildSpan(context, "register command consumer");
        TracingHelper.setDeviceTags(currentSpan, tenantId, deviceId);
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(currentSpan, adapterInstanceId);
        currentSpan.setTag("lifespan", lifespanSeconds);
        properties.put("send_event", sendEvent);
        Future resultTracker = this.getOrCreateClient(tenantId).compose(client -> client.createAndSendRequest(CommandRouterConstants.CommandRouterAction.REGISTER_COMMAND_CONSUMER.getSubject(), properties, null, null, x$0 -> this.getRequestResponseResult((org.apache.qpid.proton.message.Message)x$0), currentSpan));
        return this.mapResultAndFinishSpan(resultTracker, result -> {
            switch (result.getStatus()) {
                case 204: {
                    return null;
                }
            }
            throw StatusCodeMapper.from(result);
        }, currentSpan).mapEmpty();
    }

    @Override
    public Future<Void> unregisterCommandConsumer(String tenantId, String deviceId, boolean sendEvent, String adapterInstanceId, SpanContext context) {
        Objects.requireNonNull(tenantId);
        Objects.requireNonNull(deviceId);
        Objects.requireNonNull(adapterInstanceId);
        Map<String, Object> properties = this.createDeviceIdProperties(deviceId);
        properties.put("adapter_instance_id", adapterInstanceId);
        Span currentSpan = this.newChildSpan(context, "unregister command consumer");
        TracingHelper.setDeviceTags(currentSpan, tenantId, deviceId);
        TracingHelper.TAG_ADAPTER_INSTANCE_ID.set(currentSpan, adapterInstanceId);
        properties.put("send_event", sendEvent);
        return this.getOrCreateClient(tenantId).compose(client -> client.createAndSendRequest(CommandRouterConstants.CommandRouterAction.UNREGISTER_COMMAND_CONSUMER.getSubject(), properties, null, null, x$0 -> this.getRequestResponseResult((org.apache.qpid.proton.message.Message)x$0), currentSpan)).recover(t -> {
            Tags.HTTP_STATUS.set(currentSpan, ServiceInvocationException.extractStatusCode(t));
            TracingHelper.logError(currentSpan, t);
            return Future.failedFuture(t);
        }).map(resultValue -> {
            Tags.HTTP_STATUS.set(currentSpan, resultValue.getStatus());
            if (resultValue.isError() && resultValue.getStatus() != 412) {
                Tags.ERROR.set(currentSpan, Boolean.TRUE);
            }
            switch (resultValue.getStatus()) {
                case 204: {
                    return null;
                }
            }
            throw StatusCodeMapper.from(resultValue);
        }).onComplete(v -> currentSpan.finish()).mapEmpty();
    }

    @Override
    public Future<Void> enableCommandRouting(List<String> tenantIds, SpanContext context) {
        Objects.requireNonNull(tenantIds);
        if (tenantIds.isEmpty()) {
            return Future.succeededFuture();
        }
        Span currentSpan = this.newChildSpan(context, "enable command routing");
        currentSpan.log(Map.of("no_of_tenants", tenantIds.size()));
        JsonArray payload = new JsonArray(tenantIds);
        Future resultTracker = this.getOrCreateClient(tenantIds.get(0)).compose(client -> client.createAndSendRequest(CommandRouterConstants.CommandRouterAction.ENABLE_COMMAND_ROUTING.getSubject(), null, payload.toBuffer(), "application/json", x$0 -> this.getRequestResponseResult((org.apache.qpid.proton.message.Message)x$0), currentSpan));
        return this.mapResultAndFinishSpan(resultTracker, result -> {
            switch (result.getStatus()) {
                case 204: {
                    this.log.info("successfully enabled routing of commands for {} tenants in Command Router", (Object)tenantIds.size());
                    return null;
                }
            }
            ServiceInvocationException e = StatusCodeMapper.from(result);
            this.log.info("failed to enable routing of commands in Command Router", e);
            throw e;
        }, currentSpan).mapEmpty();
    }
}

