🛠 Как использовать кафку в GoПредставьте, что у вас в руках инструмент, который позволяет строить крутой обмен сообщениями между сервисами. Представили? Давайте попробуем его применить.1. УстановкаДля начала проверьте, что у вас установлен Go версии 1.15 и выше и введите команду:go get github.com/segmentio/kafka-goЭто подтянет для вас либу kafka-go после чего её можно будет использовать в проекте:import "github.com/segmentio/kafka-go"2. Используем низкоуровневый коннектКогда нужно отправить сообщение:ctx := context.Background()conn, err := kafka.DialLeader(ctx, "tcp", "localhost:9092", "my-topic", 0)if err != nil { log.Fatal(err)}defer conn.Close()// Отправляем два сообщения подрядconn.SetWriteDeadline(time.Now().Add(10 * time.Second))_, err = conn.WriteMessages( kafka.Message{Value: []byte("Первое сообщение")}, kafka.Message{Value: []byte("Второе сообщение")},)if err != nil { log.Fatal(err)}Когда сообщение нужно принять:batch := conn.ReadBatch(10e3, 1e6)defer batch.Close()buf := make([]byte, 10e3)for { n, err := batch.Read(buf) if err != nil { break } fmt.Println("→", string(buf[:n]))}3. Используем высокоуровневый ReaderReader сам следит за смещениями, повторными подключениями и балансировкой в группе:r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093"}, GroupID: "example-group", Topic: "events", MinBytes: 10e3, MaxBytes: 10e6, CommitInterval: time.Second,})defer r.Close()for { m, err := r.ReadMessage(context.Background()) if err != nil { log.Println("Завершение чтения:", err) break } fmt.Printf("Получено: ключ=%s, значение=%s, смещение=%d\n", m.Key, m.Value, m.Offset)}Если нужно ручное управление, замените ReadMessage на FetchMessage + CommitMessages.4. Просто пишем с высокоуровневым WriterWriter сам позаботится о повторных попытках, балансировке и «гладком» завершении:w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092", "localhost:9093"}, Topic: "events", Balancer: &kafka.LeastBytes{},})defer w.Close()messages := []kafka.Message{ {Key: []byte("order123"), Value: []byte(`{"status":"created"}`)}, {Key: []byte("order124"), Value: []byte(`{"status":"paid"}`)},}if err := w.WriteMessages(context.Background(), messages...); err != nil { log.Fatal("Ошибка при записи:", err)}Бонус: советы по использованию— всегда context.WithTimeout или WithCancel — это ваша страховка от «зависаний».— ловите SIGINT/SIGTERM и аккуратно закрывайте Reader и Writer.— подключайте Prometheus/OpenTelemetry — мониторьте throughput, задержки и ошибки.Запустите пару консьюмеров и продюсеров — и почувствуйте, как работают Kafka и Go.Библиотека Go-разработчика #буст