
fff8 10年前

原文出处: 蘑菇先生  




  1. 背景
  2. Map实现
  3. Reduce实现
  4. 支持分布式
  5. 总结




const string hamlet = @"Though yet of Hamlet our dear brother's death  The memory be green, and that it us befitted  To bear our hearts in grief and our whole kingdom  To be contracted in one brow of woe,  Yet so far hath discretion fought with nature  That we with wisest sorrow think on him,  Together with remembrance of ourselves.  Therefore our sometime sister, now our queen,  The imperial jointress to this warlike state,  Have we, as 'twere with a defeated joy,--  With an auspicious and a dropping eye,  With mirth in funeral and with dirge in marriage,  In equal scale weighing delight and dole,--  Taken to wife: nor have we herein barr'd  Your better wisdoms, which have freely gone  With this affair along. For all, our thanks.  Now follows, that you know, young Fortinbras,  Holding a weak supposal of our worth,  Or thinking by our late dear brother's death  Our state to be disjoint and out of frame,  Colleagued with the dream of his advantage,  He hath not fail'd to pester us with message,  Importing the surrender of those lands  Lost by his father, with all bonds of law,  To our most valiant brother. So much for him.  Now for ourself and for this time of meeting:  Thus much the business is: we have here writ  To Norway, uncle of young Fortinbras,--  Who, impotent and bed-rid, scarcely hears  Of this his nephew's purpose,--to suppress  His further gait herein; in that the levies,  The lists and full proportions, are all made  Out of his subject: and we here dispatch  You, good Cornelius, and you, Voltimand,  For bearers of this greeting to old Norway;  Giving to you no further personal power  To business with the king, more than the scope  Of these delated articles allow.  Farewell, and let your haste commend your duty.";


var content = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);              var wordcount=new Dictionary<string,int>();              foreach (var item in content)              {                  if (wordcount.ContainsKey(item))                      wordcount[item] += 1;                  else                      wordcount.Add(item, 1);              }

第一步:先把文本以某维度分解映射成最小独立单元。 (段落、单词、字母维度)。




public IEnumerable<Tuple<T, int>> Mapping(IEnumerable<T> list)          {              foreach (T sourceVal in list)                  yield return Tuple.Create(sourceVal, 1);          }

使用,输出为(brow, 1), (brow, 1), (sorrow, 1), (sorrow, 1):

 var spit = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);              var mp = new MicroMapReduce<string>(new Master<string>());              var result= mp.Mapping(spit);


为了减少数据通信开销,mapping出的键值对数据在进入真正的reduce前,进行重复键合并。也相对于提前进行预计算一部分,加快总体计算速度。 输出格式为(brow, 2), (sorrow, 2):

public Dictionary<T, int> Combine(IEnumerable<Tuple<T, int>> list)          {              Dictionary<T, int> dt = new Dictionary<T, int>();              foreach (var val in list)              {                  if (dt.ContainsKey(val.Item1))                      dt[val.Item1] += val.Item2;                  else                      dt.Add(val.Item1, val.Item2);              }              return dt;          }


其输出格式为: (brow, {(brow,2)},(brow,3)), (sorrow, {(sorrow,10)},(brow,11)):

public IEnumerable<Group<T, int>> Partitioner(Dictionary<T, int> list)          {              var dict = new Dictionary<T, Group<T, int>>();              foreach (var val in list)              {                  if (!dict.ContainsKey(val.Key))                      dict[val.Key] = new Group<T, int>(val.Key);                  dict[val.Key].Values.Add(val.Value);              }              return dict.Values;          }


public class Group<TKey, TValue> : Tuple<TKey, List<TValue>>      {          public Group(TKey key)              : base(key, new List<TValue>())          {          }            public TKey Key          {              get              {                  return base.Item1;              }          }            public List<TValue> Values          {              get              {                  return base.Item2;              }          }      }




public Dictionary<T, int> Reducing(IEnumerable<Group<T, int>> groups)          {              Dictionary<T, int> result=new Dictionary<T, int>();              foreach (var sourceVal in groups)              {                  result.Add(sourceVal.Key, sourceVal.Values.Sum());              }              return result;          }


public IEnumerable<Group<T, int>> Map(IEnumerable<T> list)          {              var step1 = Mapping(list);              var step2 = Combine(step1);              var step3 = Partitioner(step2);              return step3;          }      public Dictionary<T, int> Reduce(IEnumerable<Group<T, int>> groups)          {              var step1 = Reducing(groups);              return step1;          }
public  Dictionary<T, int> MapReduce(IEnumerable<T> list)          {              var map = Map(list);              var reduce = Reduce(map);              return reduce;          }





var spit = hamlet.Split(new[] { " ", Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);              var mp = new MicroMapReduce<string>(new Master<string>());              var result1= mp.MapReduce(spit);

小张完成后脑洞大开,考虑到以后文本数据量超大。 所以fork了个分支,准备支持分布式计算,以后可以在多个服务器节点跑。


数据分片就是把大量数据拆成一块一块的,分散到各个节点上,方便我们的mapReduce程序去计算。 分片主流的有mod、consistent hashing、vitual Buckets、Range Partition等方式。 关于consistent hashing上篇有介绍(探索c#之一致性Hash详解)。在Hadoop中Hdfs和mapreduce是相互关联配合的,一个存储和一个计算。如果自行实现的话还需要个统一的存储。所以这里的数据源可以是数据库也可以是文件。小张只是满足boss需求,通用计算框架的话可以直接用现成的。


public List<IEnumerable<T>> Partition(IEnumerable<T> list)          {              var temp =new List<IEnumerable<T>>();              temp.Add(list);              temp.Add(list);              return temp;          }


小张定义了Master,worker角色。 master负责汇集输出,即我们的主程序。 每一个worker我们用一个线程来模拟,最后输出到master汇总,master最后可以写到数据库或其他。

public void WorkerNode(IEnumerable<T> list)          {              new Thread(() =>              {                  var map = Map(list);                  var reduce = Reduce(map);                  master.Merge(reduce);              }).Start();          }
public class Master<T>      {          public Dictionary<T, int> Result = new Dictionary<T, int>();          public  void Merge(Dictionary<T, int> list)          {              foreach (var item in list)              {                  lock (this)                  {                      if (Result.ContainsKey(item.Key))                          Result[item.Key] += item.Value;                      else                          Result.Add(item.Key, item.Value);                  }              }          }      }





