2013年12月7日土曜日

Google Guava's EventBus programming example.

GoogleGuava には EventBus を簡単に実装できるライブラリが含まれていたので、試してみたので備忘録的に残します。

Publish/Subscribe を使ってEDP(Event Driven Programming)を簡単に実装できるようです。

それではさっそくシンプルに

pub/sub するときに受け渡しするメッセージオブジェクト(POJO)
public class EventMessage {

    final int msgcode;
    final String msg;

    public EventMessage(int msgcode, String msg) {
 this.msgcode = msgcode;
 this.msg = msg;
    }

    public int getMsgcode() {
 return msgcode;
    }

    public String getMsg() {
 return msg;
    }
}

そして、subscriber、@Subscribeアノテーションが付けられたメソッドが呼ばれるようです。
なんとなく、1クラスで1つのSubscribe定義がよさそうな感じ...

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.Subscribe;

public class Subscriber {
 
 private static Logger log = LoggerFactory.getLogger(Subscriber.class);

 protected static ExecutorService worker = Executors.newCachedThreadPool();
    
 public static class EventHandler implements Runnable {
 
  final EventMessage message;
 
  public EventHandler( EventMessage message) {
   this.message = message;
  }
 
  public void run() {
   try {
    log.debug("[start] Processing worker thread. / {} {}", message.msgcode, message.msg);
    TimeUnit.MILLISECONDS.sleep(message.msgcode); // do something
    log.debug("[end] worker thread");
   } catch (Exception e) {
    log.error("Event failed", e);
   }
  }
    }
    
 @Subscribe
 public void handleEvent(final EventMessage eventMessage) {
  try {
   log.debug("[start] dispatched event message to worker thread. {}/{}", eventMessage.msgcode, eventMessage.msg);
   worker.execute(new EventHandler(eventMessage));
   log.debug("[end] dispatched events.");
  } catch (Exception e) {
   log.error("event failed.", e);
  }
 }
    
 public void shutdown() {
  worker.shutdown();
  try {
   if (!worker.awaitTermination(1000, TimeUnit.MILLISECONDS))
    worker.shutdownNow();
   log.info("shutdown success");
  } catch (InterruptedException e) {
   log.error("shutdown failed",e);
  }
 }
}

最後に、Main というか Publish するところ、前にあるSubscriberの受け取るメッセージオブジェクトに対応しないメッセージがとどくとDeadEventsSubscriberがハンドルする。

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class Main {
 private static Logger log = LoggerFactory.getLogger(Main.class);

 public static class DeadEventsSubscriver {
  @Subscribe
  public void handleDeadEvent(DeadEvent deadEvent) {
   log.error("DEAD EVENT: {}", deadEvent.getEvent());
  }
 }

 /**
 * example EventBus programming 'Google-Guava'
 * @param args
 */
 public static void main(String[] args) {
 try {
  final EventBus eventBus = new EventBus("example-events");
  final Subscriber subscriver = new Subscriber();
  final DeadEventsSubscriver des = new DeadEventsSubscriver();
      
  eventBus.register(subscriver);
  eventBus.register(des);

  // published
  eventBus.post("This is dead event");
  eventBus.post(new EventMessage(5000, "This message from hoge"));
  eventBus.post(new EventMessage(1000, "This message from fuga"));

  log.debug("waiting...");
  TimeUnit.MILLISECONDS.sleep(10000);

  log.info(">> end of main thread");
  eventBus.unregister(subscriver);
  eventBus.unregister(des);
  subscriver.shutdown();
      
 } catch (InterruptedException e) {
  log.error("fail", e);
 }
 }
}

んで、実行した結果。
[main] 20:44:28.803  ERROR o.horiga.study.eventbus.example.Main[20] - DEAD EVENT: This is dead event
[main] 20:44:28.807  DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 5000/This message from hoge
[main] 20:44:28.808  DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events.
[main] 20:44:28.809  DEBUG o.h.s.eventbus.example.Subscriber[40] - [start] dispatched event message to worker thread. 1000/This message from fuga
[pool-1-thread-1] 20:44:28.809  DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 5000 This message from hoge
[main] 20:44:28.809  DEBUG o.h.s.eventbus.example.Subscriber[42] - [end] dispatched events.
[pool-1-thread-2] 20:44:28.810  DEBUG o.h.s.eventbus.example.Subscriber[28] - [start] Processing worker thread. / 1000 This message from fuga
[main] 20:44:28.810  DEBUG o.horiga.study.eventbus.example.Main[42] - waiting...
[pool-1-thread-2] 20:44:29.811  DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread
[pool-1-thread-1] 20:44:33.810  DEBUG o.h.s.eventbus.example.Subscriber[30] - [end] worker thread
[main] 20:44:38.811  INFO  o.horiga.study.eventbus.example.Main[45] - &gt&gt end of main thread
[main] 20:44:38.816  INFO  o.h.s.eventbus.example.Subscriber[54] - shutdown success


