-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpollServer.cpp
More file actions
363 lines (317 loc) · 10.8 KB
/
pollServer.cpp
File metadata and controls
363 lines (317 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#include "pollServer.h"
//打印PID
void printTid()
{
printf("pid = %d, tid = %d\n", getpid(), CurrentThread::tid());
printf("now %s\n", Timestamp::now().toString().c_str());
LOG(INFO) << "PID :"<< getpid();
}
void print(const char* msg)
{
printf("msg %s %s\n", Timestamp::now().toString().c_str(), msg);
}
//添加轮询任务
void addPollEvent(int type, int intervalTime)
{
TimerParam *timerparam = new TimerParam;
timerparam->type = (POLLTYPE)type;
timerparam->state = POLLSTATE::START;
timerparam->intervalTime = intervalTime;
timerparam->paramstr = "";
//开始轮询事件
TimerId fd = g_loop->runEvery(intervalTime,std::bind(handleEvent, timerparam));
timerparam->fd = fd;
//定时器对象入队列
timer_confList->pushList((void*)timerparam);
}
//处理线程池事件
void handleData(POLLTYPE type , shared_ptr<void> vptr)
{
switch(type) {
case POLLTYPE::BROADCAST:
{
printf("直播状态正在轮询中... \n");
shared_ptr <PollLiveState> livestate = static_pointer_cast<PollLiveState>(vptr);
livestate->pollState();
break;
}
case POLLTYPE::COURSE:
{
break;
}
case POLLTYPE::BROADCAST_ROOM:
{
printf("直播间状态正在轮询中... \n");
shared_ptr <PollBroadcastRoom> broadcastState = static_pointer_cast<PollBroadcastRoom>(vptr);
broadcastState->pollBroadcast();
break;
}
default:
break;
}
}
//处理轮询事件
void handleEvent(TimerParam *param)
{
switch(param->type) {
case POLLTYPE::BROADCAST: //轮询直播状态
{
printf("增加直播状态轮询任务 \n");
shared_ptr<void> vptr = shared_ptr<PollLiveState>(new PollLiveState);
//任务放入线程池
pool.run(std::bind(handleData, param->type, vptr));
break;
}
case POLLTYPE::COURSE: //轮询课程状态
{
printf("增加课程状态正在轮询任务 \n");
break;
}
case POLLTYPE::BROADCAST_ROOM: //轮询直播间状态
{
printf("增加直播间状态正在轮询任务 \n");
shared_ptr<void> vptr = shared_ptr<PollBroadcastRoom>(new PollBroadcastRoom);
//任务放入线程池
pool.run(std::bind(handleData, param->type, vptr));
break;
}
default:
break;
}
}
//处理web接口参数
void *manage_fun(void *data)
{
while(manageFlag)
{
void *parmdata = g_commList->popLockList();
if(NULL != parmdata)
{
PollParam *param = (PollParam*)parmdata;
printf("参数: %d %d %s\n",(int)param->type , (int)param->state , param->paramstr.c_str());
LOG(INFO) << "web管理线程获取参数:"<<(int)param->type<<" "<<(int)param->state<<" "<<param->paramstr;
//添加轮询任务事件
if(POLLSTATE::START == param->state)
{
void *timerParam = timer_commList->findList((int)param->type);
if(NULL == timerParam)
{
TimerParam *timerparam = new TimerParam;
timerparam->type = param->type;
timerparam->state = param->state;
timerparam->paramstr = param->paramstr;
TimerId fd = g_loop->runEvery(10,std::bind(handleEvent, timerparam));
timerparam->fd = fd;
//定时器事件对象入队列
timer_commList->pushList((void*)timerparam);
LOG(INFO) << "增加轮询事件成功 type:"<<(int)param->type;
}else
{
printf("该事件已在轮询中\n");
LOG(ERROR) << "该事件已在轮询中 type:"<<(int)param->type;
}
//取消轮询任务事件
}else if(POLLSTATE::STOP == param->state)
{
void *timerParam = timer_commList->findList((int)param->type);
if( NULL != timerParam)
{
TimerParam *timerData = (TimerParam *)timerParam;
TimerId fd = timerData->fd;
g_loop->cancel(fd);
//定时器事件对象出队列
timer_commList->popList(timerParam);
LOG(INFO) << "取消轮询事件成功 type:"<<(int)param->type;
}else
{
printf("未找到该定时器任务事件\n");
LOG(ERROR) << "取消轮询事件失败,未找到该轮询事件 type:"<<(int)param->type;
}
}else
{
printf("收到非法指令!\n");
LOG(ERROR) << "未知的命令 type:"<<(int)param->type;
}
delete param;
param = NULL;
}
}
return data;
}
//创建日志文件夹
int CreateLogFileDir(const char *sPathName)
{
char DirName[256];
strcpy(DirName, sPathName);
int i,len = strlen(DirName);
for(i=1; i<len; i++)
{
if(DirName[i]=='/')
{
DirName[i] = 0;
if(access(DirName, F_OK)!=0)
{
if(mkdir(DirName, 0755)==-1)
{
printf("mkdir log error\n");
LOG(ERROR) << "创建日志文件夹失败";
return -1;
}
}
DirName[i] = '/';
}
}
return 0;
}
//停止运行服务
void StopServer(int sig)
{
//web服务停止
int resCode = stopWebServer();
if(0 != resCode)
{
LOG(ERROR) << "退出web服务异常 "<<" "<<"resCode:"<<resCode;
}
LOG(INFO) << "已退出web服务";
//web参数管理线程
manageFlag = false;
resCode = pthread_join(manage_t, NULL);
if(0 != resCode)
{
printf("web参数管理线程 ret:%d" , resCode);
LOG(ERROR) << "web参数管理线程 "<<" "<<"resCode:"<<resCode;
}
LOG(INFO) << "已退出web参数管理线程";
//回收线程池
pool.stop();
//定时器服务停止
g_loop->quit();
if(!g_loop)
{
delete g_loop;
g_loop = NULL;
}
LOG(INFO) << "已退出定时器服务停止";
//删除队列任务
if(!g_commList)
{
delete g_commList;
g_commList = NULL;
}
if(!timer_commList)
{
delete timer_commList;
timer_commList = NULL;
}
if(!timer_confList)
{
delete timer_confList;
timer_confList = NULL;
}
}
int main()
{
int resCode = 0;
printTid();
//从配置文件读取http接口参数
CConfigFileReader config_file("poll.conf");
ServerIP = config_file.GetConfigName("ServerIP");
ServerAllID = config_file.GetConfigName("ServerAllID");
SelectState = config_file.GetConfigName("SelectState");
UpdataState = config_file.GetConfigName("UpdataState");
PullUrl = config_file.GetConfigName("PullUrl");
string StateTime = config_file.GetConfigName("StatePollTime");
string BroadcastTime = config_file.GetConfigName("BroadcastPollTime");
StatePollTime = std::stoi(StateTime);
BroadcastPollTime = std::stoi(BroadcastTime);
//从配置文件中获取事件数组
int event_count = 0;
event_info_t* event_list = read_event_config(&config_file, "EventType", "EeventTime", event_count);
printf("configInfo: %s %s %s %s %s %d %d\n", ServerIP.c_str(), ServerAllID.c_str(),SelectState.c_str(),
UpdataState.c_str(),PullUrl.c_str(),StatePollTime,BroadcastPollTime);
//停止服务信号
signal(SIGINT, StopServer);
signal(SIGHUP,StopServer);
//创建日志文件夹
CreateLogFileDir(logstr.c_str());
//创建log初始化
google::InitGoogleLogging("");
string logfile = logstr + "poll";
google::SetLogDestination(google::GLOG_INFO,logfile.c_str());
FLAGS_logbufsecs = 0; //缓冲日志输出,默认为30秒,此处改为立即输出
FLAGS_max_log_size = 500; //最大日志大小为 100MB
FLAGS_stop_logging_if_full_disk = true; //当磁盘被写满时,停止日志输出*/
//web接口参数队列
g_commList = new CommonList(true);
if(NULL == g_commList)
{
resCode = 1;
printf("resCode: %d errInfo:%s", resCode, "web参数队列初始化失败!");
LOG(ERROR) << "web参数队列初始化失败 "<<" "<<"resCode:"<<resCode;
return resCode;
}
//定时器事件队列
timer_commList = new CommonList(false);
if(NULL == timer_commList)
{
resCode = 2;
printf("resCode: %d errInfo:%s", resCode, "web接口定时器事件队列初始化失败!");
LOG(ERROR) << "定时器事件队列初始化失败 "<<" "<<"resCode:"<<resCode;
return resCode;
}
//本地可配置事件队列
timer_confList = new CommonList(false);
if(NULL == timer_confList)
{
resCode = 2;
printf("resCode: %d errInfo:%s", resCode, "本地可配置事件队列初始化失败!");
LOG(ERROR) << "本地可配置队列初始化失败 "<<" "<<"resCode:"<<resCode;
return resCode;
}
//启动线程池
pool.setMaxQueueSize(1000);
pool.start(10);
//web参数管理线程
resCode = pthread_create(&manage_t, NULL, manage_fun, NULL);
if(0 != resCode)
{
printf("resCode: %d errInfo:%s", resCode, "管理线程创建失败");
LOG(ERROR) << "管理线程创建失败"<<" "<<"resCode:"<<resCode;
return resCode;
}
LOG(INFO) << "已启动web参数管理线程";
//启动web服务
resCode = startWebServer(g_commList);
if(0 != resCode)
{
printf("resCode: %d errInfo:%s", resCode, "启动web服务失败!");
LOG(ERROR) << "启动web服务失败"<<" "<<"resCode:"<<resCode;
return resCode;
}
LOG(INFO) << "已启动web服务";
//定时器服务
g_loop =new EventLoop();
if(NULL == g_loop)
{
resCode = 3;
LOG(ERROR) << "初始化定时器失败"<<" "<<"resCode:"<<resCode;
printf("resCode: %d errInfo:%s", resCode, "初始化定时器失败");
return resCode;
}
LOG(INFO) << "已启动定时器服务";
g_loop->runEvery(1, std::bind(print, "once1"));
//将配置文件中的事件加入到事件循环中
if(event_count > 0)
{
for(int i = 0; i < event_count;i++)
{
int type = event_list[i].eventType;
int time = event_list[i].intervalTime;
addPollEvent(type,time);
}
}
LOG(INFO) << "已启动配置文件中的所有定时器事件";
//定时器服务开始轮询
g_loop->loop();
return resCode;
}