package io.r2dbc.postgresql.client;

import io.netty.buffer.ByteBufAllocator;
import io.r2dbc.postgresql.message.backend.BackendKeyData;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.BackendMessageDecoder;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.Field;
import io.r2dbc.postgresql.message.backend.NoticeResponse;
import io.r2dbc.postgresql.message.backend.ParameterStatus;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.util.PredicateUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;

/* loaded from: input_file:io/r2dbc/postgresql/client/ReactorNettyClient.class */
public final class ReactorNettyClient implements Client {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicReference<ByteBufAllocator> byteBufAllocator = new AtomicReference<>();
    private final AtomicReference<Connection> connection = new AtomicReference<>();
    private final BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleErrorResponse = handleBackendMessage(ErrorResponse.class, (errorResponse, synchronousSink) -> {
        this.logger.error("Error: {}", toString(errorResponse.getFields()));
        synchronousSink.next(errorResponse);
    });
    private final BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleNoticeResponse = handleBackendMessage(NoticeResponse.class, (noticeResponse, synchronousSink) -> {
        this.logger.warn("Notice: {}", toString(noticeResponse.getFields()));
    });
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final ConcurrentMap<String, String> parameterStatus = new ConcurrentHashMap();
    private final BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleParameterStatus = handleBackendMessage(ParameterStatus.class, (parameterStatus, synchronousSink) -> {
        this.parameterStatus.put(parameterStatus.getName(), parameterStatus.getValue());
    });
    private final AtomicReference<Integer> processId = new AtomicReference<>();
    private final EmitterProcessor<FrontendMessage> requestProcessor = EmitterProcessor.create(false);
    private final FluxSink<FrontendMessage> requests = this.requestProcessor.sink();
    private final EmitterProcessor<Flux<BackendMessage>> responseProcessor = EmitterProcessor.create(false);
    private final AtomicReference<Integer> secretKey = new AtomicReference<>();
    private final BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleBackendKeyData = handleBackendMessage(BackendKeyData.class, (backendKeyData, synchronousSink) -> {
        this.processId.set(Integer.valueOf(backendKeyData.getProcessId()));
        this.secretKey.set(Integer.valueOf(backendKeyData.getSecretKey()));
    });
    private final AtomicReference<TransactionStatus> transactionStatus = new AtomicReference<>(TransactionStatus.IDLE);
    private final BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleReadyForQuery = handleBackendMessage(ReadyForQuery.class, (readyForQuery, synchronousSink) -> {
        this.transactionStatus.set(TransactionStatus.valueOf(readyForQuery.getTransactionStatus()));
        synchronousSink.next(readyForQuery);
    });

    private ReactorNettyClient(Connection connection) {
        Objects.requireNonNull(connection, "Connection must not be null");
        BackendMessageDecoder backendMessageDecoder = new BackendMessageDecoder();
        FluxSink sink = this.responseProcessor.sink();
        this.byteBufAllocator.set(connection.outbound().alloc());
        ByteBufFlux retain = connection.inbound().receive().retain();
        backendMessageDecoder.getClass();
        Flux handle = retain.concatMap(backendMessageDecoder::decode).doOnNext(backendMessage -> {
            this.logger.debug("Response: {}", backendMessage);
        }).handle(this.handleNoticeResponse).handle(this.handleErrorResponse).handle(this.handleBackendKeyData).handle(this.handleParameterStatus).handle(this.handleReadyForQuery);
        Class<ReadyForQuery> cls = ReadyForQuery.class;
        ReadyForQuery.class.getClass();
        Flux windowWhile = handle.windowWhile(PredicateUtils.not((v1) -> {
            return r1.isInstance(v1);
        }));
        sink.getClass();
        Consumer consumer = (v1) -> {
            r1.next(v1);
        };
        sink.getClass();
        Consumer consumer2 = sink::error;
        sink.getClass();
        windowWhile.subscribe(consumer, consumer2, sink::complete);
        this.requestProcessor.doOnNext(frontendMessage -> {
            this.logger.debug("Request:  {}", frontendMessage);
        }).concatMap(frontendMessage2 -> {
            return connection.outbound().send(frontendMessage2.encode(connection.outbound().alloc()));
        }).subscribe();
        this.connection.set(connection);
    }

    public static Mono<ReactorNettyClient> connect(String str, int i) {
        Objects.requireNonNull(str, "host must not be null");
        return connect(ConnectionProvider.newConnection(), str, i);
    }

    public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, String str, int i) {
        Objects.requireNonNull(connectionProvider, "connectionProvider must not be null");
        Objects.requireNonNull(str, "host must not be null");
        return TcpClient.create(connectionProvider).host(str).port(i).connect().map(ReactorNettyClient::new);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Mono<Void> close() {
        return Mono.defer(() -> {
            Connection andSet = this.connection.getAndSet(null);
            return andSet == null ? Mono.empty() : TerminationMessageFlow.exchange(this).doOnComplete(() -> {
                andSet.disposeNow();
                this.isClosed.set(true);
            }).then();
        });
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Flux<BackendMessage> exchange(Publisher<FrontendMessage> publisher) {
        Objects.requireNonNull(publisher, "requests must not be null");
        return Flux.defer(() -> {
            return this.isClosed.get() ? Flux.error(new IllegalStateException("Cannot exchange messages because the connection is closed")) : this.responseProcessor.doOnSubscribe(subscription -> {
                Flux from = Flux.from(publisher);
                FluxSink<FrontendMessage> fluxSink = this.requests;
                fluxSink.getClass();
                Consumer consumer = (v1) -> {
                    r1.next(v1);
                };
                FluxSink<FrontendMessage> fluxSink2 = this.requests;
                fluxSink2.getClass();
                from.subscribe(consumer, fluxSink2::error);
            }).next().flatMapMany(Function.identity());
        });
    }

    @Override // io.r2dbc.postgresql.client.Client
    public ByteBufAllocator getByteBufAllocator() {
        return this.byteBufAllocator.get();
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Map<String, String> getParameterStatus() {
        return new HashMap(this.parameterStatus);
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<Integer> getProcessId() {
        return Optional.ofNullable(this.processId.get());
    }

    @Override // io.r2dbc.postgresql.client.Client
    public Optional<Integer> getSecretKey() {
        return Optional.ofNullable(this.secretKey.get());
    }

    @Override // io.r2dbc.postgresql.client.Client
    public TransactionStatus getTransactionStatus() {
        return this.transactionStatus.get();
    }

    private static <T extends BackendMessage> BiConsumer<BackendMessage, SynchronousSink<BackendMessage>> handleBackendMessage(Class<T> cls, BiConsumer<T, SynchronousSink<BackendMessage>> biConsumer) {
        return (backendMessage, synchronousSink) -> {
            if (cls.isInstance(backendMessage)) {
                biConsumer.accept(backendMessage, synchronousSink);
            } else {
                synchronousSink.next(backendMessage);
            }
        };
    }

    private static String toString(List<Field> list) {
        return (String) list.stream().map(field -> {
            return String.format("%s=%s", field.getType().name(), field.getValue());
        }).collect(Collectors.joining(", "));
    }
}
