aports/main/nginx/nchan~fix-redis-race-condition.patch
Jakub Jirutka f3dbbda9f6 main/nginx: backport bugfixes for multiple third-party modules
The most important are bugfixes for http-shibboleth (segfault)
and http-lua (probably segfault too).
2023-05-26 14:23:20 +00:00

270 lines
11 KiB
Diff

Patch-Source: https://github.com/slact/nchan/commit/cc705a948cf6797f669b64649605f007f6f26a42
--
From cc705a948cf6797f669b64649605f007f6f26a42 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?F=C3=A1bio=20Urquiza?= <fabiorush@gmail.com>
Date: Mon, 20 Mar 2023 10:31:33 -0300
Subject: [PATCH] Fix redis connection race condition on many workers (#666)
diff --git a/src/nchan_module.c b/src/nchan_module.c
index ad6f3534..95b51521 100644
--- a/src/nchan_module.c
+++ b/src/nchan_module.c
@@ -39,7 +39,6 @@ int nchan_redis_stats_enabled = 0;
static void nchan_publisher_body_handler(ngx_http_request_t *r);
-static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r);
//#define DEBUG_LEVEL NGX_LOG_WARN
//#define DEBUG_LEVEL NGX_LOG_DEBUG
@@ -746,18 +745,6 @@ ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) {
return NGX_OK;
}
- if(cf->redis.enabled && !nchan_store_redis_ready(cf)) {
- //using redis, and it's not ready yet
- if(r->method == NGX_HTTP_POST || r->method == NGX_HTTP_PUT) {
- //discard request body before responding
- nchan_http_publisher_handler(r, nchan_publisher_unavailable_body_handler);
- }
- else {
- nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
- }
- return NGX_OK;
- }
-
if(cf->pub.websocket || cf->pub.http) {
char *err;
if(!nchan_parse_message_buffer_config(r, cf, &err)) {
@@ -841,6 +828,13 @@ ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) {
#if FAKESHARD
memstore_sub_debug_start();
#endif
+
+ if(cf->redis.enabled && ngx_process_slot == memstore_channel_owner(channel_id) && !nchan_store_redis_ready(cf)) {
+ //using redis, and it's not ready yet
+ nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
+ return NGX_OK;
+ }
+
if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) {
goto bad_msgid;
}
@@ -1249,11 +1243,6 @@ static ngx_int_t nchan_publisher_body_authorize_handler(ngx_http_request_t *r, v
return NGX_OK;
}
-static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r) {
- nchan_http_finalize_request(r, NGX_HTTP_SERVICE_UNAVAILABLE);
- return;
-}
-
static void nchan_publisher_body_handler(ngx_http_request_t *r) {
ngx_str_t *channel_id;
nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module);
diff --git a/src/store/memory/ipc-handlers.c b/src/store/memory/ipc-handlers.c
index 55df2fd3..bee7c1d9 100644
--- a/src/store/memory/ipc-handlers.c
+++ b/src/store/memory/ipc-handlers.c
@@ -47,6 +47,8 @@
L(benchmark_finish_reply) \
L(redis_stats_request) \
L(redis_stats_reply) \
+ L(redis_conn_ready) \
+ L(redis_conn_ready_reply) \
@@ -1205,6 +1207,35 @@ static void receive_redis_stats_reply(ngx_int_t sender, redis_stats_request_data
shm_free(nchan_store_memory_shmem, data->stats);
}
+////////// REDIS CONN READY DATA ////////////////
+typedef struct {
+ ngx_int_t ready;
+ nchan_loc_conf_t *cf;
+ callback_pt callback;
+ void *privdata;
+} redis_conn_ready_data_t;
+
+ngx_int_t memstore_ipc_send_redis_conn_ready(ngx_int_t dst, nchan_loc_conf_t *cf, callback_pt callback, void* privdata) {
+ DBG("send redis_conn_ready to %i", dst);
+ redis_conn_ready_data_t data;
+ DEBUG_MEMZERO(&data);
+ data.ready = 0;
+ data.cf = cf;
+ data.callback = callback;
+ data.privdata = privdata;
+
+ return ipc_cmd(redis_conn_ready, dst, &data);
+}
+
+static void receive_redis_conn_ready(ngx_int_t sender, redis_conn_ready_data_t *d) {
+ DBG("received redis_conn_ready request for privdata %p", d->privdata);
+ d->ready = nchan_store_redis_ready(d->cf);
+ ipc_cmd(redis_conn_ready_reply, sender, d);
+}
+
+static void receive_redis_conn_ready_reply(ngx_int_t sender, redis_conn_ready_data_t *d) {
+ d->callback(d->ready, NULL, d->privdata);
+}
#define MAKE_ipc_cmd_handler(val) [offsetof(ipc_handlers_t, val)/sizeof(ipc_handler_pt)] = (ipc_handler_pt )receive_ ## val,
static ipc_handler_pt ipc_cmd_handler[] = {
diff --git a/src/store/memory/ipc-handlers.h b/src/store/memory/ipc-handlers.h
index e22c4a0d..7fd9c19c 100644
--- a/src/store/memory/ipc-handlers.h
+++ b/src/store/memory/ipc-handlers.h
@@ -26,3 +26,5 @@ ngx_int_t memstore_ipc_broadcast_benchmark_finish(void);
ngx_int_t memstore_ipc_broadcast_benchmark_abort(void);
ngx_int_t memstore_ipc_broadcast_redis_stats_request(void *nodeset, callback_pt cb, void *pd);
+
+ngx_int_t memstore_ipc_send_redis_conn_ready(ngx_int_t dst, nchan_loc_conf_t *cf, callback_pt callback, void* privdata);
\ No newline at end of file
diff --git a/src/store/memory/memstore.c b/src/store/memory/memstore.c
index f8b4ee8b..e1e1c7a8 100755
--- a/src/store/memory/memstore.c
+++ b/src/store/memory/memstore.c
@@ -2079,12 +2079,13 @@ static void subscribe_data_free(subscribe_data_t *d) {
#define SUB_CHANNEL_NOTSURE 2
static ngx_int_t nchan_store_subscribe_channel_existence_check_callback(ngx_int_t channel_status, void* _, subscribe_data_t *d);
-static ngx_int_t nchan_store_subscribe_continued(ngx_int_t channel_status, void* _, subscribe_data_t *d);
+static ngx_int_t nchan_store_subscribe_stage2(ngx_int_t continue_subscription, void* _, subscribe_data_t *d);
+static ngx_int_t nchan_store_subscribe_stage3(ngx_int_t channel_status, void* _, subscribe_data_t *d);
static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) {
ngx_int_t owner = memstore_channel_owner(channel_id);
subscribe_data_t *d = subscribe_data_alloc(sub->cf->redis.enabled ? -1 : owner);
-
+
assert(d != NULL);
d->channel_owner = owner;
@@ -2095,24 +2096,51 @@ static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub)
d->channel_exists = 0;
d->group_channel_limit_pass = 0;
d->msg_id = sub->last_msgid;
-
- if(sub->cf->subscribe_only_existing_channel || sub->cf->max_channel_subscribers > 0) {
+
+ if(sub->cf->redis.enabled && memstore_slot() != owner) {
+ ngx_int_t rc;
sub->fn->reserve(sub);
d->reserved = 1;
- if(memstore_slot() != owner) {
- ngx_int_t rc;
- rc = memstore_ipc_send_channel_existence_check(owner, channel_id, sub->cf, (callback_pt )nchan_store_subscribe_channel_existence_check_callback, d);
- if(rc == NGX_DECLINED) { // out of memory
- nchan_store_subscribe_channel_existence_check_callback(SUB_CHANNEL_UNAUTHORIZED, NULL, d);
- return NGX_ERROR;
+ rc = memstore_ipc_send_redis_conn_ready(d->channel_owner, sub->cf, (callback_pt)nchan_store_subscribe_stage2, d);
+ if(rc == NGX_DECLINED) { // out of memory
+ nchan_store_subscribe_stage2(0, NULL, d);
+ return NGX_ERROR;
+ }
+ } else {
+ return nchan_store_subscribe_stage2(1, NULL, d);
+ }
+ return NGX_OK;
+}
+
+static ngx_int_t nchan_store_subscribe_stage2(ngx_int_t continue_subscription, void* _, subscribe_data_t *d) {
+ if(continue_subscription) {
+ if(d->sub->cf->subscribe_only_existing_channel || d->sub->cf->max_channel_subscribers > 0) {
+ if(!d->reserved) {
+ d->sub->fn->reserve(d->sub);
+ d->reserved = 1;
+ }
+ if(memstore_slot() != d->channel_owner) {
+ ngx_int_t rc;
+ rc = memstore_ipc_send_channel_existence_check(d->channel_owner, d->channel_id, d->sub->cf, (callback_pt )nchan_store_subscribe_channel_existence_check_callback, d);
+ if(rc == NGX_DECLINED) { // out of memory
+ nchan_store_subscribe_channel_existence_check_callback(SUB_CHANNEL_UNAUTHORIZED, NULL, d);
+ return NGX_ERROR;
+ }
+ }
+ else {
+ return nchan_store_subscribe_stage3(SUB_CHANNEL_NOTSURE, NULL, d);
}
}
else {
- return nchan_store_subscribe_continued(SUB_CHANNEL_NOTSURE, NULL, d);
+ return nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d);
}
- }
- else {
- return nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d);
+ } else {
+ //using redis, and it's not ready yet
+ if(d->sub->fn->release(d->sub, 0) == NGX_OK) {
+ d->reserved = 0;
+ nchan_respond_status(d->sub->request, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0);
+ }
+ subscribe_data_free(d);
}
return NGX_OK;
}
@@ -2120,7 +2148,7 @@ static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub)
static ngx_int_t nchan_store_subscribe_channel_existence_check_callback(ngx_int_t channel_status, void* _, subscribe_data_t *d) {
if(d->sub->fn->release(d->sub, 0) == NGX_OK) {
d->reserved = 0;
- return nchan_store_subscribe_continued(channel_status, _, d);
+ return nchan_store_subscribe_stage3(channel_status, _, d);
}
else {//don't go any further, the sub has been deleted
subscribe_data_free(d);
@@ -2143,7 +2171,7 @@ static ngx_int_t redis_subscribe_channel_existence_callback(ngx_int_t status, vo
/*
else if (cf->max_channel_subscribers > 0) {
// don't check this anymore -- a total subscribers count check is less
- // useful as a per-instance check, which is handled in nchan_store_subscribe_continued
+ // useful as a per-instance check, which is handled in nchan_store_subscribe_stage3
// shared total subscriber count check can be re-enabled with another config setting
channel_status = channel->subscribers >= cf->max_channel_subscribers ? SUB_CHANNEL_UNAUTHORIZED : SUB_CHANNEL_AUTHORIZED;
}
@@ -2152,7 +2180,7 @@ static ngx_int_t redis_subscribe_channel_existence_callback(ngx_int_t status, vo
channel_status = SUB_CHANNEL_AUTHORIZED;
}
- nchan_store_subscribe_continued(channel_status, NULL, data);
+ nchan_store_subscribe_stage3(channel_status, NULL, data);
}
else {
//error!!
@@ -2196,7 +2224,7 @@ static ngx_int_t group_subscribe_channel_limit_reached(ngx_int_t rc, nchan_chann
if(d->sub->status != DEAD) {
if(chaninfo) {
//ok, channel already exists.
- nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d);
+ nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d);
}
else {
//nope. no channel, no subscribing.
@@ -2219,14 +2247,14 @@ static ngx_int_t group_subscribe_channel_limit_check(ngx_int_t _, nchan_group_t
if(shm_group) {
if(!shm_group->limit.channels || (shm_group->channels < shm_group->limit.channels)) {
d->group_channel_limit_pass = 1;
- rc = nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d);
+ rc = nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d);
}
else if (shm_group->limit.channels && shm_group->channels == shm_group->limit.channels){
//no new channels!
rc = nchan_store_find_channel(d->channel_id, d->sub->cf, (callback_pt )group_subscribe_channel_limit_reached, d);
}
else {
- rc = nchan_store_subscribe_continued(SUB_CHANNEL_UNAUTHORIZED, NULL, d);
+ rc = nchan_store_subscribe_stage3(SUB_CHANNEL_UNAUTHORIZED, NULL, d);
}
}
@@ -2246,7 +2274,7 @@ static ngx_int_t group_subscribe_channel_limit_check(ngx_int_t _, nchan_group_t
return rc;
}
-static ngx_int_t nchan_store_subscribe_continued(ngx_int_t channel_status, void* _, subscribe_data_t *d) {
+static ngx_int_t nchan_store_subscribe_stage3(ngx_int_t channel_status, void* _, subscribe_data_t *d) {
memstore_channel_head_t *chanhead = NULL;
int retry_null_chanhead = 1;
//store_message_t *chmsg;