すげー簡単にでけた。ここまでホントに15分程度。

とりあえず、eventbus#post は、blocking されるようだったので、subscriber側は thread にしてみたこれで asynchronous event driven っぽくなったかな? subscribe をnetwork経由で別のsubscriberに通知できればもっといい感じになりそうだけどそのあたりは、akka とか使ったほうが早いだろうかと

まだ、スレッドセーフとか渡したいEventのPojoとかいろいろな型でためしてどれくらい拡張性(POJO となるEventクラスの継承をした場合、@Subscribeが複数ある場合は対象となる?とか、AnnotationやらGenericsなどを使ったりとか)があるか試したいところですが、ここまでということで。

今回試したコードはこちら

2013年6月14日金曜日

【備忘録】spring + aspectj によるAOP

AOP (Aspect Oriented Programming) はある特定の振る舞い(aspect)を分離し、既存の振る舞いに対し入れ込むような時に利用される。例えば以下のようなものが挙げられます。

  • 特定処理にログを入れる
  • 開始終了の処理時間を計測する
  • 特定処理の呼び出し回数を計測する
今回は、spring mvc を使ってある特定の controller の処理結果をハンドリングし、例外が発生した場合、自動的にエラーオブジェクトを生成して結果を返すような処理を作ってみたので、備忘録的にまとめておきます。

controller は以下、入力を受けて service の処理結果をjsonとして返却します。
@Controller
public class HelloController {
  
  private static Logger logger = LoggerFactory.getLogger(HelloController.class);
  
  @Autowired
  private HelloService service;
  
  @Procedure
  @RequestMapping(value="/hello/{type}", method={RequestMethod.GET})
  public @ResponseBody
  Output<?> hello(
      @PathVariable("type") String type) throws Exception {
    logger.info("- start hello");
    try {
    if ("exception".equalsIgnoreCase(type))
      return new Output<Hello>(UUID.randomUUID().toString(), service.runOnException());
    else if ("failure".equalsIgnoreCase(type)) 
      return new Output<Hello>(UUID.randomUUID().toString(), service.runOnFailure());
    
    return new Output<Hello>(UUID.randomUUID().toString(), service.runOnSuccess());
    } finally {
      logger.info("- end hello");
    }
  }
}

output は以下のようなpojoです。
public class Output<T> {
  
  private String trxId;
  private int statusCode;
  private String statusMessage;
  private T data;

  public Output( String trxId, int statusCode, String statusMessage) {
    this.trxId = trxId;
    this.statusCode = statusCode;
    this.statusMessage = "";
  }
  
  public Output( String trxId, T data) {
    this.trxId = trxId;
    this.statusCode = 200;
    this.data = data;
    this.statusMessage = "";
  }
  
  public String getTrxId() {
    return trxId;
  }

  public void setTrxId(String trxId) {
    this.trxId = trxId;
  }

  public int getStatusCode() {
    return statusCode;
  }

  public void setStatusCode(int statusCode) {
    this.statusCode = statusCode;
  }

  public String getStatusMessage() {
    return statusMessage;
  }

  public void setStatusMessage(String statusMessage) {
    this.statusMessage = statusMessage;
  }

  public T getData() {
    return data;
  }

  public void setData(T data) {
    this.data = data;
  }
  
  public String toString() {
    return new StringBuilder().append("@").append(this.trxId).append("-[")
        .append(this.statusCode).append(": ")
        .append(this.statusMessage).append("], ").append(data)
        .toString();
  }
}

で、AOP を利用しない場合、呼び出し結果は以下のようになります。
10:52:40.811 [http-8080-2] INFO  j.b.h.example.aop.HelloController - - start hello
10:52:40.845 [http-8080-2] INFO  j.b.h.example.aop.HelloServiceImpl - success
10:52:40.845 [http-8080-2] INFO  j.b.h.example.aop.HelloController - - end hello

