互斥锁,条件等待,读写锁,自旋锁,原子操作,CAS操作

大耗子 2020年06月09日 183次浏览

文章链接:https://codemouse.online/archives/2020-06-09145446

互斥锁(mutex)

  • 用在执行长的代码块效率较高,如果只是执行一条少的指令,速度不如自旋锁和原子锁。如果被锁住,线程回去休眠等待,不占用系统资源。由于共享区域执行时间长,所以速度快。
  • API:
#include <pthread.h>
//动态初始化:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
//静态初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex)
int pthread_mutex_unlock(pthread_mutex_t *mutex)
int pthread_mutex_trylock(pthread_mutex_t *mutex)

#include <pthread.h>
#include <time.h>
//避免死锁,允许线程阻塞特定时间,如果加锁失败就会返回ETIMEDOUT
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timesec *restrict tsptr);

mutex:互斥锁
attr:互斥锁的属性,NULL表示默认/缺省的属性

条件锁

  • 条件锁与互斥锁一起配合使用的。条件本身是由互斥量保护的。线程在改变条件状态之前必须产生锁住互斥量,其他线程在获得互斥量之前不会到这种改变,因为互斥量必须在锁定以后才能计算条件。
  • API
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
int pthreead_cond_destroy(pthread_cond_t *cond);
// 进入等待,mutex会被解开,然后进入睡眠,唤醒后mutex继续上锁
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

// 唤醒,如果线程没有再wait上,那么无法唤醒,实质是从等待队列中唤醒一个
int pthread_cond_signal(pthread_cond_t *cond);
// 这个唤醒有惊群效应,全部唤醒
int pthread_cond_broadcast(pthread_cond_t *cond);

读写锁

  • 读写锁与互斥量类似,但它允许更高的并行性。
  • 互斥量只有两种状态:锁住和未锁住,且一次只有一个线程可以对它加锁。
  • 读写锁可以有三种状态:读模式下加锁状态、写模式下加锁状态和不加锁状态。一次只有一个线程可以占有写模式的读写锁,但多个线程可以同时占有读模式的读写锁。
  • 读写锁非常适合于对数据结构读的次数远远大于写的情况。
    API:
#include <pthread.h>

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

自旋锁

  • 常用于快速响应的线程情况。自旋锁如果被锁住,其他线程获取锁就会空转等待,消耗CPU资源,不会去休眠,执行语句多和阻塞一般不适用。
  • API:
#include <pthread.h>
int pthread_spin_destroy(pthread_spinlock_t *); 
int pthread_spin_init(pthread_spinlock_t *, int pshared);
int pthread_spin_lock(pthread_spinlock_t *);
int pthread_spin_trylock(pthread_spinlock_t *);
int pthread_spin_unlock(pthread_spinlock_t *);

pshared的取值:
PTHREAD_PROCESS_SHARED:该⾃旋锁可以在多个进程中的线程之间共享。 PTHREAD_PROCESS_PRIVATE:仅初始化本⾃旋锁的线程所在的进程内的线程才能够使⽤该⾃旋锁

原子操作

操作本身就具有原子性,不可拆分。
对于gcc、g++编译器来讲,它们提供了⼀组API来做原⼦操作:

#include <atomic>
type __sync_fetch_and_add (type *ptr, type value, ...) 
type __sync_fetch_and_sub (type *ptr, type value, ...) 
type __sync_fetch_and_or (type *ptr, type value, ...) 
type __sync_fetch_and_and (type *ptr, type value, ...) 
type __sync_fetch_and_xor (type *ptr, type value, ...) 
type __sync_fetch_and_nand (type *ptr, type value, ...) 
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newva l, ...) 
type __sync_val_compare_and_swap (type *ptr, type oldval type newval , ...) 
type __sync_lock_test_and_set (type *ptr, type value, ...)
void __sync_lock_release (type *ptr, ...)

无锁数据结构CAS(compare_and_swap)

解释引致《C++并发编程实战》:

  • 作为无锁结构,就意味着线程可以并发的访问这个数据结构。线程不能做相同的操作;一个 无锁队列可能允许一个线程进行压入数据,另一个线程弹出数据,当有两个线程同时尝试添 加元素时,这个数据结构将被破坏。不仅如此,当其中一个访问线程被调度器中途挂起时, 其他线程必须能够继续完成自己的工作,而无需等待挂起线程。
  • 具有“比较/交换”操作的数据结构,通常在“比较/交换”实现中都有一个循环。使用“比较/交换”操 作的原因:当有其他线程同时对指定数据的修改时,代码将尝试恢复数据。当其他线程被挂 起时,“比较/交换”操作执行成功,那么这样的代码就是无锁的。当执行失败时,就需要一个 自旋锁了,且这个结构就是“非阻塞-有锁”的结构。
  • 无锁算法中的循环会让一些线程处于“饥饿”状态。如有线程在“错误”时间执行,那么第一个线 程将会不停得尝试自己所要完成的操作(其他程序继续执行)。“无锁-无等待”数据结构,就为了 避免这种问题存在的。
  • cas其实这也算一种锁,乐观锁!
    无锁数据结构简单实现
