https://www.mongodb.com/docs/manual/changeStreams/
https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/
db = MongoClient(online_cluster_config["url"])['test']['users']
change_stream = db.watch(full_document="updateLookup")
# Loop through the change stream and print out all the changes
for change in change_stream:
print(f'Change detected: {pprint(change)}')
{'_id': {'_data': '8264EEE395000000032B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE35C8879F0ADE986277C0004'},
'clusterTime': Timestamp(1693377429, 3),
'documentKey': {'_id': ObjectId('64eee35c8879f0ade986277c')},
'fullDocument': {'_id': ObjectId('64eee35c8879f0ade986277c'),
'age': 10,
'category': 'ass',
'created_at': datetime.datetime(2023, 8, 30, 14, 54, 59, 597000),
'is_test': True},
'ns': {'coll': 'users', 'db': 'test'},
'operationType': 'insert'}
Change detected: None
{'_id': {'_data': '8264EEE529000000082B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE4C38879F0ADE986277D0004'},
'clusterTime': Timestamp(1693377833, 8),
'documentKey': {'_id': ObjectId('64eee4c38879f0ade986277d')},
'fullDocument': {'_id': ObjectId('64eee4c38879f0ade986277d'),
'category': 'ass21',
'created_at': datetime.datetime(2023, 8, 30, 14, 54, 59, 597000),
'every': 'ass',
'is_test': False},
'ns': {'coll': 'users', 'db': 'test'},
'operationType': 'update',
'updateDescription': {'removedFields': ['age'],
'updatedFields': {'every': 'ass'}}}
Change detected: None
값이 updated 되거나 새로 추가된 column들은 updatedFields에 들어간다
{'_id': {'_data': '8264EEE5A90000000C2B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE35C8879F0ADE986277C0004'},
'clusterTime': Timestamp(1693377961, 12),
'documentKey': {'_id': ObjectId('64eee35c8879f0ade986277c')},
'ns': {'coll': 'users', 'db': 'test'},
'operationType': 'delete'}
Change detected: None
https://www.mongodb.com/docs/manual/changeStreams/#resume-a-change-stream
각 return 값의 _id object값이 resume_token 값을 한다. 요걸 넣으면 요 이벤트 다음으로 resume 할 수 있다.
resume_token = {'_data':'8264EEEFF2000000062B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE4C38879F0ADE986277D0004'}
change_stream = db.watch(
full_document="updateLookup",
resume_after=resume_token
)


'ns': {'coll': 'users', 'db': 'test'},
change['dynamic_partition_tablename_key'] =
change['ns']['db'] + '_' + change['ns']['coll']
change['dynamic_partition_time_key'] =
str_timestamp(change['clusterTime'])

