linux多线程同步

linux多线程


进程特性

Linux进程创建一个新线程时,线程将拥有自己的栈(因为线程有自己的局部变量),但与它的创建者共享全局变量、文件描述符、信号句柄和当前目录状态。
Linux通过fork创建子进程与创建线程之间是有区别的:fork创建出该进程的一份拷贝,这个新进程拥有自己的变量和自己的PID,它的时间调度是独立的,它的执行几乎完全独立于父进程。
进程可以看成一个资源的基本单位,而线程是程序调度的基本单位,一个进程内部的线程之间共享进程获得的时间片。
介绍线程的基本函数,主要了解文章中的_REENTRANT宏,以及线程中的基本函数到线程属性
另一篇文章

多线程面试题集锦

多线程面试题

生产者-消费者问题

生产者消费者
模型:
生产者—->有限的缓冲区—->消费者
问题描述:生产者产生项目并把他们插入到一个有限的缓冲区中。需要消费者从缓冲区中取出这些项目,然后消费他们。
因为插入和取出项目都涉及更新共享变量,所以我们必须保证对缓冲区的访问是互斥的。只保证互斥访问是不够的,我们还需要更新调度对缓冲区的访问。如果缓冲区是满的,那么生产者必须等待直到一个槽位变为可用。与之相似,如果缓冲区是空的,那么消费者必须等待直到有一个项目变为可用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>

#define USER 2 //生产者数量,消费者数量
#define NUMS 10 //缓冲区长度

typedef struct{
int *buf; //缓冲区
int n; //缓冲区最大长度
int front; //队头 (front+1)%n 是第一个元素
int rear; //队尾 (rear % n)是最后一个元素
sem_t mutex; //保护对缓冲区的访问
sem_t slots; //Counts the slot
sem_t items; //Counts the item
}sbuf_t;

void sbuf_init(sbuf_t *sp, int n){
int result;
sp->buf = (int*)calloc(n, sizeof(int));
if(sp->buf == NULL){
fprintf(stderr, "calloc error\n");
exit(EXIT_FAILURE);
}
sp->n = n;
sp->front = sp->rear = 0;

result = sem_init(&sp->mutex, 0, 1);
if (result != 0){
fprintf(stderr, "sem_init mutex error\n");
exit(EXIT_FAILURE);
}

result = sem_init(&sp->slots, 0, n);
if (result != 0){
fprintf(stderr, "sem_init slots error\n");
exit(EXIT_FAILURE);
}

result = sem_init(&sp->items, 0, 0);
if ( result != 0){
fprintf(stderr, "sem_init items error\n");
exit(EXIT_FAILURE);
}
}

void sbuf_deinit(sbuf_t *sp){
free(sp->buf);
sp->buf = NULL;
}

void sbuf_insert(sbuf_t *sp, int item){
int ret;

ret = sem_wait(&sp->slots);
if (ret != 0){
fprintf(stderr, "sem_wait slots error\n");
exit(EXIT_FAILURE);
}

ret = sem_wait(&sp->mutex);
if (ret != 0){
fprintf(stderr, "sem_wait mutex error\n");
exit(EXIT_FAILURE);
}

pthread_t tid;
tid = pthread_self();
sp->buf[sp->rear] = item;
sp->rear = ++(sp->rear) % sp->n;

printf("thread[%u] insert an item :%d\n", (unsigned int)tid, item);

ret = sem_post(&sp->mutex);
if(ret != 0){
fprintf(stderr, "sem_post mutex error\n");
exit(EXIT_FAILURE);
}
ret = sem_post(&sp->items);
if(ret != 0){
fprintf(stderr, "sem_post items error\n");
exit(EXIT_FAILURE);
}
}

int sbuf_remove(sbuf_t * sp){
int item = -1;
int ret;

ret = sem_wait(&sp->items);
if (ret != 0){
fprintf(stderr, "sem_wait items error\n");
exit(EXIT_FAILURE);
}

ret = sem_wait(&sp->mutex);
if (ret != 0){
fprintf(stderr, "sem_wait mutex error\n");
exit(EXIT_FAILURE);
}

pthread_t tid;
tid = pthread_self();
item = sp->buf[sp->front];
sp->front = (++sp->front) % sp->n;
printf("thread[%u] delete an item :%d\n", (unsigned int)tid, item);

ret = sem_post(&sp->mutex);
if(ret != 0){
fprintf(stderr, "sem_post mutex error\n");
exit(EXIT_FAILURE);
}
ret = sem_post(&sp->slots);
if(ret != 0){
fprintf(stderr, "sem_post items error\n");
exit(EXIT_FAILURE);
}
return item;
}

void *producer(void *arg){
sbuf_t *sbp = (sbuf_t *) arg;
int randb;
srand((unsigned)time(NULL));
while(1){
randb = rand();
sleep(rand()%2);
//printf("now produce want to produce %d \n",randb);
//if(sbuf_insert(sbp, randb)){
//printf("report error condition of producer\n\n");
//}else{
sbuf_insert(sbp, randb);
//printf("producer produced %d\n\n",randb);
//}
}
}

void *consumer(void *arg){
sbuf_t *sbp = (sbuf_t *) arg;
int item;
srand((unsigned)time(NULL));
while(1){
sleep(rand()%2);
//printf("now consume want to consume an item\n");
if((item = sbuf_remove(sbp)) < 0)
printf("report error condition of consume\n\n");
//else
//printf("consume consumed %d\n\n",item);
}
}

int main(int argc, char *argv[]){
pthread_t tidProducer[USER], tidConsumer[USER];
int i;
int err;

sbuf_t sb;

sbuf_init(&sb, NUMS);

for( i = 0 ; i < USER; ++i){
err = pthread_create(&tidProducer[i], NULL, producer, (void*)(&sb));
if (err != 0){
fprintf(stderr, "pthread_create error\n");
exit(EXIT_FAILURE);
}
}

for(i = 0; i < USER; ++i){
err = pthread_create(&tidConsumer[i], NULL, consumer, (void*)(&sb));
if(err != 0){
fprintf(stderr, "pthread_create error\n");
exit(EXIT_FAILURE);
}
}

sleep(10);
for (i = 0; i < USER; i++){
err = pthread_kill(tidProducer[i], 0);
if(err != 0){
fprintf(stderr, "pthread_kill error\n");\
exit(EXIT_FAILURE);
}
}
for (i = 0; i < USER; i++){
err = pthread_kill(tidConsumer[i], 0);
if(err != 0){
fprintf(stderr, "pthread_kill error\n");\
exit(EXIT_FAILURE);
}
}

sbuf_deinit(&sb);
return 0;
}

线程池

我们知道应用程序创建一个对象,然后销毁对象是很耗费资源的。创建线程,销毁线程,也是如此。因此,我们就预先生成一些线程,等到我们使用的时候在进行调度,于是,一些”池化资源”技术就这样的产生了。
一般一个简单线程池至少包含下列组成部分。

  1. 线程池管理器(ThreadPoolManager):用于创建并管理线程池
  2. 工作线程(WorkThread): 线程池中线程
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
  4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
    实现的具体代码见