Kivi

没有什么远大理想,只是永远都不会满足而已


  • 首页

  • 关于

  • 标签

  • 归档

MongoDB聚合操作小节

发表于 2016-10-26 更新于 2017-07-02 分类于 mongodb 阅读次数:
本文字数: 13k 阅读时长 ≈ 11 分钟

为什么使用聚合

  • 聚合操作的作用

    Aggregations operations process data records and return computed results. Aggregation operations group values from multiple documents together, and can perform a variety of operations on the grouped data to return a single result.

    我的理解是,聚合类的操作提供了一个便捷的计算查询结果的方法。例如你如果使用简单的查询语句可能返回的是一组结果,需要代码去把结果进行计算处理,但是如果使用聚合操作,可以通过一个聚合语句直接返回计算结果。

  • 这里举一个官网上提供的最简单的例子

    聚合操作示例

    如果需要从左边的数据得到右边的结果,一个简单的查询是没有办法做到的,但是聚合操作就可以做到。

MongoDB提供了哪些聚合操作

Aggregation Pipeline

MongoDB提供的聚合操作(aggregation)是基于数据处理管道模型的。管道在Unix和Linux中一般用于将当前命令的输出结果作为下一个命令的参数。MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传递给下一个管道处理。

最基本的聚合管道类似过滤器或者是文档格式转换器。稍微复杂点的,会提供分组,排序,计算等操作。

相比其他的聚合方法,聚合管道这种聚合方式是比较高效的,因为聚合管道是mongodb远程操作去执行的(区别于map reduce),这是mongodb官方推荐的聚合方式

聚合管道可以在分片集合上执行

可以通过添加索引来提高聚合管道的性能,聚合管道自身也有一套性能优化方案

Map-reduce

MongoDB还提供了一种简单聚合操作,map reduce。map reduce的聚合原理分为两个步骤,即map阶段和reduce阶段。这两个操作非常类似es5中,数组原型上的两个方法:map和reduce。不理解的建议先理解一下这两个方法

  • map过程: 将输入的一组文档映射成为另外一组文档(便于统计计算的格式的文档)
  • reduce过程:对map的结果进行聚合,计算等操作

这么些可能不太好理解,还是举一个官网的例子
map-reduce示例

map-reduce可以指定一个最终的数据处理阶段,用于对数据的最后一次整理。而且map reduce也可以指定一个查询语句去过滤数据,或者进行sort和limit操作。
map-reduce使用js方法去执行聚合操作,这样的好处是提供了极大的灵活性,缺点是在性能上会比aggregation pipeline差。
map-reduce在分片的集合上操作,也可以把数据输出到分片集合上。

Single Purpose Aggregation Operations(单一目的聚合操作)

MongoDB提供了一些常用的简单聚合操作的快捷入口,例如db.collection.count(), db.collection.group(), db.collection.distinct()
这些操作使用起来非常简单,但是没有aggregation pipeline 和 map-reduce 灵活。性能上,这些操作是比aggregation pipeline要慢的,这种聚合操作就是方便了调用。

group操作示例

MongoDB三种聚合操作的比较

聚合操作比较表

aggregate pipeline map-reduce group
描述 为了聚合操作的性能和可用性而生,使用多级管道处理数据 适合处理较大数据量的聚合操作,有map,reduce,final(不是必须)三个数据处理阶段 提供函数式的简单聚合操作,但是速度比aggregate慢,灵活性没有aggregate和map-reduce高
主要特点

管道操作可重复多次,没有次数限制

管道不用实现每一个输入文档都对应一个输出文档; 管道也提供生成新的文档,过滤输入文档的功能

除了分组操作,可以执行复杂的聚合任务以及对不断增长的数据集执行增量聚合(增量聚合稍后讨论)

非常方便的对文档现有字段进行分组

也可以通过定制keyf js函数,对计算过的字段进行分组(keyf函数稍后讨论)

灵活性 尽管受mongodb提供的aggregation操作符和表达式的限制,还是能够通过$project实现常用的聚合基本操作(稍后讨论aggregation操作符) 自定义的map,reduce,final等js函数给聚合操作提供了非常大的灵活性 自定义的reduce,final等js函数为group操作提供了灵活性
输出结果

返回一个内联文档(包含一个结集和一个结果集的指针),或者是存储结果到集合中

