


———————————————————————




import gzip
import json
import base64
import os
import urllib3
# <https://nyyang.tistory.com/117>
SLACK_URL = "<https://hooks.slack.com/services/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>"
http = urllib3.PoolManager()
def decode_cloudwatch_event(event):
cw_data = event['awslogs']['data']
compressed_payload = base64.b64decode(cw_data)
uncompressed_payload = gzip.decompress(compressed_payload)
return json.loads(uncompressed_payload)
def post_to_slack(message, webhook_url):
slack_payload = {'text': message} # "icon_emoji": ":ghost:"
data = json.dumps(slack_payload).encode('utf-8')
response = http.request(
method='POST',
url=webhook_url,
body=data
)
return response
def lambda_handler(event, context):
# payload : {'messageType': 'DATA_MESSAGE', 'owner': '497217256558', 'logGroup': 'etl-cdc-ec2-log-group', 'logStream': 'output.log', 'subscriptionFilters': ['cdc_log_filter'], 'logEvents': [{'id': '37882772', 'timestamp': 1698722427994, 'message': '[2023-10-31 03:20:22,870 ass.py:8] [INFO] Current Time = 2023-10-31 03:20:22'}]}
payload = decode_cloudwatch_event(event)
log_events = payload['logEvents']
for log_event in log_events:
# webhook_url = log_webhook_url if payload['logStream'] == 'output.log' else cdc_webhook_url
post_to_slack(log_event['message'], SLACK_URL)
return