我正在创建一种以MongoDB作为数据存储的后台作业队列系统。在催生工作者来处理作业之前,如何“监听” MongoDB集合中的插入内容?我是否需要每隔几秒钟轮询一次,以查看自上次以来是否有任何更改,或者我的脚本有什么方法可以等待插入发生?这是我正在从事的一个PHP项目,但是可以使用Ruby或与语言无关的方式随意回答。

评论

MongoDB 3.6中添加了变更流来解决您的情况。 docs.mongodb.com/manual/changeStreams同样,如果您使用的是MongoDB Atlas,则可以利用Stitch触发器,该触发器可让您执行功能以响应插入/更新/删除/等。 docs.mongodb.com/stitch/triggers/overview不再需要解析操作日志。

#1 楼

您在想什么听起来很像触发器。 MongoDB没有对触发器的任何支持,但是有些人使用了一些技巧来“滚动自己”。关键是操作日志。

在副本集中运行MongoDB时,所有MongoDB操作都记录到操作日志(称为操作日志)中。操作日志基本上只是对数据所做修改的运行清单。副本集功能通过侦听此操作日志上的更改,然后在本地应用更改来实现。

听起来听起来很熟悉吗?

我无法在此处详细说明整个过程,它是几页文档,但是可以使用所需的工具。

首先在oplog上进行一些写作
-简要说明
-local集合的布局(包含oplog)

您还希望利用可尾游标。这些将为您提供一种侦听更改而不是轮询更改的方法。请注意,复制使用可尾游标,因此这是受支持的功能。

评论


嗯...不完全是我的想法。我现在只运行一个实例(没有奴隶)。那么也许是更基本的解决方案?

–安德鲁(Andrew)
2012年3月13日在22:18

您可以使用--replSet选项启动服务器,它将创建/填充操作日志。即使没有中学。这绝对是“监听”数据库更改的唯一方法。

–盖茨副总裁
2012年3月13日22:36

这是一个很好的说明,说明如何设置操作日志以将更改记录本地记录到本地:宽松xaml.wordpress.com/2012/09/03/…

–johndodo
2014年12月30日14:35

酷!那才是我真正想要的。我在npm上找到了一个名为“ mongo-oplog”的库。好开心〜

– pjincz
16 Dec 11'在20:14



我同意在撰写此答案之时可能无法使用触发器,但对于所有落入此处的人来说,现在有一个可用选项,请查看MongoDB Stitch(docs.mongodb.com/stitch/#stitch)和Stitch触发器(docs。 mongodb.com/stitch/triggers)..

–whoami-fakeFaceTrueSoul
5月11日2:06

#2 楼

MongoDB具有capped collectionstailable cursors,它们允许MongoDB将数据推送到侦听器。

capped collection本质上是一个固定大小的集合,仅允许插入。这就是创建它的样子:

db.createCollection("messages", { capped: true, size: 100000000 })


MongoDB可尾游标(Jonathan H. Wage的原创文章)

Ruby

 coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end
 


PHP

 $mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}
 


Python(由Robert Stewart提供)

 from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)
 


Perl(由Max编写)

 use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}
 


其他资源:

Ruby / Node.js教程,它将引导您创建一个侦听MongoDB封顶集合中插入内容的应用程序。

更详细地讨论可尾游标的文章。

使用可尾游标的PHP,Ruby,Python和Perl示例。

评论


睡1真?生产代码?那怎么不投票呢?

– rbp
2013年9月13日12:36



@rbp哈哈,我从未说过这是生产代码,但是您是对的,睡一秒钟不是一个好习惯。可以肯定的是,我从其他地方得到了那个例子。虽然不确定如何重构它。

–安德鲁(Andrew)
2013年9月13日16:16

@kroe,因为那些不相关的细节将由可能不理解为什么不好的较新的程序员放入生产代码中。

– fish鱼
15年1月14日在17:35

我理解您的意思,但是期望某些新程序员在生产中添加“ sleep 1”几乎是令人反感的!我的意思是,我不会感到惊讶...但是,如果有人将其投入生产,至少会永远学到艰辛的方式..哈哈哈

–kroe
15年1月14日在18:22

在生产中执行time.sleep(1)有什么问题?

– Al Johri
16年4月30日在0:19

#3 楼

看看这个:Change Streams

2018年1月10日-版本3.6


*编辑:我写了一篇关于如何做到这一点的文章https:// medium .com / riow / mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


它是mongodb 3.6中的新功能
https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2



为了使用changeStreams,数据库必须是复制集


有关复制集的更多信息:
https://docs.mongodb.com/manual/replication/


