编辑
2023-11-15
redis
00
请注意,本文编写于 542 天前,最后修改于 542 天前,其中某些信息可能已经过时。

目录

初始化
任务结构体和相关数据结构
线程栈大小(4M)
提交任务
创建任务(生产者)
线程main函数(消费者)
其他函数

之前讲server启动的时候有个InitServerLast()函数

c
/* Some steps in server initialization need to be done last (after modules * are loaded). * Specifically, creation of threads due to a race bug in ld.so, in which * Thread Local Storage initialization collides with dlopen call. * see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */ void InitServerLast(void) { bioInit(); //初始化bio,其实是一个线程池 initThreadedIO(); //也是一个专门用来处理IO的线程池 set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); }

接下来讲解一下Bio,主要是bio.h,bio.c两个文件。

c
#ifndef __BIO_H #define __BIO_H typedef void lazy_free_fn(void *args[]); /* Exported API */ void bioInit(void); unsigned long bioPendingJobsOfType(int type); void bioDrainWorker(int job_type); void bioKillThreads(void); void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache); void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); /* Background job opcodes */ enum { BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */ BIO_AOF_FSYNC, /* Deferred AOF fsync. */ BIO_LAZY_FREE, /* Deferred objects freeing. */ BIO_CLOSE_AOF, /* Deferred close for AOF files. */ BIO_NUM_OPS }; #endif

初始化

主要是定了一些函数和任务类型

c
/* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; unsigned long j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_WORKER_NUM; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); bio_jobs[j] = listCreate(); } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible for. */ for (j = 0; j < BIO_WORKER_NUM; j++) { //创建线程,默认三个 void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING, "Fatal: Can't initialize Background Jobs. Error message: %s", strerror(errno)); exit(1); } bio_threads[j] = thread; } } //******************************************************************************* static char* bio_worker_title[] = { "bio_close_file", "bio_aof", "bio_lazy_free", }; #define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title))

任务结构体和相关数据结构

c
* file as the API does not expose the internals at all. */ typedef union bio_job { struct { int type; /* Job-type tag. This needs to appear as the first element in all union members. */ } header; /* Job specific arguments.*/ struct { int type; int fd; /* Fd for file based background jobs */ long long offset; /* A job-specific offset, if applicable */ unsigned need_fsync:1; /* A flag to indicate that a fsync is required before * the file is closed. */ unsigned need_reclaim_cache:1; /* A flag to indicate that reclaim cache is required before * the file is closed. */ } fd_args; struct { int type; lazy_free_fn *free_fn; /* Function that will free the provided arguments */ void *free_args[]; /* List of arguments to be passed to the free function */ } free_args; } bio_job; //********************************************************************************** static unsigned int bio_job_to_worker[] = { [BIO_CLOSE_FILE] = 0, [BIO_AOF_FSYNC] = 1, [BIO_CLOSE_AOF] = 1, [BIO_LAZY_FREE] = 2, }; static pthread_t bio_threads[BIO_WORKER_NUM]; static pthread_mutex_t bio_mutex[BIO_WORKER_NUM]; static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM]; static list *bio_jobs[BIO_WORKER_NUM]; //任务链表,默认三个双向链表 static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};

线程栈大小(4M)

c
/* Make sure we have enough stack to perform all the things we do in the * main thread. */ #define REDIS_THREAD_STACK_SIZE (1024*1024*4)

提交任务

c
void bioSubmitJob(int type, bio_job *job) { job->header.type = type; unsigned long worker = bio_job_to_worker[type]; pthread_mutex_lock(&bio_mutex[worker]); listAddNodeTail(bio_jobs[worker],job); bio_jobs_counter[type]++; pthread_cond_signal(&bio_newjob_cond[worker]); pthread_mutex_unlock(&bio_mutex[worker]); }

创建任务(生产者)

c
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { va_list valist; /* Allocate memory for the job structure and all required * arguments */ bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count)); job->free_args.free_fn = free_fn; va_start(valist, arg_count); for (int i = 0; i < arg_count; i++) { job->free_args.free_args[i] = va_arg(valist, void *); } va_end(valist); bioSubmitJob(BIO_LAZY_FREE, job); } void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; job->fd_args.need_fsync = need_fsync; job->fd_args.need_reclaim_cache = need_reclaim_cache; bioSubmitJob(BIO_CLOSE_FILE, job); } void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; job->fd_args.offset = offset; job->fd_args.need_fsync = 1; job->fd_args.need_reclaim_cache = need_reclaim_cache; bioSubmitJob(BIO_CLOSE_AOF, job); } void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) { bio_job *job = zmalloc(sizeof(*job)); job->fd_args.fd = fd; job->fd_args.offset = offset; job->fd_args.need_reclaim_cache = need_reclaim_cache; bioSubmitJob(BIO_AOF_FSYNC, job); }

