commit 99c0965ffe737f1ccb4f8f10d584cbc63cfa8cdf Author: Thomas Cataldo Date: Mon Nov 18 18:55:54 2019 +0100 [replication] FEATBL-971 Feat: shard cyrus replication using a hashcode on mailbox names diff --git a/imap/cyr_synclog.c b/imap/cyr_synclog.c index c1e151cbf..b0e50e36d 100644 --- a/imap/cyr_synclog.c +++ b/imap/cyr_synclog.c @@ -82,6 +82,7 @@ int main(int argc, char *argv[]) char *alt_config = NULL; char cmd = '\0'; int opt; + int shard = "dummy"; if ((geteuid()) == 0 && (become_cyrus(/*is_master*/0) != 0)) { fatal("must run as the Cyrus user", EC_USAGE); @@ -173,7 +174,7 @@ int main(int argc, char *argv[]) break; default: /* just as is! */ - sync_log(argv[optind]); + sync_log("dummy", argv[optind]); break; } diff --git a/imap/sync_client.c b/imap/sync_client.c index 882928026..94425c569 100644 --- a/imap/sync_client.c +++ b/imap/sync_client.c @@ -584,7 +584,7 @@ enum { RESTART_RECONNECT }; -static int do_daemon_work(const char *channel, const char *sync_shutdown_file, +static int do_daemon_work(const char *channel, int shard, const char *sync_shutdown_file, unsigned long timeout, unsigned long min_delta, int *restartp) { @@ -596,7 +596,7 @@ static int do_daemon_work(const char *channel, const char *sync_shutdown_file, sync_log_reader_t *slr; *restartp = RESTART_NONE; - slr = sync_log_reader_create_with_channel(channel); + slr = sync_log_reader_create_with_channel(channel, shard); session_start = time(NULL); @@ -776,7 +776,7 @@ static void replica_disconnect(void) backend_disconnect(sync_backend); } -static void do_daemon(const char *channel, const char *sync_shutdown_file, +static void do_daemon(const char *channel, int shard, const char *sync_shutdown_file, unsigned long timeout, unsigned long min_delta) { int r = 0; @@ -786,7 +786,7 @@ static void do_daemon(const char *channel, const char *sync_shutdown_file, while (restart) { replica_connect(channel); - r = do_daemon_work(channel, sync_shutdown_file, + r = do_daemon_work(channel, shard, sync_shutdown_file, timeout, min_delta, &restart); if (r) { /* See if we're still connected to the server. @@ -883,6 +883,7 @@ int main(int argc, char **argv) int mode = MODE_UNKNOWN; int wait = 0; int timeout = 600; + int shard = 0; int min_delta = 0; const char *channel = NULL; const char *sync_shutdown_file = NULL; @@ -898,7 +899,7 @@ int main(int argc, char **argv) setbuf(stdout, NULL); - while ((opt = getopt(argc, argv, "C:vlLS:F:f:w:t:d:n:rRumsozOAp:")) != EOF) { + while ((opt = getopt(argc, argv, "C:vlLS:F:f:w:t:i:d:n:rRumsozOAp:")) != EOF) { switch (opt) { case 'C': /* alt config file */ alt_config = optarg; @@ -941,6 +942,10 @@ int main(int argc, char **argv) wait = atoi(optarg); break; + case 'i': /* shard index */ + shard = atoi(optarg); + break; + case 't': timeout = atoi(optarg); break; @@ -1201,7 +1206,8 @@ int main(int argc, char **argv) if (!min_delta) min_delta = sync_get_intconfig(channel, "sync_repeat_interval"); - do_daemon(channel, sync_shutdown_file, timeout, min_delta); + syslog(LOG_INFO, "Running in daemon mode for channel %s shard-index %d", channel, shard); + do_daemon(channel, shard, sync_shutdown_file, timeout, min_delta); } break; diff --git a/imap/sync_log.c b/imap/sync_log.c index 4e4764be1..166044ccc 100644 --- a/imap/sync_log.c +++ b/imap/sync_log.c @@ -75,17 +75,21 @@ static int sync_log_suppressed = 0; static strarray_t *channels = NULL; static strarray_t *unsuppressable = NULL; +static int shards = 4; + EXPORTED void sync_log_init(void) { const char *conf; int i; - + int shards; /* sync_log_init() may be called more than once */ - if (channels) strarray_free(channels); + if (channels) + strarray_free(channels); conf = config_getstring(IMAPOPT_SYNC_LOG_CHANNELS); - if (!conf) conf = "\"\""; + if (!conf) + conf = "\"\""; channels = strarray_split(conf, " ", 0); /* * The sysadmin can specify "" in the value of sync_log_channels to @@ -102,6 +106,8 @@ EXPORTED void sync_log_init(void) conf = config_getstring(IMAPOPT_SYNC_LOG_UNSUPPRESSABLE_CHANNELS); if (conf) unsuppressable = strarray_split(conf, " ", 0); + + shards = config_getint(IMAPOPT_SYNC_LOG_SHARDS); } EXPORTED void sync_log_suppress(void) @@ -118,16 +124,66 @@ EXPORTED void sync_log_done(void) unsuppressable = NULL; } -static char *sync_log_fname(const char *channel) +static int hashcode(const char *shard) +{ + int len = strlen(shard); + int hash = 0; + int i; + for (i = 0; i < len; i++) + { + hash = 31 * hash + shard[i]; + } + return hash; +} + +static char *normalize_shard(const char *shard) +{ + int suffix_cut = 0; + int total_len = strlen(shard); + char *mark = strrchr(shard, '!'); + if (mark) + { + if (strncmp("user.", mark + 1, 5) == 0) + { + // prefixed by user. + char *sec_dot = strchr(mark + 6, '.'); + if (sec_dot != NULL) + { + suffix_cut = strlen(sec_dot); + } + } + else + { + // not prefixed by user. aka mailshare + char *first_dot = strchr(mark + 1, '.'); + if (first_dot) + { + suffix_cut = strlen(first_dot); + } + } + } + return strndup(shard, total_len - suffix_cut); +} + +static int shard_index(const char* shard) { + char* norm = normalize_shard(shard); + int ret = abs(hashcode(norm) % shards); + free(norm); + return ret; +} + + + +static char *sync_log_fname(const char *channel, int shard) { static char buf[MAX_MAILBOX_PATH]; if (channel) snprintf(buf, MAX_MAILBOX_PATH, - "%s/sync/%s/log", config_dir, channel); + "%s/sync/%s/log.%d", config_dir, channel, shard); else snprintf(buf, MAX_MAILBOX_PATH, - "%s/sync/log", config_dir); + "%s/sync/log.%d", config_dir, shard); return buf; } @@ -135,37 +191,42 @@ static char *sync_log_fname(const char *channel) static int sync_log_enabled(const char *channel) { if (!config_getswitch(IMAPOPT_SYNC_LOG)) - return 0; /* entire mechanism is disabled */ + return 0; /* entire mechanism is disabled */ if (!sync_log_suppressed) - return 1; /* _suppress() wasn't called */ + return 1; /* _suppress() wasn't called */ if (unsuppressable && strarray_find(unsuppressable, channel, 0) >= 0) - return 1; /* channel is unsuppressable */ - return 0; /* suppressed */ + return 1; /* channel is unsuppressable */ + return 0; /* suppressed */ } -static void sync_log_base(const char *channel, const char *string) +static void sync_log_base(const char *channel, int shard, const char *string) { int fd; struct stat sbuffile, sbuffd; int retries = 0; const char *fname; - fname = sync_log_fname(channel); + fname = sync_log_fname(channel, shard); - while (retries++ < SYNC_LOG_RETRIES) { - fd = open(fname, O_WRONLY|O_APPEND|O_CREAT, 0640); - if (fd < 0 && errno == ENOENT) { - if (!cyrus_mkdir(fname, 0755)) { - fd = open(fname, O_WRONLY|O_APPEND|O_CREAT, 0640); + while (retries++ < SYNC_LOG_RETRIES) + { + fd = open(fname, O_WRONLY | O_APPEND | O_CREAT, 0640); + if (fd < 0 && errno == ENOENT) + { + if (!cyrus_mkdir(fname, 0755)) + { + fd = open(fname, O_WRONLY | O_APPEND | O_CREAT, 0640); } } - if (fd < 0) { + if (fd < 0) + { syslog(LOG_ERR, "sync_log(): Unable to write to log file %s: %s", fname, strerror(errno)); return; } - if (lock_blocking(fd, fname) == -1) { + if (lock_blocking(fd, fname) == -1) + { syslog(LOG_ERR, "sync_log(): Failed to lock %s for %s: %m", fname, string); xclose(fd); @@ -181,7 +242,8 @@ static void sync_log_base(const char *channel, const char *string) lock_unlock(fd, fname); xclose(fd); } - if (retries >= SYNC_LOG_RETRIES) { + if (retries >= SYNC_LOG_RETRIES) + { xclose(fd); syslog(LOG_ERR, "sync_log(): Failed to lock %s for %s after %d attempts", @@ -200,7 +262,7 @@ static void sync_log_base(const char *channel, const char *string) static const char *sync_quote_name(const char *name) { - static char buf[MAX_MAILBOX_BUFFER+3]; /* "x2 plus \0 */ + static char buf[MAX_MAILBOX_BUFFER + 3]; /* "x2 plus \0 */ char c; int src; int dst = 0; @@ -210,24 +272,28 @@ static const char *sync_quote_name(const char *name) buf[dst++] = '"'; /* degenerate case - no name is the empty string, quote it */ - if (!name || !*name) { + if (!name || !*name) + { need_quote = 1; goto end; } - for (src = 0; name[src]; src++) { + for (src = 0; name[src]; src++) + { c = name[src]; if ((c == '\r') || (c == '\n')) fatal("Illegal line break in folder name", EC_IOERR); /* quoteable characters */ - if ((c == '\\') || (c == '\"') || (c == '{') || (c == '}')) { + if ((c == '\\') || (c == '\"') || (c == '{') || (c == '}')) + { need_quote = 1; buf[dst++] = '\\'; } /* non-atom characters */ - else if ((c == ' ') || (c == '\t') || (c == '(') || (c == ')')) { + else if ((c == ' ') || (c == '\t') || (c == '(') || (c == ')')) + { need_quote = 1; } @@ -238,12 +304,14 @@ static const char *sync_quote_name(const char *name) } end: - if (need_quote) { + if (need_quote) + { buf[dst++] = '\"'; buf[dst] = '\0'; return buf; } - else { + else + { buf[dst] = '\0'; return buf + 1; /* skip initial quote */ } @@ -253,26 +321,29 @@ end: static char *va_format(const char *fmt, va_list ap) { - static char buf[BUFSIZE+1]; + static char buf[BUFSIZE + 1]; size_t len; int ival; const char *sval; const char *p; - for (len = 0, p = fmt; *p && len < BUFSIZE; p++) { - if (*p != '%') { + for (len = 0, p = fmt; *p && len < BUFSIZE; p++) + { + if (*p != '%') + { buf[len++] = *p; continue; } - switch (*++p) { + switch (*++p) + { case 'd': ival = va_arg(ap, int); - len += snprintf(buf+len, BUFSIZE-len, "%d", ival); + len += snprintf(buf + len, BUFSIZE - len, "%d", ival); break; case 's': sval = va_arg(ap, const char *); sval = sync_quote_name(sval); - strlcpy(buf+len, sval, BUFSIZE-len); + strlcpy(buf + len, sval, BUFSIZE - len); len += strlen(sval); break; default: @@ -281,32 +352,35 @@ static char *va_format(const char *fmt, va_list ap) } } - if (buf[len-1] != '\n') buf[len++] = '\n'; + if (buf[len - 1] != '\n') + buf[len++] = '\n'; buf[len] = '\0'; return buf; } -EXPORTED void sync_log(const char *fmt, ...) +EXPORTED void sync_log(const char *shard, const char *fmt, ...) { va_list ap; const char *val; int i; - if (!channels) return; + if (!channels) + return; va_start(ap, fmt); val = va_format(fmt, ap); va_end(ap); - for (i = 0 ; i < channels->count ; i++) { + for (i = 0; i < channels->count; i++) + { const char *channel = channels->data[i]; if (sync_log_enabled(channel)) - sync_log_base(channel, val); + sync_log_base(channel, shard_index(shard), val); } } -EXPORTED void sync_log_channel(const char *channel, const char *fmt, ...) +EXPORTED void sync_log_channel(const char *shard, const char *channel, const char *fmt, ...) { va_list ap; const char *val; @@ -315,7 +389,7 @@ EXPORTED void sync_log_channel(const char *channel, const char *fmt, ...) val = va_format(fmt, ap); va_end(ap); - sync_log_base(channel, val); + sync_log_base(channel, shard_index(shard), val); } /* @@ -368,16 +442,16 @@ static sync_log_reader_t *sync_log_reader_alloc(void) * Returns a new object which must be freed with sync_log_reader_free(). * Does not return NULL. */ -EXPORTED sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel) +EXPORTED sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel, int shard) { sync_log_reader_t *slr = sync_log_reader_alloc(); struct buf buf = BUF_INITIALIZER; - slr->log_file = xstrdup(sync_log_fname(channel)); + slr->log_file = xstrdup(sync_log_fname(channel, shard)); /* Create a work log filename. We will process this * first if it exists */ - buf_printf(&buf, "%s-run", slr->log_file); + buf_printf(&buf, "%s-run.%d", slr->log_file, shard); slr->work_file = buf_release(&buf); return slr; @@ -416,9 +490,12 @@ EXPORTED sync_log_reader_t *sync_log_reader_create_with_fd(int fd) */ EXPORTED void sync_log_reader_free(sync_log_reader_t *slr) { - if (!slr) return; - if (slr->input) prot_free(slr->input); - if (slr->fd_is_ours && slr->fd >= 0) close(slr->fd); + if (!slr) + return; + if (slr->input) + prot_free(slr->input); + if (slr->fd_is_ours && slr->fd >= 0) + close(slr->fd); free(slr->log_file); free(slr->work_file); buf_free(&slr->type); @@ -446,47 +523,57 @@ EXPORTED int sync_log_reader_begin(sync_log_reader_t *slr) struct stat sbuf; int r; - if (slr->input) { + if (slr->input) + { r = sync_log_reader_end(slr); - if (r) return r; + if (r) + return r; } - if (stat(slr->work_file, &sbuf) == 0) { + if (stat(slr->work_file, &sbuf) == 0) + { /* Existing work log file - process this first */ syslog(LOG_NOTICE, "Reprocessing sync log file %s", slr->work_file); } - else if (!slr->log_file) { + else if (!slr->log_file) + { syslog(LOG_ERR, "Failed to stat %s: %m", slr->log_file); return IMAP_IOERROR; } - else { + else + { /* Check for sync_log file */ - if (stat(slr->log_file, &sbuf) < 0) { + if (stat(slr->log_file, &sbuf) < 0) + { if (errno == ENOENT) - return IMAP_AGAIN; /* no problem, try again later */ + return IMAP_AGAIN; /* no problem, try again later */ syslog(LOG_ERR, "Failed to stat %s: %m", slr->log_file); return IMAP_IOERROR; } /* Move sync_log to our work file */ - if (rename(slr->log_file, slr->work_file) < 0) { + if (rename(slr->log_file, slr->work_file) < 0) + { syslog(LOG_ERR, "Rename %s -> %s failed: %m", slr->log_file, slr->work_file); return IMAP_IOERROR; } } - if (slr->fd < 0) { + if (slr->fd < 0) + { int fd = open(slr->work_file, O_RDWR, 0); - if (fd < 0) { + if (fd < 0) + { syslog(LOG_ERR, "Failed to open %s: %m", slr->work_file); return IMAP_IOERROR; } - if (lock_blocking(fd, slr->work_file) < 0) { + if (lock_blocking(fd, slr->work_file) < 0) + { syslog(LOG_ERR, "Failed to lock %s: %m", slr->work_file); close(fd); return IMAP_IOERROR; @@ -502,7 +589,7 @@ EXPORTED int sync_log_reader_begin(sync_log_reader_t *slr) lock_unlock(slr->fd, slr->work_file); } - slr->input = prot_new(slr->fd, /*write*/0); + slr->input = prot_new(slr->fd, /*write*/ 0); return 0; } @@ -522,23 +609,27 @@ EXPORTED int sync_log_reader_end(sync_log_reader_t *slr) if (!slr->input) return 0; - if (slr->input) { + if (slr->input) + { prot_free(slr->input); slr->input = NULL; } - if (slr->fd_is_ours && slr->fd >= 0) { + if (slr->fd_is_ours && slr->fd >= 0) + { lock_unlock(slr->fd, slr->work_file); close(slr->fd); slr->fd = -1; } - if (slr->log_file) { + if (slr->log_file) + { /* We were initialised with a sync log channel, whose * log file we rename()d to the work file. Now that * we've done with the work file we can unlink it. * Further checks at this point are just paranoia. */ - if (slr->work_file && unlink(slr->work_file) < 0) { + if (slr->work_file && unlink(slr->work_file) < 0) + { syslog(LOG_ERR, "Unlink %s failed: %m", slr->work_file); return IMAP_IOERROR; } @@ -566,32 +657,40 @@ EXPORTED int sync_log_reader_getitem(sync_log_reader_t *slr, if (!slr->input) return EOF; - for (;;) { + for (;;) + { if ((c = getword(slr->input, &slr->type)) == EOF) return EOF; /* Ignore blank lines */ - if (c == '\r') c = prot_getc(slr->input); + if (c == '\r') + c = prot_getc(slr->input); if (c == '\n') continue; - if (c != ' ') { + if (c != ' ') + { syslog(LOG_ERR, "Invalid input"); eatline(slr->input, c); continue; } - if ((c = getastring(slr->input, 0, &slr->arg1)) == EOF) return EOF; + if ((c = getastring(slr->input, 0, &slr->arg1)) == EOF) + return EOF; arg1s = slr->arg1.s; arg2s = NULL; - if (c == ' ') { - if ((c = getastring(slr->input, 0, &slr->arg2)) == EOF) return EOF; + if (c == ' ') + { + if ((c = getastring(slr->input, 0, &slr->arg2)) == EOF) + return EOF; arg2s = slr->arg2.s; } - if (c == '\r') c = prot_getc(slr->input); - if (c != '\n') { + if (c == '\r') + c = prot_getc(slr->input); + if (c != '\n') + { syslog(LOG_ERR, "Garbage at end of input line"); eatline(slr->input, c); continue; diff --git a/imap/sync_log.h b/imap/sync_log.h index 0dcc0a203..70a8be507 100644 --- a/imap/sync_log.h +++ b/imap/sync_log.h @@ -52,79 +52,79 @@ void sync_log_init(void); void sync_log_suppress(void); void sync_log_done(void); -void sync_log(const char *fmt, ...); -void sync_log_channel(const char *channel, const char *fmt, ...); +void sync_log(const char* shard, const char *fmt, ...); +void sync_log_channel(const char* shard, const char *channel, const char *fmt, ...); #define sync_log_user(user) \ - sync_log("USER %s\n", user) + sync_log(user, "USER %s\n", user) #define sync_log_unuser(user) \ - sync_log("UNUSER %s\n", user) + sync_log(user, "UNUSER %s\n", user) #define sync_log_sieve(user) \ - sync_log("META %s\n", user) + sync_log(user, "META %s\n", user) #define sync_log_append(name) \ - sync_log("APPEND %s\n", name) + sync_log(name, "APPEND %s\n", name) #define sync_log_mailbox(name) \ - sync_log("MAILBOX %s\n", name) + sync_log(name, "MAILBOX %s\n", name) #define sync_log_unmailbox(name) \ - sync_log("UNMAILBOX %s\n", name) + sync_log(name, "UNMAILBOX %s\n", name) #define sync_log_mailbox_double(name1, name2) \ - sync_log("MAILBOX %s\nMAILBOX %s\n", name1, name2) + sync_log(name1, "MAILBOX %s\nMAILBOX %s\n", name1, name2) #define sync_log_quota(name) \ - sync_log("QUOTA %s\n", name) + sync_log(name, "QUOTA %s\n", name) #define sync_log_annotation(name) \ - sync_log("ANNOTATION %s\n", name) + sync_log(name, "ANNOTATION %s\n", name) #define sync_log_seen(user, name) \ - sync_log("SEEN %s %s\n", user, name) + sync_log(user, "SEEN %s %s\n", user, name) #define sync_log_subscribe(user, name) \ - sync_log("SUB %s %s\n", user, name) + sync_log(user, "SUB %s %s\n", user, name) #define sync_log_channel_user(channel, user) \ - sync_log_channel(channel, "USER %s\n", user) + sync_log_channel(user, channel, "USER %s\n", user) #define sync_log_channel_unuser(channel, user) \ - sync_log_channel(channel, "UNUSER %s\n", user) + sync_log_channel(user, channel, "UNUSER %s\n", user) #define sync_log_channel_sieve(channel, user) \ - sync_log_channel(channel, "META %s\n", user) + sync_log_channel(user, channel, "META %s\n", user) #define sync_log_channel_append(channel, name) \ - sync_log_channel(channel, "APPEND %s\n", name) + sync_log_channel(user, channel, "APPEND %s\n", name) #define sync_log_channel_mailbox(channel, name) \ - sync_log_channel(channel, "MAILBOX %s\n", name) + sync_log_channel(name, channel, "MAILBOX %s\n", name) #define sync_log_channel_unmailbox(channel, name) \ - sync_log_channel(channel, "UNMAILBOX %s\n", name) + sync_log_channel(name, channel, "UNMAILBOX %s\n", name) #define sync_log_channel_mailbox_double(channel, name1, name2) \ - sync_log_channel(channel, "MAILBOX %s\nMAILBOX %s\n", name1, name2) + sync_log_channel(name1, channel, "MAILBOX %s\nMAILBOX %s\n", name1, name2) #define sync_log_channel_quota(channel, name) \ - sync_log_channel(channel, "QUOTA %s\n", name) + sync_log_channel(name, channel, "QUOTA %s\n", name) #define sync_log_channel_annotation(channel, name) \ - sync_log_channel(channel, "ANNOTATION %s\n", name) + sync_log_channel(name, channel, "ANNOTATION %s\n", name) #define sync_log_channel_seen(channel, user, name) \ - sync_log_channel(channel, "SEEN %s %s\n", user, name) + sync_log_channel(user, channel, "SEEN %s %s\n", user, name) #define sync_log_channel_subscribe(channel, user, name) \ - sync_log_channel(channel, "SUB %s %s\n", user, name) + sync_log_channel(user, channel, "SUB %s %s\n", user, name) /* read-side sync log code */ typedef struct sync_log_reader sync_log_reader_t; -sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel); +sync_log_reader_t *sync_log_reader_create_with_channel(const char *channel, int shard); sync_log_reader_t *sync_log_reader_create_with_filename(const char *filename); sync_log_reader_t *sync_log_reader_create_with_fd(int fd); void sync_log_reader_free(sync_log_reader_t *slr); diff --git a/lib/imapoptions b/lib/imapoptions index e86300061..60d838e43 100644 --- a/lib/imapoptions +++ b/lib/imapoptions @@ -2107,6 +2107,9 @@ product version in the capabilities /* Enable replication action logging by sync_server as well, allowing chaining of replicas. Use this on 'B' for A => B => C replication layout */ +{ "sync_log_shards", 4, INT } +/* Specifies how much shards of the replication log files will be created. */ + { "sync_log_channels", NULL, STRING } /* If specified, log all events to multiple log files in directories specified by each "channel". Each channel can then be processed