Learn And Life.

PHP中的线程

线程安全ZTS

这个宏是需要在编译的时候指定了,才会生成的。其中configure文件中有下面的一行

1
$as_echo "#define ZTS 1" >>confdefs.h

说明只有当TSRM被启用的时候,就会定义这个名为ZTS的宏

线程安全资源管理器(Thread Safe Resource Manager)TSRM

在写PHP扩展的时候经常使用的宏,先看下相关的宏的定义

1
2
3
4
5
6
7
8
9
10
11
#ifdef ZTS
#define TSRMLS_D void ***tsrm_ls
#define TSRMLS_DC , TSRMLS_D
#define TSRMLS_C tsrm_ls
#define TSRMLS_CC , TSRMLS_C
#else
#define TSRMLS_D void
#define TSRMLS_DC
#define TSRMLS_C
#define TSRMLS_CC
#endif

发现这些宏都跟tsrm_ls是相关的,找到sapi/fpm/fpm/fpm_main.c中1583行和1612行

1
2
void ***tsrm_ls;
tsrm_ls = ts_resource(0);

TSRM/TSRM.h文件112行

1
#define ts_resource(id) ts_resource_ex(id, NULL)

最终定位到ts_resource_ex这个函数,现在看下这个函数是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
/* 线程指针表头指针 */
static tsrm_tls_entry **tsrm_tls_table = NULL
/* 线程数量 */
static int tsrm_tls_table_size;
/* 初始化线程表 */
tsrm_tls_table = (tsrm_tls_entry **) calloc(tsrm_tls_table_size, sizeof(tsrm_tls_entry *));
typedef struct _tsrm_tls_entry tsrm_tls_entry;
/* 线程结构体 */
struct _tsrm_tls_entry {
void **storage; // 资源指针、就是指向自己的公共资源内存区
int count; // 资源数、就是 PHP内核 + 扩展模块 共注册了多少公共资源
THREAD_T thread_id; // 线程id
tsrm_tls_entry *next; // 指向下一个线程指针,当前每一个线程指针都存在一个线程指针表里(类似于hash表),这个next可以理解成是hash冲突链式解决法
};
/* 资源类型 */
typedef struct {
size_t size; // 资源大小
ts_allocate_ctor ctor; // 构造函数指针、在给每一个线程创建该资源的时候会调用一下当前ctor指针
ts_allocate_dtor dtor; // 析构函数指针、释放该资源的时候会调用一下当前dtor指针
int done; // 资源是否已经销毁 0:正常 1:已销毁
} tsrm_resource_type;
static tsrm_resource_type \*resource_types_table=NULL; // 公共资源类型表头指针
static int resource_types_table_size; // 当前公共资源类型数量
/* 全局资源 注册资源时给每一个资源生成一个唯一id,获取时根据该唯一id进行获取 */
/* 每个线程都会把当前注册的所有公共资源全部copy一份过来,也就是一个malloc()一个大数组,这个资源id就是该数组的索引 */
/* 然后需要公共资源时,通过资源id来读取即可 */
typedef int ts_rsrc_id;
static ts_rsrc_id id_count;
/*#################### 线程管理 #####################*/
/*# 1. tsrm_startup() #*/
/*# 2. ts_allocate_id() #*/
/*# 3. ts_reaource(id) #*/
/*# 4. allocate_new_resource() #*/
/*# 5. tsrm_shutdown() #*/
/*####################################################*/
//1. 内核初始化 tsrm_startup() Startup TSRM (call once for the entire process)
TSRM_API int tsrm_startup(int expected_threads, int expected_resources, int debug_level, char *debug_filename)
{
#if defined(GNUPTH)
pth_init();
#elif defined(PTHREADS)
pthread_key_create( &tls_key, 0 );
#elif defined(TSRM_ST)
st_init();
st_key_create(&tls_key, 0);
#elif defined(TSRM_WIN32)
tls_key = TlsAlloc();
#elif defined(BETHREADS)
tls_key = tls_allocate();
#endif
/* ensure singleton */
in_main_thread = 1;
tsrm_error_file = stderr;
tsrm_error_set(debug_level, debug_filename);
tsrm_tls_table_size = expected_threads;
tsrm_tls_table = (tsrm_tls_entry **) calloc(tsrm_tls_table_size, sizeof(tsrm_tls_entry *));
if (!tsrm_tls_table) {
TSRM_ERROR((TSRM_ERROR_LEVEL_ERROR, "Unable to allocate TLS table"));
return 0;
}
id_count=0;
resource_types_table_size = expected_resources;
resource_types_table = (tsrm_resource_type *) calloc(resource_types_table_size, sizeof(tsrm_resource_type));
if (!resource_types_table) {
TSRM_ERROR((TSRM_ERROR_LEVEL_ERROR, "Unable to allocate resource types table"));
free(tsrm_tls_table);
tsrm_tls_table = NULL;
return 0;
}
tsmm_mutex = tsrm_mutex_alloc();
TSRM_ERROR((TSRM_ERROR_LEVEL_CORE, "Started up TSRM, %d expected threads, %d expected resources", expected_threads, expected_resources));
return 1;
}
//2. 注册公共资源 ts_allocate_id() allocates a new thread-safe-resource id
TSRM_API ts_rsrc_id ts_allocate_id(ts_rsrc_id *rsrc_id, size_t size, ts_allocate_ctor ctor, ts_allocate_dtor dtor)
{
int i;
TSRM_ERROR((TSRM_ERROR_LEVEL_CORE, "Obtaining a new resource id, %d bytes", size));
tsrm_mutex_lock(tsmm_mutex);
/* obtain a resource id */
*rsrc_id = TSRM_SHUFFLE_RSRC_ID(id_count++);
TSRM_ERROR((TSRM_ERROR_LEVEL_CORE, "Obtained resource id %d", *rsrc_id));
/* store the new resource type in the resource sizes table */
if (resource_types_table_size < id_count) {
resource_types_table = (tsrm_resource_type *) realloc(resource_types_table, sizeof(tsrm_resource_type)*id_count);
if (!resource_types_table) {
tsrm_mutex_unlock(tsmm_mutex);
TSRM_ERROR((TSRM_ERROR_LEVEL_ERROR, "Unable to allocate storage for resource"));
*rsrc_id = 0;
return 0;
}
resource_types_table_size = id_count;
}
resource_types_table[TSRM_UNSHUFFLE_RSRC_ID(*rsrc_id)].size = size;
resource_types_table[TSRM_UNSHUFFLE_RSRC_ID(*rsrc_id)].ctor = ctor;
resource_types_table[TSRM_UNSHUFFLE_RSRC_ID(*rsrc_id)].dtor = dtor;
resource_types_table[TSRM_UNSHUFFLE_RSRC_ID(*rsrc_id)].done = 0;
/* enlarge the arrays for the already active threads */
for (i=0; i<tsrm_tls_table_size; i++) {
tsrm_tls_entry *p = tsrm_tls_table[i];
while (p) {
if (p->count < id_count) {
int j;
p->storage = (void *) realloc(p->storage, sizeof(void *)*id_count);
for (j=p->count; j<id_count; j++) {
p->storage[j] = (void *) malloc(resource_types_table[j].size);
if (resource_types_table[j].ctor) {
resource_types_table[j].ctor(p->storage[j]);
}
}
p->count = id_count;
}
p = p->next;
}
}
tsrm_mutex_unlock(tsmm_mutex);
TSRM_ERROR((TSRM_ERROR_LEVEL_CORE, "Successfully allocated new resource id %d", *rsrc_id));
return *rsrc_id;
}
// 3. 读取公共资源 ts_reaource(id) fetches the requested resource for the current thread
TSRM_API void *ts_resource_ex(ts_rsrc_id id, THREAD_T *th_id)
{
THREAD_T thread_id;
int hash_value;
tsrm_tls_entry *thread_resources;
if (!th_id) {
/* Fast path for looking up the resources for the current
* thread. Its used by just about every call to
* ts_resource_ex(). This avoids the need for a mutex lock
* and our hashtable lookup.
*/
thread_resources = tsrm_tls_get();
if (thread_resources) {
TSRM_ERROR((TSRM_ERROR_LEVEL_INFO, "Fetching resource id %d for current thread %d", id, (long) thread_resources->thread_id));
/* Read a specific resource from the thread's resources.
* This is called outside of a mutex, so have to be aware about external
* changes to the structure as we read it.
*/
TSRM_SAFE_RETURN_RSRC(thread_resources->storage, id, thread_resources->count);
}
thread_id = tsrm_thread_id();
} else {
thread_id = *th_id;
}
TSRM_ERROR((TSRM_ERROR_LEVEL_INFO, "Fetching resource id %d for thread %ld", id, (long) thread_id));
tsrm_mutex_lock(tsmm_mutex);
hash_value = THREAD_HASH_OF(thread_id, tsrm_tls_table_size);
thread_resources = tsrm_tls_table[hash_value];
if (!thread_resources) {
allocate_new_resource(&tsrm_tls_table[hash_value], thread_id);
return ts_resource_ex(id, &thread_id);
} else {
do {
if (thread_resources->thread_id == thread_id) {
break;
}
if (thread_resources->next) {
thread_resources = thread_resources->next;
} else {
allocate_new_resource(&thread_resources->next, thread_id);
return ts_resource_ex(id, &thread_id);
/*
* thread_resources = thread_resources->next;
* break;
*/
}
} while (thread_resources);
}
tsrm_mutex_unlock(tsmm_mutex);
/* Read a specific resource from the thread's resources.
* This is called outside of a mutex, so have to be aware about external
* changes to the structure as we read it.
*/
TSRM_SAFE_RETURN_RSRC(thread_resources->storage, id, thread_resources->count);
}
// 初始化当前线程 allocate_new_resource()
static void allocate_new_resource(tsrm_tls_entry **thread_resources_ptr, THREAD_T thread_id)
{
int i;
TSRM_ERROR((TSRM_ERROR_LEVEL_CORE, "Creating data structures for thread %x", thread_id));
(*thread_resources_ptr) = (tsrm_tls_entry *) malloc(sizeof(tsrm_tls_entry));
(*thread_resources_ptr)->storage = NULL;
if (id_count > 0) {
(*thread_resources_ptr)->storage = (void **) malloc(sizeof(void *)*id_count);
}
(*thread_resources_ptr)->count = id_count;
(*thread_resources_ptr)->thread_id = thread_id;
(*thread_resources_ptr)->next = NULL;
/* Set thread local storage to this new thread resources structure */
tsrm_tls_set(*thread_resources_ptr);
if (tsrm_new_thread_begin_handler) {
tsrm_new_thread_begin_handler(thread_id);
}
for (i=0; i<id_count; i++) {
if (resource_types_table[i].done) {
(*thread_resources_ptr)->storage[i] = NULL;
} else
{
(*thread_resources_ptr)->storage[i] = (void *) malloc(resource_types_table[i].size);
if (resource_types_table[i].ctor) {
resource_types_table[i].ctor((*thread_resources_ptr)->storage[i]);
}
}
}
if (tsrm_new_thread_end_handler) {
tsrm_new_thread_end_handler(thread_id);
}
tsrm_mutex_unlock(tsmm_mutex);
}
// 注销线程资源tsrm_shutdown(void) Shutdown TSRM (call once for the entire process)
TSRM_API void tsrm_shutdown(void)
{
int i;
if (!in_main_thread) {
/* ensure singleton */
return;
}
if (tsrm_tls_table) {
for (i=0; i<tsrm_tls_table_size; i++) {
tsrm_tls_entry *p = tsrm_tls_table[i], *next_p;
while (p) {
int j;
next_p = p->next;
for (j=0; j<p->count; j++) {
if (p->storage[j]) {
if (resource_types_table && !resource_types_table[j].done && resource_types_table[j].dtor) {
resource_types_table[j].dtor(p->storage[j]);
}
free(p->storage[j]);
}
}
free(p->storage);
free(p);
p = next_p;
}
}
free(tsrm_tls_table);
tsrm_tls_table = NULL;
}
if (resource_types_table) {
free(resource_types_table);
resource_types_table=NULL;
}
tsrm_mutex_free(tsmm_mutex);
tsmm_mutex = NULL;
TSRM_ERROR((TSRM_ERROR_LEVEL_CORE, "Shutdown TSRM"));
if (tsrm_error_file!=stderr) {
fclose(tsrm_error_file);
}
#if defined(GNUPTH)
pth_kill();
#elif defined(PTHREADS)
pthread_setspecific(tls_key, 0);
pthread_key_delete(tls_key);
#elif defined(TSRM_WIN32)
TlsFree(tls_key);
#endif
if (tsrm_shutdown_handler) {
tsrm_shutdown_handler();
}
tsrm_new_thread_begin_handler = NULL;
tsrm_new_thread_end_handler = NULL;
tsrm_shutdown_handler = NULL;
}