bool CAS( int * pAddr, int nExpected, int nNew ) 
{ 
	if ( *pAddr == nExpected ) { 
		*pAddr = nNew ; 
		return true ; 
	} 
	else 
		return false ; 
}

用例

int num = 0;
void add(int cout)
{
    int temp;
    do
    {
        temp = num;
    }
    while (cas(&num, temp, temp+cout)==true)
}

使用demo

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#include <atomic>

#define MAX_THREAD_NUM 2
#define FOR_LOOP_COUNT 2000
#define FOR_ADD_COUNT 100000
static int counter = 0;
static pthread_spinlock_t spinlock;
static pthread_mutex_t mutex;

typedef void *(*thread_func_t)(void *argv);


void do_for_add(int count)
{
    long sum = 0;
    for(int i = 0; i < count; i++)
    {
        sum += i;
    }
}


// 简单的自旋锁定义
class atomic_flag_spinlock
{
    std::atomic_flag flag;
public:
    atomic_flag_spinlock():
        flag(ATOMIC_FLAG_INIT)
    {}
    void lock()
    {
        while(flag.test_and_set(std::memory_order_acquire));
    }
    void unlock()
    {
        flag.clear(std::memory_order_release);
    }
};

static atomic_flag_spinlock s_atomic_flag_spinlock;

// x86架构下的,指令前缀lock⽤于锁定,让代码具有原子性
// 并发访问(Read/Write)被禁⽌
static int lxx_atomic_add(int *ptr, int increment)
{
    int old_value = *ptr;
    __asm__ volatile("lock; xadd %0, %1 \n\t"
                     : "=r"(old_value), "=m"(*ptr)
                     : "0"(increment), "m"(*ptr)
                     : "cc", "memory");
    return *ptr;
}


// mutex锁
void *mutex_thread_main(void *argv)
{
    for (int i = 0; i < FOR_LOOP_COUNT; i++)
    {
        pthread_mutex_lock(&mutex);
        // counter++;
        do_for_add(FOR_ADD_COUNT);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

// 自定义的原子锁
void *atomic_thread_main(void *argv)
{
    for (int i = 0; i < FOR_LOOP_COUNT; i++)
    {
        lxx_atomic_add(&counter, 1);
        // counter++;
    }
    return NULL;
}

// 自旋锁
void *spin_thread_main(void *argv)
{
    for (int i = 0; i < FOR_LOOP_COUNT; i++)
    {
        pthread_spin_lock(&spinlock);
        // counter++;
        do_for_add(FOR_ADD_COUNT);
        pthread_spin_unlock(&spinlock);
    }
    return NULL;
}

// 原子锁
void *atomic_flag_spinlock_thread_main(void *argv)
{
    for (int i = 0; i < FOR_LOOP_COUNT; i++)
    {
        s_atomic_flag_spinlock.lock();
        // counter++;
        do_for_add(FOR_ADD_COUNT);
        s_atomic_flag_spinlock.unlock();
    }
    return NULL;
}

// 锁测试
int test_lock(thread_func_t func, char **argv)
{
    clock_t start = clock();
    pthread_t tid[MAX_THREAD_NUM] = {0};
    for (int i = 0; i < MAX_THREAD_NUM; i++)
    {
        int ret = pthread_create(&tid[i], NULL, func, argv);
        if (0 != ret)
        {
            printf("create thread failed\n");
        }
    }
    for (int i = 0; i < MAX_THREAD_NUM; i++)
    {
        pthread_join(tid[i], NULL);
    }
    clock_t end = clock();
    printf("spend clock : %ld, ", (end - start) / CLOCKS_PER_SEC);
    return 0;
}


int main(int argc, char **argv)
{
    printf("THREAD_NUM:%d\n\n", MAX_THREAD_NUM);
    counter = 0;
    printf("use mutex ----------->\n");
    test_lock(mutex_thread_main, NULL);
    printf("counter = %d\n", counter);

    // counter = 0;
    // printf("\nuse atomic ----------->\n");
    // test_lock(atomic_thread_main, NULL);
    // printf("counter = %d\n", counter);

    counter = 0;
    printf("\nuse spin ----------->\n");
    pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);
    test_lock(spin_thread_main, NULL);
    printf("counter = %d\n", counter);

    counter = 0;
    printf("\nuse atomic_flag_spinlock ----------->\n");
    test_lock(atomic_flag_spinlock_thread_main, NULL);
    printf("counter = %d\n", counter);

    return 0;
}