From f0ee6adc5edae364b93318f978bb2ba2dc95b7da Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Thu, 14 Dec 2017 17:29:20 +0800 Subject: [PATCH 01/12] A little change for the use of hadoop-connector --- .../cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java | 2 +- .../edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java | 3 ++- .../cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java index 367051ab..3df2d438 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java @@ -78,7 +78,7 @@ private void init() throws IOException { int fileMetaDataLength = randomAccessFileReader.readInt(); randomAccessFileReader.seek(l - MAGIC_LENGTH - FOOTER_LENGTH - fileMetaDataLength); byte[] buf = new byte[fileMetaDataLength]; - randomAccessFileReader.read(buf, 0, buf.length);//FIXME is this a potential bug? + randomAccessFileReader.read(buf, 0, buf.length);//FIXE is this a potential bug?M0 ByteArrayInputStream bais = new ByteArrayInputStream(buf); this.fileMetaData = new TsFileMetaDataConverter().toTsFileMetadata(ReadWriteThriftFormatUtils.readFileMetaData(bais)); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java index 752684b2..b23b2e45 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java @@ -109,7 +109,8 @@ public QueryDataSet query(List paths, FilterExpression timeFilter, FilterE return queryWithSpecificRowGroups(paths, timeFilter, freqFilter, valueFilter, idxs); } - private QueryDataSet queryWithSpecificRowGroups(List paths, FilterExpression timeFilter, FilterExpression freqFilter + //Used by tsfile-hadoop-connector, do not delete + public QueryDataSet queryWithSpecificRowGroups(List paths, FilterExpression timeFilter, FilterExpression freqFilter , FilterExpression valueFilter, ArrayList rowGroupIndexList) throws IOException { if (timeFilter == null && freqFilter == null && valueFilter == null) { return queryWithoutFilter(paths, rowGroupIndexList); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java index b6368c2e..795b2b24 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java @@ -173,7 +173,7 @@ public void write(TSRecord record) throws IOException, WriteProcessException { * @see cn.edu.tsinghua.tsfile.timeseries.write.series.IRowGroupWriter#getDataInMemory(String) * @param deltaObjectId deltaObject id * @param measurementId measurement id - * @return fist object is the current page data, second object is the all pages which is packaged + * @return first object is the current page data, second object is the all pages which is packaged */ public List getDataInMemory(String deltaObjectId, String measurementId) { if (groupWriters.get(deltaObjectId) == null) { From a5b1007f22dedb1522bd271da0d5ae0bda58c1d6 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Mon, 18 Dec 2017 14:51:04 +0800 Subject: [PATCH 02/12] Add FileReader:getSortedRowGroupMetaDataList() for hadoop connector --- .../tsfile/file/metadata/TsFileMetaData.java | 2 +- .../tsfile/timeseries/read/FileReader.java | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java b/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java index 04eb8cf4..18319b5a 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java @@ -155,7 +155,7 @@ public void convertToTSF(FileMetaData metadataInThrift) { deltaObjectMap = new HashMap<>(); for (Map.Entry entry : metadataInThrift.getDelta_object_map().entrySet()){ DeltaObject object = entry.getValue(); - deltaObjectMap.put(entry.getKey(), new TsDeltaObject(object.getOffset(), + deltaObjectMap.put(entry.getKey(), new TsDeltaObject(object.getOffset(), object.getMetadata_block_size(), object.getStart_time(), object.getEnd_time())); } } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java index 3df2d438..62626758 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java @@ -299,4 +299,32 @@ public TsFileMetaData getFileMetaData() { return this.fileMetaData; } + public List getSortedRowGroupMetaDataList() throws IOException{ + List rowGroupMetaDataList = new ArrayList<>(); + Collection deltaObjects = fileMetaData.getDeltaObjectMap().keySet(); + for (String deltaObjectID : deltaObjects) { + this.rwLock.writeLock().lock(); + try { + TsDeltaObject deltaObj = this.fileMetaData.getDeltaObject(deltaObjectID); + TsRowGroupBlockMetaData blockMeta = new TsRowGroupBlockMetaData(); + blockMeta.convertToTSF(ReadWriteThriftFormatUtils.readRowGroupBlockMetaData(this.randomAccessFileReader, + deltaObj.offset, deltaObj.metadataBlockSize)); + rowGroupMetaDataList.addAll(blockMeta.getRowGroups()); + } finally { + this.rwLock.writeLock().unlock(); + } + } + + Comparator comparator = new Comparator() { + @Override + public int compare(RowGroupMetaData o1, RowGroupMetaData o2) { + + return Long.signum(o1.getMetaDatas().get(0).getProperties().getFileOffset() - o2.getMetaDatas().get(0).getProperties().getFileOffset()); + } + + }; + rowGroupMetaDataList.sort(comparator); + return rowGroupMetaDataList; + } + } From da13fbae48d05cffdeeda9029fd992e0c3d14282 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Tue, 19 Dec 2017 15:06:49 +0800 Subject: [PATCH 03/12] Add HadoopQueryEngine --- .../tsfile/timeseries/read/RecordReader.java | 6 +++ .../read/query/HadoopQueryEngine.java | 54 +++++++++++++++++++ .../timeseries/read/query/QueryEngine.java | 13 +++-- 3 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index ae6fe95f..a2e201d5 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -2,6 +2,7 @@ import cn.edu.tsinghua.tsfile.common.exception.UnSupportedDataTypeException; import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; +import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterFactory; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; @@ -32,6 +33,11 @@ public RecordReader(ITsRandomAccessFileReader raf) throws IOException { this.fileReader = new FileReader(raf); } + //for hadoop-connector + public RecordReader(ITsRandomAccessFileReader raf, List rowGroupMetaDataList) throws IOException { + this.fileReader = new FileReader(raf, rowGroupMetaDataList); + } + /** * Read one path without filter. * diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java new file mode 100644 index 00000000..2f7379bf --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java @@ -0,0 +1,54 @@ +package cn.edu.tsinghua.tsfile.timeseries.read.query; + +import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; +import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; +import cn.edu.tsinghua.tsfile.timeseries.filter.definition.CrossSeriesFilterExpression; +import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression; +import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; +import cn.edu.tsinghua.tsfile.timeseries.read.RecordReader; +import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; +import cn.edu.tsinghua.tsfile.timeseries.write.desc.MeasurementDescriptor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class HadoopQueryEngine extends QueryEngine { + + private static final String SEPARATOR_DEVIDE_SERIES = "."; + private List rowGroupMetaDataList; + + public HadoopQueryEngine(ITsRandomAccessFileReader raf, List rowGroupMetaDataList) throws IOException { + super(raf, rowGroupMetaDataList); + this.rowGroupMetaDataList = rowGroupMetaDataList; + } + + public QueryDataSet queryWithSpecificRowGroups(List deviceIdList, List sensorList, FilterExpression timeFilter, FilterExpression freqFilter, FilterExpression valueFilter) throws IOException{ + List paths = new ArrayList<>(); + for(String deviceId : deviceIdList){ + for(MeasurementDescriptor sensor: sensorList){ + paths.add(new Path(deviceId + SEPARATOR_DEVIDE_SERIES + sensor.getMeasurementId())); + } + } + + if (timeFilter == null && freqFilter == null && valueFilter == null) { + return queryWithoutFilter(paths); + } else if (valueFilter instanceof SingleSeriesFilterExpression || (timeFilter != null && valueFilter == null)) { +// return readOneColumnValueUseFilter(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, +// (SingleSeriesFilterExpression) valueFilter); + } else if (valueFilter instanceof CrossSeriesFilterExpression) { +// return crossColumnQuery(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, +// (CrossSeriesFilterExpression) valueFilter, rowGroupIndexList); + } + throw new IOException("Query Not Support Exception"); + } + + private QueryDataSet queryWithoutFilter(List paths) throws IOException { + return new IteratorQueryDataSet(paths) { + @Override + public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException { + return recordReader.getValueInOneColumn(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString()); + } + }; + } +} diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java index b23b2e45..505119c7 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java @@ -3,6 +3,7 @@ import cn.edu.tsinghua.tsfile.common.constant.QueryConstant; import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; +import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.CrossSeriesFilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; @@ -23,8 +24,8 @@ public class QueryEngine { private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); - private static int FETCH_SIZE = 20000; - private RecordReader recordReader; + protected static int FETCH_SIZE = 20000; + protected RecordReader recordReader; public QueryEngine(ITsRandomAccessFileReader raf) throws IOException { recordReader = new RecordReader(raf); @@ -35,6 +36,11 @@ public QueryEngine(ITsRandomAccessFileReader raf, int fetchSize) throws IOExcept FETCH_SIZE = fetchSize; } + //for hadoop-connector + public QueryEngine(ITsRandomAccessFileReader raf, List rowGroupMetaDataList) throws IOException { + recordReader = new RecordReader(raf, rowGroupMetaDataList); + } + public static QueryDataSet query(QueryConfig config, String fileName) throws IOException { TsRandomAccessLocalFileReader raf = new TsRandomAccessLocalFileReader(fileName); QueryEngine queryEngine = new QueryEngine(raf); @@ -109,8 +115,7 @@ public QueryDataSet query(List paths, FilterExpression timeFilter, FilterE return queryWithSpecificRowGroups(paths, timeFilter, freqFilter, valueFilter, idxs); } - //Used by tsfile-hadoop-connector, do not delete - public QueryDataSet queryWithSpecificRowGroups(List paths, FilterExpression timeFilter, FilterExpression freqFilter + private QueryDataSet queryWithSpecificRowGroups(List paths, FilterExpression timeFilter, FilterExpression freqFilter , FilterExpression valueFilter, ArrayList rowGroupIndexList) throws IOException { if (timeFilter == null && freqFilter == null && valueFilter == null) { return queryWithoutFilter(paths, rowGroupIndexList); From bc99e28d4a4524d8301cdc87d0ddc2ce608b604e Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Thu, 21 Dec 2017 13:35:28 +0800 Subject: [PATCH 04/12] Update HadoopQueryEngine --- .../read/query/HadoopQueryEngine.java | 85 +++++++++++++++++-- .../timeseries/read/query/QueryEngine.java | 2 +- 2 files changed, 79 insertions(+), 8 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java index 2f7379bf..baa17423 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java @@ -1,7 +1,9 @@ package cn.edu.tsinghua.tsfile.timeseries.read.query; +import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; +import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.CrossSeriesFilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; @@ -11,7 +13,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class HadoopQueryEngine extends QueryEngine { @@ -23,22 +27,43 @@ public HadoopQueryEngine(ITsRandomAccessFileReader raf, List r this.rowGroupMetaDataList = rowGroupMetaDataList; } - public QueryDataSet queryWithSpecificRowGroups(List deviceIdList, List sensorList, FilterExpression timeFilter, FilterExpression freqFilter, FilterExpression valueFilter) throws IOException{ + private void initDeviceIdList(List deviceIdList) { + Set deviceIdSet = new HashSet<>(); + for (RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) { + deviceIdSet.add(rowGroupMetaData.getDeltaObjectID()); + } + deviceIdList = new ArrayList<>(deviceIdSet); + } + + private void initSensorIdList(List sensorIdList){ + Set sensorIdSet = new HashSet<>(); + for(RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) { + for(TimeSeriesChunkMetaData timeSeriesChunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()){ + sensorIdSet.add(timeSeriesChunkMetaData.getProperties().getMeasurementUID()); + } + } + sensorIdList = new ArrayList<>(sensorIdSet); + } + + public QueryDataSet queryWithSpecificRowGroups(List deviceIdList, List sensorIdList, FilterExpression timeFilter, FilterExpression freqFilter, FilterExpression valueFilter) throws IOException{ + if(deviceIdList == null)initDeviceIdList(deviceIdList); + if(sensorIdList == null)initSensorIdList(sensorIdList); + List paths = new ArrayList<>(); for(String deviceId : deviceIdList){ - for(MeasurementDescriptor sensor: sensorList){ - paths.add(new Path(deviceId + SEPARATOR_DEVIDE_SERIES + sensor.getMeasurementId())); + for(String sensorId: sensorIdList){ + paths.add(new Path(deviceId + SEPARATOR_DEVIDE_SERIES + sensorId)); } } if (timeFilter == null && freqFilter == null && valueFilter == null) { return queryWithoutFilter(paths); } else if (valueFilter instanceof SingleSeriesFilterExpression || (timeFilter != null && valueFilter == null)) { -// return readOneColumnValueUseFilter(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, -// (SingleSeriesFilterExpression) valueFilter); + return readOneColumnValueUseFilter(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, + (SingleSeriesFilterExpression) valueFilter); } else if (valueFilter instanceof CrossSeriesFilterExpression) { -// return crossColumnQuery(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, -// (CrossSeriesFilterExpression) valueFilter, rowGroupIndexList); + return crossColumnQuery(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, + (CrossSeriesFilterExpression) valueFilter); } throw new IOException("Query Not Support Exception"); } @@ -51,4 +76,50 @@ public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnD } }; } + + private QueryDataSet readOneColumnValueUseFilter(List paths, SingleSeriesFilterExpression timeFilter, + SingleSeriesFilterExpression freqFilter, SingleSeriesFilterExpression valueFilter) throws IOException { + logger.debug("start read one column data with filter"); + + return new IteratorQueryDataSet(paths) { + @Override + public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException { + return recordReader.getValuesUseFilter(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString() + , timeFilter, freqFilter, valueFilter); + } + }; + } + + private QueryDataSet crossColumnQuery(List paths, SingleSeriesFilterExpression timeFilter, SingleSeriesFilterExpression freqFilter, + CrossSeriesFilterExpression valueFilter) throws IOException { + CrossQueryTimeGenerator timeQueryDataSet = new CrossQueryTimeGenerator(timeFilter, freqFilter, valueFilter, FETCH_SIZE) { + @Override + public DynamicOneColumnData getDataInNextBatch(DynamicOneColumnData res, int fetchSize, + SingleSeriesFilterExpression valueFilter, int valueFilterNumber) throws ProcessorException, IOException { + return recordReader.getValuesUseFilter(res, fetchSize, valueFilter); + } + }; + + return new CrossQueryIteratorDataSet(timeQueryDataSet) { + @Override + public boolean getMoreRecords() throws IOException { + try { + long[] timeRet = crossQueryTimeGenerator.generateTimes(); + if (timeRet.length == 0) { + return true; + } + for (Path p : paths) { + String deltaObjectUID = p.getDeltaObjectToString(); + String measurementUID = p.getMeasurementToString(); + DynamicOneColumnData oneColDataList = recordReader.getValuesUseTimestamps(deltaObjectUID, measurementUID, timeRet); + mapRet.put(p.getFullPath(), oneColDataList); + } + + } catch (ProcessorException e) { + throw new IOException(e.getMessage()); + } + return false; + } + }; + } } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java index 505119c7..6f62d1f5 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java @@ -23,7 +23,7 @@ import java.util.Map; public class QueryEngine { - private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); + protected static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); protected static int FETCH_SIZE = 20000; protected RecordReader recordReader; From c7aa5d14647d2ad6e321070b167bd4bee31869d9 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Thu, 21 Dec 2017 15:10:34 +0800 Subject: [PATCH 05/12] Update HadoopQueryEngine --- .../tsfile/timeseries/read/FileReader.java | 3 ++- .../timeseries/read/query/HadoopQueryEngine.java | 14 +++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java index 62626758..ac0cc012 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java @@ -59,11 +59,12 @@ public FileReader(ITsRandomAccessFileReader raf) throws IOException { * @param reader * @param rowGroupMetaDataList */ - public FileReader(ITsRandomAccessFileReader reader, List rowGroupMetaDataList) { + public FileReader(ITsRandomAccessFileReader reader, List rowGroupMetaDataList) throws IOException { this.randomAccessFileReader = reader; this.rwLock = new ReentrantReadWriteLock(); this.rowGroupReaderLRUList = new LinkedList<>(); initFromRowGroupMetadataList(rowGroupMetaDataList); + init(); } /** diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java index baa17423..fb3e71e9 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java @@ -27,27 +27,27 @@ public HadoopQueryEngine(ITsRandomAccessFileReader raf, List r this.rowGroupMetaDataList = rowGroupMetaDataList; } - private void initDeviceIdList(List deviceIdList) { + private List initDeviceIdList() { Set deviceIdSet = new HashSet<>(); for (RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) { deviceIdSet.add(rowGroupMetaData.getDeltaObjectID()); } - deviceIdList = new ArrayList<>(deviceIdSet); + return new ArrayList<>(deviceIdSet); } - private void initSensorIdList(List sensorIdList){ + private List initSensorIdList(){ Set sensorIdSet = new HashSet<>(); for(RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) { for(TimeSeriesChunkMetaData timeSeriesChunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()){ sensorIdSet.add(timeSeriesChunkMetaData.getProperties().getMeasurementUID()); } } - sensorIdList = new ArrayList<>(sensorIdSet); + return new ArrayList<>(sensorIdSet); } public QueryDataSet queryWithSpecificRowGroups(List deviceIdList, List sensorIdList, FilterExpression timeFilter, FilterExpression freqFilter, FilterExpression valueFilter) throws IOException{ - if(deviceIdList == null)initDeviceIdList(deviceIdList); - if(sensorIdList == null)initSensorIdList(sensorIdList); + if(deviceIdList == null)deviceIdList = initDeviceIdList(); + if(sensorIdList == null)sensorIdList = initSensorIdList(); List paths = new ArrayList<>(); for(String deviceId : deviceIdList){ @@ -122,4 +122,4 @@ public boolean getMoreRecords() throws IOException { } }; } -} +} \ No newline at end of file From 7cda82adec8995eb370448baba4c4244bf2f55c6 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Thu, 21 Dec 2017 16:01:44 +0800 Subject: [PATCH 06/12] Update HadoopQueryEngine --- .../tsinghua/tsfile/timeseries/read/query/QueryDataSet.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java index 1f894dce..edaff268 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java @@ -186,6 +186,10 @@ public boolean next() { } public RowRecord getCurrentRecord() { + if (!ifInit) { + initForRecord(); + ifInit = true; + } return currentRecord; } From 7ea2768f94eb052dfa58fbc35153600a9ac74b95 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Sun, 24 Dec 2017 15:38:10 +0800 Subject: [PATCH 07/12] Update HadoopQueryEngine --- .../edu/tsinghua/tsfile/timeseries/read/RecordReader.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index a2e201d5..00ee962e 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -38,6 +38,7 @@ public RecordReader(ITsRandomAccessFileReader raf, List rowGro this.fileReader = new FileReader(raf, rowGroupMetaDataList); } + //modified by hadoop /** * Read one path without filter. * @@ -50,7 +51,11 @@ public RecordReader(ITsRandomAccessFileReader raf, List rowGro */ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fetchSize , String deltaObjectUID, String measurementUID) throws IOException { - checkSeries(deltaObjectUID, measurementUID); + try { + checkSeries(deltaObjectUID, measurementUID); + }catch(IOException ex){ + return null; + } List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID); int i = 0; if (res != null) { From 1861aaf9aa827d5eac38278f54a6d727dbe3da33 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Tue, 26 Dec 2017 17:24:46 +0800 Subject: [PATCH 08/12] Update HadoopQueryEngine --- .../tsfile/timeseries/read/RecordReader.java | 7 +++++- .../read/query/IteratorQueryDataSet.java | 24 ++++++++++++++----- .../timeseries/read/query/QueryDataSet.java | 4 ++-- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index 00ee962e..acc64e42 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -54,7 +54,10 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe try { checkSeries(deltaObjectUID, measurementUID); }catch(IOException ex){ - return null; + if(res == null)res = new DynamicOneColumnData(); + res.dataType = fileReader.getDataTypeBySeriesName(deltaObjectUID, measurementUID); + return res; +// fileReader.getFileMetaData().getJsonMetaData(); } List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID); int i = 0; @@ -69,6 +72,8 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe break; } } +// if(i >= rowGroupReaderList.size())return res; +// res = getValueInOneColumn(res, fetchSize, rowGroupReaderList.get(i), measurementUID); return res; } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java index 2130b324..d2247d40 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java @@ -7,10 +7,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.PriorityQueue; +import java.util.*; public abstract class IteratorQueryDataSet extends QueryDataSet { private static final Logger logger = LoggerFactory.getLogger(IteratorQueryDataSet.class); @@ -36,14 +33,28 @@ public IteratorQueryDataSet(List paths) throws IOException { public abstract DynamicOneColumnData getMoreRecordsForOneColumn(Path colName , DynamicOneColumnData res) throws IOException; + //modified by hadoop public void initForRecord() { - heap = new PriorityQueue<>(retMap.size()); + size = retMap.size(); + heap = new PriorityQueue<>(size); + if (size > 0) { + deltaObjectIds = new String[size]; + measurementIds = new String[size]; + } else { + LOG.error("QueryDataSet init row record occurs error! the size of ret is 0."); + } + + int i = 0; for (Path p : retMap.keySet()) { + deltaObjectIds[i] = p.getDeltaObjectToString(); + measurementIds[i] = p.getMeasurementToString(); + DynamicOneColumnData res = retMap.get(p); if (res != null && res.curIdx < res.valueLength) { heapPut(res.getTime(res.curIdx)); } + i++; } } @@ -58,6 +69,7 @@ public boolean hasNextRecord() { return false; } + //modified by hadoop public RowRecord getNextRecord() { if (!ifInit) { initForRecord(); @@ -94,7 +106,7 @@ public RowRecord getNextRecord() { heapPut(res.getTime(res.curIdx)); } } else { - f = new Field(res.dataType, p.getMeasurementToString()); + f = new Field(res.dataType, p.getDeltaObjectToString(), p.getMeasurementToString()); f.setNull(true); } r.addField(f); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java index edaff268..b3643c9f 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java @@ -12,8 +12,8 @@ import java.util.PriorityQueue; public class QueryDataSet { - private static final Logger LOG = LoggerFactory.getLogger(QueryDataSet.class); - private static final char PATH_SPLITTER = '.'; + protected static final Logger LOG = LoggerFactory.getLogger(QueryDataSet.class); + protected static final char PATH_SPLITTER = '.'; /** * Time Generator for Cross Query when using batching read From 182a08c046b32af73e989b1fbe9595b696706a1b Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Thu, 28 Dec 2017 10:18:25 +0800 Subject: [PATCH 09/12] deal with conflicts --- .../tsfile/timeseries/read/FileReader.java | 4 +-- .../tsfile/timeseries/read/RecordReader.java | 35 ++++++++++++++++++- .../read/query/HadoopQueryEngine.java | 2 +- 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java index ac0cc012..1254b193 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java @@ -64,7 +64,6 @@ public FileReader(ITsRandomAccessFileReader reader, List rowGr this.rwLock = new ReentrantReadWriteLock(); this.rowGroupReaderLRUList = new LinkedList<>(); initFromRowGroupMetadataList(rowGroupMetaDataList); - init(); } /** @@ -79,7 +78,7 @@ private void init() throws IOException { int fileMetaDataLength = randomAccessFileReader.readInt(); randomAccessFileReader.seek(l - MAGIC_LENGTH - FOOTER_LENGTH - fileMetaDataLength); byte[] buf = new byte[fileMetaDataLength]; - randomAccessFileReader.read(buf, 0, buf.length);//FIXE is this a potential bug?M0 + randomAccessFileReader.read(buf, 0, buf.length);//FIXME is this a potential bug? ByteArrayInputStream bais = new ByteArrayInputStream(buf); this.fileMetaData = new TsFileMetaDataConverter().toTsFileMetadata(ReadWriteThriftFormatUtils.readFileMetaData(bais)); @@ -300,6 +299,7 @@ public TsFileMetaData getFileMetaData() { return this.fileMetaData; } + //used by hadoop public List getSortedRowGroupMetaDataList() throws IOException{ List rowGroupMetaDataList = new ArrayList<>(); Collection deltaObjects = fileMetaData.getDeltaObjectMap().keySet(); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index acc64e42..870afc64 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -38,7 +38,6 @@ public RecordReader(ITsRandomAccessFileReader raf, List rowGro this.fileReader = new FileReader(raf, rowGroupMetaDataList); } - //modified by hadoop /** * Read one path without filter. * @@ -51,6 +50,40 @@ public RecordReader(ITsRandomAccessFileReader raf, List rowGro */ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fetchSize , String deltaObjectUID, String measurementUID) throws IOException { + + checkSeries(deltaObjectUID, measurementUID); + + List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID); + int i = 0; + if (res != null) { + i = res.getRowGroupIndex(); + } + for (; i < rowGroupReaderList.size(); i++) { + RowGroupReader rowGroupReader = rowGroupReaderList.get(i); + res = getValueInOneColumn(res, fetchSize, rowGroupReader, measurementUID); + if (res.valueLength >= fetchSize) { + res.hasReadAll = false; + break; + } + } + if(i >= rowGroupReaderList.size())return res; + res = getValueInOneColumn(res, fetchSize, rowGroupReaderList.get(i), measurementUID); + return res; + } + + //used by hadoop + /** + * Read one path without filter. + * + * @param res the iterative result + * @param fetchSize fetch size + * @param deltaObjectUID delta object id + * @param measurementUID measurement Id + * @return the result in means of DynamicOneColumnData + * @throws IOException TsFile read error + */ + public DynamicOneColumnData getValueInOneColumnWithoutException(DynamicOneColumnData res, int fetchSize + , String deltaObjectUID, String measurementUID) throws IOException { try { checkSeries(deltaObjectUID, measurementUID); }catch(IOException ex){ diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java index fb3e71e9..ada52f77 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java @@ -72,7 +72,7 @@ private QueryDataSet queryWithoutFilter(List paths) throws IOException { return new IteratorQueryDataSet(paths) { @Override public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException { - return recordReader.getValueInOneColumn(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString()); + return recordReader.getValueInOneColumnWithoutException(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString()); } }; } From 7a56314e4ae9a2fe84ec2cc8f9eef46fecbfd550 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Thu, 28 Dec 2017 11:51:20 +0800 Subject: [PATCH 10/12] deal with conflicts --- .../tsfile/timeseries/read/FileReader.java | 4 +++ .../tsfile/timeseries/read/RecordReader.java | 30 +++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java index 1254b193..de7834bd 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java @@ -141,6 +141,10 @@ public List getRowGroupReaderListByDeltaObject(String deltaObjec return this.rowGroupReaderMap.get(deltaObjectUID); } + public List getRowGroupReaderListByDeltaObjectByHadoop(String deltaObjectUID) throws IOException { + return this.rowGroupReaderMap.get(deltaObjectUID); + } + public TSDataType getDataTypeBySeriesName(String deltaObject, String measurement) throws IOException { loadDeltaObj(deltaObject); List rgrList = getRowGroupReaderMap().get(deltaObject); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index 49ec29f0..9214f1ee 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -71,9 +71,8 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe return res; } - //used by hadoop /** - * Read one path without filter. + * Read one path without filter and do not throw exceptino. Used by hadoop. * * @param res the iterative result * @param fetchSize fetch size @@ -85,14 +84,13 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe public DynamicOneColumnData getValueInOneColumnWithoutException(DynamicOneColumnData res, int fetchSize , String deltaObjectUID, String measurementUID) throws IOException { try { - checkSeries(deltaObjectUID, measurementUID); + checkSeriesByHadoop(deltaObjectUID, measurementUID); }catch(IOException ex){ if(res == null)res = new DynamicOneColumnData(); - res.dataType = fileReader.getDataTypeBySeriesName(deltaObjectUID, measurementUID); + res.dataType = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID).get(0).getDataTypeBySeriesName(measurementUID); return res; -// fileReader.getFileMetaData().getJsonMetaData(); } - List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID); + List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObjectByHadoop(deltaObjectUID); int i = 0; if (res != null) { i = res.getRowGroupIndex(); @@ -413,6 +411,26 @@ private void checkSeries(String deltaObject, String measurement) throws IOExcept throw new IOException("Series is not exist in current file: " + deltaObject + "#" + measurement); } + private void checkSeriesByHadoop(String deltaObject, String measurement) throws IOException { + if (seriesSchemaMap == null) { + seriesSchemaMap = new HashMap<>(); + Map> seriesSchemaListMap = getAllSeriesSchemasGroupByDeltaObject(); + for (String key : seriesSchemaListMap.keySet()) { + HashMap tmap = new HashMap<>(); + for (SeriesSchema ss : seriesSchemaListMap.get(key)) { + tmap.put(ss.name, ss); + } + seriesSchemaMap.put(key, tmap); + } + } + if (seriesSchemaMap.containsKey(deltaObject)) { + if (seriesSchemaMap.get(deltaObject).containsKey(measurement)) { + return; + } + } + throw new IOException("Series is not exist in current file: " + deltaObject + "#" + measurement); + } + public List getAllRowGroupReaders() { return fileReader.getRowGroupReaderList(); } From c01e697541213df09eac90d53e4c5af1fab3e463 Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Fri, 29 Dec 2017 01:08:18 +0800 Subject: [PATCH 11/12] deal with conflicts --- .../cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index 9214f1ee..c8e56b87 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -66,8 +66,6 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe break; } } - if(i >= rowGroupReaderList.size())return res; - res = getValueInOneColumn(res, fetchSize, rowGroupReaderList.get(i), measurementUID); return res; } From 7039b7f19d918708f9d2ac984247187a74383fef Mon Sep 17 00:00:00 2001 From: East <952945925@qq.com> Date: Fri, 29 Dec 2017 06:18:48 +0800 Subject: [PATCH 12/12] deal with conflicts --- .../cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index c8e56b87..f8f6d2d6 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -101,8 +101,6 @@ public DynamicOneColumnData getValueInOneColumnWithoutException(DynamicOneColumn break; } } -// if(i >= rowGroupReaderList.size())return res; -// res = getValueInOneColumn(res, fetchSize, rowGroupReaderList.get(i), measurementUID); return res; }