Demo Kết nối Kafka Bizfly Cloud với ngôn ngữ Python
1. Yêu cầu hệ thống
-
Python >= 3.7
-
Đã khởi tạo Kafka Cluster trên Bizfly Cloud
-
Có thông tin kết nối: Bootstrap Servers, Username, Password, Topic name
-
Cài đặt thư viện: confluent-kafka
2. Cài đặt thư viện
pip install confluent-kafka
3. Cấu hình kết nối Kafka (SASL_PLAINTEXT)
KAFKA_CONFIG = {
'bootstrap.servers': 'your-kafka-bootstrap.bizflycloud.vn:9093',
'security.protocol': ' SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256,
'sasl.username': 'your-username',
'sasl.password': 'your-password'
}
TOPIC_NAME = 'your-topic-name'
Lưu ý: Bạn có thể lấy các thông số kết nối này trong mục Credential của Kafka Cluster trên Bizfly Cloud.
Tên Acl phải trùng với username thì mới có hiệu lực với user.
4. Producer – Gửi dữ liệu vào Kafka
from confluent_kafka import Producer
conf = {
'bootstrap.servers': 'IP-Kafka', # Thay bang dia chi Kafka thuc te
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
}
producer = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f"Loi gui: {err}")
else:
print(f"Gui thanh cong den {msg.topic()} [{msg.partition()}]")
# Gui mot tin nhan don gian
producer.produce('your-topic', key='key1', value='Hello Kafka', callback=delivery_report)
producer.flush()
5. Consumer – Đọc dữ liệu từ Kafka
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': 'IP-Kafka', # Thay bang dia chi Kafka thuc te
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
'group.id': 'demo-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe([TOPIC_NAME])
print("🚦 Listening for messages...\nPress Ctrl+C to exit.\n")
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"❌ Consumer error: {msg.error()}")
else:
print(f"📨 Received message: {msg.value().decode('utf-8')} from partition {msg.partition()}")
except KeyboardInterrupt:
print("🛑 Stopped by user.")
finally:
consumer.close()
6. Kiểm tra kết quả
-
Chạy script Producer để gửi dữ liệu.
-
Sau đó chạy Consumer script để xác nhận dữ liệu nhận được từ Kafka topic.
-
Truy cập giao diện Bizfly Cloud Kafka -> chọn Cluster vừa thao tác -> Consumer Group để xác minh trạng thái group (nên hiển thị “Stable”).
7. Ghi chú
-
group.id trong Consumer nên là duy nhất cho mỗi nhóm ứng dụng.
-
Trong hướng dẫn sử dụng thư viện confluent_kafka để kết nối, ngoài ra có thể sử dụng các thứ viện khác dành cho ngôn ngữ python như kafka-python