Vert.x Blocking Code 처리하기

🗓 ⏰ 소요시간 13 분

이번 포스팅에서는 Vert.x 에서 Blocking 코드를 어떻게 처리하는지 확인해보겠습니다.

Thread Blocking Warning

Vert.x 는 single thread 모델로 하나의 thread(event loop)에서 모든 API 가 Non-blocking 으로 처리됩니다. 물론 Node.js 와 달리 event loop 를 여러 개 띄울 수 있지만 각각 event loop 가 독립적인 single thread 로 작동합니다. 즉, 특정 작업으로 인해 event loop thread 가 block 되면 (해당 event loop 에 할당된 작업들) 전체가 뻗어버리는 것과 같습니다. 따라서 event loop 가 block 되는 것을 막아야 하며, BlockedThreadChecker 가 해당 작업에 어느 정도 시간이 소요되면 다음과 같이 경고를 띄워주고 error 를 발생시킵니다.

1
2
3
4
5
6
7
8
9
10
11
12
May 29, 2018 5:56:16 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2149 ms, time limit is 2000
May 29, 2018 5:56:17 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 3150 ms, time limit is 2000
May 29, 2018 5:56:18 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 4151 ms, time limit is 2000
May 29, 2018 5:56:19 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 5151 ms, time limit is 2000
io.vertx.core.VertxException: Thread blocked
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
...

blocking 되는 경우

  • Thread.sleep()
  • Thread lock
  • 시간이 오래 걸리는 DB 작업
  • 시간이 오래 걸리는 복잡한 연산
  • 오래 걸리는 반복문
  • Blocking 코드로 짜여진 API 를 호출하는 경우

경고 잠재우기

blocking 이 크게 문제되지 않는 경우인데 이러한 경고가 거슬린다면 다음 두 가지 방법으로 제한 시간을 늘릴 수 있습니다.

시스템 프로퍼티로 옵션 주기

CLI 로 Vert.x 를 실행할 경우 다음과 같이 원하는 시간만큼 옵션을 줍니다. (milliseconds)

1
-Dvertx.options.blockedThreadCheckInterval=200000000

VertxOptions 으로 옵션 주기

Vert.x 생성 시에 VertxOptions 객체를 이용해 옵션을 전달할 수 있습니다.

1
2
VertxOptions vxOptions = new VertxOptions().setblockedThreadCheckInterval(200000000);
Vertx vertx = Vertx.vertx(vxOptions);

Blocking code 처리하기

하지만 event loop 가 blocking 되는 근본적인 문제를 해결한 건 아닙니다.

Vert.x 에서 blocking 코드를 처리하는 방법에는 세 가지 방법이 있습니다.

  1. Vert.x 에서 재작성한 API를 사용 (blocking -> non-blocking)
  2. executeBlocking 메소드로 처리 (각 작업들의 순서를 지킬 지, 병렬로 처리할지 설정 가능)
  3. Worker verticle 사용 (Worker thread pool)

executeBlocking 메소드

버텍스 에서는 blocking 작업들을 non-blocking 처럼 처리하기 위해서 내부적으로 worker pool 을 가지고 있습니다. event loop 에서 blocking 코드를 실행하면 내부적으로 thread를 이용해서 처리하고 callback 을 실행해 event loop 입장에서는 마치 non-blocking 처럼 처리할 수 있습니다. 이 때 사용하는 메소드가 executeBlocking 메소드입니다.

1
2
3
4
5
// Safely execute some blocking code.
// Executes the blocking code in the handler blockingCodeHandler using a thread from the worker pool.
<T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
boolean ordered,
Handler<AsyncResult<T>> resultHandler)

파라미터 중 boolean 값인 ordered 를 이용해서 executeBlocking 메소드가 여러 번 호출되었을 때 순서를 지켜서 호출할 것인지 병렬적으로 실행할 것인지 정해줄 수 있습니다. (기본값 true)

기본적인 사용법은 다음과 같습니다.

1
2
3
4
5
6
7
vertx.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});

WorkerExecutor

위 코드처럼 executeBlocking 메소드를 vertx 인스턴스를 통해서 바로 호출할 수도 있지만 WorkerExecutor 인스턴스를 이용해서 다양하게 활용도 가능합니다.

WorkerExecutor 를 생성할 때 worker pool 이름을 지정해줄 수 있는데, 이름이 같으면 같은 wroker pool 을 공유하고, 이름이 다르면 다른 worker pool 을 사용합니다. 이를 이용해서 목적에 맞는 worker pool 을 만들어 사용할 수 있습니다.

1
2
3
4
5
6
7
8
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
// Call some blocking API that takes a significant amount of time to return
String result = someAPI.blockingMethod("hello");
future.complete(result);
}, res -> {
System.out.println("The result is: " + res.result());
});

추가 파라미터로 poolSize 와 maxExecuteTime 도 설정할 수 있습니다.

1
2
3
4
int poolSize = 10;
long maxExecuteTime = 120000; // 2분

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);

Worker pool 과 executor 종료

Verticle 내에서 생성된 executor 는 verticle 이 종료(undeployed) 되면 자동으로 닫히거나, 사용이 끝난 executor 를 수동으로 닫을 수 있습니다.

1
executor.close();

모든 executor 가 닫히면, 해당 worker pool 은 삭제됩니다.

Worker Verticle

Worker verticle 은 일반적인 verticle 처럼 event loop 에서 실행되는 것이 아니라 worker pool 의 thread 들을 이용해서 처리되는 verticle 입니다. 위에서 살펴본 executeBlocking 메서드는 해당 작업만 thread pool 을 이용해서 작업했다면, worker verticle 은 verticle 자체를 thread pool 을 이용해서 처리한다고 보시면 될 것 같습니다.

1
2
DeploymentOptions options = new DeploymentOptions().setWorker(true);
vertx.deployVerticle("com.future.creator.MyOrderProcessorVerticle", options);

Vert.x 의 장점이 thread에 안전한 것이죠. Worker verticle 도 마찬가지입니다. Worker verticle 인스턴스는 여러 개의 thread에서 동시에 실행되는 것은 아닙니다. 하지만 하나의 thread에서만 동작하는 event loop와는 다르게 다른 시간에 다른 thread에서 실행될 수는 있습니다.

Vert.x 에서는 이러한 방법으로 blocking 코드를 안전하게 처리할 수 있습니다.