共享記憶體實現多程序低延遲佇列 10us

2021-10-07 16:53:02 字數 4262 閱讀 8414

之前的部落格寫過通過inotify 加檔案的形式來實現多程序佇列的文章。這種方式在通常情況下表現不錯,但是這裡存在乙個問題就是當消費者過慢,會產生大量的擊穿核心高速緩衝區io,導致消費者卡在讀取資料的瓶頸上,無法使用負載均衡等手段來提高處理能力。

為了解決上述問題,引入了共享記憶體,眾所周知,這是所有ipc中最快的通訊方式,從根本上解決這個問題。下面通過實現乙個producer 和 consumer 程式,來展示我的設計思路。

由於物理記憶體有限,生產者會使用乙個環形緩衝區來保證熱點資料始終在記憶體中。同時為了保證消費者的接入配置最小化,生產者將配置通過乙個固定大小的結構體對映到記憶體中,消費者首先對映結構體讀取配置資訊,從結構體中的得知緩衝區大小後執行mremap進行重新調整大小,這樣消費者只需要知道共享記憶體的位址(乙個檔名),就可以實現消費。同時採用了訊息計數,來標識消費者是否已經處理所有訊息,觸發等待。當消費者在等待新資料時,喚醒消費者我們選擇了通過向指定檔案寫入乙個位元組的內容觸發inotify,雖然通過訊號量也可以實現,但是使用訊號量會導致生產者要多開乙個執行緒實現管理,引入額外的複雜度。

#include #include #include #include #include #include #include #include #include #include #include #include define_int64(shm_size, 6, "shm_size m");

define_string(inotify_file, "/tmp/writer.txt", "inotify file path");

define_string(shm_file, "/test", "shm file path");

define_string(shm_key, "", "shm key");

class producer

bool init(const std::string &key)

// 開啟共享記憶體

shm_fd_ = shm_open(shm_path_.c_str(), o_creat | o_rdwr | o_trunc, 0777);

if (shm_fd_ < 0)

uint64_t size = shm_size_ + sizeof(shm_data);

if (ftruncate(shm_fd_, size) == -1)

shm_data_ = (shm_data *)mmap(null, size, prot_read | prot_write, map_shared, shm_fd_, 0);

if (shm_data_ == map_failed)

shm_data_->total = 0;

shm_data_->size = shm_size_;

memcpy(shm_data_->inotify_name, inotify_path_.c_str(), inotify_path_.size());

memcpy(shm_data_->key, key.c_str(), key.size());

return true;

}void write(const char *line)

shm_data_->buffer[current_offset_++] = line[i];

}if (current_offset_ >= shm_size_)

shm_data_->buffer[current_offset_++] = '\0';

shm_data_->total++;

write(fd_, "8", 1);

private:

struct shm_data

;shm_data *shm_data_ = nullptr; // 共享記憶體

int fd_;

int shm_fd_;

uint64_t shm_size_ = 0;

uint64_t buffer_size_ = 0;

uint64_t total_read = 0;

uint64_t current_offset_ = 0;

std::string inotify_path_;

std::string shm_path_;

};int main(int argc, char *ar**)

消費者實現就相對簡單一些,讀取配置結構體,執行mremap調整大小, 如果機器效能足夠,可以選擇不等待inotify,類似自旋鎖的方式。這種方式測試發現新訊息能在10us左右被消費者感知,使用inoitfy新訊息感知需要40us左右。

#include #include #include #include #include #include #include #include #include #include #include define_string(shm_file, "/test", "shm file path");

define_bool(shm_nowait, false, "shm no wait mode");

#define event_size (sizeof(struct inotify_event))

#define buf_len (10 * (event_size + filename_max + 1))

class consumer

~tail()

bool init(const std::string& key)

shm_data *shm_info_ = (shm_data *)mmap(null, sizeof(shm_data), prot_read | prot_write, map_shared, shm_fd_, 0);

if (shm_info_ == map_failed)

printf("info size=%ld inotify_name=%s,key=%s\n", shm_info_->size, shm_info_->inotify_name, shm_info_->key);

if (strcasecmp(shm_info_->key, key.c_str()) != 0)

// 開始監聽檔案變化

inotify_fd_ = inotify_init();

if (inotify_fd_ < 0)

shm_size_ = shm_info_->size;

uint64_t real_size = shm_info_->size + sizeof(shm_data);

inotify_add_watch(inotify_fd_, shm_info_->inotify_name, in_modify | in_create | in_delete);

shm_data_ = (shm_data *)mremap(shm_info_, sizeof(shm_data), real_size, mremap_maymove);

if (shm_data_ == map_failed)

return true;

}void loop()

line_[i] = shm_data_->buffer[current_offset_++];

if (line_[i] == '\0')

}total_read++;

printf("current_offset=%d, total=%d, read=%d, %s", current_offset_, shm_data_->total, total_read, line_);

}if (!flags_shm_nowait)}}

private:

struct shm_data

;shm_data *shm_data_ = nullptr; // 共享物件指標

uint64_t shm_size_ = 0; // 共享記憶體大小

uint64_t line_size_ = 0; // 每條資料最大值

uint64_t total_read = 0; // 當前讀取總記錄數

uint64_t current_offset_ = 0; // 當前讀取的偏移量

std::string shm_path_;

int inotify_fd_;

int shm_fd_;

int tag_;

char *line_;

char *inotify_buffer_;

};define_string(shm_key, "", "shm key");

int main(int argc, char *ar**)

consumer.loop();

}

多程序通訊(IPC) 共享記憶體

1 共享記憶體介紹 共享記憶體可以說是最有用的程序間通訊方式,也是最快的ipc形式。兩個不同程序a b共享記憶體的意思是,同一塊物理記憶體被對映到程序a b各自的程序位址空間。程序a可以即時看到程序b對共享記憶體中資料的更新,反之亦然。由於多個程序共享同一塊記憶體區域,必然需要某種同步機制,互斥鎖和...

多程序鎖和共享記憶體

當我們用多程序來讀寫檔案的時候,如果乙個程序是寫檔案,乙個程序是讀檔案,如果兩個檔案同時進行,肯定是不行的,必須是檔案寫結束以後,才可以進行讀操作。或者是多個程序在共享一些資源的時候,同時只能有乙個程序進行訪問,那就要有乙個鎖機制進行控制。需求 乙個程序寫入乙個檔案,乙個程序追加檔案,乙個程序讀檔案...

linuxC多程序通訊 POSIX共享記憶體

簡單的共享記憶體讀寫示例 訊號量和共享記憶體 使用示例 函式功能 改變檔案大小 相關函式 open truncate 表頭檔案 include 函式原型 int ftruncate int fd,off t length 函式說明 ftruncate 會將引數fd指定的檔案大小改為引數length指...