MongoDB集群与LBS应用系列(二)--与Hadoop集成

作者:web前端    发布时间:2020-02-11 09:18     浏览次数 :

[返回]

Hadoop擅长分析和处理大型数据集,而MongoDB擅长存储应用程序的大型数据集,这两者结合到一起,就诞生了Mongo-Hadoop——MongoDB主要负责存储和查询,Hadoop主要负责批处理。今天Mongo开发团队发布了Mongo-Hadoop 1.1版本。项目地址:可以很容易地使用Mongo数据库库以及.bson格式的mongoDB备份文件,并将此作为Hadoop Map/Reduce任务的数据输入源或者输出目标。Mongo-Hadoop先检测数据并计算数据分割,然后交给Hadoop并行处理,这样非常大的数据集就可以快速被处理。Mongo-Hadoop支持Pig和Hive,这两个开源项目允许编写非常简单的脚本来执行非常复杂的MapReduce工作流。Mongo-Hadoop还支持Hadoop streaming,这样你可以使用Java以及其他任何编程语言来编写map/reduce函数。目前Mongo-Hadoop支持Ruby、Node.js和Python中的streaming。Mongo-Hadoop的工作流程如下:Mongo-Hadoop首先检查MongoDB Collection,并计算数据分割每个分割部分被分配到Hadoop集群中的一个节点同时,Hadoop节点从MongoDB获取数据,并进行本地处理Hadoop合并结果,并输出到MongoDB或BSON详细信息:

长期以来,我每开个系列,只有兴趣写一篇,很难持之与恒。为了克服这个长久以来的性格弱点,以及梳理工作半年的积累。最近一个月会写两篇关于Mongo在地理大数据方面的实践和应用,一篇关于推荐系统的初期准备过程,一篇用户行为矩阵的可视化。希望能够立言为证,自我监督。

最后更新时间:2017-07-13 11:10:49

1.驱动准备

言归正传,前文MongoDB集群部署完毕之后,CRUD就是主要需求。NoSQL与普通关系数据库不同的是,避免采用ORM框架对数据库做操作,这样会带来明显的性能下降[1]。使用原生的Driver是一个较为合理的选择,Mongo支持的语言非常多,包括JS,Java,C,C++,Python,Scala等[2]。

如果是单纯的MongoDB项目,我们会用NodeJS Driver,方便快捷,示例规范,值得推荐。在本文我使用Java Driver,主要是集成Hadoop工程方便。同时还会用到Mongo Hadoop Adapter 可以选择到Github 下载源码编译,或者直接根据自己Hadoop集群版本选择下载Jar包,添加到Hadoop安装目录的lib文件夹下[3]。但是在不少公有云平台上,普通用户是没有修改Hadoop系统的权限,无法添加Jar包,所以在本文的示例代码中,采用分布式缓存的方法添加这两个Jar包。

原始文章链接:

2.实现原理与过程

其实Hadoop和MongoDB的集成,很大程度上是将Mongo作为Hadoop的输入和输出源,而Mongo Hadoop Adapter也是主要实现了BSONWritable,MongoInputformat等这些类,也就是说需要自定义Hadoop的序列化类以及输入输出格式。

官网:

2.1 Hadoop序列化与反序列化

序列化(serialization)将结构化对象转化为二进制字节流,以便网络传输和写入磁盘。反序列化(deserialization)则是它的逆过程,将字节流转化为结构化对象。分布式系统通常在进程通讯和持久化时候会使用序列化。Hadoop系统节点进程通信使用RPC,该协议存活时间非常短,因此需要其序列化格式具备以下特点:紧凑、快速、可扩展等。Hadoop提供了Writable接口,它定义了对数据的IO流,即需要实现readFields 和 Write两个方法[4]。

MongoDB 是一个基于分布式文件存储的数据库,由 C++ 语言编写,旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。

2.2 Mongo Adapter的源码实现

Mongo Hadoop Adater所实现的BSONWritable等类,源码实现体现了上述的规范:

//输出
public void write( DataOutput out ) throws IOException{
        BSONEncoder enc = new BasicBSONEncoder();
        BasicOutputBuffer buf = new BasicOutputBuffer();
        enc.set( buf );
        …………
    }
