This repository was archived by the owner on Jan 17, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathagent.lua
More file actions
94 lines (87 loc) · 3.4 KB
/
agent.lua
File metadata and controls
94 lines (87 loc) · 3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
local libs = require 'libs'
local util = require 'util.util'
local fileutil = require 'util.fileutil'
local tableutil = require 'util.tableutil'
local kafkaClient = require 'rdkafka.kafkaclient'
local customLogWatchConfig = (require 'conf.logwatchconfig').getconfig()
local tunningConfig = (require 'conf.tunningconfig' ).getconfig()
local parserConfig = (require 'conf.parserconfig' ).init()
local customParserConfig = (require 'conf.parserconfig' ).getconfig()
local customKafkaConfig = (require 'conf.kafkaconfig' ).getconfig()
local singlelineW = require 'watchlog.watchlogfilesingleline'
local multilineW = require 'watchlog.watchlogfilemultiline'
local javalogW = require 'watchlog.watchlogfilejavalog'
local jsonlogW = require 'watchlog.watchlogfilesinglelinejson'
local container , metrics , msgCount = {} , {} , 0
local lastCheckTaskTime , now = os.time() , os.time()
kafkaClient.initKafkaClient(customKafkaConfig , metrics)
local function createTask(first, task)
print("creat task " .. task.dirpath .. task.filename)
return coroutine.create(function()
local topic = kafkaClient.getTopicInst(task.topic)
local watchlogFac = nil
if task.multiline then
watchlogFac = (task.javaLog and javalogW or multilineW)
else
watchlogFac = (task.jsonLog and jsonlogW or singlelineW)
end
local wtcLogFile = watchlogFac:new(task , customParserConfig , tunningConfig , first)
local c = 0
while true do
c = wtcLogFile:readFile(kafkaClient , topic)
if c < 0 then break end
coroutine.yield(c)
end
wtcLogFile:close()
coroutine.yield(c)
end)
end
local function checkTask(first)
for _ , task in ipairs(customLogWatchConfig) do
if task.dirpath == nil then error("dirpath define missing") end
if task.filename then
local p = task.dirpath .. task.filename
if container[p] == nil then
if fileutil.isExist(p) then
container[p] , metrics[p] = createTask(first, task) , 0
end
end
elseif task.filenameFilter then
local filenames = fileutil.filter(task.dirpath , task.filenameFilter)
for _ , filename in ipairs(filenames) do
if container[task.dirpath .. filename] == nil then
local newtask = tableutil.clone(task)
newtask.filename = filename
container[task.dirpath .. filename] , metrics[task.dirpath .. filename] = createTask(first, newtask) , 0
end
end
else
error("filename define missing")
end
end
end
--------------------------------------- starting ---------------------------------------
checkTask(true)
while true do
now = os.time()
if now - lastCheckTaskTime >= tunningConfig.CHECK_FILE_EXIST_INTERVAL then
checkTask(false)
lastCheckTaskTime = now
end
for name , worker in pairs(container) do
local status , result = coroutine.resume(worker)
if status then
if result < 0 then
container[name] , metrics[name] = nil , nil
print("delete task for " .. name)
else
msgCount = msgCount + result
metrics[name] = metrics[name] + result
end
else
print(result)
end
end
if msgCount == 0 then util.sleep(tunningConfig.NO_DATA_INTERVAL) end
msgCount = 0
end