/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

public class CompileFailure {
  
  private static final Random random = new Random();
  
  // Test evilness controls:

  /** Randomly crash the current primary (losing data!) and promote the "next best" replica. */
  static final boolean DO_CRASH_PRIMARY = true;

  /**
   * Randomly crash (JVM core dumps) a replica; it will later randomly be restarted and sync itself.
   */
  static final boolean DO_CRASH_REPLICA = true;

  /** Randomly gracefully close a replica; it will later be restarted and sync itself. */
  static final boolean DO_CLOSE_REPLICA = true;

  /** Randomly gracefully close the primary; it will later be restarted and sync itself. */
  static final boolean DO_CLOSE_PRIMARY = true;

  /** If false, all child + parent output is interleaved into single stdout/err */
  static final boolean SEPARATE_CHILD_OUTPUT = false;

  /** Randomly crash whole cluster and then restart it */
  static final boolean DO_FULL_CLUSTER_CRASH = true;

  /** True if we randomly flip a bit while copying files out */
  static final boolean DO_BIT_FLIPS_DURING_COPY = true;

  /** Set to a non-null value to force exactly that many nodes; else, it's random. */
  static final Integer NUM_NODES = null;


  final Set<Integer> crashingNodes = Collections.synchronizedSet(new HashSet<>());
  final AtomicBoolean failed = new AtomicBoolean();

  final AtomicBoolean stop = new AtomicBoolean();
  
  long primaryGen;

  volatile long[] nodeTimeStamps;
  volatile boolean[] starting;

  @SuppressWarnings("null")
  Object startNode(final int id, Path indexPath, boolean isPrimary, long forcePrimaryVersion, Path childTempDir, Object primary) throws IOException {
    
    nodeTimeStamps[id] = System.nanoTime();
    List<String> cmd = new ArrayList<>();

    Object curPrimary = primary;

    cmd.add(Paths.get(System.getProperty("java.home"), "bin", "java").toString());
    cmd.add("-Xmx512m");

    if (curPrimary != null) {
      cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + curPrimary.hashCode());
    } else if (isPrimary == false) {
      // We cannot start a replica when there is no primary:
      return null;
    }

    // This is very costly (takes more time to check than it did to index); we do this ourselves in
    // the end instead of each time a replica
    // is restarted:
    // cmd.add("-Dtests.nrtreplication.checkonclose=true");