如果以内联文档的形式返回,那么返回数据受到BSON文档大小的限制(稍后讨论BSON文档大小限制)

NOTE:2.6版本之后可以只返回一个指针或者是存储数据到集合中

多种数据输出方式,可以内联文档形式返回,可以存入collection,可以merge,replace,reduce

返回一个内联数组(分过组的数据组成的数组),返回数据受到BSON文档大小的限制

返回的数组最多包含20,000 条数据

对分片的支持 支持分片集合 支持分片集合 不支持分片集合

聚合操作比较补充说明


Map-reduce的增量聚合

如果当前map-reduce数据集合处于持续增长的状态,我们会希望能够实现增量的map-reduce(每次都只对新增数据进行map-reduce),而不是每次聚合都需要对所有数据进行map-reduce。

怎样进行增量map-reduce?

  • 在一个collection中启动map-reduce任务,输出结果到另外一个独立的collection中
  • 当有新的数据需要处理的时候,在下面的前提下执行之后的map-reduce任务
    1. query参数要能匹配出新增的数据
    2. out参数要能使reduce产出的新的结果正确的合并到已存在的结果集中

下面通过一个例子说明:

  • 场景:每天结束的时候增量聚合一下用户的session集合
  • 数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 }
    { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length: 110 }
    { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length: 120 }
    { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length: 45 }
    /****************************************************************/
    { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length: 105 }
    { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length: 120 }
    { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length: 130 }
    { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length: 65 }
  • 初始化session集合的map-reduce:
    1.定义map方法,实现userid到用户信息的映射

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    var mapFunction = function() {
    var key = this.userid;
    var value = {
    userid: this.userid,
    total_time: this.length,
    count: 1,
    avg_time: 0
    };

    emit( key, value );
    };

    2.定义与map对应的reduce方法,实现通过key和value去计算total_time和count

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    var reduceFunction = function(key, values) {
    var reducedObject = {
    userid: key,
    total_time: 0,
    count:0,
    avg_time:0
    };

    values.forEach( function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    }
    );
    return reducedObject;
    };

    3.定义finalize方法,接收key和reducedObject两个参数,修改reducedObject为最终想要的格式

    1
    2
    3
    4
    5
    6
    7
    var finalizeFunction = function (key, reducedValue) {

    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;

    return reducedValue;
    };

    4.在session集合执行map-reduce操作通过上面定义的三个方法,输出数据到session-stat集合中,如果session-stat集合已存在,则此操作将会替换一下内容:

    1
    2
    3
    4
    5
    6
    7
    db.sessions.mapReduce( mapFunction,
    reduceFunction,
    {
    out: "session_stat",
    finalize: finalizeFunction
    }
    )
  • 实现增量map-reduce
    加入今天又出现了一下几条数据:

    1
    2
    3
    4
    db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } );
    db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length: 115 } );
    db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length: 125 } );
    db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length: 55 } );

    那么接下来的map-reduce操作应该这么做:

    1
    2
    3
    4
    5
    6
    7
    8
    db.sessions.mapReduce( mapFunction,
    reduceFunction,
    {
    query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
    out: { reduce: "session_stat" },
    finalize: finalizeFunction
    }
    );

group的keyf函数

group的$keyf参数,是可选的,是key参数的一种取代方案,它指定一个函数,返回一个key Object作为分组的key。用$keyf代替key的意义在于,可以实现用计算得出的字段去分组,而不是用已经存在的字段去分组

