Skip to content

Commit

Permalink
added new routing hook queue:route
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Dec 27, 2016
1 parent c4dc5c0 commit 34e8af5
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 98 deletions.
210 changes: 112 additions & 98 deletions lib/mail-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const SharedIterator = require('./shared-iterator');
const QueueLocker = require('./queue-locker');
const TtlCache = require('./ttl-cache');
const crypto = require('crypto');
const plugins = require('./plugins');

/**
* MailQueue class for generating mail queue instances. These instances handle
Expand Down Expand Up @@ -220,122 +221,135 @@ class MailQueue {
deliveryZone = 'default';
}

seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,

// Store indexing keys in the delivery object
// sort by Zone
_zoneKey: 'delivery-zone ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient),
// sort by Domain
_domainKey: 'delivery-domain ' + encodeURIComponent(recipientDomain) + ' ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient),
// reference for ID
_refKey: 'ref ' + id + ' ' + encodeURIComponent(deliveryZone) + ' ' + encodeURIComponent(recipient),
// reference for ID + SEQ
_seqKey: 'seq ' + id + ' ' + deliverySeq,

// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,

// actual recipient address
recipient
let routing = {
recipient,
deliveryZone
};

if (!envelope.deferDelivery) {
// Normal insert
plugins.handler.runHooks('queue:route', [envelope, routing], err => {
if (err) {
return callback(err);
}
recipient = routing.recipient;
deliveryZone = routing.deliveryZone;

seq++;
let deliverySeq = (seq < 0x100 ? '0' : '') + (seq < 0x10 ? '0' : '') + seq.toString(16);
let delivery = {
id,
seq: deliverySeq,

// Store indexing keys in the delivery object
// sort by Zone
_zoneKey: 'delivery-zone ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient),
// sort by Domain
_domainKey: 'delivery-domain ' + encodeURIComponent(recipientDomain) + ' ' + encodeURIComponent(deliveryZone) + ' ' + id + ' ' + encodeURIComponent(recipient),
// reference for ID
_refKey: 'ref ' + id + ' ' + encodeURIComponent(deliveryZone) + ' ' + encodeURIComponent(recipient),
// reference for ID + SEQ
_seqKey: 'seq ' + id + ' ' + deliverySeq,

// Actual delivery data
domain: recipientDomain,
sendingZone: deliveryZone,

// actual recipient address
recipient
};

// List all remaining deliveries for a zone sorted by message id
// Needed to find deliveries by Zone
ops.push({
type: 'put',
key: delivery._zoneKey,
value: delivery._refKey
});
if (!envelope.deferDelivery) {
// Normal insert

// List all remaining deliveries for a zone sorted by domain and message id
// Needed to find deliveries by Zone + domain
ops.push({
type: 'put',
key: delivery._domainKey,
value: delivery._refKey
});
} else {
// Deferred insert
// List all remaining deliveries for a zone sorted by message id
// Needed to find deliveries by Zone
ops.push({
type: 'put',
key: delivery._zoneKey,
value: delivery._refKey
});

// Setup defer data
delivery._deferred = delivery._deferred || {
first: Date.now(),
count: 0
};
delivery._deferred.last = Date.now();
delivery._deferred.next = envelope.deferDelivery;
delivery._deferred.response = 'Deferred by router';
// List all remaining deliveries for a zone sorted by domain and message id
// Needed to find deliveries by Zone + domain
ops.push({
type: 'put',
key: delivery._domainKey,
value: delivery._refKey
});
} else {
// Deferred insert

// Add to defer queue
ops.push({
type: 'put',
key: 'deferred:item ' + delivery._deferred.next + ' ' + this.seqIndex.short(),
value: [delivery._zoneKey, delivery._domainKey, delivery._refKey].join('\n')
});
// Setup defer data
delivery._deferred = delivery._deferred || {
first: Date.now(),
count: 0
};
delivery._deferred.last = Date.now();
delivery._deferred.next = envelope.deferDelivery;
delivery._deferred.response = 'Deferred by router';

// Add to defer queue
ops.push({
type: 'put',
key: 'deferred:item ' + delivery._deferred.next + ' ' + this.seqIndex.short(),
value: [delivery._zoneKey, delivery._domainKey, delivery._refKey].join('\n')
});

// Mark key as deferred
// Useful for finding deferred messages for a zone
// Mark key as deferred
// Useful for finding deferred messages for a zone
ops.push({
type: 'put',
key: 'deferred:key ' + delivery._zoneKey,
value: 1
});

// Mark domain key as deferred
// Useful for finding deferred messages for a zone+domain
ops.push({
type: 'put',
key: 'deferred:domain ' + delivery._domainKey,
value: 1
});

}

// Keep references against a message id. Once there are no more references
// against a message ID, the message can be safely deleted. This is also the
// key we use to store the delivery data
ops.push({
type: 'put',
key: 'deferred:key ' + delivery._zoneKey,
value: 1
key: delivery._refKey,
value: delivery,
valueEncoding: 'json'
});

// Mark domain key as deferred
// Useful for finding deferred messages for a zone+domain
// List all remaining deliveries for ID+SEQ
// Needed to find deliveries by ID+SEQ
ops.push({
type: 'put',
key: 'deferred:domain ' + delivery._domainKey,
value: 1
key: delivery._seqKey,
value: delivery._refKey
});

}

// Keep references against a message id. Once there are no more references
// against a message ID, the message can be safely deleted. This is also the
// key we use to store the delivery data
ops.push({
type: 'put',
key: delivery._refKey,
value: delivery,
valueEncoding: 'json'
});

// List all remaining deliveries for ID+SEQ
// Needed to find deliveries by ID+SEQ
ops.push({
type: 'put',
key: delivery._seqKey,
value: delivery._refKey
});
// emit an event about the new element
ev.emit('queued', {
event: 'queued',
id: delivery.id + '.' + delivery.seq,
time: Date.now(),
zone: delivery.sendingZone,
messageId: envelope.messageId,
from: envelope.from,
recipient: delivery.recipient,
source: envelope.origin
});

// emit an event about the new element
ev.emit('queued', {
event: 'queued',
id: delivery.id + '.' + delivery.seq,
time: Date.now(),
zone: delivery.sendingZone,
messageId: envelope.messageId,
from: envelope.from,
recipient: delivery.recipient,
source: envelope.origin
});
inserted.push({
zone: delivery.sendingZone,
domain: recipientDomain,
deferred: !!envelope.deferDelivery
});

inserted.push({
zone: delivery.sendingZone,
domain: recipientDomain,
deferred: !!envelope.deferDelivery
return setImmediate(processRecipients);
});

return setImmediate(processRecipients);
};

processRecipients();
Expand Down
1 change: 1 addition & 0 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ To use these hooks you need to set `enabled` `'main'` or `['main',...]`
- **'api:mail'** with arguments `envelope`, `session`, called when an email is dropped to HTTP
- **'queue:bounce'** with arguments `bounce` called when a message bounced and is no longer queued for delivery
- **'queue:release'** with arguments `zone`, `data` called when a message was removed from the queue
- **'queue:route'** with arguments `envelope`, `routing` called before a message entry is stored to message index. This is your last chance to edit message routing for a single recipient. Message for this specific recipient is routed to `routing.deliveryZone`. If this zone does not exist, then your message is never sent and sits in the queue forever.

**'receiver' context**

Expand Down
2 changes: 2 additions & 0 deletions test/queue-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const dbfolder = path.join(__dirname, 'queuetest');
const MailQueue = require('../lib/mail-queue');
const PassThrough = require('stream').PassThrough;
const memdown = require('memdown');
const plugins = require('../lib/plugins');
plugins.init();

let db;
let queue;
Expand Down

0 comments on commit 34e8af5

Please sign in to comment.