このままでは例外発生時にエラー情報を返却できないです。
なので、hello() に対し、@Procedure というアノテーションを付けて、@Procedure がついた処理について例外が発生した場合、エラーのOutputを生成しレスポンスしたいと考えてみました。
AOP は AspectJ を利用します。Spring+AspectJ はこちらを参考に
@Component
@Aspect
public class ProcedureAspect {

  static Logger logger = LoggerFactory.getLogger(ProcedureAspect.class);

  @Pointcut("execution(* jp.blogspot.horiga3.*.*(..)) ")
  public void targetMethods() {}
  
  @Before("@annotation(jp.blogspot.horiga3.example.aop.Procedure)")
  public void preHandle() {
    logger.info("Aspect :: preHandle");
  }
  
  @AfterReturning(
      pointcut="@annotation(jp.blogspot.horiga3.example.aop.Procedure)",
      returning="retVal")
  public void postHandle(Object retVal) {
    logger.info("Aspect :: postHandle, retVal={}", retVal != null ? retVal.toString() : "null");
  }
  
  @Around("@annotation(jp.blogspot.horiga3.example.aop.Procedure)")
  public Object handle(ProceedingJoinPoint pjp) {

    logger.info("Aspect :: around - start");

    Object[] args;
    try {
      args = pjp.getArgs();
      return args == null ? pjp.proceed() : pjp.proceed(args);
    } catch (Throwable e) {
      logger.info("Aspect :: handleException");
      int statusCode = 500;
      String statusMessage = "unknown";
      if (e instanceof ProcedureException) {
        statusCode = ((ProcedureException) e).getStatusCode();
        statusMessage = ((ProcedureException) e).getStatusMessage();
      } else if (e instanceof IllegalArgumentException) {
        statusCode = 400;
        statusMessage = "Invalid parameter";
      }
      Output<Object> error = new Output<Object>(UUID.randomUUID().toString(), statusCode, statusMessage);
      return error;
    } finally {
      logger.info("Aspect :: around - end");
    }
  }
}

spring の設定は、aop:aspectj-autoproxy を追加しただけ
<?xml version="1.0" encoding="UTF-8"?>
<beans 
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:aop="http://www.springframework.org/schema/aop" 
  xmlns:mvc="http://www.springframework.org/schema/mvc"
  xmlns:context="http://www.springframework.org/schema/context"
  xsi:schemaLocation=
     "http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" >
  
  <mvc:annotation-driven />

  <aop:aspectj-autoproxy/>
  <context:component-scan base-package="jp.blogspot.horiga3.example.aop"/>
  
</beans>

で、実行すると例外の場合もjsonを返却するようになりました。既存のControllerは何も修正していないですね。
10:52:40.768 [http-8080-2] INFO  j.b.h.example.aop.ProcedureAspect - Aspect :: around - start
10:52:40.772 [http-8080-2] INFO  j.b.h.example.aop.ProcedureAspect - Aspect :: preHandle
10:52:40.811 [http-8080-2] INFO  j.b.h.example.aop.HelloController - - start hello
10:52:40.845 [http-8080-2] INFO  j.b.h.example.aop.HelloServiceImpl - success
10:52:40.845 [http-8080-2] INFO  j.b.h.example.aop.HelloController - - end hello
10:52:40.845 [http-8080-2] INFO  j.b.h.example.aop.ProcedureAspect - Aspect :: around - end
10:52:40.845 [http-8080-2] INFO  j.b.h.example.aop.ProcedureAspect - Aspect :: postHandle, retVal=@11d8c89a-6fc2-4e38-a745-f2ade9c3d6ff-[200: ], jp.blogspot.horiga3.example.aop.Hello@6dbf4a72

@AfterReturningが @Around より後に来ることは予想できたけど、@Around が @Before より先に来るんですね。

2013年6月8日土曜日

springframework を利用した JavaMail 送信 の覚え書き

springframework を利用して Mail を送信することがあったので、覚え書き。spring は何かと設定を xml にする必要があって覚えるのが大変と思って Guice をここ2年程度利用していたが、最近は annotation でほぼ設定できるようになってて結構良い感じだった。まぁ、springframework はそれだけではなく巨大なフレームワークだからいろいろと機能が沢山あって全てを語るには勉強が足りないですww。

