Publish/Subscribe を使ってEDP(Event Driven Programming)を簡単に実装できるようです。
それではさっそくシンプルに
pub/sub するときに受け渡しするメッセージオブジェクト(POJO)
public class EventMessage { final int msgcode; final String msg; public EventMessage(int msgcode, String msg) { this.msgcode = msgcode; this.msg = msg; } public int getMsgcode() { return msgcode; } public String getMsg() { return msg; } }
そして、subscriber、@Subscribeアノテーションが付けられたメソッドが呼ばれるようです。
なんとなく、1クラスで1つのSubscribe定義がよさそうな感じ...
最後に、Main というか Publish するところ、前にあるSubscriberの受け取るメッセージオブジェクトに対応しないメッセージがとどくとDeadEventsSubscriberがハンドルする。
なんとなく、1クラスで1つのSubscribe定義がよさそうな感じ...
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.eventbus.Subscribe; public class Subscriber { private static Logger log = LoggerFactory.getLogger(Subscriber.class); protected static ExecutorService worker = Executors.newCachedThreadPool(); public static class EventHandler implements Runnable { final EventMessage message; public EventHandler( EventMessage message) { this.message = message; } public void run() { try { log.debug("[start] Processing worker thread. / {} {}", message.msgcode, message.msg); TimeUnit.MILLISECONDS.sleep(message.msgcode); // do something log.debug("[end] worker thread"); } catch (Exception e) { log.error("Event failed", e); } } } @Subscribe public void handleEvent(final EventMessage eventMessage) { try { log.debug("[start] dispatched event message to worker thread. {}/{}", eventMessage.msgcode, eventMessage.msg); worker.execute(new EventHandler(eventMessage)); log.debug("[end] dispatched events."); } catch (Exception e) { log.error("event failed.", e); } } public void shutdown() { worker.shutdown(); try { if (!worker.awaitTermination(1000, TimeUnit.MILLISECONDS)) worker.shutdownNow(); log.info("shutdown success"); } catch (InterruptedException e) { log.error("shutdown failed",e); } } }
最後に、Main というか Publish するところ、前にあるSubscriberの受け取るメッセージオブジェクトに対応しないメッセージがとどくとDeadEventsSubscriberがハンドルする。
import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.eventbus.DeadEvent; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; public class Main { private static Logger log = LoggerFactory.getLogger(Main.class); public static class DeadEventsSubscriver { @Subscribe public void handleDeadEvent(DeadEvent deadEvent) { log.error("DEAD EVENT: {}", deadEvent.getEvent()); } } /** * example EventBus programming 'Google-Guava' * @param args */ public static void main(String[] args) { try { final EventBus eventBus = new EventBus("example-events"); final Subscriber subscriver = new Subscriber(); final DeadEventsSubscriver des = new DeadEventsSubscriver(); eventBus.register(subscriver); eventBus.register(des); // published eventBus.post("This is dead event"); eventBus.post(new EventMessage(5000, "This message from hoge")); eventBus.post(new EventMessage(1000, "This message from fuga")); log.debug("waiting..."); TimeUnit.MILLISECONDS.sleep(10000); log.info(">> end of main thread"); eventBus.unregister(subscriver); eventBus.unregister(des); subscriver.shutdown(); } catch (InterruptedException e) { log.error("fail", e); } } }
んで、実行した結果。
[main] 20:44:28.803 ERROR o.horiga.study.eventbus.example.Main[20] - DEAD EVENT: This is dead event [main] 20:44:28.807 DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 5000/This message from hoge [main] 20:44:28.808 DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events. [main] 20:44:28.809 DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 1000/This message from fuga [pool-1-thread-1] 20:44:28.809 DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 5000 This message from hoge [main] 20:44:28.809 DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events. [pool-1-thread-2] 20:44:28.810 DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 1000 This message from fuga [main] 20:44:28.810 DEBUG o.horiga.study.eventbus.example.Main[42] - waiting... [pool-1-thread-2] 20:44:29.811 DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread [pool-1-thread-1] 20:44:33.810 DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread [main] 20:44:38.811 INFO o.horiga.study.eventbus.example.Main[45] - >> end of main thread [main] 20:44:38.816 INFO o.h.s.eventbus.example.Subscriber[54] - shutdown success
すげー簡単にでけた。ここまでホントに15分程度。
とりあえず、eventbus#post は、blocking されるようだったので、subscriber側は thread にしてみたこれで asynchronous event driven っぽくなったかな? subscribe をnetwork経由で別のsubscriberに通知できればもっといい感じになりそうだけどそのあたりは、akka とか使ったほうが早いだろうかと
まだ、スレッドセーフとか渡したいEventのPojoとかいろいろな型でためしてどれくらい拡張性(POJO となるEventクラスの継承をした場合、@Subscribeが複数ある場合は対象となる?とか、AnnotationやらGenericsなどを使ったりとか)があるか試したいところですが、ここまでということで。
今回試したコードはこちら