/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.process;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.process.NativeProcess;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.process.StateProcessor;
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter;

public abstract class AbstractNativeProcess
implements NativeProcess {
    private static final Logger LOGGER = LogManager.getLogger(AbstractNativeProcess.class);
    private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000L);
    private final String jobId;
    private final ProcessPipes processPipes;
    private final SetOnce<CppLogMessageHandler> cppLogHandler = new SetOnce();
    private final AtomicBoolean processInStreamClosed = new AtomicBoolean();
    private final SetOnce<OutputStream> processInStream = new SetOnce();
    private final SetOnce<InputStream> processOutStream = new SetOnce();
    private final SetOnce<OutputStream> processRestoreStream = new SetOnce();
    private final SetOnce<LengthEncodedWriter> recordWriter = new SetOnce();
    private final ZonedDateTime startTime;
    private final int numberOfFields;
    private final List<Path> filesToDelete;
    private final Consumer<String> onProcessCrash;
    private volatile Future<?> logTailFuture;
    private volatile Future<?> stateProcessorFuture;
    private volatile boolean processCloseInitiated;
    private volatile boolean processKilled;
    private volatile boolean isReady;

    protected AbstractNativeProcess(String jobId, ProcessPipes processPipes, int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash) {
        this.jobId = jobId;
        this.processPipes = processPipes;
        this.startTime = ZonedDateTime.now();
        this.numberOfFields = numberOfFields;
        this.filesToDelete = filesToDelete;
        this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
    }

    public abstract String getName();

    public void start(ExecutorService executorService) throws IOException {
        this.processPipes.connectLogStream();
        this.cppLogHandler.set((Object)this.processPipes.getLogStreamHandler());
        this.logTailFuture = executorService.submit(() -> {
            try (CppLogMessageHandler h = (CppLogMessageHandler)this.cppLogHandler.get();){
                h.tailStream();
            }
            catch (IOException e) {
                if (!this.processKilled) {
                    LOGGER.error((Message)new ParameterizedMessage("[{}] Error tailing {} process logs", (Object)this.jobId, (Object)this.getName()), (Throwable)e);
                }
            }
            finally {
                this.detectCrash();
            }
        });
        this.processPipes.connectOtherStreams();
        if (this.processPipes.getProcessInStream().isPresent()) {
            this.processInStream.set((Object)new BufferedOutputStream(this.processPipes.getProcessInStream().get()));
            this.recordWriter.set((Object)new LengthEncodedWriter((OutputStream)this.processInStream.get()));
        }
        this.processOutStream.set((Object)this.processPipes.getProcessOutStream().orElse(null));
        this.processRestoreStream.set((Object)this.processPipes.getRestoreStream().orElse(null));
    }

    private void detectCrash() {
        if (this.processCloseInitiated || this.processKilled) {
            return;
        }
        if (this.processInStream() == null) {
            return;
        }
        String errors = this.cppLogHandler().getErrors();
        String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", this.jobId, this.getName(), errors);
        LOGGER.error(fullError);
        this.onProcessCrash.accept(fullError);
    }

    public void start(ExecutorService executorService, StateProcessor stateProcessor) throws IOException {
        this.start(executorService);
        assert (this.processPipes.getPersistStream().isPresent());
        this.stateProcessorFuture = executorService.submit(() -> {
            block9: {
                try (InputStream in = this.processPipes.getPersistStream().get();){
                    stateProcessor.process(in);
                    if (!this.processKilled) {
                        LOGGER.info("[{}] State output finished", (Object)this.jobId);
                    }
                }
                catch (IOException e) {
                    if (this.processKilled) break block9;
                    LOGGER.error((Message)new ParameterizedMessage("[{}] Error reading {} state output", (Object)this.jobId, (Object)this.getName()), (Throwable)e);
                }
            }
        });
    }

    @Override
    public boolean isReady() {
        return this.isReady;
    }

    protected void setReady() {
        this.isReady = true;
    }

    @Override
    public void writeRecord(String[] record) throws IOException {
        this.recordWriter().writeRecord(record);
    }

    @Override
    public void flushStream() throws IOException {
        this.recordWriter().flush();
    }

    @Override
    public void close() throws IOException {
        try {
            this.processCloseInitiated = true;
            if (this.processInStream() != null && this.processInStreamClosed.compareAndSet(false, true)) {
                this.processInStream().close();
            }
            if (this.stateProcessorFuture != null) {
                this.stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
            }
            if (this.logTailFuture != null) {
                this.logTailFuture.get(5L, TimeUnit.SECONDS);
            }
            if (this.cppLogHandler() != null) {
                if (this.cppLogHandler().seenFatalError()) {
                    throw ExceptionsHelper.serverError((String)this.cppLogHandler().getErrors());
                }
                LOGGER.debug("[{}] {} process exited", (Object)this.jobId, (Object)this.getName());
            }
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running {} process", (Object)this.jobId, (Object)this.getName()), (Throwable)e);
        }
        catch (InterruptedException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running {} process", (Object)this.jobId, (Object)this.getName()), (Throwable)e);
            Thread.currentThread().interrupt();
        }
        finally {
            this.deleteAssociatedFiles();
        }
    }

    @Override
    public void kill() throws IOException {
        LOGGER.debug("[{}] Killing {} process", (Object)this.jobId, (Object)this.getName());
        this.processKilled = true;
        try {
            NativeControllerHolder.getNativeController().killProcess(this.cppLogHandler().getPid(this.processPipes.getTimeout()));
            this.cppLogHandler().waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
        }
        catch (TimeoutException e) {
            LOGGER.warn("[{}] Failed to get PID of {} process to kill", (Object)this.jobId, (Object)this.getName());
        }
        finally {
            try {
                if (this.processInStream() != null && this.processInStreamClosed.compareAndSet(false, true)) {
                    this.processInStream().close();
                }
            }
            catch (IOException iOException) {}
            try {
                this.deleteAssociatedFiles();
            }
            catch (IOException iOException) {}
        }
    }

    private synchronized void deleteAssociatedFiles() throws IOException {
        if (this.filesToDelete == null) {
            return;
        }
        for (Path fileToDelete : this.filesToDelete) {
            if (Files.deleteIfExists(fileToDelete)) {
                LOGGER.debug("[{}] Deleted file {}", (Object)this.jobId, (Object)fileToDelete.toString());
                continue;
            }
            LOGGER.warn("[{}] Failed to delete file {}", (Object)this.jobId, (Object)fileToDelete.toString());
        }
        this.filesToDelete.clear();
    }

    @Override
    public ZonedDateTime getProcessStartTime() {
        return this.startTime;
    }

    @Override
    public boolean isProcessAlive() {
        return this.cppLogHandler() != null && !this.cppLogHandler().hasLogStreamEnded();
    }

    @Override
    public boolean isProcessAliveAfterWaiting() {
        if (this.cppLogHandler() != null) {
            this.cppLogHandler().waitForLogStreamClose(Duration.ofMillis(45L));
        }
        return this.isProcessAlive();
    }

    @Override
    public String readError() {
        return this.cppLogHandler() == null ? "" : this.cppLogHandler().getErrors();
    }

    protected String jobId() {
        return this.jobId;
    }

    protected InputStream processOutStream() {
        return (InputStream)this.processOutStream.get();
    }

    @Nullable
    private OutputStream processInStream() {
        return (OutputStream)this.processInStream.get();
    }

    @Nullable
    protected OutputStream processRestoreStream() {
        return (OutputStream)this.processRestoreStream.get();
    }

    protected int numberOfFields() {
        return this.numberOfFields;
    }

    protected LengthEncodedWriter recordWriter() {
        return (LengthEncodedWriter)this.recordWriter.get();
    }

    protected CppLogMessageHandler cppLogHandler() {
        return (CppLogMessageHandler)this.cppLogHandler.get();
    }

    protected boolean isProcessKilled() {
        return this.processKilled;
    }

    public void consumeAndCloseOutputStream() {
        try {
            byte[] buff = new byte[512];
            while (this.processOutStream().read(buff) >= 0) {
            }
            this.processOutStream().close();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }
}

