-
Notifications
You must be signed in to change notification settings - Fork 148
/
HFILE-remove-double-write-V2.patch
140 lines (132 loc) · 4.72 KB
/
HFILE-remove-double-write-V2.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (revision 1068099)
+++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (working copy)
@@ -221,9 +221,6 @@
// Used to ensure we write in order.
private final RawComparator<byte []> comparator;
- // A stream made per block written.
- private DataOutputStream out;
-
// Number of uncompressed bytes per block. Reinitialized when we start
// new block.
private int blocksize;
@@ -264,9 +261,9 @@
// Block cache to optionally fill on write
private BlockCache blockCache;
- // Additional byte array output stream used to fill block cache
- private ByteArrayOutputStream baos;
- private DataOutputStream baosDos;
+ // Byte array output stream made per block written.
+ private ByteArrayOutputStream baos = null;
+ private DataOutputStream baosDos = null;
private int blockNumber = 0;
/**
@@ -360,7 +357,7 @@
* @throws IOException
*/
private void checkBlockBoundary() throws IOException {
- if (this.out != null && this.out.size() < blocksize) return;
+ if (baosDos != null && baosDos.size() < blocksize) return;
finishBlock();
newBlock();
}
@@ -370,11 +367,18 @@
* @throws IOException
*/
private void finishBlock() throws IOException {
- if (this.out == null) return;
+ if (baosDos == null) return;
+
+ // Flush Data Output Stream
+ baosDos.flush();
+
+ // Compress Data and write to output stream
+ DataOutputStream compressStream = getCompressingStream();
+ baos.writeTo(compressStream);
+ int size = releaseCompressingStream(compressStream);
+
long now = System.currentTimeMillis();
- int size = releaseCompressingStream(this.out);
- this.out = null;
blockKeys.add(firstKey);
blockOffsets.add(Long.valueOf(blockBegin));
blockDataSizes.add(Integer.valueOf(size));
@@ -384,14 +388,17 @@
writeOps++;
if (blockCache != null) {
- baosDos.flush();
byte [] bytes = baos.toByteArray();
ByteBuffer blockToCache = ByteBuffer.wrap(bytes, DATABLOCKMAGIC.length,
bytes.length - DATABLOCKMAGIC.length);
String blockName = path.toString() + blockNumber;
blockCache.cacheBlock(blockName, blockToCache);
- baosDos.close();
}
+
+ baosDos.close();
+ baosDos = null;
+ baos = null;
+
blockNumber++;
}
@@ -402,14 +409,14 @@
private void newBlock() throws IOException {
// This is where the next block begins.
blockBegin = outputStream.getPos();
- this.out = getCompressingStream();
- this.out.write(DATABLOCKMAGIC);
+
firstKey = null;
- if (blockCache != null) {
- this.baos = new ByteArrayOutputStream();
- this.baosDos = new DataOutputStream(baos);
- this.baosDos.write(DATABLOCKMAGIC);
- }
+
+ // to avoid too many calls to realloc(),
+ // pre-allocates the byte stream to the block size + 25%
+ baos = new ByteArrayOutputStream(blocksize + (blocksize / 25));
+ baosDos = new DataOutputStream(baos);
+ baosDos.write(DATABLOCKMAGIC);
}
/*
@@ -467,7 +474,7 @@
for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own
byte[] cur = metaNames.get(i);
- if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
+ if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
key, 0, key.length) > 0) {
break;
}
@@ -563,12 +570,12 @@
checkBlockBoundary();
}
// Write length of key and value and then actual key and value bytes.
- this.out.writeInt(klength);
+ this.baosDos.writeInt(klength);
this.keylength += klength;
- this.out.writeInt(vlength);
+ this.baosDos.writeInt(vlength);
this.valuelength += vlength;
- this.out.write(key, koffset, klength);
- this.out.write(value, voffset, vlength);
+ this.baosDos.write(key, koffset, klength);
+ this.baosDos.write(value, voffset, vlength);
// Are we the first key in this block?
if (this.firstKey == null) {
// Copy the key.
@@ -579,13 +586,6 @@
this.lastKeyOffset = koffset;
this.lastKeyLength = klength;
this.entryCount ++;
- // If we are pre-caching blocks on write, fill byte array stream
- if (blockCache != null) {
- this.baosDos.writeInt(klength);
- this.baosDos.writeInt(vlength);
- this.baosDos.write(key, koffset, klength);
- this.baosDos.write(value, voffset, vlength);
- }
}
/*