大量データを分散させる技術?アルゴリズムで consistent hashing ということが挙げられます。mixi さんのエンジニアブログでも紹介されていましたが、Javaでサンプルコードを書いてみました。
package com.blogspot.horiga3.example; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; public class ConsistentHashing<T> { static final HashFunction DEFAULT_HASH_FUNCTION = Hashing.md5(); static final int DEFAULT_NUMBER_OF_REPLICAS = 200; static int numberOfReplicas = DEFAULT_NUMBER_OF_REPLICAS; final SortedMap<Long, T> shardedNodes = new TreeMap<Long, T>(); final HashFunction hashFunction; public ConsistentHashing(Collection<T> nodes, HashFunction hashFunction) { this.hashFunction = hashFunction != null ? hashFunction : DEFAULT_HASH_FUNCTION; for (T node : nodes) { for (int i = 0; i < numberOfReplicas; i++) { shardedNodes.put(hashFunction.hashString(String.format("SHARDS-%s-%03d", node.toString(), i)).asLong(), node); } } } public T get(Object key) { if (shardedNodes.isEmpty()) { return null; } long hash = hashFunction.hashString(key.toString()).asLong(); if (!shardedNodes.containsKey(hash)) { SortedMap<Long, T> tailMap = shardedNodes.tailMap(hash); hash = tailMap.isEmpty() ? shardedNodes.firstKey() : tailMap.firstKey(); } return shardedNodes.get(hash); } public static void main(String[] args) { try { List<String> shards = new ArrayList<String>(); final int shardsCount = 5; final int numberOfUsers = 1000000; for (int i = 1; i <= shardsCount; i++) shards.add("shard" + i); List<String> keys = new ArrayList<String>(); for (int i = 0; i < numberOfUsers; i++) { keys.add(UUID.randomUUID().toString().replaceAll("-", "")); } final HashFunction hashFunction = Hashing.md5(); ConsistentHashing<String> consistentHashing = new ConsistentHashing<String>(shards, hashFunction); SortedMap<String, List<String>> keyshards = new TreeMap<String, List<String>>(); for (String key : keys) { String shard = consistentHashing.get(key); if (keyshards.containsKey(shard)) keyshards.get(shard).add(key); else { keyshards.put(shard, new ArrayList<String>()); keyshards.get(shard).add(key); } } // shard node added shards.add(String.format("shard%d", shardsCount + 1)); ConsistentHashing<String> consistentHashing2 = new ConsistentHashing<String>(shards, hashFunction); SortedMap<String, List<String>> keyshards2 = new TreeMap<String, List<String>>(); for (String key : keys) { String k = consistentHashing2.get(key); if (keyshards2.containsKey(k)) keyshards2.get(k).add(key); else { keyshards2.put(k, new ArrayList<String>()); keyshards2.get(k).add(key); } } SortedMap<String, AtomicInteger> result = new TreeMap<String, AtomicInteger>(); final String unchanged = "shard.unchanged"; for (String key : keys) { String shard1 = consistentHashing.get(key); String shard2 = consistentHashing2.get(key); if (shard1.equals(shard2)) { if (!result.containsKey(unchanged)) result.put(unchanged, new AtomicInteger(1)); else result.get(unchanged).incrementAndGet(); } else { String k = shard1 + "=>" + shard2; if (!result.containsKey(k)) result.put(k, new AtomicInteger(1)); else result.get(k).incrementAndGet(); } } System.out.println("=========================================="); System.out.println(":: Consistent hashing sharding Testcase ::"); System.out.println("------------------------------------------"); for (Map.Entry<String, List<String>> entry: keyshards.entrySet()) { System.out.println(String.format("%s: %d", entry.getKey(), entry.getValue().size())); } System.out.println("------------------------------------------ :: number of key for after adding a shard"); for (Map.Entry<String, List<String>> entry: keyshards2.entrySet()) { if ( !entry.getKey().equals(String.format("shard%d", shardsCount + 1))) { int before = keyshards.get(entry.getKey()).size(); int after = entry.getValue().size(); System.out.println(String.format("%s: %d=>%d(-%d)", entry.getKey(), before, after, before-after)); } else { System.out.println(String.format("%s: 0=>%d", entry.getKey(), entry.getValue().size())); } } System.out.println("------------------------------------------ :: number of key shard node that has moved"); for (Map.Entry<String, AtomicInteger> entry : result.entrySet()) { if (!entry.getKey().equals(unchanged)) { System.out.println(String.format("%s: %d", entry.getKey(), entry.getValue().intValue())); } } System.out.println("====================================="); System.out.println("shard.unchanged=" + result.get(unchanged).intValue()); System.out.println("shard.changeing=" + (keys.size() - result.get(unchanged).intValue())); System.out.println("-------------------------------------"); System.out.println("HIT's=" + Math.round(result.get(unchanged).intValue() * 100 / keys.size()) + "%"); } catch (Exception e) { e.printStackTrace(); } } }
テストは、ノードが5台から1台追加した場合の、100万件のキーの移動についてテストしてみたけど、ヒット率は、およそ80%程度、キーの移動はおよそ20%程度になりました。
========================================== :: Consistent hashing sharding Testcase :: ------------------------------------------ shard1: 185436 shard2: 218328 shard3: 207460 shard4: 170488 shard5: 218288 ------------------------------------------ :: number of key for after adding a shard shard1: 185436=>144072(-41364) shard2: 218328=>185908(-32420) shard3: 207460=>177256(-30204) shard4: 170488=>144561(-25927) shard5: 218288=>179190(-39098) shard6: 0=>169013 ------------------------------------------ :: number of key shard node that has moved shard1=>shard6: 41364 shard2=>shard6: 32420 shard3=>shard6: 30204 shard4=>shard6: 25927 shard5=>shard6: 39098 ===================================== shard.unchanged=830987 shard.changeing=169013 ------------------------------------- HIT's=83%
また、上記サンプルコードを利用して数パターン試した結果は以下のようになりました
キーの数 | 変更前node数 | 変更後node数 | key移動件数 | ヒット率 |
---|---|---|---|---|
1000000 | 5 | 6 | 169013 | 83% |
1000000 | 3 | 4 | 344433 | 65% |
10000 | 5 | 6 | 1692 | 83% |
10000 | 3 | 4 | 2448 | 75% |
Consistent Hashing を利用することで、ノードを追加しても、既存のノード間でのキーの移動は考慮しなくてよさそうなので、良い方法でした。