import threading
import boto3
from typing import Dict, Any
class ThreadSafeBoto3Manager:
"""Thread-safe boto3 클라이언트 매니저"""
def __init__(self):
self._local = threading.local()
def get_client(self, service_name: str, **kwargs) -> Any:
"""스레드별로 독립적인 클라이언트 반환"""
client_key = f"{service_name}_client"
if not hasattr(self._local, client_key):
# 각 스레드마다 새로운 session과 client 생성
session = boto3.session.Session()
client = session.client(service_name, **kwargs)
setattr(self._local, client_key, client)
return getattr(self._local, client_key)
# 전역 매니저 인스턴스
boto3_manager = ThreadSafeBoto3Manager()
# 사용 예시
def some_service_function():
s3 = boto3_manager.get_client('s3')
dynamodb = boto3_manager.get_client('dynamodb')
# 안전하게 사용 가능
s3.list_buckets()
dynamodb.list_tables()
import threading
import boto3
from queue import Queue
from contextlib import contextmanager
class Boto3ClientPool:
"""boto3 클라이언트 연결 풀"""
def __init__(self, service_name: str, pool_size: int = 10, **client_kwargs):
self.service_name = service_name
self.client_kwargs = client_kwargs
self.pool = Queue(maxsize=pool_size)
self._lock = threading.Lock()
# 미리 클라이언트들을 생성해서 풀에 저장
for _ in range(pool_size):
session = boto3.session.Session()
client = session.client(service_name, **client_kwargs)
self.pool.put(client)
@contextmanager
def get_client(self):
"""컨텍스트 매니저로 클라이언트 대여/반납"""
client = self.pool.get()
try:
yield client
finally:
self.pool.put(client)
# 사용 예시
s3_pool = Boto3ClientPool('s3', pool_size=5)
def process_s3_task():
with s3_pool.get_client() as s3:
s3.list_buckets()