import threading
import boto3
from typing import Dict, Any

class ThreadSafeBoto3Manager:
    """Thread-safe boto3 클라이언트 매니저"""

    def __init__(self):
        self._local = threading.local()

    def get_client(self, service_name: str, **kwargs) -> Any:
        """스레드별로 독립적인 클라이언트 반환"""
        client_key = f"{service_name}_client"

        if not hasattr(self._local, client_key):
            # 각 스레드마다 새로운 session과 client 생성
            session = boto3.session.Session()
            client = session.client(service_name, **kwargs)
            setattr(self._local, client_key, client)

        return getattr(self._local, client_key)

# 전역 매니저 인스턴스
boto3_manager = ThreadSafeBoto3Manager()

# 사용 예시
def some_service_function():
    s3 = boto3_manager.get_client('s3')
    dynamodb = boto3_manager.get_client('dynamodb')

    # 안전하게 사용 가능
    s3.list_buckets()
    dynamodb.list_tables()
import threading
import boto3
from queue import Queue
from contextlib import contextmanager

class Boto3ClientPool:
    """boto3 클라이언트 연결 풀"""

    def __init__(self, service_name: str, pool_size: int = 10, **client_kwargs):
        self.service_name = service_name
        self.client_kwargs = client_kwargs
        self.pool = Queue(maxsize=pool_size)
        self._lock = threading.Lock()

        # 미리 클라이언트들을 생성해서 풀에 저장
        for _ in range(pool_size):
            session = boto3.session.Session()
            client = session.client(service_name, **client_kwargs)
            self.pool.put(client)

    @contextmanager
    def get_client(self):
        """컨텍스트 매니저로 클라이언트 대여/반납"""
        client = self.pool.get()
        try:
            yield client
        finally:
            self.pool.put(client)

# 사용 예시
s3_pool = Boto3ClientPool('s3', pool_size=5)

def process_s3_task():
    with s3_pool.get_client() as s3:
        s3.list_buckets()