artemis와 kafka로 2차 queue 설계
29 December 2019
activemq artemis와 kafka를 이용해 local queue와 remote queue를 동시에 쓰는 예제를 진행해보도록 하겠습니다.
artemis [https://shashaka.github.io/springboot/2019/12/22/SPRING-ARTEMIS-AUTO-CONFIG.html]와
kafka [ https://shashaka.github.io/springboot/2019/12/28/SPRING-KAFKA.html ] 설정 참고 부탁드립니다.
컨셉은 아래와 같습니다.
1. request가 들어오면 서버 내 activemq로 enqueue
2. activemq에서 다시 외부 kafka로 enqueue
3. kafka에서 서버 activemq consumer가 dequeue
4. activemq consumer가 마지막 처리
해당 logic에서 kafka가 healthy 상황이 아닐 경우, 처리하는 logic을 추가해보도록 하겠습니다.
@Bean
public ProducerListener<String, String> kafkaProducerListener() {
return new ProducerListener<String, String>() {
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
log.error("kafkaProducerListener error ", exception);
jmsTemplate.send("final", session -> session.createTextMessage(value));
}
};
}
위와 같이 ProducerListener를 선언해줍니다.kafka 설정에서 request.timeout.ms와 delivery.timeout.ms를 짧게 조정해주면 해당 시간 동안
처리되지 못한 produce message에 대해서는 timeout exception이 발생하게 됩니다.
해당 메세지는 kafka에 enqueue되지 않고 바로 activemq consumer에서 처리되게 됩니다.
위 순서에서 1번 이후, 바로 4번으로 routing되게 됩니다.
아래와 같이 적용되는 것을 확인할 수 있습니다.
// kafka 정상 동작시
14:25:43.253 [DefaultMessageListenerContainer-1] INFO org.shashaka.io.JmsConfig - jms : message_send
14:25:43.290 [default-0-C-1] INFO org.shashaka.io.KafkaConfig - kafka : message_send
14:25:43.301 [DefaultMessageListenerContainer-1] INFO org.shashaka.io.JmsConfig - final : message_send
// kafka disconnected 상황 발생시
14:26:03.542 [DefaultMessageListenerContainer-2] INFO org.shashaka.io.JmsConfig - jms : message_send
14:26:06.544 [kafka-producer-network-thread | producer-1] ERROR org.shashaka.io.KafkaConfig - kafkaProducerListener error
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for myTopic-0:3001 ms has passed since batch creation
14:26:06.555 [DefaultMessageListenerContainer-2] INFO org.shashaka.io.JmsConfig - final : message_send