go-queue 之 kq 的生产者不支持配置SASL认证信息
go-queue
的生产者写入时,没有支持配置SASL认证,官方的示例:
type ServiceContext struct {Config config.Config.....KqPusherClient *kq.Pusher
}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,.....KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),}
}
NewPusher
的实现:
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {producer := &kafka.Writer{Addr: kafka.TCP(addrs...),Topic: topic,Balancer: &kafka.LeastBytes{},Compression: kafka.Snappy,}// ...
}
go-queue
在 segmentio/kafka-go
这个包基础上,使用 go-zero
进行了上层统一封装,看了下segmentio/kafka-go
包的使用说明,在创建&kafka.Writer
对象的时候,支持通过Transport
来配置SASL认证信息:
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password")
if err != nil {panic(err)
}// Transports are responsible for managing connection pools and other resources,
// it's generally best to create a few of these and share them across your
// application.
sharedTransport := &kafka.Transport{SASL: mechanism,
}w := kafka.Writer{Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),Topic: "topic-A",Balancer: &kafka.Hash{},Transport: sharedTransport,
}
所以对go-queue
的NewPusher
源码做了部分更改:
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {producer := &kafka.Writer{Addr: kafka.TCP(addrs...),Topic: topic,Balancer: &kafka.LeastBytes{},Compression: kafka.Snappy,Transport: &kafka.Transport{ // 添加的部分SASL: plain.Mechanism{Username: "username", // SASL 用户名Password: "password", // SASL 密码},},}// ...
}