Publish/Subscribe を使ってEDP(Event Driven Programming)を簡単に実装できるようです。
それではさっそくシンプルに
pub/sub するときに受け渡しするメッセージオブジェクト(POJO)
public class EventMessage {
final int msgcode;
final String msg;
public EventMessage(int msgcode, String msg) {
this.msgcode = msgcode;
this.msg = msg;
}
public int getMsgcode() {
return msgcode;
}
public String getMsg() {
return msg;
}
}
そして、subscriber、@Subscribeアノテーションが付けられたメソッドが呼ばれるようです。
なんとなく、1クラスで1つのSubscribe定義がよさそうな感じ...
最後に、Main というか Publish するところ、前にあるSubscriberの受け取るメッセージオブジェクトに対応しないメッセージがとどくとDeadEventsSubscriberがハンドルする。
なんとなく、1クラスで1つのSubscribe定義がよさそうな感じ...
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.Subscribe;
public class Subscriber {
private static Logger log = LoggerFactory.getLogger(Subscriber.class);
protected static ExecutorService worker = Executors.newCachedThreadPool();
public static class EventHandler implements Runnable {
final EventMessage message;
public EventHandler( EventMessage message) {
this.message = message;
}
public void run() {
try {
log.debug("[start] Processing worker thread. / {} {}", message.msgcode, message.msg);
TimeUnit.MILLISECONDS.sleep(message.msgcode); // do something
log.debug("[end] worker thread");
} catch (Exception e) {
log.error("Event failed", e);
}
}
}
@Subscribe
public void handleEvent(final EventMessage eventMessage) {
try {
log.debug("[start] dispatched event message to worker thread. {}/{}", eventMessage.msgcode, eventMessage.msg);
worker.execute(new EventHandler(eventMessage));
log.debug("[end] dispatched events.");
} catch (Exception e) {
log.error("event failed.", e);
}
}
public void shutdown() {
worker.shutdown();
try {
if (!worker.awaitTermination(1000, TimeUnit.MILLISECONDS))
worker.shutdownNow();
log.info("shutdown success");
} catch (InterruptedException e) {
log.error("shutdown failed",e);
}
}
}
最後に、Main というか Publish するところ、前にあるSubscriberの受け取るメッセージオブジェクトに対応しないメッセージがとどくとDeadEventsSubscriberがハンドルする。
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
public class Main {
private static Logger log = LoggerFactory.getLogger(Main.class);
public static class DeadEventsSubscriver {
@Subscribe
public void handleDeadEvent(DeadEvent deadEvent) {
log.error("DEAD EVENT: {}", deadEvent.getEvent());
}
}
/**
* example EventBus programming 'Google-Guava'
* @param args
*/
public static void main(String[] args) {
try {
final EventBus eventBus = new EventBus("example-events");
final Subscriber subscriver = new Subscriber();
final DeadEventsSubscriver des = new DeadEventsSubscriver();
eventBus.register(subscriver);
eventBus.register(des);
// published
eventBus.post("This is dead event");
eventBus.post(new EventMessage(5000, "This message from hoge"));
eventBus.post(new EventMessage(1000, "This message from fuga"));
log.debug("waiting...");
TimeUnit.MILLISECONDS.sleep(10000);
log.info(">> end of main thread");
eventBus.unregister(subscriver);
eventBus.unregister(des);
subscriver.shutdown();
} catch (InterruptedException e) {
log.error("fail", e);
}
}
}
んで、実行した結果。
[main] 20:44:28.803 ERROR o.horiga.study.eventbus.example.Main[20] - DEAD EVENT: This is dead event [main] 20:44:28.807 DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 5000/This message from hoge [main] 20:44:28.808 DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events. [main] 20:44:28.809 DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 1000/This message from fuga [pool-1-thread-1] 20:44:28.809 DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 5000 This message from hoge [main] 20:44:28.809 DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events. [pool-1-thread-2] 20:44:28.810 DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 1000 This message from fuga [main] 20:44:28.810 DEBUG o.horiga.study.eventbus.example.Main[42] - waiting... [pool-1-thread-2] 20:44:29.811 DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread [pool-1-thread-1] 20:44:33.810 DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread [main] 20:44:38.811 INFO o.horiga.study.eventbus.example.Main[45] - >> end of main thread [main] 20:44:38.816 INFO o.h.s.eventbus.example.Subscriber[54] - shutdown success
すげー簡単にでけた。ここまでホントに15分程度。
とりあえず、eventbus#post は、blocking されるようだったので、subscriber側は thread にしてみたこれで asynchronous event driven っぽくなったかな? subscribe をnetwork経由で別のsubscriberに通知できればもっといい感じになりそうだけどそのあたりは、akka とか使ったほうが早いだろうかと
まだ、スレッドセーフとか渡したいEventのPojoとかいろいろな型でためしてどれくらい拡張性(POJO となるEventクラスの継承をした場合、@Subscribeが複数ある場合は対象となる?とか、AnnotationやらGenericsなどを使ったりとか)があるか試したいところですが、ここまでということで。
今回試したコードはこちら