默认情况下,您的数据库将是“独立”的。


如何将独立版本转换为副本集:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/



以下示例是如何使用此示例的实际应用。
*专门用于Node。

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */



有用的链接:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-sethttps:// docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-examplehttp://plusnconsulting.com / post / MongoDB-Change-Streams

评论


对所有编辑表示抱歉,所以不喜欢我的“链接”(说它们的格式不正确。)

–里奥·韦伯
18年2月7日在18:53

您不必查询数据库,我认为可以使用watch()或类似方法将新数据发送到正在监听的服务器

–亚历山大·米尔斯(Alexander Mills)
18 Mar 17'4 at 16



#4 楼

从MongoDB 3.6开始,将有一个名为Change Streams的新通知API,可用于此目的。有关示例,请参见此博客文章。例子:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])


评论


为什么?你能详细说明吗?这是现在的标准方法吗?

–米塔尔
17年9月8日于20:39

怎么样?不要使用轮询-您需要一种事件方法,而不是while循环等。

–亚历山大·米尔斯(Alexander Mills)
18 Mar 17 '18 at 4:17



您在哪里看到轮询?

–米塔尔
18 Mar 17 '18 at 15:34

我认为他/她指的是最后一个循环。但是我认为PyMongo仅支持这一点。 Motor可能具有异步/事件侦听器样式的实现。

–许绍恩
19年1月24日15:51



#5 楼

MongoDB 3.6版现在包括变更流,该变更流本质上是OpLog之上的API,允许类似触发/通知的用例。

这里是Java示例的链接:
http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

NodeJS示例可能类似于:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });


评论


JSON.stringify对于在Android Studio(Android App)中接收此数据非常重要。

–龙火
5月19日9:45

#6 楼

另外,您可以使用标准的Mongo FindAndUpdate方法,并在回调中运行回调时触发EventEmitter事件(在Node中)。

应用程序或体系结构的任何其他部分侦听此事件将会收到更新通知,并且还会向其中发送任何相关数据。这是从Mongo获得通知的一种非常简单的方法。

评论


这是非常低效的..您要为每个FindAndUpdate锁定数据库!

– Yash Gupta
15年11月21日在16:22

我的猜测是,亚历克斯回答的方式略有不同(不是专门针对插入内容),但存在一个相关的问题,例如当排队作业的状态发生变化时如何向客户发出某种通知,我们认为这需要在产生作业时发生,成功或失败。通过使用websocket连接到节点的客户端,可以通过FIndAndUpdate回调上的广播事件将所有更改通知给客户端,当接收状态更改消息时可以调用该事件。我会说这并不是低效率的,因为需要完成更新。

– Peter Scott
15年12月28日在21:20

#7 楼

其中许多答案只会给您新的记录,而不会提供更新,并且/或者效率极低

唯一可靠,高效的方法是在本地db上创建可尾游标:oplog.rs集合以获得对MongoDB的所有更改,并按照您的意愿进行操作。 (MongoDB甚至在内部或多或少地在内部执行此操作以支持复制!)

解释oplog包含的内容:
https://www.compose.com/articles/the-mongodb-oplog -and-node-js /

Node.js库的示例,该库提供了关于使用oplog可以完成的操作的API:
https://github.com/cayasso / mongo-oplog

#8 楼

有一组很棒的服务,称为MongoDB Stitch。查看针迹功能/触发器。请注意,这是基于云的付费服务(AWS)。在您的情况下,您可以在插入代码上调用用javascript编写的自定义函数。



评论


stackoverflow.com/users/486867/manish-jain-您是否有一个示例,该示例说明了如何使用针脚通知REACT应用程序数据已插入表中?

–MLissCetrus
4月10日20:57

#9 楼

有一个有效的Java示例,可以在这里找到。

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();


关键是此处给出的QUERY OPTIONS。

也可以更改查找查询,如果您不需要每次都加载所有数据。

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);


#10 楼

实际上,与其观察输出,不如为什么不知道使用mongoose模式提供的中间件插入新内容的时间

您可以捕捉到插入新文档并在之后执行某些操作的事件。插入完成

评论


我的错。对不起先生

–阮阮(Duong Nguyen)
18年8月12日在7:34

#11 楼

3.6版允许使用以下数据库触发器类型后:事件驱动触发器-用于自动更新相关文档,通知下游服务,传播数据以支持混合工作负载,数据完整性和审核
计划的触发器-对于计划的数据检索,传播,归档和分析工作负载有用

登录到Atlas帐户并选择Triggers接口并添加新的触发器:



展开每个部分以获取更多设置或详细信息。