Skip to content

Commit e9ec16f

Browse files
starszSylviaBABY
andauthored
feat: add brokers field to support set broker with the same host in kafka-logger plugin (#7999)
Co-authored-by: Sylvia <39793568+SylviaBABY@users.noreply.github.com>
1 parent 61afafd commit e9ec16f

File tree

5 files changed

+209
-27
lines changed

5 files changed

+209
-27
lines changed

apisix/plugins/kafka-logger.lua

+35-8
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ local schema = {
3939
default = "default",
4040
enum = {"default", "origin"},
4141
},
42+
-- deprecated, use "brokers" instead
4243
broker_list = {
4344
type = "object",
4445
minProperties = 1,
@@ -51,6 +52,27 @@ local schema = {
5152
},
5253
},
5354
},
55+
brokers = {
56+
type = "array",
57+
minItems = 1,
58+
items = {
59+
type = "object",
60+
properties = {
61+
host = {
62+
type = "string",
63+
description = "the host of kafka broker",
64+
},
65+
port = {
66+
type = "integer",
67+
minimum = 1,
68+
maximum = 65535,
69+
description = "the port of kafka broker",
70+
},
71+
},
72+
required = {"host", "port"},
73+
},
74+
uniqueItems = true,
75+
},
5476
kafka_topic = {type = "string"},
5577
producer_type = {
5678
type = "string",
@@ -89,7 +111,10 @@ local schema = {
89111
producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
90112
producer_time_linger = {type = "integer", minimum = 1, default = 1}
91113
},
92-
required = {"broker_list", "kafka_topic"}
114+
oneOf = {
115+
{ required = {"broker_list", "kafka_topic"},},
116+
{ required = {"brokers", "kafka_topic"},},
117+
}
93118
}
94119

