AWS Lambda는 이벤트에 대한 응답으로 코드를 실행하고 자동으로 기본 컴퓨팅 리소스를 관리하는 서버리스 컴퓨팅 서비스입니다. 이러한 이벤트에는 전자 상거래 웹 사이트에서 사용자가 장바구니에 항목을 배치하는 것과 같은 상태 변경 또는 업데이트가 포함될 수 있습니다. AWS Lambda를 사용하면 사용자 지정 로직을 통해 다른 AWS 서비스를 확장하거나, AWS 규모, 성능 및 보안으로 작동하는 자체 백엔드 서비스를 만들 수 있습니다. AWS Lambda는 Amazon API Gateway를 통한 HTTP 요청, Amazon Simple Storage Service(Amazon S3) 버킷에 있는 객체에 대한 변경 사항, Amazon DynamoDB의 테이블 업데이트 또는 AWS Step Functions의 상태 전환과 같은 다양한 이벤트에 대한 응답으로 코드를 자동 실행할 수 있습니다.
Lambda는 트리거에 따른 조건이 충족되면 저장된 코드를 한 번 실행시켜주는 서비스입니다. 특정 인원이 특정 API에 접근하거나, S3에 데이터가 업로드될 때와 같이 어떠한 조건을 만족하면 저장된 소스 파일이 실행됩니다.
프로젝트에 사용한 이유 : 스케쥴러를 모든 서버에서 돌아가게 한다면 스케일아웃한 상태에선 스케일아웃한 모든 서버에서 스케쥴러를 실행하게 되어서 리소스가 낭비됩니다.
→ 그런 리소스 낭비를 막기 위해서 스케쥴러를 위한 서버를 배포해야 합니다.
→ 스케쥴러만을 위한 서버는 되려 리소스 낭비가 심하기 때문에 스케쥴러와 같이 특정 시간에 1번 코드를 실행해주는 것이 필요합니다.
→ AWS의 lambda와 EventBridge를 활용해서 스케쥴러와 같은 역할을 하는 코드를 구축했습니다.
import redis
import os
import json
import pymysql
import random
# 환경변수 세팅
redis_host = os.environ['ELASTICACHE_HOST']
redis_port = int(os.environ['ELASTICACHE_PORT'])
db_host = os.environ['DB_HOST']
db_username = os.environ['DB_USERNAME']
db_password = os.environ['DB_PASSWORD']
db_name = os.environ['DB_NAME']
db_port = int(os.environ['DB_PORT'])
connection = pymysql.connect(
host = db_host,
user = db_username,
passwd = db_password,
db = db_name,
port = db_port
)
def lambda_handler(event, context):
# Redis 연결
redis_client = redis.StrictRedis(host=redis_host, port=redis_port)
with connection:
with connection.cursor() as cur:
if not connection.open:
connection.ping(reconnect=True) # 연결이 닫혔다면 다시 연결을 시도
cur.execute("UPDATE stock SET stock = '1000' WHERE id IN (SELECT id FROM (SELECT DISTINCT s1.id FROM study.stock s1 LEFT JOIN study.order_product op ON op.product_id = s1.product_id WHERE stock < 100) AS subquery);")
connection.commit()
with connection:
if not connection.open:
connection.ping(reconnect=True) # 연결이 닫혔다면 다시 연결을 시도
with connection.cursor() as cur:
cur.execute("TRUNCATE table event;")
connection.commit()
if not connection.open:
connection.ping(reconnect=True) # 연결이 닫혔다면 다시 연결을 시도
cursor = connection.cursor()
cursor.execute("SELECT p.product_id, title FROM product p WHERE p.product_id IN (SELECT product_id FROM (SELECT op.product_id, COUNT(op.product_id) AS product_count FROM order_product op GROUP BY op.product_id ORDER BY product_count DESC LIMIT 5) subquery);")
rows = cursor.fetchall()
for index,row in enumerate(rows):
id = row[0]
title = row[1]
json_data = make_product_rank_dto(id,title)
redis_client.set("product:rank:{}".format(index+1),json_data)
total = redis_client.get("product:total")
if total == None:
# product count cache miss
cursor.execute("SELECT count(*) FROM product;")
total = cursor.fetchall()
redis_client.set("product:total",str(total[0][0]))
total = total[0][0]
else:
total = int(total)
random_numbers = set()
while len(random_numbers) < 20:
random_number = random.randint(1, total)
random_numbers.add(random_number)
event_id_list = list(random_numbers)
#save event_id_list
result = json.dumps(event_id_list)
redis_client.set("product:sale:list",result)
# 10 이상 40 이하의 무작위 정수 생성
cursor.execute("SELECT * FROM product WHERE product_id IN {}".format(tuple(event_id_list)))
product_list = cursor.fetchall()
sql = "INSERT INTO event (sale_rate, product_id) VALUES ({}, {})"
#stock
cursor.execute("SELECT s.stock FROM stock s WHERE s.product_id IN {}".format(tuple(event_id_list)))
stock_list = cursor.fetchall()
print(stock_list)
for index, product in enumerate(product_list):
sale_rate = 10 + (int(random.random() * 7) * 5)
product_id = product[0]
price = product[4]
product_string = make_product_dto(product)
print(product_string)
## stock redis
redis_client.setex("product:sale:{}:stock".format(product_id),24*60*60,stock_list[index][0])
redis_client.setex("product:sale:{}:price".format(product_id),24*60*60,price)
redis_client.setex("product:sale:{}".format(product_id),24*60*60,product_string)
with connection:
if not connection.open:
connection.ping(reconnect=True) # 연결이 닫혔다면 다시 연결을 시도
with connection.cursor() as cur:
cursor.execute(sql.format(sale_rate, product_id))
connection.commit()
return {
'statusCode': 200,
'body': 'Redis 명령 실행 완료'
}
def make_product_rank_dto(id,title):
data = {
"id": id,
"title": title
}
return json.dumps(data)
def make_product_dto(product):
id = product[0]
category1 = product[1]
category2 = product[2]
image = product[3]
price = product[4]
stock = product[5]
title = product[6]
data = {
"id": id,
"category1" : category1,
"category2" : category2,
"image" : image,
"price" : price,
"stock" : stock,
"title": title
}
return json.dumps(data)
위와 같은 코드로 DB와 ElasticCache에 접근해 기존 스케쥴러가 하던 일을 대신 수행하고 있습니다.