# strimzi operator 로 k8s 에 kafka cluster 를 구성한 경우
# 클러스터들이 svc 로컬 호스트 사용으로 k8s 클러스터 외부에서 kafka 9092포트(bootstrap/broker)로 접속이 안된다.
# 우선 kafka 설치가 되어 있어야 테스트할 수 있다.
# /opt/homebrew/opt/kafka/bin 사용할 수 있는 커맨드 스크립트들이 생성된다.
brew install kafka kcat
# nodeport 생성하기
# kafka 리소스 > spec > kafka > listeners 에 다음과 설정을 추가하면 nodeport 가 생성된다.
# service/pod 에 9094 nodeport 설정이 추가된다.
- name: external1 # ^[a-z0-9]{1,11}$' 이고 유니크해야 한다.
port: 9094
type: nodeport
tls: false
configuration:
bootstrap:
nodePort: 32100
brokers:
- broker: 0
nodePort: 32000
- broker: 1
nodePort: 32001
- broker: 2
nodePort: 32002
# nodeport 접속 확인
# 토픽으로 메시지 생성
/opt/homebrew/opt/kafka/bin/kafka-console-producer \
--broker-list ysoftman-node1:32100 \
--topic test
# 토픽으로 들오는 메시지 확인
/opt/homebrew/opt/kafka/bin/kafka-console-consumer \
--bootstrap-server ysoftman-node1:32100 \
--topic test \
--from-beginning
# 또는
kcat -b ysoftman-node1:32100 -t test
#####
# ingress 생성하기
# ingress 는 http 프로토콜을 사용하지만 kafka 는 tcp 프로토콜을 사용한다.
# 따라서 nginx ingress > ssl-passthrough 기능을 사용해 서비스 tcp 로 바로 연결되는 방식을 사용해야 한다.
# kafka 리소스 > spec > kafka > listeners 에 다음과 설정을 추가하면 ingress 가 생성된다.
# service/pod 에 9096 포트 설정이 추가된다.
- name: external1 # ^[a-z0-9]{1,11}$' 이고 유니크해야 한다.
port: 9096
tls: true # Ingress type listener and requires enabled TLS encryption
type: ingress
configuration:
bootstrap:
host: ysoftman-bootstrap.ysoftman.abc
brokers:
- broker: 0
host: ysoftman-0.ysoftman.abc
- broker: 1
host: ysoftman-1.ysoftman.abc
- broker: 2
host: ysoftman-2.ysoftman.abc
class: nginx # kubectl get ingressclass 로 사용 가능한 클래스 이름 파악
# 잠시 후 생성된 인그레스 중 하나를 보면 다음과 같다.
# tls 에 별도의 secretName 이 없다.
# 대신 ssl-passthrough 활성화한다.
# nginx-ingress-controller daemonset(또는 deployment) 에 --enable-ssl-passthrough 설정을 적용해야 ingress ssl-passthrough 이 동작한다.
spec:
template:
- args:
- /nginx-ingress-controller
- --enable-ssl-passthrough=true
# kafka 서버(broker)에서 https 를 받고 tls 인증을 처리하게 된다.
# 참고
metadata:
annotations:
ingress.kubernetes.io/ssl-passthrough: "true"
nginx.ingress.kubernetes.io/backend-protocol: HTTPS
nginx.ingress.kubernetes.io/ssl-passthrough: "true"
... 생략 ...
spec:
tls:
- hosts:
- ysoftman-bootstrap.ysoftman.abc
# 그리고 client, cluster 등의 이름으로 secret 도 생성이 된다.
# 이중 client secret 를 .crt 파일로 다음과 같이 저장한다.
kubectl get secret ysoftman-kafka-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
# kafka 커맨드에서 사용할 truststore.jks 파일 생성
keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt
# kafka 클러스터에 접속해서 producing 해보기
/opt/homebrew/opt/kafka/bin/kafka-console-producer --broker-list ysoftman-bootstrap.ysoftman.abc:443 --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=./truststore.jks --topic test
# kafka 클러스터에 접속해서 consume 해보기
/opt/homebrew/opt/kafka/bin/kafka-console-consumer --bootstrap-server ysoftman-bootstrap.ysoftman.abc:443 --topic test
# 인증서 확인
openssl s_client -connect ysoftman-bootstrap.ysoftman.abc:443 \
-servername ysoftman-bootstrap.ysoftman.abc \
-showcerts
# 만약 다음과 같은 ssl 실패 에러가 발생한다면
failed authentication due to: SSL handshake failed
# ssl 디버깅 정보를 보자
export KAFKA_OPTS="-Djavax.net.debug=ssl"
#####
# strimzi operator 로 kafka 를 설치한 경우 broker, controller pod 들은
# strimzipodset(statefulset 과 비슷) 이라는 커스텀 리소스로 관리된다.
# broker pod 1개를 수동 삭제했는데 pod 가 새로 올라 올때 다른 pod 들과 연결 에러가 발생한다.
# 테스트해본 결과 strimzipodset broker, controller 모두 삭제해서 재시작하도록 하면 된다.
managed-kafka-cluster-broker
managed-kafka-cluster-controller