举个例子:

  • 以下是测试数据的数据类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    {
    _id: ObjectId("5085a95c8fada716c89d0021"),
    ord_dt: ISODate("2012-07-01T04:00:00Z"),
    ship_dt: ISODate("2012-07-02T04:00:00Z"),
    item:
    {
    sku: "abc123",
    price: 1.99,
    uom: "pcs",
    qty: 25
    }
    }
  • 目标是计算总和,总数,平均值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    db.runCommand(
    {
    group:
    {
    ns: 'orders', // collection name
    $keyf: function(doc) {
    return { day_of_week: doc.ord_dt.getDay() };
    },
    cond: { ord_dt: { $gt: new Date( '01/01/2012' ) } }, // 相当于过滤条件,缺省的话就是group全部
    $reduce: function( curr, result ) {
    result.total += curr.item.qty;
    result.count++;
    },
    initial: { total : 0, count: 0 },
    finalize: function(result) {
    var weekdays = [
    "Sunday", "Monday", "Tuesday",
    "Wednesday", "Thursday",
    "Friday", "Saturday"
    ];
    result.day_of_week = weekdays[result.day_of_week];
    result.avg = Math.round(result.total / result.count);
    }
    }
    }
    )
  • 得到以下数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    {
    "retval" :
    [
    { "day_of_week" : "Sunday", "total" : 70, "count" : 4, "avg" : 18 },
    { "day_of_week" : "Friday", "total" : 110, "count" : 6, "avg" : 18 },
    { "day_of_week" : "Tuesday", "total" : 70, "count" : 3, "avg" : 23 }
    ],
    "count" : 13,
    "keys" : 3,
    "ok" : 1
    }

Aggregation Pipeline 操作符


聚合阶段操作符

操作符 说明
$project 改变数据流中的文档结构
$match 数据过滤器,只聚合match到的数据,match语句格式是mongdb标准查询语句格式
$redact 基于文档自身内容去调整文档机构,包含了$match和$project的功能,可以用于文档的字段编辑。对于每一个输入,可以有0个或者是1哥输出
$limit 限制通过的文档的数量
$skip 跳过指定数量的文档
$unwind 解构输入文档的数组字段,为数组中的每一元素指定一个新的输出文档
$group 分组操作的功能实现,需要指定一个id表达式和一些计算表达式(非必需)。都指定之后,分组操作会输入每一组所有的文档,然后输出一个经过计算之后的输出文档。注意输出文档只包含了id表达式指定的字段,以及计算表达式指定的字段(如果有计算表达式的话)
$sample 随机选择指定数量的输入文档
$sort 根据指定字段给输入结果排序,输出排序后的结果,一个输入对应一个输出
$geoNear 输出接近某一地理位置的有序文档
$lookup 类似一个left outer join 的操作
$out 输出聚合结果到一个collection中
$indexStats 返回集合中索引的使用统计结果


表达式操作符

操作符 说明
$and 逻辑与
$or 逻辑或
$not 逻辑非


集合操作符

操作符 说明
$setEquals 比较两个数组是否包含相同的不重复元素
$setIntersection 返回所有输入数组共同拥有的不重复元素
$setUnion 返回所有输入数组的所有不重复元素组成的集合
$setDifference 返回第一个数组中有的,第二个数组中没有的不重复元素组成的集合
$setIsSubset 如果第一个数组所有元素在第二个数组中都出现,则返回true,否则返回false
$anyElementTrue 数组只要有一个元素经过if语句判断都能通过的时候,返回true,否则返回false
$allElementsTrue 数组所有元素经过if语句判断都能通过的时候,返回true,否则返回false


比较操作符

操作符 说明
$cmp 相等,返回0,第一个值大于第二个,返回1,第一个值小于第二个,返回-1
$eq 两值相等,返回true
$gt 第一个值大于第二个值的时候,返回true
$gte 第一个值大于或者等于第二个值的时候,返回true
$lt 第一个值小于第二个值的时候,返回true
$lte 第一个值大于或等于第二个值的时候,返回true
$ne 两值不想等的时候返回true


数学操作符

操作符 说明
$abs 取绝对值
$add 计算两数之和或者是日期加上一个数字得到的一个新的日期,数字被当作毫秒看待。可以有任意参数,但是最多只有一个参数会被当作日期处理
$ceil 返回一个比给出值大或相等的最小整数
$divide 做除法,只接收两个参数
$exp 指定e的指数为n,计算e的n次方
$floor 返回一个比给出值小或相等的最大整数
$ln
$log
$log10
$mod 取余
$multiply 做乘法
$pow 指数运算
$sqrt 开方运算
$subtract 减法运算
$trunc 截去一个数值的小数部分,返回小数部分


字符串操作符

操作符 说明
$concat 拼接人意数量的字符串
$substr 字符串截取功能
$toLower 字符串转小写
$toUpper 字符串转大写
$strcasecmp 字符串比较(区分大小写)


文字操作符

操作符 说明
$meta 结合$text一起使用,例子看这里


数组操作符