95120
local metadata_schema = {
@@ -199,15 +224,17 @@ function _M.log(conf, ctx)
199224
end
200225

201226
-- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
202-
local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
227+
local broker_list = core.table.clone(conf.brokers or {})
203228
local broker_config = {}
204229

205-
for host, port in pairs(conf.broker_list) do
206-
local broker = {
207-
host = host,
208-
port = port
209-
}
210-
core.table.insert(broker_list, broker)
230+
if conf.broker_list then
231+
for host, port in pairs(conf.broker_list) do
232+
local broker = {
233+
host = host,
234+
port = port
235+
}
236+
core.table.insert(broker_list, broker)
237+
end
211238
end
212239

213240
broker_config["request_timeout"] = conf.timeout * 1000

docs/en/latest/plugins/kafka-logger.md

+19-9
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ It might take some time to receive the log data. It will be automatically sent a
3737

3838
| Name | Type | Required | Default | Valid values | Description |
3939
| ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
40-
| broker_list | object | True | | | List of Kafka brokers (nodes). |
40+
| broker_list | object | True | | | Deprecated, use `brokers` instead. List of Kafka brokers. (nodes). |
41+
| brokers | array | True | | | List of Kafka brokers (nodes). |
42+
| brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
43+
| brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
4144
| kafka_topic | string | True | | | Target topic to push the logs for organisation. |
4245
| producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
4346
| required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
@@ -164,10 +167,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
164167
{
165168
"plugins": {
166169
"kafka-logger": {
167-
"broker_list" :
170+
"brokers" : [
168171
{
169-
"127.0.0.1":9092
170-
},
172+
"host" :"127.0.0.1",
173+
"port" : 9092
174+
}
175+
],
171176
"kafka_topic" : "test2",
172177
"key" : "key1",
173178
"batch_max_size": 1,
@@ -187,11 +192,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
187192
This Plugin also supports pushing to more than one broker at a time. You can specify multiple brokers in the Plugin configuration as shown below:
188193

189194
```json
190-
"broker_list" :
191-
{
192-
"127.0.0.1":9092,
193-
"127.0.0.1":9093
194-
},
195+
"brokers" : [
196+
{
197+
"host" :"127.0.0.1",
198+
"port" : 9092
199+
},
200+
{
201+
"host" :"127.0.0.1",
202+
"port" : 9093
203+
}
204+
],
195205
```
196206

197207
## Example usage

docs/zh/latest/plugins/kafka-logger.md

+20-9
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
3535

3636
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 |
3737
| ---------------------- | ------- | ------ | -------------- | --------------------- | ------------------------------------------------ |
38-
| broker_list | object || | | 需要推送的 Kafka 的 broker 列表。 |
38+
| broker_list | object || | | 已废弃,现使用 `brokers` 属性代替。原指需要推送的 Kafka 的 broker 列表。 |
39+
| brokers | array || | | 需要推送的 Kafka 的 broker 列表。 |
40+
| brokers.host | string || | | Kafka broker 的节点 host 配置,例如 `192.168.1.1` |
41+
| brokers.port | string || | | Kafka broker 的节点端口配置 |
3942
| kafka_topic | string || | | 需要推送的 topic。 |
4043
| producer_type | string || async | ["async", "sync"] | 生产者发送消息的模式。 |
4144
| required_acks | integer || 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)|
@@ -162,10 +165,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 \
162165
{
163166
"plugins": {
164167
"kafka-logger": {
165-
"broker_list" :
166-
{
167-
"127.0.0.1":9092
168-
},
168+
"brokers" : [
169+
{
170+
"host": "127.0.0.1",
171+
"port": 9092
172+
}
173+
],
169174
"kafka_topic" : "test2",
170175
"key" : "key1"
171176
}
@@ -183,10 +188,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 \
183188
该插件还支持一次推送到多个 Broker,示例如下:
184189

185190
```json
186-
{
187-
"127.0.0.1":9092,
188-
"127.0.0.1":9093
189-
}
191+
"brokers" : [
192+
{
193+
"host" :"127.0.0.1",
194+
"port" : 9092
195+
},
196+
{
197+
"host" :"127.0.0.1",
198+
"port" : 9093
199+
}
200+
],
190201
```
191202

192203
## 测试插件

t/plugin/kafka-logger.t

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ done
7272
}
7373
}
7474
--- response_body
75-
property "broker_list" is required
75+
value should match only one schema, but matches none
7676
done
7777
7878

t/plugin/kafka-logger2.t

+134
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,72 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
340340
key= "key1",
341341
},
342342
},
343+
{
344+
input = {
345+
brokers = {
346+
},
347+
kafka_topic = "test",
348+
key = "key1",
349+
},
350+
},
351+
{
352+
input = {
353+
brokers = {
354+
{
355+
host = "127.0.0.1",
356+
}
357+
},
358+
kafka_topic = "test",
359+
key = "key1",
360+
},
361+
},
362+
{
363+
input = {
364+
brokers = {
365+
{
366+
port = 9092,
367+
}
368+
},
369+
kafka_topic = "test",
370+
key = "key1",
371+
},
372+
},
373+
{
374+
input = {
375+
brokers = {
376+
{
377+
host = "127.0.0.1",
378+
port = "9093",
379+
},
380+
},
381+
kafka_topic = "test",
382+
key = "key1",
383+
},
384+
},
385+
{
386+
input = {
387+
brokers = {
388+
{
389+
host = "127.0.0.1",
390+
port = 0,
391+
},
392+
},
393+
kafka_topic = "test",
394+
key = "key1",
395+
},
396+
},
397+
{
398+
input = {
399+
brokers = {
400+
{
401+
host = "127.0.0.1",
402+
port = 65536,
403+
},
404+
},
405+
kafka_topic = "test",
406+
key = "key1",
407+
},
408+
},
343409
}
344410
345411
local plugin = require("apisix.plugins.kafka-logger")
@@ -361,6 +427,12 @@ property "broker_list" validation failed: expect object to have at least 1 prope
361427
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string
362428
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be at least 1
363429
property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be at most 65535
430+
property "brokers" validation failed: expect array to have at least 1 items
431+
property "brokers" validation failed: failed to validate item 1: property "port" is required
432+
property "brokers" validation failed: failed to validate item 1: property "host" is required
433+
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
434+
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
435+
property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535
364436
365437
366438
@@ -715,3 +787,65 @@ hello world
715787
[qr/send data to kafka: \{.*"body":"abcdef"/,
716788
qr/send data to kafka: \{.*"body":"hello world\\n"/]
717789
--- wait: 2
790+
791+
792+
793+
=== TEST 20: update route(id: 1,include_req_body = true,include_req_body_expr = array)
794+
--- config
795+
location /t {
796+
content_by_lua_block {
797+
local t = require("lib.test_admin").test
798+
local code, body = t('/apisix/admin/routes/1',
799+
ngx.HTTP_PUT,
800+
[=[{
801+
"plugins": {
802+
"kafka-logger": {
803+
"brokers" :
804+
[{
805+
"host":"127.0.0.1",
806+
"port": 9092
807+
}],
808+
"kafka_topic" : "test2",
809+
"key" : "key1",
810+
"timeout" : 1,
811+
"include_req_body": true,
812+
"include_req_body_expr": [
813+
[
814+
"arg_name",
815+
"==",
816+
"qwerty"
817+
]
818+
],
819+
"batch_max_size": 1
820+
}
821+
},
822+
"upstream": {
823+
"nodes": {
824+
"127.0.0.1:1980": 1
825+
},
826+
"type": "roundrobin"
827+
},
828+
"uri": "/hello"
829+
}]=]
830+
)
831+
if code >= 300 then
832+
ngx.status = code
833+
end
834+
ngx.say(body)
835+
}
836+
}
837+
838+
--- response_body
839+
passed
840+
841+
842+
843+
=== TEST 21: hit route, expr eval success
844+
--- request
845+
POST /hello?name=qwerty
846+
abcdef
847+
--- response_body
848+
hello world
849+
--- error_log eval
850+
qr/send data to kafka: \{.*"body":"abcdef"/
851+
--- wait: 2

0 commit comments

Comments
 (0)