编辑
2023-11-16
Redis源码阅读
00
请注意,本文编写于 540 天前,最后修改于 540 天前,其中某些信息可能已经过时。

目录

线程运行函数
生产者

一切还是要从这里考试说起

c
void InitServerLast(void) { bioInit(); initThreadedIO(); set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); }

进入这个函数

c
/* ========================================================================== * Threaded I/O * ========================================================================== */ #define IO_THREADS_MAX_NUM 128 //最多128个线程 #ifndef CACHE_LINE_SIZE #if defined(__aarch64__) && defined(__APPLE__) #define CACHE_LINE_SIZE 128 #else #define CACHE_LINE_SIZE 64 #endif #endif typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending { redisAtomic unsigned long value; } threads_pending; pthread_t io_threads[IO_THREADS_MAX_NUM]; pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; threads_pending io_threads_pending[IO_THREADS_MAX_NUM]; //等待处理的客户端个数 int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??! //************************************************************************** /* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ /* Indicate that io-threads are currently idle */ io_threads_op = IO_THREADS_OP_IDLE; /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; 默认就一个IO线程 if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); setIOPendingCount(i, 0); pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }
  • 每个线程一个任务队列,每个线程一把锁

线程运行函数

c
void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.io_threads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ long id = (unsigned long)myid; char thdname[16]; snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); redisSetCpuAffinity(server.server_cpulist); makeThreadKillable();//设置线程为随时可取消状态 // 开始取任务执行 while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } /* Give the main thread a chance to stop this thread. */ if (getIOPendingCount(id) == 0) { pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; } serverAssert(getIOPendingCount(id) != 0); /* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); setIOPendingCount(id, 0); } }

这句是一个简单的线程池结构,上面基本就是消费者逻辑,接下来讲一下生产者逻辑

c
list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */

生产者

推迟客户端读操作

c
void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); int nread, big_arg = 0; size_t qblen, readlen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; /* Update total number of reads on server */ atomicIncr(server.stat_total_reads_processed, 1);

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!