线程池的封装实现

大耗子 2020年06月08日 203次浏览

文章链接:https://codemouse.online/archives/2020-06-08154227

线程常用函数

创建线程函数———pthread_create函数

#include <pthread.h>
int pthread_create(pthread_t * thread, const pthread_arrt_t* attr,void*(*start_routine)(void *), void* arg);
  • 参数说明:
    (1)thread参数是新线程的标识符,为一个整型。
    (2)attr参数用于设置新线程的属性。给传递NULL表示设置为默认线程属性。
    (3)start_routine和arg参数分别指定新线程将运行的函数和参数。start_routine返回时,这个线程就退出了
    (4)返回值:成功返回0,失败返回错误号。
  • 线程id的类型是thread_t,它只在当前进程中保证是唯一的,在不同的系统中thread_t这个类型有不同的实现,调用pthread_self()可以获得当前线程的id
  • 进程id的类型时pid_t,每个进程的id在整个系统中是唯一的,调用getpid()可以获得当前进程的id,是一个正整数值。

终止线程———pthread_cancel函数和pthread_exit函数

#include <pthread.h>
int pthread_cancel(pthread_t thread);
  • 参数说明:
    (1)thread参数是目标线程的标识符。
    (2)该函数成功返回0,失败返回错误码。
#include <pthread.h>
void pthread_exit(void * retval);
  • 参数说明:
    (1)retval是void *类型其它线程可以调用pthread_join获得这个指针。需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是由malloc分 配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
    (2)pthread_exit函数通过retval参数向线程的回收者传递其退出信息。它执行之后不会返回到调用者,且永远不会失败。

线程等待———pthread_join

#include <pthread.h>
void pthread_join(pthread_t thread,void ** retval);
  • 参数说明:
    (1)调用该函数的线程将挂起等待,直到id为thread的线程终止。
    (2)thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的。

线程分离———pthread_detach

#include <pthread.h>
int pthread_detach(pthread_t tid);
  • 参数说明:
    (1)tid:线程标识符
    (2)返回值:pthread_detach() 在调用成功完成之后返回零。其他任何返回值都表示出现了错误。
    EINVAL:tid是分离线程。
    ESRCH:tid不是当前进程中有效的为分离线程。

线程数量的设置

  • 通常线程池的线程数量设置为40%< idleThreadNum / sumThreadNum < 80%(注重业务)

  • 设置线程数量为cpu的核心数(注重计算)

模块化写法

  • 创建线程池

int nThreadPoolCreate(nThreadPool *pool, int numWorkers);

  • 加入新任务

int nThreadPoolPushJob(nThreadPool *pool, nJob *job);

  • 摧毁线程池

    int nThreadPoolDestory(nThreadPool *pool);

#define LL_ADD(item, list) do {				\
	item->prev = NULL;						\
	item->next = list;						\
	if (list != NULL) list->prev = item;	\
	list = item;							\	
} while (0)

#define LL_REMOVE(item, list) do {			\
	if (item->prev != NULL) item->prev->next = item->next;	\
	if (item->next != NULL) item->next->prev = item->prev;	\
	if (list == item) list = item->next;					\
	item->prev = item->next = NULL;							\
} while (0)


typedef struct NWORKER {
	pthread_t threadid;
	int terminate; // 控制work退出码 1退出 0保持

	struct NMANAGER *pool;
	
	struct NWORKER *next;
	struct NWORKER *prev;
	
} nWorker;


typedef struct NJOB {

	void (*func)(void *arg);
	void *user_data; // 参数

	struct NJOB *next;
	struct NJOB *prev;

} nJob;

typedef struct NMANAGER {
	nWorker *workers; // 线程池
	nJob *jobs; // 工作池

	int sum_thread;  
	int free_thread; //free_thread = sum_thread;

	pthread_mutex_t jobs_mtx;
	pthread_cond_t jobs_cond;
	
} nManager;

typedef nManager nThreadPool;

// 工作线程回调
void *nWorkerCallback(void *arg) {

	nWorker *worker = (nWorker*)arg;
	while (1) {

		// jobs != null
		pthread_mutex_lock(&worker->pool->jobs_mtx);
		while (worker->pool->jobs == NULL) {
			if (worker->terminate == 1) break;
			pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mtx);
		}

		if (worker->terminate == 1) { 
			pthread_mutex_unlock(&worker->pool->jobs_mtx);
			break;
		}
		// ll_remove(item, jobs)
		nJob *job = worker->pool->jobs;
		LL_REMOVE(job, worker->pool->jobs);
		
		// enter 
		worker->pool->free_thread --;
		pthread_mutex_unlock(&worker->pool->jobs_mtx);
		
		job->func(job->user_data);
		// end
		pthread_mutex_lock(&worker->pool->jobs_mtx);
		worker->pool->free_thread ++;
		pthread_mutex_unlock(&worker->pool->jobs_mtx);
		
		free(job);

		// jobs->func(jobs);
	}
	free(worker);
}
// 创建线程池
int nThreadPoolCreate(nThreadPool *pool, int numWorkers) {

	if (pool == NULL) return -1;
	if (numWorkers < 1) numWorkers = 1;
	memset(pool, 0, sizeof(nThreadPool));


	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
	memcpy(&pool->jobs_mtx, &blank_mutex, sizeof(pthread_mutex_t));

	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
	memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));

	int i = 0;
	for (i = 0;i < numWorkers;i ++) {

		nWorker *worker = (nWorker*)malloc(sizeof(nWorker));
		if (worker == NULL) {
			perror("malloc");
			return 1;
		}
		memset(worker, 0, sizeof(nWorker));
		worker->pool = pool;
		int ret = pthread_create(&worker->threadid, NULL, nWorkerCallback, worker);
		if (ret) {
			perror("pthread_create");
			//free(worker);
			nWorker *w = pool->workers;
			for (w = pool->workers; w != NULL; w = w->next) {
				w->terminate = 1;
			}
				
			return 1;
		}
		LL_ADD(worker, pool->workers);
	}

	return 0;
}

// 程序退出时候,退出所有的线程
int nThreadPoolDestory(nThreadPool *pool) {

	nWorker *w = pool->workers;
	for (w = pool->workers; w != NULL; w = w->next) {
		w->terminate = 1;
	}

	pthread_mutex_lock(&pool->jobs_mtx);
	
	pthread_cond_broadcast(&pool->jobs_cond);
	
	pthread_mutex_unlock(&pool->jobs_mtx);
}

// 添加新任务
int nThreadPoolPushJob(nThreadPool *pool, nJob *job) {

	pthread_mutex_lock(&pool->jobs_mtx);
	LL_ADD(job, pool->jobs);
	// 此处如果所有线程都在繁忙,此时signal会无效,待解决的问题
	// 处理方式:
	//  1. 等待有线程空闲时候在发送信号
	//  2. 线程过少的时候去创建线程
	pthread_cond_signal(&pool->jobs_cond);
	pthread_mutex_unlock(&pool->jobs_mtx);

}