initial multiprocess implementation

This commit is contained in:
Jethro Grassie 2020-09-04 01:48:57 -04:00
parent 48bcb418b0
commit 88b724799a
No known key found for this signature in database
GPG key ID: DE8ED755616565BB
2 changed files with 56 additions and 6 deletions

View file

@ -26,6 +26,7 @@ disable-payouts = 0
data-dir = ./data data-dir = ./data
pid-file = pid-file =
forked = 0 forked = 0
processes = 1
# trusted-listen = 127.0.0.1 # trusted-listen = 127.0.0.1
# trusted-port = 4244 # trusted-port = 4244
# trusted-allowed = 127.0.0.1,127.0.0.2 # trusted-allowed = 127.0.0.1,127.0.0.2

View file

@ -34,6 +34,7 @@ developers.
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h> #include <fcntl.h>
#include <event2/event.h> #include <event2/event.h>
@ -182,6 +183,7 @@ typedef struct config_t
char upstream_host[MAX_HOST]; char upstream_host[MAX_HOST];
uint16_t upstream_port; uint16_t upstream_port;
char pool_view_key[64]; char pool_view_key[64];
int processes;
} config_t; } config_t;
typedef struct block_template_t typedef struct block_template_t
@ -3828,6 +3830,12 @@ read_config(const char *config_file)
{ {
config.forked = atoi(val); config.forked = atoi(val);
} }
else if (strcmp(key, "processes") == 0)
{
config.processes = atoi(val);
if (config.processes < -1)
config.processes = -1;
}
else if (strcmp(key, "trusted-listen") == 0) else if (strcmp(key, "trusted-listen") == 0)
{ {
strncpy(config.trusted_listen, val, strncpy(config.trusted_listen, val,
@ -3966,6 +3974,7 @@ static void print_config()
" data-dir = %s\n" " data-dir = %s\n"
" pid-file = %s\n" " pid-file = %s\n"
" forked = %u\n" " forked = %u\n"
" processes = %d\n"
" trusted-listen = %s\n" " trusted-listen = %s\n"
" trusted-port = %u\n" " trusted-port = %u\n"
" trusted-allowed = %s\n" " trusted-allowed = %s\n"
@ -3999,6 +4008,7 @@ static void print_config()
config.data_dir, config.data_dir,
config.pid_file, config.pid_file,
config.forked, config.forked,
config.processes,
config.trusted_listen, config.trusted_listen,
config.trusted_port, config.trusted_port,
display_allowed, display_allowed,
@ -4242,7 +4252,8 @@ print_help(struct option *opts)
{ {
printf("-%c, --%s %s\n", opts->val, opts->name, printf("-%c, --%s %s\n", opts->val, opts->name,
opts->has_arg==required_argument ? opts->has_arg==required_argument ?
strstr(opts->name,"file") ? "<file>" : "<dir>" : strstr(opts->name,"file") ? "<file>" :
strstr(opts->name,"processes") ? "[-1|N]" : "<dir>" :
opts->has_arg==optional_argument ? "[0|1]" : "" ); opts->has_arg==optional_argument ? "[0|1]" : "" );
} }
} }
@ -4257,6 +4268,7 @@ int main(int argc, char **argv)
{"data-dir", required_argument, 0, 'd'}, {"data-dir", required_argument, 0, 'd'},
{"pid-file", required_argument, 0, 'p'}, {"pid-file", required_argument, 0, 'p'},
{"forked", optional_argument, 0, 'f'}, {"forked", optional_argument, 0, 'f'},
{"processes", required_argument, 0, 'm'},
{"help", no_argument, 0, 'h'}, {"help", no_argument, 0, 'h'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
@ -4266,11 +4278,12 @@ int main(int argc, char **argv)
char *data_dir = NULL; char *data_dir = NULL;
char *pid_file = NULL; char *pid_file = NULL;
int forked = -1; int forked = -1;
int processes = 1;
int c; int c;
while (1) while (1)
{ {
int option_index = 0; int option_index = 0;
c = getopt_long (argc, argv, "c:l:b::d:p:f::h", c = getopt_long (argc, argv, "c:l:b::d:p:f::m:h",
options, &option_index); options, &option_index);
if (c == -1) if (c == -1)
break; break;
@ -4306,6 +4319,11 @@ int main(int argc, char **argv)
else else
forked = optarg ? atoi(optarg) : 1; forked = optarg ? atoi(optarg) : 1;
break; break;
case 'm':
processes = atoi(optarg);
if (processes < -1)
processes = -1;
break;
case 'h': case 'h':
default: default:
print_help(options); print_help(options);
@ -4313,10 +4331,8 @@ int main(int argc, char **argv)
break; break;
} }
} }
setvbuf(stdout, NULL, _IONBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
log_set_level(LOG_INFO); log_set_level(LOG_INFO);
log_set_lock(log_lock);
log_set_udata(&mutex_log);
read_config(config_file); read_config(config_file);
if (config_file) if (config_file)
@ -4340,6 +4356,8 @@ int main(int argc, char **argv)
} }
if (forked > -1) if (forked > -1)
config.forked = forked; config.forked = forked;
if (processes != 1)
config.processes = processes;
if (block_notified > -1) if (block_notified > -1)
config.block_notified = block_notified; config.block_notified = block_notified;
@ -4350,7 +4368,10 @@ int main(int argc, char **argv)
if (!fd_log) if (!fd_log)
log_warn("Failed to open log file: %s", config.log_file); log_warn("Failed to open log file: %s", config.log_file);
else else
{
setvbuf(fd_log, NULL, _IOLBF, 0);
log_set_fp(fd_log); log_set_fp(fd_log);
}
} }
print_config(); print_config();
@ -4365,8 +4386,36 @@ int main(int argc, char **argv)
forkoff(pf); forkoff(pf);
} }
int evthread_use_pthreads(void); if (config.processes < 0 || config.processes > 1)
{
int nproc = sysconf(_SC_NPROCESSORS_ONLN);
if (config.processes > nproc)
{
log_warn("Requested more processes than available cores (%d)",
nproc);
config.processes = -1;
}
nproc = config.processes < 0 ? nproc : config.processes;
log_info("Launching processes: %d", nproc);
int pid = 0;
while(nproc--)
{
pid = fork();
if (pid < 1)
break;
if (pid > 0)
continue;
}
if (pid > 0)
{
while (waitpid(-1, 0, 0) > 0)
{}
_exit(0);
}
}
log_set_udata(&mutex_log);
log_set_lock(log_lock);
signal(SIGINT, sigint_handler); signal(SIGINT, sigint_handler);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
atexit(cleanup); atexit(cleanup);