//输入
public void readFields( DataInput in ) throws IOException{
        BSONDecoder dec = new BasicBSONDecoder();
        BSONCallback cb = new BasicBSONCallback();
        // Read the BSON length from the start of the record
       //字节流长度
        byte[] l = new byte[4];
        try {
            in.readFully( l );
            …………
            byte[] data = new byte[dataLen + 4];
            System.arraycopy( l, 0, data, 0, 4 );
            in.readFully( data, 4, dataLen - 4 );
            dec.decode( data, cb );
            _doc = (BSONObject) cb.get();
           ………………
    }

 

因此我们在编写MapReduce程序的时候可以传递BsonWritable的key,value键值对,而Mongo构建于Bson之上,也就是说可以将MongoDB视为HDFS同性质的存储节点即可。

MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。

3. 代码实现

在Mongo-Hadoop网站有数个例子,但是讲得不够详细,本文主要对它的金矿产量的例子做一个补充。完整的Hadoop项目一般包括Mapper,Reduceer,Job三个Java Class,以及一个一个配置文件(configuration.xml)来定义项目的输入输出等。Mongo-Hadoop项目会多一个mongo-defaults.xml,当然可以将两者融合起来。

图片 1

3.1  数据准备

从github中下载源码包,它会包含examples/treasury_yield/src/main/resources/yield_historical_in.json文件,将该json文件上传到Mongo所在的服务器,使用以下命令将它导入Mongo的testmr数据库中的example collection中。

mongoimport --host 127.0.0.1 --port 27017 -d testmr -c example --file ./yield_historical_in.json

查看一下数据结构

use testmr
db.example.find().limit(1).pretty()

如下:

{
  "_id": ISODate("1990-01-25T19:00:00-0500"),
  "dayOfWeek": "FRIDAY", "bc3Year": 8.38,
  "bc10Year": 8.49,
  …
}
  1. 下载

3.2  Mapper和Reducer还有Job以及mongo-defaults.xml

Mapper是从Mongo中读取BSONObject

public class MongoTestMapper extends Mapper<Object,BSONObject, IntWritable, DoubleWritable>

以及处理读过来的<key,value>键值对,并发到Reducer中汇总计算。注意value的类型。

public void map(final Object pkey, final BSONObject pvalue,final Context context)
        {
            final int year = ((Date)pvalue.get("_id")).getYear()+1990;
            double bdyear  = ((Number)pvalue.get("bc10Year")).doubleValue();
            try {
                context.write( new IntWritable( year ), new DoubleWritable( bdyear ));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

Reducer会接受Mapper传过来的键值对

public class MongoTestReducer extends Reducer<IntWritable,DoubleWritable,IntWritable,BSONWritable>

进行计算并将结果写入MongoDB.请注意输出的Value的类型是BSONWritable.

public void reduce( final IntWritable pKey,
            final Iterable<DoubleWritable> pValues,
            final Context pContext ) throws IOException, InterruptedException{
      int count = 0;
      double sum = 0.0;
      for ( final DoubleWritable value : pValues ){
          sum += value.get();
          count++;
      }

      final double avg = sum / count;

        BasicBSONObject out = new BasicBSONObject();
        out.put("avg", avg);
        pContext.write(pKey, new BSONWritable(out));
    }

 

Job作为MapReudce主类,主要使用DistributedCache分布式缓存来添加驱动包,并定义了任务的输入配置等。如下所示:

//Using Distribute Cache,call it before job define.
        DistributedCache.createSymlink(conf);
//………………
//Using DistributedCache to add Driver Jar File
        DistributedCache.addFileToClassPath(new Path("/user/amap/data/mongo/mongo-2.10.1.jar"), conf);
        DistributedCache.addFileToClassPath(new Path("/user/amap/data/mongo/mongo-hadoop-core_cdh4.3.0-1.1.0.jar"), conf);

// job conf
        Job job = new Job(conf,"VentLam:Mongo-Test-Job");

mongo-defaults.xml 配置文件中定义了非常多的参数,我们只需要修改输入输出URI

   <!-- If you are reading from mongo, the URI -->
    <name>mongo.input.uri</name>
    <value>mongodb://127.0.0.1/testmr.example</value>
  </property>
  <property>
    <!-- If you are writing to mongo, the URI -->
    <name>mongo.output.uri</name>
    <value>mongodb://127.0.0.1/testmr.mongotest</value>
  </property>
  <property>

 

将整个java项目打包为名为mongotest的jar包,上传到Hadoop集群,执行命令:

hadoop jar mongotest.jar org.ventlam.MongoTestJob

以后会将我的博客涉及到源码都发布在 中,这篇文章对应的是mongohadoop文件夹。

07/05/2017 Current Stable Release

4.参考文献

[1] What the overhead of Java ORM for MongoDB

[2] MongoDB Drivers and Client Libraries

[3]Getting Started with Hadoop

[4] Interface Writable    

图片 2

本作品由VentLam创作,采用知识共享署名-非商业性使用-相同方式共享 2.5 中国大陆许可协议进行许可。

  1. 创建数据目录

MongoDB 将数据目录存储在 db 目录下,需手动创建。

E:MongoDBdatadb
  1. 运行 MongoDB 服务器

为了从命令提示符下运行MongoDB服务器,你必须从MongoDBbin目录中执行mongod.exe文件,不要关闭服务。ctrl + c关闭。

mongod.exe --dbpath E:MongoDBdatadb
  1. MongoDB 后台管理

运行 mongo.exe

MongoDB Shell是MongoDB自带的交互式Javascript shell,用来对MongoDB进行操作和管理的交互式环境。

  1. 将 MongoDB 服务器作为 Windows 服务运行

添加系统环境 path E:MongoDBServer3.4bin

检测:cmd 中输入 mongod --help

新建文件:E:MongoDBlogslogs.log

将 MongoDB 服务器作为 Windows 服务随 Windows 启动而开启:

mongod.exe --logpath "E:MongoDBlogslogs.log" --logappend --dbpath "E:MongoDBdata" --directoryperdb --serviceName MongoDB --install

开启 MongoDB 服务:net start MongoDB

停止 MongoDB 服务:net stop MongoDB

删除 MongoDB 服务:sc delete MongoDB

接下来就可以在 cmd 中运行 E:MongoDBServer3.4bin 里面的 *.exe 程序了

  • shell 控制台 mongo
  • 数据库的还原 mongorestore
  • 备份 mongodump
  1. mongodb 启动的参数

图片 3

  • MongoDB安装简单。
  • MongoDB的提供了一个面向文档存储,没有表结构的概念,每天记录可以有完全不同的结构,操作起来比较简单和容易。
  • 完全的索引支持(单键索引、数组索引、全文索引、地理位置索引 等)
  • 你可以通过本地或者网络创建数据镜像,这使得MongoDB有更强的扩展性。
  • 如果负载的增加(需要更多的存储空间和更强的处理能力) ,它可以分布在计算机网络中的其他节点上这就是所谓的分片。
  • Mongo支持丰富的查询表达式。查询指令使用JSON形式的标记,可轻易查询文档中内嵌的对象及数组。
  • MongoDb 使用update()命令可以实现替换完成的文档或者一些指定的数据字段 。
  • Mongodb中的Map/reduce主要是用来对数据进行批量处理和聚合操作。
  • Map和Reduce。Map函数调用emit(key,value)遍历集合中所有的记录,将key与value传给Reduce函数进行处理。
  • Map函数和Reduce函数是使用Javascript编写的,并可以通过db.runCommand或mapreduce命令来执行MapReduce操作。
  • GridFS是MongoDB中的一个内置功能,可以用于存放大量小文件。
  • MongoDB允许在服务端执行脚本,可以用Javascript编写某个函数,直接在服务端执行,也可以把函数的定义存储在服务端,下次直接调用即可。
  • MongoDB 支持多种编程语言:C C++ C# .NET Erlang Haskell Java JavaScript Lisp node.JS Perl PHP Python Ruby Scala 等

监控

  • Munin:网络和系统监控工具
  • Gangila:网络和系统监控工具
  • Cacti:用于查看CPU负载, 网络带宽利用率,它也提供了一个应用于监控 MongoDB 的插件。

GUI

  • Robomongo
  • Fang of Mongo – 网页式,由Django和jQuery所构成。
  • Futon4Mongo – 一个CouchDB Futon web的mongodb山寨版。
  • Mongo3 – Ruby写成。
  • MongoHub – 适用于OSX的应用程序。
  • Opricot – 一个基于浏览器的MongoDB控制台, 由PHP撰写而成。
  • Database Master — Windows的mongodb管理工具
  • RockMongo — 最好的PHP语言的MongoDB管理工具,轻量级, 支持多国语言.

图片 4

多个集合逻辑上组织在一起,就是数据库。

数据库命名规范:

  • 不能是空字符串。
  • 不得含有' '、.、$、/、和 。
  • 应全部小写。
  • 最多64字节。

有一些数据库名是保留的,可以直接访问这些有特殊作用的数据库。

  • admin: 从权限的角度来看,这是"root"数据库。要是将一个用户添加到这个数据库,这个用户自动继承所有数据库的权限。一些特定的服务器端命令也只能从这个数据库运行,比如列出所有的数据库或者关闭服务器。
  • local: 这个数据永远不会被复制,可以用来存储限于本地单台服务器的任意集合
  • config: 当Mongo用于分片设置时,config数据库在内部使用,用于保存分片的相关信息。

多个文档组成一个集合,相当于关系数据库的表。

所有存储在集合中的数据都是 BSON 格式,BSON 是类 JSON 的一种二进制形式的存储格式,简称 Binary JSON。