package org.jetlinks.core.defaults;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceMessageSender;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.FunctionInvokeMessageSender;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.ReadPropertyMessageSender;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.WritePropertyMessageSender;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.utils.DeviceMessageTracer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/core/defaults/DefaultDeviceMessageSender.class */
public class DefaultDeviceMessageSender implements DeviceMessageSender {
    private final DeviceOperationBroker handler;
    private final DeviceOperator operator;
    private final DeviceRegistry registry;
    private long defaultTimeout = DEFAULT_TIMEOUT;
    private final DeviceMessageSenderInterceptor globalInterceptor;
    private static final Logger log = LoggerFactory.getLogger(DefaultDeviceMessageSender.class);
    private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(Integer.getInteger("jetlinks.device.message.default-timeout", 10).intValue());

    public DefaultDeviceMessageSender(DeviceOperationBroker deviceOperationBroker, DeviceOperator deviceOperator, DeviceRegistry deviceRegistry, DeviceMessageSenderInterceptor deviceMessageSenderInterceptor) {
        this.handler = deviceOperationBroker;
        this.operator = deviceOperator;
        this.registry = deviceRegistry;
        this.globalInterceptor = deviceMessageSenderInterceptor;
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public <R extends DeviceMessageReply> Flux<R> send(Publisher<RepayableDeviceMessage<R>> publisher) {
        return send(publisher, this::convertReply);
    }

    protected <T extends DeviceMessageReply> T convertReply(Message message, Object obj) {
        if (!(obj instanceof ChildDeviceMessageReply) || (message instanceof ChildDeviceMessage)) {
            return (T) convertReply(obj);
        }
        ChildDeviceMessageReply childDeviceMessageReply = (ChildDeviceMessageReply) obj;
        if (!childDeviceMessageReply.isSuccess()) {
            ErrorCode.of(childDeviceMessageReply.getCode()).map(DeviceOperationException::new).ifPresent(deviceOperationException -> {
                throw deviceOperationException;
            });
        }
        if (childDeviceMessageReply.getChildDeviceMessage() != null) {
            return (T) convertReply(((ChildDeviceMessageReply) obj).getChildDeviceMessage());
        }
        ErrorCode.of(childDeviceMessageReply.getCode()).map(DeviceOperationException::new).ifPresent(deviceOperationException2 -> {
            throw deviceOperationException2;
        });
        throw new DeviceOperationException(ErrorCode.NO_REPLY);
    }

    protected <T extends DeviceMessage> T convertReply(Object obj) {
        DeviceMessage deviceMessage = null;
        if (obj instanceof DeviceMessageReply) {
            DeviceMessageReply deviceMessageReply = (DeviceMessageReply) obj;
            if (!deviceMessageReply.isSuccess()) {
                ErrorCode.of(deviceMessageReply.getCode()).map(errorCode -> {
                    return new DeviceOperationException(errorCode, (String) deviceMessageReply.getHeader("errorMessage").map(String::valueOf).orElse(errorCode.getText()));
                }).ifPresent(deviceOperationException -> {
                    throw deviceOperationException;
                });
            }
            deviceMessage = deviceMessageReply;
        } else if (obj instanceof DeviceMessage) {
            deviceMessage = (DeviceMessage) obj;
        } else if (obj instanceof Map) {
            deviceMessage = (DeviceMessage) MessageType.convertMessage((Map) obj).orElse(null);
        }
        if (deviceMessage == null) {
            throw new DeviceOperationException(ErrorCode.SYSTEM_ERROR, new ClassCastException("can not cast " + obj + " to DeviceMessageReply"));
        }
        return (T) deviceMessage;
    }

    private <R extends DeviceMessage> Flux<R> logReply(DeviceMessage deviceMessage, Flux<R> flux) {
        return log.isDebugEnabled() ? flux.doOnNext(deviceMessage2 -> {
            log.debug("receive device[{}] message[{}]: {}", new Object[]{this.operator.getDeviceId(), deviceMessage2.getMessageId(), deviceMessage2});
        }).doOnComplete(() -> {
            log.debug("complete receive device[{}] message[{}]", this.operator.getDeviceId(), deviceMessage.getMessageId());
        }).doOnCancel(() -> {
            log.debug("cancel receive device[{}] message[{}]", this.operator.getDeviceId(), deviceMessage.getMessageId());
        }) : flux;
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public <R extends DeviceMessage> Flux<R> send(DeviceMessage deviceMessage) {
        return send(Mono.just(deviceMessage), this::convertReply);
    }

    private Mono<String> refreshAndGetConnectionServerId() {
        return Mono.defer(() -> {
            return this.operator.refreshConfig(Collections.singleton(DeviceConfigKey.connectionServerId.getKey())).then(this.operator.getConnectionServerId());
        });
    }

    private Flux<DeviceMessage> sendToParentDevice(String str, DeviceMessage deviceMessage) {
        if (str.equals(this.operator.getDeviceId())) {
            return Flux.error(new DeviceOperationException(ErrorCode.CYCLIC_DEPENDENCE, "子设备与父设备不能为相同的设备"));
        }
        ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
        childDeviceMessage.setDeviceId(str);
        childDeviceMessage.setMessageId(deviceMessage.getMessageId());
        childDeviceMessage.setTimestamp(deviceMessage.getTimestamp());
        childDeviceMessage.setChildDeviceId(this.operator.getDeviceId());
        childDeviceMessage.setChildDeviceMessage(deviceMessage);
        if (null != deviceMessage.getHeaders()) {
            childDeviceMessage.setHeaders(new ConcurrentHashMap(deviceMessage.getHeaders()));
        }
        deviceMessage.addHeader((HeaderKey<HeaderKey<Boolean>>) Headers.dispatchToParent, (HeaderKey<Boolean>) true);
        childDeviceMessage.validate();
        return this.registry.getDevice(str).switchIfEmpty(Mono.error(() -> {
            return new DeviceOperationException(ErrorCode.UNKNOWN_PARENT_DEVICE, "未知的父设备:" + str);
        })).flatMapMany(deviceOperator -> {
            return deviceOperator.messageSender().send(Mono.just(childDeviceMessage), obj -> {
                return convertReply(deviceMessage, obj);
            });
        });
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public <R extends DeviceMessage> Flux<R> send(Publisher<? extends DeviceMessage> publisher, Function<Object, R> function) {
        return Mono.zip(this.operator.getConnectionServerId().switchIfEmpty(refreshAndGetConnectionServerId()).defaultIfEmpty(""), this.operator.getProtocol().flatMap((v0) -> {
            return v0.getSenderInterceptor();
        }).defaultIfEmpty(DeviceMessageSenderInterceptor.DO_NOTING), this.operator.getSelfConfig(DeviceConfigKey.parentGatewayId).defaultIfEmpty("")).flatMapMany(tuple3 -> {
            DeviceMessageSenderInterceptor andThen = ((DeviceMessageSenderInterceptor) tuple3.getT2()).andThen(this.globalInterceptor);
            String str = (String) tuple3.getT1();
            String str2 = (String) tuple3.getT3();
            return (StringUtils.isEmpty(str) && StringUtils.hasText(str2)) ? Flux.from(publisher).flatMap(deviceMessage -> {
                return andThen.preSend(this.operator, deviceMessage);
            }).flatMap(deviceMessage2 -> {
                return (Flux) sendToParentDevice(str2, deviceMessage2).as(flux -> {
                    return andThen.afterSent(this.operator, deviceMessage2, flux);
                });
            }).map(deviceMessage3 -> {
                return deviceMessage3;
            }) : Flux.from(publisher).flatMap(deviceMessage4 -> {
                return andThen.preSend(this.operator, deviceMessage4);
            }).concatMap(deviceMessage5 -> {
                DeviceMessageTracer.trace(deviceMessage5, "send.before");
                if (StringUtils.isEmpty(str)) {
                    return andThen.afterSent(this.operator, deviceMessage5, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                }
                Flux empty = ((Boolean) deviceMessage5.getHeader(Headers.sendAndForget).orElse(false)).booleanValue() ? Flux.empty() : (Flux) this.handler.handleReply(deviceMessage5.getDeviceId(), deviceMessage5.getMessageId(), Duration.ofMillis(((Long) deviceMessage5.getHeader(Headers.timeout).orElse(Long.valueOf(this.defaultTimeout))).longValue())).map(function).onErrorResume(DeviceOperationException.class, deviceOperationException -> {
                    return deviceOperationException.getCode() == ErrorCode.CLIENT_OFFLINE ? this.operator.checkState().then(Mono.error(deviceOperationException)) : Mono.error(deviceOperationException);
                }).onErrorMap(TimeoutException.class, timeoutException -> {
                    return new DeviceOperationException(ErrorCode.TIME_OUT, timeoutException);
                }).as(flux -> {
                    return logReply(deviceMessage5, flux);
                });
                return this.handler.send(str, Mono.just(deviceMessage5)).defaultIfEmpty(-1).flatMapMany(num -> {
                    if (num.intValue() == 0) {
                        return this.operator.checkState().flatMapMany(b -> {
                            if (1 != b.byteValue()) {
                                return andThen.afterSent(this.operator, deviceMessage5, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                            }
                            if (StringUtils.hasText(str2)) {
                                log.debug("Device [{}] Cached Server [{}] Not Available,Dispatch To Parent [{}]", new Object[]{this.operator.getDeviceId(), str, str2});
                                return andThen.afterSent(this.operator, deviceMessage5, sendToParentDevice(str2, deviceMessage5)).map(deviceMessage5 -> {
                                    return deviceMessage5;
                                });
                            }
                            log.warn("Device [{}] Cached Server [{}] Not Available", this.operator.getDeviceId(), str);
                            return andThen.afterSent(this.operator, deviceMessage5, Flux.error(new DeviceOperationException(ErrorCode.SERVER_NOT_AVAILABLE)));
                        });
                    }
                    if (num.intValue() == -1) {
                        return andThen.afterSent(this.operator, deviceMessage5, Flux.error(new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)));
                    }
                    log.debug("send device[{}] message complete", this.operator.getDeviceId());
                    return andThen.afterSent(this.operator, deviceMessage5, empty);
                }).doOnNext(deviceMessage5 -> {
                    DeviceMessageTracer.trace(deviceMessage5, "send.reply");
                });
            });
        });
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public FunctionInvokeMessageSender invokeFunction(String str) {
        return new DefaultFunctionInvokeMessageSender(this.operator, str);
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public ReadPropertyMessageSender readProperty(String... strArr) {
        return new DefaultReadPropertyMessageSender(this.operator).read(strArr);
    }

    @Override // org.jetlinks.core.device.DeviceMessageSender
    public WritePropertyMessageSender writeProperty() {
        return new DefaultWritePropertyMessageSender(this.operator);
    }

    public void setDefaultTimeout(long j) {
        this.defaultTimeout = j;
    }

    public long getDefaultTimeout() {
        return this.defaultTimeout;
    }
}
