From 34e8af560d1f240db7c5c42e51e6d7989a065545 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Tue, 27 Dec 2016 14:53:27 +0200 Subject: [PATCH] added new routing hook queue:route --- lib/mail-queue.js | 210 ++++++++++++++++++++++++--------------------- plugins/README.md | 1 + test/queue-test.js | 2 + 3 files changed, 115 insertions(+), 98 deletions(-) diff --git a/lib/mail-queue.js b/lib/mail-queue.js index 97ac2a9..5add539 100644 --- a/lib/mail-queue.js +++ b/lib/mail-queue.js @@ -11,6 +11,7 @@ const SharedIterator = require('./shared-iterator'); const QueueLocker = require('./queue-locker'); const TtlCache = require('./ttl-cache'); const crypto = require('crypto'); +const plugins = require('./plugins'); /** * MailQueue class for generating mail queue instances. These instances handle @@ -220,122 +221,135 @@ class MailQueue { deliveryZone = 'default'; } - seq++; - let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); - let delivery = { - id, - seq: deliverySeq, - - // Store indexing keys in the delivery object - // sort by Zone - _zoneKey: 'delivery-zone ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient), - // sort by Domain - _domainKey: 'delivery-domain ' + encodeURIComponent(recipientDomain) + ' ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient), - // reference for ID - _refKey: 'ref ' + id + ' ' + encodeURIComponent(deliveryZone) + ' ' + encodeURIComponent(recipient), - // reference for ID + SEQ - _seqKey: 'seq ' + id + ' ' + deliverySeq, - - // Actual delivery data - domain: recipientDomain, - sendingZone: deliveryZone, - - // actual recipient address - recipient + let routing = { + recipient, + deliveryZone }; - if (!envelope.deferDelivery) { - // Normal insert + plugins.handler.runHooks('queue:route', [envelope, routing], err => { + if (err) { + return callback(err); + } + recipient = routing.recipient; + deliveryZone = routing.deliveryZone; + + seq++; + let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16); + let delivery = { + id, + seq: deliverySeq, + + // Store indexing keys in the delivery object + // sort by Zone + _zoneKey: 'delivery-zone ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient), + // sort by Domain + _domainKey: 'delivery-domain ' + encodeURIComponent(recipientDomain) + ' ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient), + // reference for ID + _refKey: 'ref ' + id + ' ' + encodeURIComponent(deliveryZone) + ' ' + encodeURIComponent(recipient), + // reference for ID + SEQ + _seqKey: 'seq ' + id + ' ' + deliverySeq, + + // Actual delivery data + domain: recipientDomain, + sendingZone: deliveryZone, + + // actual recipient address + recipient + }; - // List all remaining deliveries for a zone sorted by message id - // Needed to find deliveries by Zone - ops.push({ - type: 'put', - key: delivery._zoneKey, - value: delivery._refKey - }); + if (!envelope.deferDelivery) { + // Normal insert - // List all remaining deliveries for a zone sorted by domain and message id - // Needed to find deliveries by Zone + domain - ops.push({ - type: 'put', - key: delivery._domainKey, - value: delivery._refKey - }); - } else { - // Deferred insert + // List all remaining deliveries for a zone sorted by message id + // Needed to find deliveries by Zone + ops.push({ + type: 'put', + key: delivery._zoneKey, + value: delivery._refKey + }); - // Setup defer data - delivery._deferred = delivery._deferred || { - first: Date.now(), - count: 0 - }; - delivery._deferred.last = Date.now(); - delivery._deferred.next = envelope.deferDelivery; - delivery._deferred.response = 'Deferred by router'; + // List all remaining deliveries for a zone sorted by domain and message id + // Needed to find deliveries by Zone + domain + ops.push({ + type: 'put', + key: delivery._domainKey, + value: delivery._refKey + }); + } else { + // Deferred insert - // Add to defer queue - ops.push({ - type: 'put', - key: 'deferred:item ' + delivery._deferred.next + ' ' + this.seqIndex.short(), - value: [delivery._zoneKey, delivery._domainKey, delivery._refKey].join('\n') - }); + // Setup defer data + delivery._deferred = delivery._deferred || { + first: Date.now(), + count: 0 + }; + delivery._deferred.last = Date.now(); + delivery._deferred.next = envelope.deferDelivery; + delivery._deferred.response = 'Deferred by router'; + + // Add to defer queue + ops.push({ + type: 'put', + key: 'deferred:item ' + delivery._deferred.next + ' ' + this.seqIndex.short(), + value: [delivery._zoneKey, delivery._domainKey, delivery._refKey].join('\n') + }); - // Mark key as deferred - // Useful for finding deferred messages for a zone + // Mark key as deferred + // Useful for finding deferred messages for a zone + ops.push({ + type: 'put', + key: 'deferred:key ' + delivery._zoneKey, + value: 1 + }); + + // Mark domain key as deferred + // Useful for finding deferred messages for a zone+domain + ops.push({ + type: 'put', + key: 'deferred:domain ' + delivery._domainKey, + value: 1 + }); + + } + + // Keep references against a message id. Once there are no more references + // against a message ID, the message can be safely deleted. This is also the + // key we use to store the delivery data ops.push({ type: 'put', - key: 'deferred:key ' + delivery._zoneKey, - value: 1 + key: delivery._refKey, + value: delivery, + valueEncoding: 'json' }); - // Mark domain key as deferred - // Useful for finding deferred messages for a zone+domain + // List all remaining deliveries for ID+SEQ + // Needed to find deliveries by ID+SEQ ops.push({ type: 'put', - key: 'deferred:domain ' + delivery._domainKey, - value: 1 + key: delivery._seqKey, + value: delivery._refKey }); - } - - // Keep references against a message id. Once there are no more references - // against a message ID, the message can be safely deleted. This is also the - // key we use to store the delivery data - ops.push({ - type: 'put', - key: delivery._refKey, - value: delivery, - valueEncoding: 'json' - }); - - // List all remaining deliveries for ID+SEQ - // Needed to find deliveries by ID+SEQ - ops.push({ - type: 'put', - key: delivery._seqKey, - value: delivery._refKey - }); + // emit an event about the new element + ev.emit('queued', { + event: 'queued', + id: delivery.id + '.' + delivery.seq, + time: Date.now(), + zone: delivery.sendingZone, + messageId: envelope.messageId, + from: envelope.from, + recipient: delivery.recipient, + source: envelope.origin + }); - // emit an event about the new element - ev.emit('queued', { - event: 'queued', - id: delivery.id + '.' + delivery.seq, - time: Date.now(), - zone: delivery.sendingZone, - messageId: envelope.messageId, - from: envelope.from, - recipient: delivery.recipient, - source: envelope.origin - }); + inserted.push({ + zone: delivery.sendingZone, + domain: recipientDomain, + deferred: !!envelope.deferDelivery + }); - inserted.push({ - zone: delivery.sendingZone, - domain: recipientDomain, - deferred: !!envelope.deferDelivery + return setImmediate(processRecipients); }); - - return setImmediate(processRecipients); }; processRecipients(); diff --git a/plugins/README.md b/plugins/README.md index b3a022d..2c468b0 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -86,6 +86,7 @@ To use these hooks you need to set `enabled` `'main'` or `['main',...]` - **'api:mail'** with arguments `envelope`, `session`, called when an email is dropped to HTTP - **'queue:bounce'** with arguments `bounce` called when a message bounced and is no longer queued for delivery - **'queue:release'** with arguments `zone`, `data` called when a message was removed from the queue +- **'queue:route'** with arguments `envelope`, `routing` called before a message entry is stored to message index. This is your last chance to edit message routing for a single recipient. Message for this specific recipient is routed to `routing.deliveryZone`. If this zone does not exist, then your message is never sent and sits in the queue forever. **'receiver' context** diff --git a/test/queue-test.js b/test/queue-test.js index 6e8e3c3..271a3f8 100644 --- a/test/queue-test.js +++ b/test/queue-test.js @@ -6,6 +6,8 @@ const dbfolder = path.join(__dirname, 'queuetest'); const MailQueue = require('../lib/mail-queue'); const PassThrough = require('stream').PassThrough; const memdown = require('memdown'); +const plugins = require('../lib/plugins'); +plugins.init(); let db; let queue;