线程main函数(消费者)

就是一个循环,不断地取任务,区分任务类型

c
void *bioProcessBackgroundJobs(void *arg) { bio_job *job; unsigned long worker = (unsigned long) arg; sigset_t sigset; /* Check that the worker is within the right interval. */ serverAssert(worker < BIO_WORKER_NUM); redis_set_thread_title(bio_worker_title[worker]); redisSetCpuAffinity(server.bio_cpulist); makeThreadKillable(); pthread_mutex_lock(&bio_mutex[worker]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[worker]) == 0) { pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]); continue; } /* Get the job from the queue. */ ln = listFirst(bio_jobs[worker]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[worker]); /* Process the job accordingly to its type. */ int job_type = job->header.type; if (job_type == BIO_CLOSE_FILE) { if (job->fd_args.need_fsync && redis_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); } if (job->fd_args.need_reclaim_cache) { if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) { serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno)); } } close(job->fd_args.fd); } else if (job_type == BIO_AOF_FSYNC || job_type == BIO_CLOSE_AOF) { /* The fd may be closed by main thread and reused for another * socket, pipe, or file. We just ignore these errno because * aof fsync did not really fail. */ if (redis_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) { int last_status; atomicGet(server.aof_bio_fsync_status,last_status); atomicSet(server.aof_bio_fsync_status,C_ERR); atomicSet(server.aof_bio_fsync_errno,errno); if (last_status == C_OK) { serverLog(LL_WARNING, "Fail to fsync the AOF file: %s",strerror(errno)); } } else { atomicSet(server.aof_bio_fsync_status,C_OK); atomicSet(server.fsynced_reploff_pending, job->fd_args.offset); } if (job->fd_args.need_reclaim_cache) { if (reclaimFilePageCache(job->fd_args.fd, 0, 0) == -1) { serverLog(LL_NOTICE,"Unable to reclaim page cache: %s", strerror(errno)); } } if (job_type == BIO_CLOSE_AOF) close(job->fd_args.fd); } else if (job_type == BIO_LAZY_FREE) { job->free_args.free_fn(job->free_args.free_args); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[worker]); listDelNode(bio_jobs[worker], ln); bio_jobs_counter[job_type]--; pthread_cond_signal(&bio_newjob_cond[worker]); } }

其他函数

c
/* Return the number of pending jobs of the specified type. */ unsigned long bioPendingJobsOfType(int type) { unsigned int worker = bio_job_to_worker[type]; pthread_mutex_lock(&bio_mutex[worker]); unsigned long val = bio_jobs_counter[type]; pthread_mutex_unlock(&bio_mutex[worker]); return val; } /* Wait for the job queue of the worker for jobs of specified type to become empty. */ void bioDrainWorker(int job_type) { unsigned long worker = bio_job_to_worker[job_type]; pthread_mutex_lock(&bio_mutex[worker]); while (listLength(bio_jobs[worker]) > 0) { pthread_cond_wait(&bio_newjob_cond[worker], &bio_mutex[worker]); } pthread_mutex_unlock(&bio_mutex[worker]); } /* Kill the running bio threads in an unclean way. This function should be * used only when it's critical to stop the threads for some reason. * Currently Redis does this only on crash (for instance on SIGSEGV) in order * to perform a fast memory check without other threads messing with memory. */ void bioKillThreads(void) { int err; unsigned long j; for (j = 0; j < BIO_WORKER_NUM; j++) { if (bio_threads[j] == pthread_self()) continue; if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) { if ((err = pthread_join(bio_threads[j],NULL)) != 0) { serverLog(LL_WARNING, "Bio worker thread #%lu can not be joined: %s", j, strerror(err)); } else { serverLog(LL_WARNING, "Bio worker thread #%lu terminated",j); } } } }

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!