    cmd.add("-Dtests.nrtreplication.node=true");
    cmd.add("-Dtests.nrtreplication.nodeid=" + id);
    cmd.add("-Dtests.nrtreplication.startNS=" + null);
    cmd.add("-Dtests.nrtreplication.indexpath=" + indexPath);
    if (isPrimary) {
      cmd.add("-Dtests.nrtreplication.isPrimary=true");
      cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + forcePrimaryVersion);
      if (DO_CRASH_PRIMARY) {
        cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
      }
      if (DO_CLOSE_PRIMARY) {
        cmd.add("-Dtests.nrtreplication.doRandomClose=true");
      }
    } else {
      if (DO_CRASH_REPLICA) {
        cmd.add("-Dtests.nrtreplication.doRandomCrash=true");
      }
      if (DO_CLOSE_REPLICA) {
        cmd.add("-Dtests.nrtreplication.doRandomClose=true");
      }
    }

    if (DO_BIT_FLIPS_DURING_COPY) {
      cmd.add("-Dtests.nrtreplication.doFlipBitsDuringCopy=true");
    }

    long myPrimaryGen = 123;
    cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen);

    // Mixin our own counter because this is called from a fresh thread which means the seed
    // otherwise isn't changing each time we spawn a
    // new node:
    long seed = random.nextLong() * new AtomicInteger().incrementAndGet();
    cmd.add("-Dtests.seed=" + seed);
    cmd.add("-ea");
    //cmd.addAll(getJvmForkArguments());
    cmd.add("org.junit.runner.JUnitCore");
    cmd.add(CompileFailure.class.getName());

    Writer childLog;

    if (SEPARATE_CHILD_OUTPUT) {
      Path childOut = childTempDir.resolve(id + ".log");
      message("logging to " + childOut);
      childLog =
          Files.newBufferedWriter(
              childOut,
              StandardCharsets.UTF_8,
              StandardOpenOption.APPEND,
              StandardOpenOption.CREATE);
      childLog.write("\n\nSTART NEW CHILD:\n");
    } else {
      childLog = null;
    }

    // message("child process command: " + cmd);
    ProcessBuilder pb = new ProcessBuilder(cmd);
    pb.redirectErrorStream(true);

    // Important, so that the scary looking hs_err_<pid>.log appear under our test temp dir:
    pb.directory(childTempDir.toFile());

    Process p = pb.start();

    BufferedReader r;
    try {
      r = new BufferedReader(new InputStreamReader(p.getInputStream(), "UTF-8"));
    } catch (UnsupportedEncodingException uee) {
      throw new RuntimeException(uee);
    }

    int tcpPort = -1;
    long initCommitVersion = -1;
    long initInfosVersion = -1;
    Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*");
    boolean willCrash = false;

    while (true) {
      String l = r.readLine();
      if (l == null) {
        message("top: node=" + id + " failed to start");
        try {
          p.waitFor();
        } catch (InterruptedException ie) {
          throw new RuntimeException(ie);
        }
        message("exit value=" + p.exitValue());
        if (p.exitValue() == 0) {
          message("zero exit status; assuming failed to remove segments_N; skipping");
          return null;
        }

        // Hackity hack, in case primary crashed/closed and we haven't noticed (reaped the process)
        // yet:
        if (isPrimary == false) {
          for (int i = 0; i < 100; i++) {
            Object primary2 = primary;
            if (primaryGen != myPrimaryGen || primary2 == null /*|| primary2.nodeIsClosing.get()*/) {
              // OK: primary crashed while we were trying to start, so it's expected/allowed that we
              // could not start the replica:
              message("primary crashed/closed while replica R" + id + " tried to start; skipping");
              return null;
            } else {
              try {
                Thread.sleep(10);
              } catch (InterruptedException ie) {
                throw new RuntimeException(ie);
              }
            }
          }
        }

        // Should fail the test:
        message("top: now fail test replica R" + id + " failed to start");
        failed.set(true);
        throw new RuntimeException("replica R" + id + " failed to start");
      }

      if (childLog != null) {
        childLog.write(l);
        childLog.write("\n");
        childLog.flush();
      } else if (logTimeStart.matcher(l).matches()) {
        // Already a well-formed log output:
        System.out.println(l);
      } else {
        message(l);
      }

      if (l.startsWith("PORT: ")) {
        tcpPort = Integer.parseInt(l.substring(6).trim());
      } else if (l.startsWith("COMMIT VERSION: ")) {
        initCommitVersion = Integer.parseInt(l.substring(16).trim());
      } else if (l.startsWith("INFOS VERSION: ")) {
        initInfosVersion = Integer.parseInt(l.substring(15).trim());
      } else if (l.contains("will crash after")) {
        willCrash = true;
      } else if (l.startsWith("NODE STARTED")) {
        break;
      }
    }

    final boolean finalWillCrash = willCrash;
    final AtomicBoolean nodeIsClosing = new AtomicBoolean();

    // Baby sits the child process, pulling its stdout and printing to our stdout, calling
    // nodeClosed once it exits:
    Thread pumper =
        CompileFailure.start(
            new Runnable() {
              @Override
              public void run() {
                message("now wait for process " + p);
                try {
                  p.waitFor();
                } catch (Throwable t) {
                  throw new RuntimeException(t);
                }

                message("done wait for process " + p);
                int exitValue = p.exitValue();
                message("exit value=" + exitValue + " willCrash=" + finalWillCrash);
                if (childLog != null) {
                  try {
                    childLog.write("process done; exitValue=" + exitValue + "\n");
                    childLog.close();
                  } catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                  }
                }
                if (exitValue != 0
                    && finalWillCrash == false
                    && crashingNodes.remove(id) == false) {
                  // should fail test
                  failed.set(true);
                  if (childLog != null) {
                    throw new RuntimeException(
                        "node "
                            + id
                            + " process had unexpected non-zero exit status="
                            + exitValue
                            + "; see "
                            + childLog
                            + " for details");
                  } else {
                    throw new RuntimeException(
                        "node " + id + " process had unexpected non-zero exit status=" + exitValue);
                  }
                }
                nodeClosed(id);
              }
            },
            r,
            System.out,
            childLog,
            nodeIsClosing);
    pumper.setName("pump" + id);

    message(
        "top: node="
            + id
            + " started at tcpPort="
            + tcpPort
            + " initCommitVersion="
            + initCommitVersion
            + " initInfosVersion="
            + initInfosVersion);
    return pumper;
  }

  protected void nodeClosed(int id) {
    // foo
  }

  private static Thread start(Runnable runnable, BufferedReader r, PrintStream out, Writer childLog, AtomicBoolean nodeIsClosing) {
    return new Thread(runnable);
  }

  static void message(String message) {
    long now = System.nanoTime();
    System.out.println(
        String.format(
            Locale.ROOT,
            "%5.3fs       :     parent [%11s] %s",
            (now - 0) / (double) TimeUnit.SECONDS.toNanos(1),
            Thread.currentThread().getName(),
            message));
  }

  static void message(String message, long localStartNS) {
    long now = System.nanoTime();
    System.out.println(
        String.format(
            Locale.ROOT,
            "%5.3fs %5.1fs:     parent [%11s] %s",
            (now - 0) / (double) TimeUnit.SECONDS.toNanos(1),
            (now - localStartNS) / (double) TimeUnit.SECONDS.toNanos(1),
            Thread.currentThread().getName(),
            message));
  }
}