個人的に最近気になっているのは、playframework と、vertx かなと JVM + netty をベースにした framework が良い性能をだしていて少しづつ勉強しているところです。

話がずれたので、とりあえず覚え書き。

まず、spring の applicationContext.xml で設定する bean 設定。mail 設定の部分だけ別ファイルとして設定
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans.xsd">
 
 <bean id="mailSender" 
  class="org.springframework.mail.javamail.JavaMailSenderImpl">
  <property name="host" value="${smtp.host}"/>
 </bean>
 
 <bean id="velocityEngine" class="org.springframework.ui.velocity.VelocityEngineFactoryBean">
  <property name="resourceLoaderPath" value="classpath:mail" />
  <property name="velocityPropertiesMap">
   <map>
                <entry key="input.encoding" value="UTF-8" />
                <entry key="output.encoding" value="UTF-8" />
            </map>
  </property>
 </bean>
 
 <bean id="velocityJavaMailSender" class="jp.blogspot.horiga3.example.spring.mail.VelocityJavaMailSender">
  <property name="mailSender" ref="mailSender" />
  <property name="velocityEngine" ref="velocityEngine" />
 </bean>
</beans>

SMTPサーバはないと動きません。幸いにも社内にあるのでそれを設定します。それぞれの環境に合わせて変更するひつようがありますのであしからず。

あとは、自前クラスは以下のように
import javax.mail.internet.MimeMessage;

import org.apache.velocity.app.VelocityEngine;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.mail.javamail.MimeMessagePreparator;
import org.springframework.stereotype.Component;
import org.springframework.ui.velocity.VelocityEngineUtils;

@Component
public class VelocityJavaMailSender {

 @Autowired
 protected JavaMailSender mailSender;
 
 @Autowired
 protected VelocityEngine velocityEngine;

 public static class MailMessage {
  
  private boolean html = false;
  
  private String from;
  private String personal;
  private String mailTemplate;
  private String[] recipients;
  private String subject;
  private Map<String, Object> content;

  public boolean isHtml() {
   return html;
  }

  public void setHtml(boolean html) {
   this.html = html;
  }

  public String getFrom() {
   return from;
  }

  public void setFrom(String from) {
   this.from = from;
  }

  public String getPersonal() {
   return personal;
  }

  public void setPersonal(String personal) {
   this.personal = personal;
  }

  public String getMailTemplate() {
   return mailTemplate;
  }

  public void setMailTemplate(String mailTemplate) {
   this.mailTemplate = mailTemplate;
  }

  public String[] getRecipients() {
   return recipients;
  }

  public void setRecipients(String[] recipients) {
   this.recipients = recipients;
  }

  public String getSubject() {
   return subject;
  }

  public void setSubject(String subject) {
   this.subject = subject;
  }

  public Map<String, Object> getContent() {
   return content;
  }

  public void setContent(Map<String, Object> content) {
   this.content = content;
  }
 }
 
 public void sendMailMessage(MailMessage mail) throws Exception {
  this.mailSender.send(createMailPreparator(mail));
 }
 
 public void setMailSender(JavaMailSender mailSender) {
  this.mailSender = mailSender;
 }
 
 public void setVelocityEngine(VelocityEngine velocityEngine) {
  this.velocityEngine = velocityEngine;
 }
 
 private MimeMessagePreparator createMailPreparator(
   final MailMessage mailMessage) {
  MimeMessagePreparator preparator = new MimeMessagePreparator() {
   @Override
   public void prepare(MimeMessage mimeMessage) throws Exception {
    MimeMessageHelper message = new MimeMessageHelper(mimeMessage);
    message.setTo(mailMessage.getRecipients());
    message.setSubject(mailMessage.getSubject());
    if ( null != mailMessage.getPersonal() && mailMessage.getPersonal().trim().length() > 0)
     message.setFrom(mailMessage.getFrom(), mailMessage.getPersonal());
    else message.setFrom(mailMessage.getFrom());
    message.setText(VelocityEngineUtils.mergeTemplateIntoString(
      velocityEngine, mailMessage.getMailTemplate(), "utf-8",
      mailMessage.getContent()), mailMessage.isHtml());
   }
  };
  return preparator;
 }
}


で、テストケースが以下。
import java.util.HashMap;

import junit.framework.Assert;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import jp.blogspot.horiga3.example.spring.mail.VelocityJavaMailSender.MailMessage;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={
 "file:src/main/webapp/WEB-INF/springframework/mail-context.xml"
})
public class VelocityJavaMailSenderTest {
 
