2013年12月7日土曜日

Google Guava's EventBus programming example.

GoogleGuava には EventBus を簡単に実装できるライブラリが含まれていたので、試してみたので備忘録的に残します。

Publish/Subscribe を使ってEDP(Event Driven Programming)を簡単に実装できるようです。

それではさっそくシンプルに

pub/sub するときに受け渡しするメッセージオブジェクト(POJO)
public class EventMessage {

    final int msgcode;
    final String msg;

    public EventMessage(int msgcode, String msg) {
 this.msgcode = msgcode;
 this.msg = msg;
    }

    public int getMsgcode() {
 return msgcode;
    }

    public String getMsg() {
 return msg;
    }
}

そして、subscriber、@Subscribeアノテーションが付けられたメソッドが呼ばれるようです。
なんとなく、1クラスで1つのSubscribe定義がよさそうな感じ...

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.Subscribe;

public class Subscriber {
 
 private static Logger log = LoggerFactory.getLogger(Subscriber.class);

 protected static ExecutorService worker = Executors.newCachedThreadPool();
    
 public static class EventHandler implements Runnable {
 
  final EventMessage message;
 
  public EventHandler( EventMessage message) {
   this.message = message;
  }
 
  public void run() {
   try {
    log.debug("[start] Processing worker thread. / {} {}", message.msgcode, message.msg);
    TimeUnit.MILLISECONDS.sleep(message.msgcode); // do something
    log.debug("[end] worker thread");
   } catch (Exception e) {
    log.error("Event failed", e);
   }
  }
    }
    
 @Subscribe
 public void handleEvent(final EventMessage eventMessage) {
  try {
   log.debug("[start] dispatched event message to worker thread. {}/{}", eventMessage.msgcode, eventMessage.msg);
   worker.execute(new EventHandler(eventMessage));
   log.debug("[end] dispatched events.");
  } catch (Exception e) {
   log.error("event failed.", e);
  }
 }
    
 public void shutdown() {
  worker.shutdown();
  try {
   if (!worker.awaitTermination(1000, TimeUnit.MILLISECONDS))
    worker.shutdownNow();
   log.info("shutdown success");
  } catch (InterruptedException e) {
   log.error("shutdown failed",e);
  }
 }
}

最後に、Main というか Publish するところ、前にあるSubscriberの受け取るメッセージオブジェクトに対応しないメッセージがとどくとDeadEventsSubscriberがハンドルする。

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class Main {
 private static Logger log = LoggerFactory.getLogger(Main.class);

 public static class DeadEventsSubscriver {
  @Subscribe
  public void handleDeadEvent(DeadEvent deadEvent) {
   log.error("DEAD EVENT: {}", deadEvent.getEvent());
  }
 }

 /**
 * example EventBus programming 'Google-Guava'
 * @param args
 */
 public static void main(String[] args) {
 try {
  final EventBus eventBus = new EventBus("example-events");
  final Subscriber subscriver = new Subscriber();
  final DeadEventsSubscriver des = new DeadEventsSubscriver();
      
  eventBus.register(subscriver);
  eventBus.register(des);

  // published
  eventBus.post("This is dead event");
  eventBus.post(new EventMessage(5000, "This message from hoge"));
  eventBus.post(new EventMessage(1000, "This message from fuga"));

  log.debug("waiting...");
  TimeUnit.MILLISECONDS.sleep(10000);

  log.info(">> end of main thread");
  eventBus.unregister(subscriver);
  eventBus.unregister(des);
  subscriver.shutdown();
      
 } catch (InterruptedException e) {
  log.error("fail", e);
 }
 }
}

んで、実行した結果。
[main] 20:44:28.803  ERROR o.horiga.study.eventbus.example.Main[20] - DEAD EVENT: This is dead event
[main] 20:44:28.807  DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 5000/This message from hoge
[main] 20:44:28.808  DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events.
[main] 20:44:28.809  DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 1000/This message from fuga
[pool-1-thread-1] 20:44:28.809  DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 5000 This message from hoge
[main] 20:44:28.809  DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events.
[pool-1-thread-2] 20:44:28.810  DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 1000 This message from fuga
[main] 20:44:28.810  DEBUG o.horiga.study.eventbus.example.Main[42] - waiting...
[pool-1-thread-2] 20:44:29.811  DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread
[pool-1-thread-1] 20:44:33.810  DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread
[main] 20:44:38.811  INFO  o.horiga.study.eventbus.example.Main[45] - &gt&gt end of main thread
[main] 20:44:38.816  INFO  o.h.s.eventbus.example.Subscriber[54] - shutdown success


すげー簡単にでけた。ここまでホントに15分程度。

とりあえず、eventbus#post は、blocking されるようだったので、subscriber側は thread にしてみたこれで asynchronous event driven っぽくなったかな? subscribe をnetwork経由で別のsubscriberに通知できればもっといい感じになりそうだけどそのあたりは、akka とか使ったほうが早いだろうかと

まだ、スレッドセーフとか渡したいEventのPojoとかいろいろな型でためしてどれくらい拡張性(POJO となるEventクラスの継承をした場合、@Subscribeが複数ある場合は対象となる?とか、AnnotationやらGenericsなどを使ったりとか)があるか試したいところですが、ここまでということで。

今回試したコードはこちら