Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8254060

SubmissionPublisher close hangs if a publication is pending

XMLWordPrintable

      ADDITIONAL SYSTEM INFORMATION :
      MacOS Catalina
      openjdk 14.0.1 2020-04-14
      OpenJDK Runtime Environment (build 14.0.1+14)
      OpenJDK 64-Bit Server VM (build 14.0.1+14, mixed mode, sharing)


      A DESCRIPTION OF THE PROBLEM :
      Submissionpublisher allows to publish elements through the submit method. That method can block if there is no room in the publisher buffer, waiting for a consumer to claim some data.
      Submission publisher offers the close() method. Such method should call the onCompleted() method of the subscriber

      If a blocking publication attempt is in progress, a call to close() do not call the onComplete and hangs


      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      See the attached code

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      - onComplete called according to documentation
      - close should complete without hanging
      ACTUAL -
      - onComplete is not called
      - close hangs
      - submit stay blocked

      ---------- BEGIN SOURCE ----------
          public static void main(String[] args) throws InterruptedException {
              SubmissionPublisher<Integer> sp = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 1);
              sp.subscribe(new Flow.Subscriber<>() {
                  private Flow.Subscription sub;
                  @Override
                  public void onSubscribe(Flow.Subscription subscription) {
                      this.sub = subscription;
                  }

                  @Override
                  public void onNext(Integer item) {

                  }

                  @Override
                  public void onError(Throwable throwable) {
                      System.err.println("Err");
                      this.sub.cancel();
                  }

                  @Override
                  public void onComplete() {
                      System.err.println("Completed");
                      this.sub.cancel();
                  }
              });
              CompletableFuture.runAsync(() -> { sp.submit(1); sp.submit(2); });
              TimeUnit.SECONDS.sleep(1);
              System.out.println("Going to hang...");
              sp.close();
              System.out.println("Not hanged!?");
          }
      ---------- END SOURCE ----------

      FREQUENCY : always


            dl Doug Lea
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated: