본문 바로가기
Error/problem and solution

카프카 500,000 events/s 처리(2)

by SeleniumBindingProtein 2023. 5. 11.
728x90
반응형

최종 목표는 카프카를 이용하여 초당 50만건의 이벤트 데이터 스트림 처리 및 초당 500개의 프로세스 처리를 목표로 진행하고 있다.

 

현재, 초당 10만건의 이벤트 데이터 스트림 처리 및 500개의 프로세스 처리를 완료했다.

 

구동했던 서버의 사양이 위의 조건에 맞는 최적사양보다는 현저히 낮은 관계로 아마 고사양 하드웨어 스펙에서 실행하면, 결과가 달라질 것으로 사료된다.

 

<하드웨어 스펙>

더보기

최적의 하드웨어 스펙

CPU : 32 core 이상

RAM : 128GB 이상

Network : 25 Gbps Ethernet 이상

 

현 하드웨어 스펙

1. 카프카 프로듀서

CPU : 8코어

Memory : 32GB

2. 주키퍼, 카프카

CPU : 4코어

Memory : 24GB

 

<결과>

더보기

1.68초당 10만건의 이벤트 처리 및 500개의 동시 프로세스 처리 결과
100000 events processed in 1.68 seconds.
500 processes completed in 1.68 seconds, 0.003361 seconds per process.

 

<라이브러리 사용>

concurrent.futures 라이브러리는 ThreadPoolExecutor 클래스를 사용하기 위해, kafka 라이브러리는 KafkaProducer 클래스를 사용하기 위해 import 하였음.

더보기

500개의 프로세스를 동시에 실행하기 위해, 스레드 기반 비동기 실행을 관리하고 제어하는 파이썬 내장 모듈 concurrent.futures의 하위 클래스인 ThreadPoolExecutor 클래스를 사용하였음.

이 클래스를 사용하면 프로그래머는 비동기적으로 실행할 작업을 스레드 풀에 제출할 수 있으며, submit 메서드를 사용하여 이러한 작업을 스레드 풀의 하나 이상의 스레드에서 실행할 수 있음.

 

<kafka producer 옵션 설정>

더보기

1. value_serializer 옵션은 Producer에서 전송할 데이터를 어떤 형식으로 serialize 할 것인지를 지정하며, json 형식으로 serialize 하기 위해 value_serializer=lambda x: json.dumps(x).encode('utf-8') 설정하였음.

2. batch_size 옵션은 메모리를 최적화하기 위해 Producer가 메시지를 배치로 전송할 크기를 지정하며, batch_size=2000으로 설정하였음.

3. linger_ms 옵션은 Producer가 메시지를 보내기 전에 대기할 시간을 지정하며, linger_ms=0으로 설정하였음.

4. 데이터 압축을 통해 시간 단축을 위해,  Producer가 보내는 메시지를 압축할 방식을 지정하는 compression_type 옵션을 사용하였고, gzip 압축을 사용하였음.(compression_type=gzip)

5. acks 옵션을 1로 설정하였고, all을 선택하지 않은 이유는 대용량 데이터 처리 속도가 느려지고, 0으로 처리하면 속도는 빠르지만 데이터 소실이 생길경우 문제가 발생하기 때문에 사용하지 않았음.

(acks=1: Producer는 Broker로부터 ACK를 받을 때까지 기다립니다. 이 경우, Broker에서 메시지를 수신하면 성공적으로 처리됐다는 ACK를 Producer에게 보내고, Producer는 메시지 전송 완료 처리를 마칩니다.)

 

<코드 리뷰>

- 데이터를 생성할 수 있는 함수를 만든 뒤에, 반복문을 통해 총 10만건의 데이터를 만듦.

- 카프카 토픽으로 전달할 수 있는 함수 생성하며, 생성한 데이터인 event를 value로 담아 전달함.

# 이벤트 생성 함수
def generate_event(event_id):
    event = {'id': event_id, 'data': 'test_data'}
    return event

# 이벤트 전송 및 처리 함수
def send_and_process_event(event):
    producer.send('topic', value=event)
    pass

# 이벤트 생성
events = [generate_event(i) for i in range(100000)]

- 500개의 프로세스를 동시에 실행하기 위해, ThreadPoolExecutor 클래스를 이용하여 최대 스레드 개수인 max_worker를 500으로 설정.

- 반복문을 통해 리스트를 200개씩 잘라서 처리하며, 각 이벤트를 send_and_process_event 함수에 submit 

-  processed_event_count 변수를 통해 처리된 이벤트의 수를 카운트

-  processed_process_count = processed_event_count 를 통해 현재까지 처리된 이벤트 수를 200으로 나누어 처리된 프로세스의 수를 계산

 

# 이벤트 전송 및 처리
processed_event_count = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=500) as executor:
    for i in range(0, len(events), 200):
        event_chunk = events[i:i+200]
        for event in event_chunk:
            executor.submit(send_and_process_event, event)
            processed_event_count += 1

        # 1초마다 처리된 이벤트 및 프로세스 수 출력
        if processed_event_count % 100000 == 0:
            elapsed_time = time.time() - start_time
            print(f"{processed_event_count} events processed in {elapsed_time:.2f} seconds.")
            # 현재까지 처리된 이벤트를 처리하는 데 소요된 시간을 처리된 프로세스 수로 나누어 프로세스 한 개당 평균적으로 소요된 시간

            processed_process_count = processed_event_count
            elapsed_time_per_process = elapsed_time / processed_process_count
            print(f"{processed_process_count} processes completed in {elapsed_time:.2f} seconds, {elapsed_time_per_process:.6f} seconds per process.")
            # 현재까지 처리된 프로세스의 수와 해당 프로세스들을 처리하는 데 걸린 시간, 그리고 각 프로세스 당 평균적으로 소요된 시간
728x90
반응형

댓글