操作符 说明
$arrayElemAt 返回数组指定位置的元素
$concatArrays 合并数组
$filter 根据输入数组和过滤条件返回一个子数组
$isArray 判断指定字段是不是数组
$size 返回数组长度
$slice 根据条件生成子数组


变量操作符

操作符 说明
$map 类似数组的map操作
$let 现在var表达式里面定义一组变量,然后再in表达式去操作


文字操作符

操作符 说明
$literal 一个用途是将操作符’$’作为普通字符使用,另外一个是创建一个新的字段


日期操作符

操作符 说明
$dayOfYear 返回一年中的第几天) 1~366
$dayOfMonth 返回一个月中的第几天 1~31
$dayOfWeek 返回一周的第几天 1~7
$year 返回年份 e.g 2014
$month 返回月份 1~12
$week 一年中的第几周 0~53
$hour 返回日期中的小时 0~23
$minute 返回日期中的分钟数 0~59
$second 返回日期中的秒数 0~60
$millisecond 返回日期的毫秒数 0~999
$dateToString 根据指定的格式返回时间字符串


条件操作符

操作符 说明
$cond 条件操作符,支持if/else,三目运算
$ifNull 如果第一个值是null,那么用第二个值替换它


计算操作符

操作符 说明
$sum 求和
$avg 计算平均值
$first 返回每一组中的第一个文档
$last 返回每一组中的最后一个文档
$max 返回每一组中的最大的文档
$min 返回每一组中的最小的文档
$push 只能在$group中使用,将对应表达式生成的数据压入一个数组返回
$addToSet 类似push,不过集合中的元素不能重复
$stdDevPop 返回输入值的总体标准偏差(没太理解)
$stdDevSamp 返回输入值的样本标准差(没太理解)

BSON文档大小限制

BSON文档大小限制是16MB,嵌套层数限制是100层。
如果需要存储大于16MB的文档,可以使用MongoDB提供的GridFS API。

Aggregation Pipeline优化

MongoDB中,在一个集合上执行的聚合操作,理论上是会传递整个集合的数据到聚合管道中的。为了优化这个操作,尽可能地使用以下几个策略去避免出现操作整个集合数据的情况。

  • 利用索引
    在管道入口(第一节管道)利用$match和$sort操作符匹配索引字段。这样后来管道的输入数据就都是过滤过的而且有序的数据。

    2.4之后的版本,$geoNear操作符可以利用地理位置索引操作数据,注意,使用$geoNear操作符的时候,必须在管道第一节中使用。
    Changed in version 3.2: Starting in MongoDB 3.2, indexes can cover an aggregation pipeline. In MongoDB 2.6 and 3.0, indexes could not cover an aggregation pipeline since even when the pipeline uses an index, aggregation still requires access to the actual documents.(这段我还没有理解)

  • 尽量早的过滤数据
    尽量在管道入口执行数据过滤操作,例如$match,$limit,$skip等。

  • 附加功能
    聚合管道内置优化程序,这部分会在下面进行详细讨论

管道顺序优化

  • 优化程序会自动将$match操作放在在$sort前(当$match紧跟在$sort之后时)
  • 优化程序会将$limit操作放在$skip操作前(skip操作比较耗时,先limit减少输入数据的数量)
  • 优化程序会将$match操作放在$redact操作前
  • 优化程序会将$skip和$limit操作放在$project前

管道合并

  • 当$limit操作紧跟在$sort之后的时候,优化程序会将$limit操作合并到$sort中
  • 当一个$limit紧跟在一个$limit之后时候,优化程序会合并两个$limit操作
  • $skip合并(同上)
  • $match合并(同上)
  • 当$unwind操作紧跟在$lookup操作,并且$unwind操作的正好是$lookup对应的as字段的时候,优化程序会将这两个操作合并,避免创建过大的內联文档

上述优化策略都是聚合管道内置的优化策略来提高 聚合查询性能的

Aggregation Pipeline限制

聚合管道有以下两个查询限制

  • 返回结果大小限制
    从2.6版本开始,聚合管道操作可以返回结果指针或者存储结果到集合中,此时每个文档大小受到BSON文档大小限制,即单个文档不得超过16MB,如果哪个文档超过大小,会返回错误。这个限制只针对返回结果,数据在管道流动过程中,可能会超出这个大小限制。2.6之后聚合管道操作默认返回结果指针
  • 内存占用限制
    聚合管道操作限制内存为100MB,超出会报错。如果要改变这个大小限制,使用allowDiskUse选项让聚合管道操作可以写入数据到临时文件中

