一切还是要从这里考试说起
cvoid InitServerLast(void) {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
进入这个函数
c/* ==========================================================================
* Threaded I/O
* ========================================================================== */
#define IO_THREADS_MAX_NUM 128 //最多128个线程
#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
#define CACHE_LINE_SIZE 128
#else
#define CACHE_LINE_SIZE 64
#endif
#endif
typedef struct __attribute__((aligned(CACHE_LINE_SIZE))) threads_pending {
redisAtomic unsigned long value;
} threads_pending;
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
threads_pending io_threads_pending[IO_THREADS_MAX_NUM]; //等待处理的客户端个数
int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
//**************************************************************************
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
/* Indicate that io-threads are currently idle */
io_threads_op = IO_THREADS_OP_IDLE;
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return; 默认就一个IO线程
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
cvoid *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.io_threads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();//设置线程为随时可取消状态
// 开始取任务执行
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
这句是一个简单的线程池结构,上面基本就是消费者逻辑,接下来讲一下生产者逻辑
c list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
推迟客户端读操作
cvoid readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!