Demo Kết nối Kafka Bizfly Cloud với Producer và Consumer bằng Python

1. Yêu cầu hệ thống

- Python >= 3.7

- Đã khởi tạo Kafka Cluster trên Bizfly Cloud

- Có thông tin kết nối: Bootstrap Servers, Username, Password, Topic name

- Cài đặt thư viện: confluent-kafka

2. Cài đặt thư viện

pip install confluent-kafka

3. Cấu hình kết nối Kafka (SASL_PLAINTEXT)

KAFKA\_CONFIG \= {  
    'bootstrap.servers': 'your-kafka-bootstrap.bizflycloud.vn:9093',  
    'security.protocol': ' SASL\_PLAINTEXT',  
    'sasl.mechanism': 'SCRAM-SHA-256,  
    'sasl.username': 'your-username',  
    'sasl.password': 'your-password'  
    }  
TOPIC\_NAME \= 'your-topic-name'

Lưu ý: Bạn có thể lấy các thông số kết nối này trong mục Credential của Kafka Cluster trên Bizfly Cloud.

Tên Acl phải trùng với username thì mới có hiệu lực với user.

4. Producer – Gửi dữ liệu vào Kafka

from confluent\_kafka import Producer

conf \= {

    'bootstrap.servers': 'IP-Kafka',  \# Thay bang dia chi Kafka thuc te
    'security.protocol': 'SASL\_PLAINTEXT',
    'sasl.mechanism': 'SCRAM-SHA-256',
    'sasl.username': 'your-username',
    'sasl.password': 'your-password',
    }

producer \= Producer(conf)

def delivery\_report(err, msg):
    if err is not None:
        print(f"Loi gui: {err}")
    else:
        print(f"Gui thanh cong den {msg.topic()} \[{msg.partition()}\]")

\# Gui mot tin nhan don gian
producer.produce('your-topic', key='key1', value='Hello Kafka', callback=delivery\_report)
producer.flush()

5. Consumer – Đọc dữ liệu từ Kafka

from confluent\_kafka import Consumer

conf \= KAFKA\_CONFIG.copy()  
conf.update({  
    'group.id': 'demo-group',  
    'auto.offset.reset': 'earliest'  
})

consumer \= Consumer(conf)  
consumer.subscribe(\[TOPIC\_NAME\])

print("🚦 Listening for messages...\\nPress Ctrl+C to exit.\\n")  
try:  
    while True:  
        msg \= consumer.poll(1.0)  
        if msg is None:  
            continue  
        if msg.error():  
            print(f"❌ Consumer error: {msg.error()}")  
        else:  
            print(f"📨 Received message: {msg.value().decode('utf-8')} from partition {msg.partition()}")

except KeyboardInterrupt:  
    print("🛑 Stopped by user.")

finally:  
    consumer.close()

6. Kiểm tra kết quả

- Mở Producer script để gửi dữ liệu.

- Sau đó chạy Consumer script để xác nhận dữ liệu nhận được từ Kafka topic.

- Truy cập giao diện Bizfly Cloud > Kafka > Consumer Group để xác minh trạng thái group (nên hiển thị “Stable”).

7. Ghi chú

- group.id trong Consumer nên là duy nhất cho mỗi nhóm ứng dụng.

- Với Bizfly Cloud, cần sử dụng SASL_PLAINTEXT và SCRAM-SHA-256 để xác thực bảo mật.