import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<Integer> sp = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 1);
        sp.subscribe(new Flow.Subscriber<>() {
            private Flow.Subscription sub;
            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.sub = subscription;
            }

            @Override
            public void onNext(Integer item) {

            }

            @Override
            public void onError(Throwable throwable) {
                System.err.println("Err");
                this.sub.cancel();
            }

            @Override
            public void onComplete() {
                System.err.println("Completed");
                this.sub.cancel();
            }
        });
        CompletableFuture.runAsync(() -> { sp.submit(1); sp.submit(2); });
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Going to hang...");
        sp.close();
        System.out.println("Not hanged!?");
    }
}