 @Autowired
 @Qualifier("velocityJavaMailSender")
 VelocityJavaMailSender mailSender;
 
 @Test
 public void test() {
  
  MailMessage message = new MailMessage();
  
  try {
   message.setMailTemplate("test.vm");
   message.setRecipients(new String[] { "your mail address " });
   message.setSubject("test message");
   message.setFrom("noreply@hogehoge.com");
   message.setPersonal("JUnitさん");
   HashMap<String, Object> content = new HashMap<String, Object>();
   content.put("str", "test");
   content.put("n", System.currentTimeMillis()/1000);
   message.setContent(content);
   mailSender.sendMailMessage(message);
  } catch ( Exception e) {
   e.printStackTrace();
   Assert.fail(e.getMessage());
  }
 }
}

送信先を自分のメールアドレスに設定してテストケースを実行すると
正しく送信されていることが確認できた。
ちなみに少しハマったところが、テンプレートファイルのエンコーディングとかも全て統一しておかないと文字化けします。
テストケースにこのソースがあるとmavenビルドとかで、自動テストするとビルドのたびに毎回メールくるようになります。注意しましょう〜







2013年5月14日火曜日

Consistent Hashing による 分散テスト


大量データを分散させる技術?アルゴリズムで consistent hashing ということが挙げられます。mixi さんのエンジニアブログでも紹介されていましたが、Javaでサンプルコードを書いてみました。

package com.blogspot.horiga3.example;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

public class ConsistentHashing<T> {

 static final HashFunction DEFAULT_HASH_FUNCTION = Hashing.md5();
 static final int DEFAULT_NUMBER_OF_REPLICAS = 200;
 
 static int numberOfReplicas = DEFAULT_NUMBER_OF_REPLICAS;
 
 final SortedMap<Long, T> shardedNodes = new TreeMap<Long, T>();
 final HashFunction hashFunction;
 
 public ConsistentHashing(Collection<T> nodes, HashFunction hashFunction) {
  this.hashFunction = hashFunction != null ? hashFunction : DEFAULT_HASH_FUNCTION;
  for (T node : nodes) {
   for (int i = 0; i < numberOfReplicas; i++) {
    shardedNodes.put(hashFunction.hashString(String.format("SHARDS-%s-%03d", node.toString(), i)).asLong(), node);
   }
  }
 }

 public T get(Object key) {
  if (shardedNodes.isEmpty()) {
   return null;
  }
  long hash = hashFunction.hashString(key.toString()).asLong();
  if (!shardedNodes.containsKey(hash)) {
   SortedMap<Long, T> tailMap = shardedNodes.tailMap(hash);
   hash = tailMap.isEmpty() ? shardedNodes.firstKey() : tailMap.firstKey();
  }
  return shardedNodes.get(hash);
 }

