https://www.mongodb.com/docs/manual/changeStreams/

https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/

db = MongoClient(online_cluster_config["url"])['test']['users']
change_stream = db.watch(full_document="updateLookup")

# Loop through the change stream and print out all the changes
for change in change_stream:
    print(f'Change detected: {pprint(change)}')

Insert

{'_id': {'_data': '8264EEE395000000032B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE35C8879F0ADE986277C0004'},
 'clusterTime': Timestamp(1693377429, 3),
 'documentKey': {'_id': ObjectId('64eee35c8879f0ade986277c')},
 'fullDocument': {'_id': ObjectId('64eee35c8879f0ade986277c'),
                  'age': 10,
                  'category': 'ass',
                  'created_at': datetime.datetime(2023, 8, 30, 14, 54, 59, 597000),
                  'is_test': True},
 'ns': {'coll': 'users', 'db': 'test'},
 'operationType': 'insert'}
Change detected: None

Update

{'_id': {'_data': '8264EEE529000000082B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE4C38879F0ADE986277D0004'},
 'clusterTime': Timestamp(1693377833, 8),
 'documentKey': {'_id': ObjectId('64eee4c38879f0ade986277d')},
 'fullDocument': {'_id': ObjectId('64eee4c38879f0ade986277d'),
                  'category': 'ass21',
                  'created_at': datetime.datetime(2023, 8, 30, 14, 54, 59, 597000),
                  'every': 'ass',
                  'is_test': False},
 'ns': {'coll': 'users', 'db': 'test'},
 'operationType': 'update',
 'updateDescription': {'removedFields': ['age'],
                       'updatedFields': {'every': 'ass'}}}
Change detected: None

값이 updated 되거나 새로 추가된 column들은 updatedFields에 들어간다

Delete

{'_id': {'_data': '8264EEE5A90000000C2B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE35C8879F0ADE986277C0004'},
 'clusterTime': Timestamp(1693377961, 12),
 'documentKey': {'_id': ObjectId('64eee35c8879f0ade986277c')},
 'ns': {'coll': 'users', 'db': 'test'},
 'operationType': 'delete'}
Change detected: None

Resume Token

https://www.mongodb.com/docs/manual/changeStreams/#resume-a-change-stream

각 return 값의 _id object값이 resume_token 값을 한다. 요걸 넣으면 요 이벤트 다음으로 resume 할 수 있다.

resume_token = {'_data':'8264EEEFF2000000062B022C0100296E5A10049C16FAD9236845B4BD9CA08B48620D2C46645F6964006464EEE4C38879F0ADE986277D0004'}

change_stream = db.watch(
    full_document="updateLookup",
    resume_after=resume_token
)

Kinesis Data Firehose Settings

Untitled

Untitled

dynamic_partition_key를 추가해서 s3에서 file_path가 명시되도록 한다

'ns': {'coll': 'users', 'db': 'test'},
change['dynamic_partition_tablename_key'] =  
	change['ns']['db'] + '_' + change['ns']['coll']

change['dynamic_partition_time_key'] = 
	str_timestamp(change['clusterTime'])

Untitled

Untitled