Aggregation Pipeline和Sharded Collections

聚合管道支持操作分片集合
如果管道是以$match开始,而且匹配了片键,那么聚合管道操作将只会操作匹配到的分片的数据。在3.2之前,管道聚合操作会分开,然后在主分片上合并结果。
对于必须在多个分片上执行的聚合管道操作,如果操作没有要求必须在主分片上执行,那么管道操作会将结果路由到一个随机的分片上去执行合并操作,避免数据库主分片过载。$out和$lookup操作需要数据库主分片上执行。

Aggregation Pipeline例子

Map-Reduce和Sharded Collections

分片集合作为输入

当用分片集合作为map-reduce的输入时,mongos会自动并行的分发map-reduce任务到各个分片上。mongos会等待所有分片上的任务完成。

分片集合作为输出

如果map-reduce的out字段包含了分片的值,MongoDB以_id作为片键对输出集合进行分片,为了将结果输到分片集合中,执行以下操作:

  • 如果输出集合不存在,MongoDB会创建集合并以_id为片键进行分片
  • 对于一个新的,或者是空的分片集合,MongoDB使用map-reduce操作第一阶段的操作结果去初始化分给分片的chunks
  • mongos并行分发map-reduce后续处理任务到每一个拥有chunk的分片上,在处理期间,每个分片都会从其他分片上拉取处理结果,执行最后的reduce/finalize方法,然后从本地写数据到输出集合

在map-reduce任务处理期间,分片会根据需要切分chunks
MongoDB为了避免map-reduce过程中高并发带来的问题,会自动禁用输出集合chunk的平衡操作(可能数据不平均)

在MongoDB 2.0中:

  • mongos从各个集合分片中查询数据,然后合并,排序,然后使用reduce/finalize方法处理结果,然后用分片模式写入结果到输出集合中
  • map-reduce方法是需要使用少量内存,即使在处理大量数据的时候
  • 插入的时候不会自动切割chunks,需要手动操作。

2.2版本之后,为了达到最佳效果,最好只使用map-reduce的分片输出选项

# mongodb
web端用户行为分析初探
javascript原型
  • 文章目录
  • 站点概览
kivi

kivi

nodejs | server
58 日志
17 分类
32 标签
RSS
  1. 1. 为什么使用聚合
  2. 2. MongoDB提供了哪些聚合操作
    1. 2.1. Aggregation Pipeline
    2. 2.2. Map-reduce
    3. 2.3. Single Purpose Aggregation Operations(单一目的聚合操作)
  3. 3. MongoDB三种聚合操作的比较
    1. 3.1. 聚合操作比较表
    2. 3.2. 聚合操作比较补充说明
      1. 3.2.1. Map-reduce的增量聚合
    3. 3.3. group的keyf函数
    4. 3.4. Aggregation Pipeline 操作符
      1. 3.4.1. 聚合阶段操作符
      2. 3.4.2. 表达式操作符
      3. 3.4.3. 集合操作符
      4. 3.4.4. 比较操作符
      5. 3.4.5. 数学操作符
      6. 3.4.6. 字符串操作符
      7. 3.4.7. 文字操作符
      8. 3.4.8. 数组操作符
      9. 3.4.9. 变量操作符
      10. 3.4.10. 文字操作符
      11. 3.4.11. 日期操作符
      12. 3.4.12. 条件操作符
      13. 3.4.13. 计算操作符
    5. 3.5. BSON文档大小限制
  4. 4. Aggregation Pipeline优化
    1. 4.1. 管道顺序优化
    2. 4.2. 管道合并
  5. 5. Aggregation Pipeline限制
  6. 6. Aggregation Pipeline和Sharded Collections
  7. 7. Aggregation Pipeline例子
  8. 8. Map-Reduce和Sharded Collections
    1. 8.1. 分片集合作为输入
    2. 8.2. 分片集合作为输出
© 2019 kivi | 173k | 2:37
由 Hexo 强力驱动 v3.9.0
|
主题 – NexT.Pisces v7.3.0
|