 public static void main(String[] args) {
  try {
   List<String> shards = new ArrayList<String>();
   
   final int shardsCount = 5;
   final int numberOfUsers = 1000000;
   
   for (int i = 1; i <= shardsCount; i++)
    shards.add("shard" + i);

   List<String> keys = new ArrayList<String>();
   for (int i = 0; i < numberOfUsers; i++) {
    keys.add(UUID.randomUUID().toString().replaceAll("-", ""));
   }
   
   final HashFunction hashFunction = Hashing.md5();
   ConsistentHashing<String> consistentHashing = new ConsistentHashing<String>(shards, hashFunction);

   SortedMap<String, List<String>> keyshards = new TreeMap<String, List<String>>();
   for (String key : keys) {
    String shard = consistentHashing.get(key);
    if (keyshards.containsKey(shard))
     keyshards.get(shard).add(key);
    else {
     keyshards.put(shard, new ArrayList<String>());
     keyshards.get(shard).add(key);
    }
   }
   
   // shard node added
   shards.add(String.format("shard%d", shardsCount + 1));

   ConsistentHashing<String> consistentHashing2 = new ConsistentHashing<String>(shards, hashFunction);
   SortedMap<String, List<String>> keyshards2 = new TreeMap<String, List<String>>();
   for (String key : keys) {
    String k = consistentHashing2.get(key);
    if (keyshards2.containsKey(k))
     keyshards2.get(k).add(key);
    else {
     keyshards2.put(k, new ArrayList<String>());
     keyshards2.get(k).add(key);
    }
   }

   SortedMap<String, AtomicInteger> result = new TreeMap<String, AtomicInteger>();
   final String unchanged = "shard.unchanged";
   for (String key : keys) {
    String shard1 = consistentHashing.get(key);
    String shard2 = consistentHashing2.get(key);
    if (shard1.equals(shard2)) {
     if (!result.containsKey(unchanged))
      result.put(unchanged, new AtomicInteger(1));
     else
      result.get(unchanged).incrementAndGet();
    } else {
     String k = shard1 + "=>" + shard2;
     if (!result.containsKey(k))
      result.put(k, new AtomicInteger(1));
     else
      result.get(k).incrementAndGet();
    }
   }
   
   System.out.println("==========================================");
   System.out.println(":: Consistent hashing sharding Testcase ::");
   
   System.out.println("------------------------------------------");
   for (Map.Entry<String, List<String>> entry: keyshards.entrySet()) {
    System.out.println(String.format("%s: %d", entry.getKey(), entry.getValue().size()));
   }
   System.out.println("------------------------------------------ :: number of key for after adding a shard");
   for (Map.Entry<String, List<String>> entry: keyshards2.entrySet()) {
    if ( !entry.getKey().equals(String.format("shard%d", shardsCount + 1))) {
     int before = keyshards.get(entry.getKey()).size();
     int after = entry.getValue().size();
     System.out.println(String.format("%s: %d=>%d(-%d)", entry.getKey(), before, after, before-after));
    } else {
     System.out.println(String.format("%s: 0=>%d", entry.getKey(), entry.getValue().size()));
    }
   }
   System.out.println("------------------------------------------ :: number of key shard node that has moved");
   for (Map.Entry<String, AtomicInteger> entry : result.entrySet()) {
    if (!entry.getKey().equals(unchanged)) {
     System.out.println(String.format("%s: %d", entry.getKey(), entry.getValue().intValue()));
    }
   }

   System.out.println("=====================================");
   System.out.println("shard.unchanged=" + result.get(unchanged).intValue());
   System.out.println("shard.changeing=" + (keys.size() - result.get(unchanged).intValue()));
   System.out.println("-------------------------------------");
   System.out.println("HIT's=" + Math.round(result.get(unchanged).intValue() * 100 / keys.size()) + "%");

  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}


テストは、ノードが5台から1台追加した場合の、100万件のキーの移動についてテストしてみたけど、ヒット率は、およそ80%程度、キーの移動はおよそ20%程度になりました。
==========================================
:: Consistent hashing sharding Testcase ::
------------------------------------------
shard1: 185436
shard2: 218328
shard3: 207460
shard4: 170488
shard5: 218288
------------------------------------------ :: number of key for after adding a shard
shard1: 185436=>144072(-41364)
shard2: 218328=>185908(-32420)
shard3: 207460=>177256(-30204)
shard4: 170488=>144561(-25927)
shard5: 218288=>179190(-39098)
shard6: 0=>169013
------------------------------------------ :: number of key shard node that has moved
shard1=>shard6: 41364
shard2=>shard6: 32420
shard3=>shard6: 30204
shard4=>shard6: 25927
shard5=>shard6: 39098
=====================================
shard.unchanged=830987
shard.changeing=169013
-------------------------------------
HIT's=83%

また、上記サンプルコードを利用して数パターン試した結果は以下のようになりました
キーの数変更前node数変更後node数key移動件数ヒット率
10000005616901383%
10000003434443365%
1000056169283%
1000034244875%

Consistent Hashing を利用することで、ノードを追加しても、既存のノード間でのキーの移動は考慮しなくてよさそうなので、良い方法でした。

2013年5月3日金曜日

Template Engine - mustache : How to import another template.

以前にメモした {{mustache}} テンプレートについて書きましたが、mustache を使って対象のテンプレートに共通のヘッダやらフッターやらをimportしたいって思いますよね。
以下のようにできます。まぁ本家のマニュアルにも記載してありましたが,,

まずは、元になるテンプレートを用意します。
import したいテンプレートの部分は {{ > 対象となるテンプレートのパス }} のように設定します。

{{> common/common_header }}
  {{!This is comment of Mustache!!}}
  <h1>pojo.str</h1>
  <h2>{{str}}</h2>
  
  <h1>pojo.num</h1>
  <h2>{{num}}</h2>
  
  <h1>pojo.flag</h1>
  {{#flag}}flag = true {{/flag}}
  {{^flag}}flag = false {{/flag}}
  
  <h1>pojo.array</h1>
  {{#array}}
  {{.}}</br>
  {{/array}}
  
  <h1>pojo.data</h1>
  {{#data}}
  <p>upper: {{a}} , {{b}}</p></br>
  {{/data}}
  
  <h1>Escaped Characters</h1>
  {{escape}}
{{> common/common_footer }}

で import されるテンプレートは以下のように定義

<!DOCTYPE html>
<html>
<head>
<title>{{title}}</title>
<meta charset="UTF-8">
</head><body>

<p>This is footer</p>
</body>
</html>

プログラム側は、以下のように ※前回のとほぼ変わりません。テンプレートに渡すオブジェクトを2つ渡しているだけです

package com.blogspot.agiroh.netty.template.html;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.github.mustachejava.DefaultMustacheFactory;
import com.github.mustachejava.Mustache;
import com.github.mustachejava.MustacheFactory;

public class ExampleMustache {

 final static String template_path = "template/mustache/example.mustache";

 public static class ExamplePojo {
  String str;
  long num;
  boolean flag;
  Map<String, Object> data;
  List<String> array;
  String escape;
 }

 static Map<String, Mustache> templates = new HashMap<>();

 public void run() throws Exception {

  // default is classpath.
  MustacheFactory factory = new DefaultMustacheFactory(); 
  
  Mustache mustache = null;
  if (!templates.containsKey(template_path)) {
   mustache = factory.compile(template_path);
   templates.put(template_path, mustache);
  } else {
   System.out.println("templates from cache");
   mustache = templates.get(template_path);
  }

  ExamplePojo pojo = new ExamplePojo();
  pojo.str = UUID.randomUUID().toString();
  pojo.num = new Date().getTime() / 1000;
  pojo.flag = true;
  pojo.data = new HashMap<String, Object>();
  pojo.data.put("a", "A");
  pojo.data.put("b", "B");
  pojo.array = new ArrayList<>();
  pojo.array.add("hoge");
  pojo.array.add("fuga");
  pojo.escape = "<p>\"te&st\"</p>";

  Map<String, String> headerPojo = new HashMap<>();
  headerPojo.put("title", "Import another templtes");

  mustache.execute(new PrintWriter(System.out), new Object[] { pojo, headerPojo }).flush();
 }

 public static void main(String[] args) {
  try {
   new ExampleMustache().run();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

これで思ったとおり動作できました。

2013年4月22日月曜日

Installed groovy for Mac OS X

Groovyが気になって自分のMacにインストールしました。少しこの週末にみましたが、rubyの影響を大きく受けている言語なのがよくわかります。wikipediaさんによくまとまってます。

ダウンロードは、groovyの公式サイトからzip形式でダウンロードできます。

http://dist.groovy.codehaus.org/distributions/groovy-binary-2.1.3.zip

インストールは至って簡単、zipを任意の場所に解凍するだけ。
※ただし、javaが事前にインストールされていること。また、PATHが通っている必要があります。

とりあえず、/usr/local に解凍して以下のようにしておく。
ln -s /usr/local/groovy-2.1.3 /usr/local/groovy

~/.bashrc とかに以下を追記
export GROOVY_HOME=/usr/local/groovy
export PATH=$GROOVY_HOME/bin:$PATH

以下のコマンドで起動する確認

[horiga@local]: groovysh
Groovy Shell (2.1.3, JVM: 1.7.0_11)
Type 'help' or '\h' for help.
--------------------------------------------------------------------------------------
groovy:000>

よし、とりあえず確認まで。これからいろいろ試します。

2013年3月15日金曜日

nginx>fluentd>mongodb

スマートフォンからのアクセスをサービスのAPサーバに影響なく指標値として収集したくて、アクセスログやアプリケーションログを使おうと思って、まずは、nginxのアクセスログを保存する方法を検討して、fluentdmongodb を試してみました。

まず、fluentd に mongodb の plugin をインストールします。
[horiga@bin]: fluent-gem install fluent-plugin-mongo
Successfully installed fluent-plugin-mongo-0.6.13
1 gem installed
Installing ri documentation for fluent-plugin-mongo-0.6.13...
Installing RDoc documentation for fluent-plugin-mongo-0.6.13...
で、fluentd の設定ファイルを以下のようにしました。
<source>
 type tail
 path /usr/local/nginx/logs/access_heartbeat.log
 tag nginx.hb
 format /^(?<ipaddr>[^ ]*) \[(?<ts>[^\]]*)\] "(?<httpm>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<stat>[^ ]*) "(?<ua>[^\"]*)" (?<chid>[^ ]*)$/
 time_format %d/%b/%Y:%H:%M:%S %z
 pos_file /usr/local/fluent/logs/nginx_access_heartbeat.log.pos
</source>
<match nginx.**>
 type mongo
 database heartbeatdb
 collection heartbeat
 host localhost
 port 27017
 flush_interval 10s
</match>
そして、fluentd を起動しておきます。ext_bson とかいうpluginの警告ログがでますが、まぁBSONは使わないので無視しておきました。
次に、mongodb を起動します。とりあえずオプションなしのシングルモードで起動します。
defaultポートが27017で起動しました。fluent2mongo.conf に設定した port になりますね。
[horiga@bin]: ./mongod
./mongod --help for help and startup options
Fri Mar 15 21:03:56 [initandlisten] MongoDB starting : pid=51694 port=27017 dbpath=/data/db/ 64-bit host=hiroyuki-no-MacBook-Air.local
Fri Mar 15 21:03:56 [initandlisten] 
Fri Mar 15 21:03:56 [initandlisten] ** WARNING: soft rlimits too low. Number of files is 256, should be at least 1000
Fri Mar 15 21:03:56 [initandlisten] db version v2.2.2, pdfile version 4.5
Fri Mar 15 21:03:56 [initandlisten] git version: d1b43b61a5308c4ad0679d34b262c5af9d664267
Fri Mar 15 21:03:56 [initandlisten] build info: Darwin bs-osx-106-x86-64-1.local 10.8.0 Darwin Kernel Version 10.8.0: Tue Jun  7 16:33:36 PDT 2011; root:xnu-1504.15.3~1/RELEASE_I386 i386 BOOST_LIB_VERSION=1_49
Fri Mar 15 21:03:56 [initandlisten] options: {}
Fri Mar 15 21:03:56 [initandlisten] journal dir=/data/db/journal
Fri Mar 15 21:03:56 [initandlisten] recover : no journal files present, no recovery needed
Fri Mar 15 21:03:56 [websvr] admin web console waiting for connections on port 28017
Fri Mar 15 21:03:56 [initandlisten] waiting for connections on port 27017
nginx の設定はとりあえず静的ファイルを変換するように以下のように設定
#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

 log_format fluent '$remote_addr [$time_local] "$request" $status "$http_user_agent" $http_x_test_chid'; 
    
 access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    server {
        listen       9001;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
            root   html;
            index  index.html index.htm;
        }

  location /hb/  {
    root html;
    index success.html;
    access_log logs/access_heartbeat.log fluent;
  }

        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}
nginx を起動して、"http://localhost:9001/hb/?q=123" へアクセスしてみるとnginxのアクセスログに確かにアクセスログが残っていることを確認
[horiga@bin]: curl -H 'X-TEST-ChId:123456' http://localhost:9001/hb/?q=123
ok
[horiga@logs]: tail -F access_heartbeat.log 
127.0.0.1 [15/Mar/2013:21:15:25 +0900] "GET /hb/?q=123 HTTP/1.1" 200 "curl/7.24.0 (x86_64-apple-darwin12.0) libcurl/7.24.0 OpenSSL/0.9.8r zlib/1.2.5" 123456
で、問題のmongodbにはどうか?
> use heartbeatdb 
> db.heartbeat.find();
{ "_id" : ObjectId("514310dc0840f7cb14000001"), "ipaddr" : "127.0.0.1", "ts" : "15/Mar/2013:21:14:27 +0900", "httpm" : "GET", "path" : "/hb/?q=123", "stat" : "200", "ua" : "curl/7.24.0 (x86_64-apple-darwin12.0) libcurl/7.24.0 OpenSSL/0.9.8r zlib/1.2.5", "chid" : "123456", "time" : ISODate("2013-03-15T12:15:13Z") }
おおっ!!たしかにmongodbにも保存されてました。ここまで特にプログラミングなし、設定のみでできます。
あとは、このデータを使ってデータ分析だな。mongodb からデータを hdfs 上にもってきて分析結果をまた mongodb に格納するとか検討してみます。