ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka 사용법
    Devops/Kafka 2019. 11. 20. 14:58
    반응형

    카프카에 메세지를 생성하고 소비하는 방식은 간단한편에 속한다. 아래는 타 사이트에 있는 소스들을 참고하여 정리한 내용들이다. 기본적인 단순 메세지 생성후 소비하여 화면에 출력하는 소스이며, 간단히 설정을 확인 할수 있다. spirng boot 에서 kafka 관련 라이브러리를 제공하는걸로 알고 있으나 ,일단 일반적인 형태로 구성하여 사용하는걸 기준으로 기록 하였다.  옵션값은 귀찮니즘(?) 으로 영문버전을 그냥 번역기로 돌렸다. 옵션별 기준값이 존재하는데 알아두면 좋을것 같다. 적어놓은것 말고도 옵션값이 굉장히 많다. 필요한 경우가 생기면 참고 사이트를 보고 별도 지정하여 사용해도 무방해 보인다. (https://kafka.apache.org/documentation/

     아래 사항은 기본적으로 카프카가 구축 되있고 구축된 카프카에 토픽이 존재한다는 가정하에 사용이 가능하다. 카프카 구축은 별도로 글을 작성 할 예정.

     

    사용을 위한 준비 사항

     

    Kafka Producer API + Consumer API 사용을 위한  Maven Dependency 를 추가

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.3.0</version>
    </dependency>
    
    

     

    Producer 만들기 

     

    - Producer 란 ? 

     메세지를 생산하는 주체 

     

    - 소스 

    /**
    * 연결 설정 값
    **/
          // create instance for properties to access producer configs   
          Properties props = new Properties();
          
          //Assign localhost id
          props.put("bootstrap.servers", “localhost:9092");
          
          //Set acknowledgements for producer requests.      
          props.put("acks", “all");
          
          //If the request fails, the producer can automatically retry,
          props.put("retries", 0);
          
          //Specify buffer size in config
          props.put("batch.size", 16384);
          
          //Reduce the no of requests less than 0   
          props.put("linger.ms", 1);
          
          //The buffer.memory controls the total amount of memory available to the producer for buffering.   
          props.put("buffer.memory", 33554432);
          
          props.put("key.serializer", 
             "org.apache.kafka.common.serializa-tion.StringSerializer");
             
          props.put("value.serializer", 
             "org.apache.kafka.common.serializa-tion.StringSerializer");
    
    /**
    * 실제 연동 로직
    **/
    	   KafkaProducer kafkaProducer = new KafkaProducer(properties);
            try{
                for(int i = 0; i < 100; i++){
                    System.out.println(i);
                    kafkaProducer.send(new ProducerRecord("devglan-test", Integer.toString(i), "test message - " + i ));
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                kafkaProducer.close();
            }
    }

     

     

    - 옵션 

      client.id :   생산자 응용 프로그램을 식별합니다
     producer.type : 동기화 또는 비동기  ( 기본값 : sync ) 
     acks : acks 구성은 생산자 요청이 완료된 것으로 간주되는 기준을 제어합니다.  

        - 0 : 카프카로부터 ack 확인 하지 않음

        - 1 : 리더로부터 ack 만 확인 

        - all : 모든 replica 의 ack 를 확인
     retries : 생산자 요청이 실패하면 특정 값으로 자동 재 시도하십시오.
     bootstrap.servers : 서버 정보들
     linger.ms : 요청 수를 줄이려면 linger.ms를 어떤 값보다 큰 값으로 설정할 수 있습니다.
     key.serializer : 시리얼 라이저 인터페이스의 키
     value.serializer : 시리얼 라이저 인터페이스의 값.
     batch.size : 버퍼 크기.
     buffer.memory : 버퍼링을 위해 생산자가 사용할 수있는 총 메모리 양을 제어합니다.


     

    Consumer 만들기 

     

    - Consumer 란 ?

      메세지를 소비하는 주체 

     

    - 소스 

    /**
    * 연결 설정 값
    **/
    	  Properties props = new Properties();
          
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "test");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          props.put("key.deserializer", 
             "org.apache.kafka.common.serializa-tion.StringDeserializer");
          props.put("value.deserializer", 
             "org.apache.kafka.common.serializa-tion.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    /**
    * 실제 연동 로직
    **/
    	 KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            List topics = new ArrayList();
            topics.add("devglan-test");
            kafkaConsumer.subscribe(topics);
            try{
                while (true){
                    ConsumerRecords records = kafkaConsumer.poll(10);
                    for (ConsumerRecord record: records){
                        System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value()));
                    }
                }
            }catch (Exception e){
                System.out.println(e.getMessage());
            }finally {
                kafkaConsumer.close();
            }

     

    - 옵션 

     

    bootstrap.servers : 접속 서버 정보 ( ip, port ) 
    group.id : 개별 소비자를 그룹에 할당합니다.
    enable.auto.commit : 값이 true이면 오프셋에 대해 자동 커밋을 활성화하고 그렇지 않으면 커밋되지 않습니다.
    auto.commit.interval.ms : 업데이트 된 소비 된 오프셋이 ZooKeeper에 기록되는 빈도를 반환합니다.
    session.timeout.ms : 메시지를 포기하고 계속하기 전에 Kafka가 ZooKeeper가 요청 (읽기 또는 쓰기)에 응답하기를 기다리는 시간 (밀리 초)을 나타냅니다.


     

     



      

    카프카 관련 글작성 많이 지연되고 있다... 이것저것 하다보니 여력이 없다 ... ㅠㅠ Elasticsearch , spring batch 등등... 급하게 해야하는것들 먼저 보다 보니 참 많이 지연된거 같다. 틈틈히 계속 글을 남기도록 노력해야겠다.

     

    반응형

    'Devops > Kafka' 카테고리의 다른 글

    Kafka manager(CMAK) 설치  (0) 2021.06.25
    Kafka Docker-compose ( feat. single node ) 만들기  (0) 2021.03.19
    Kafka 사용이유 ( vs RabbitMQ )  (8) 2019.05.10
    Kafka 기본  (0) 2019.03.11
Designed by Tistory.