diff --git a/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java b/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java index 04eb8cf4..18319b5a 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/file/metadata/TsFileMetaData.java @@ -155,7 +155,7 @@ public void convertToTSF(FileMetaData metadataInThrift) { deltaObjectMap = new HashMap<>(); for (Map.Entry entry : metadataInThrift.getDelta_object_map().entrySet()){ DeltaObject object = entry.getValue(); - deltaObjectMap.put(entry.getKey(), new TsDeltaObject(object.getOffset(), + deltaObjectMap.put(entry.getKey(), new TsDeltaObject(object.getOffset(), object.getMetadata_block_size(), object.getStart_time(), object.getEnd_time())); } } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java index 367051ab..de7834bd 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/FileReader.java @@ -59,7 +59,7 @@ public FileReader(ITsRandomAccessFileReader raf) throws IOException { * @param reader * @param rowGroupMetaDataList */ - public FileReader(ITsRandomAccessFileReader reader, List rowGroupMetaDataList) { + public FileReader(ITsRandomAccessFileReader reader, List rowGroupMetaDataList) throws IOException { this.randomAccessFileReader = reader; this.rwLock = new ReentrantReadWriteLock(); this.rowGroupReaderLRUList = new LinkedList<>(); @@ -141,6 +141,10 @@ public List getRowGroupReaderListByDeltaObject(String deltaObjec return this.rowGroupReaderMap.get(deltaObjectUID); } + public List getRowGroupReaderListByDeltaObjectByHadoop(String deltaObjectUID) throws IOException { + return this.rowGroupReaderMap.get(deltaObjectUID); + } + public TSDataType getDataTypeBySeriesName(String deltaObject, String measurement) throws IOException { loadDeltaObj(deltaObject); List rgrList = getRowGroupReaderMap().get(deltaObject); @@ -299,4 +303,33 @@ public TsFileMetaData getFileMetaData() { return this.fileMetaData; } + //used by hadoop + public List getSortedRowGroupMetaDataList() throws IOException{ + List rowGroupMetaDataList = new ArrayList<>(); + Collection deltaObjects = fileMetaData.getDeltaObjectMap().keySet(); + for (String deltaObjectID : deltaObjects) { + this.rwLock.writeLock().lock(); + try { + TsDeltaObject deltaObj = this.fileMetaData.getDeltaObject(deltaObjectID); + TsRowGroupBlockMetaData blockMeta = new TsRowGroupBlockMetaData(); + blockMeta.convertToTSF(ReadWriteThriftFormatUtils.readRowGroupBlockMetaData(this.randomAccessFileReader, + deltaObj.offset, deltaObj.metadataBlockSize)); + rowGroupMetaDataList.addAll(blockMeta.getRowGroups()); + } finally { + this.rwLock.writeLock().unlock(); + } + } + + Comparator comparator = new Comparator() { + @Override + public int compare(RowGroupMetaData o1, RowGroupMetaData o2) { + + return Long.signum(o1.getMetaDatas().get(0).getProperties().getFileOffset() - o2.getMetaDatas().get(0).getProperties().getFileOffset()); + } + + }; + rowGroupMetaDataList.sort(comparator); + return rowGroupMetaDataList; + } + } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java index 9f9820ef..f8f6d2d6 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/RecordReader.java @@ -2,6 +2,7 @@ import cn.edu.tsinghua.tsfile.common.exception.UnSupportedDataTypeException; import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; +import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterFactory; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; @@ -32,6 +33,11 @@ public RecordReader(ITsRandomAccessFileReader raf) throws IOException { this.fileReader = new FileReader(raf); } + //for hadoop-connector + public RecordReader(ITsRandomAccessFileReader raf, List rowGroupMetaDataList) throws IOException { + this.fileReader = new FileReader(raf, rowGroupMetaDataList); + } + /** * Read one path without filter. * @@ -44,7 +50,9 @@ public RecordReader(ITsRandomAccessFileReader raf) throws IOException { */ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fetchSize , String deltaObjectUID, String measurementUID) throws IOException { + checkSeries(deltaObjectUID, measurementUID); + List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID); int i = 0; if (res != null) { @@ -61,6 +69,41 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe return res; } + /** + * Read one path without filter and do not throw exceptino. Used by hadoop. + * + * @param res the iterative result + * @param fetchSize fetch size + * @param deltaObjectUID delta object id + * @param measurementUID measurement Id + * @return the result in means of DynamicOneColumnData + * @throws IOException TsFile read error + */ + public DynamicOneColumnData getValueInOneColumnWithoutException(DynamicOneColumnData res, int fetchSize + , String deltaObjectUID, String measurementUID) throws IOException { + try { + checkSeriesByHadoop(deltaObjectUID, measurementUID); + }catch(IOException ex){ + if(res == null)res = new DynamicOneColumnData(); + res.dataType = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID).get(0).getDataTypeBySeriesName(measurementUID); + return res; + } + List rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObjectByHadoop(deltaObjectUID); + int i = 0; + if (res != null) { + i = res.getRowGroupIndex(); + } + for (; i < rowGroupReaderList.size(); i++) { + RowGroupReader rowGroupReader = rowGroupReaderList.get(i); + res = getValueInOneColumn(res, fetchSize, rowGroupReader, measurementUID); + if (res.valueLength >= fetchSize) { + res.hasReadAll = false; + break; + } + } + return res; + } + private DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fetchSize, RowGroupReader rowGroupReader, String measurementId) throws IOException { return rowGroupReader.getValueReaders().get(measurementId).readOneColumn(res, fetchSize); @@ -364,6 +407,26 @@ private void checkSeries(String deltaObject, String measurement) throws IOExcept throw new IOException("Series is not exist in current file: " + deltaObject + "#" + measurement); } + private void checkSeriesByHadoop(String deltaObject, String measurement) throws IOException { + if (seriesSchemaMap == null) { + seriesSchemaMap = new HashMap<>(); + Map> seriesSchemaListMap = getAllSeriesSchemasGroupByDeltaObject(); + for (String key : seriesSchemaListMap.keySet()) { + HashMap tmap = new HashMap<>(); + for (SeriesSchema ss : seriesSchemaListMap.get(key)) { + tmap.put(ss.name, ss); + } + seriesSchemaMap.put(key, tmap); + } + } + if (seriesSchemaMap.containsKey(deltaObject)) { + if (seriesSchemaMap.get(deltaObject).containsKey(measurement)) { + return; + } + } + throw new IOException("Series is not exist in current file: " + deltaObject + "#" + measurement); + } + public List getAllRowGroupReaders() { return fileReader.getRowGroupReaderList(); } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java new file mode 100644 index 00000000..ada52f77 --- /dev/null +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/HadoopQueryEngine.java @@ -0,0 +1,125 @@ +package cn.edu.tsinghua.tsfile.timeseries.read.query; + +import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; +import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; +import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; +import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData; +import cn.edu.tsinghua.tsfile.timeseries.filter.definition.CrossSeriesFilterExpression; +import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression; +import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; +import cn.edu.tsinghua.tsfile.timeseries.read.RecordReader; +import cn.edu.tsinghua.tsfile.timeseries.read.support.Path; +import cn.edu.tsinghua.tsfile.timeseries.write.desc.MeasurementDescriptor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class HadoopQueryEngine extends QueryEngine { + + private static final String SEPARATOR_DEVIDE_SERIES = "."; + private List rowGroupMetaDataList; + + public HadoopQueryEngine(ITsRandomAccessFileReader raf, List rowGroupMetaDataList) throws IOException { + super(raf, rowGroupMetaDataList); + this.rowGroupMetaDataList = rowGroupMetaDataList; + } + + private List initDeviceIdList() { + Set deviceIdSet = new HashSet<>(); + for (RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) { + deviceIdSet.add(rowGroupMetaData.getDeltaObjectID()); + } + return new ArrayList<>(deviceIdSet); + } + + private List initSensorIdList(){ + Set sensorIdSet = new HashSet<>(); + for(RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) { + for(TimeSeriesChunkMetaData timeSeriesChunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()){ + sensorIdSet.add(timeSeriesChunkMetaData.getProperties().getMeasurementUID()); + } + } + return new ArrayList<>(sensorIdSet); + } + + public QueryDataSet queryWithSpecificRowGroups(List deviceIdList, List sensorIdList, FilterExpression timeFilter, FilterExpression freqFilter, FilterExpression valueFilter) throws IOException{ + if(deviceIdList == null)deviceIdList = initDeviceIdList(); + if(sensorIdList == null)sensorIdList = initSensorIdList(); + + List paths = new ArrayList<>(); + for(String deviceId : deviceIdList){ + for(String sensorId: sensorIdList){ + paths.add(new Path(deviceId + SEPARATOR_DEVIDE_SERIES + sensorId)); + } + } + + if (timeFilter == null && freqFilter == null && valueFilter == null) { + return queryWithoutFilter(paths); + } else if (valueFilter instanceof SingleSeriesFilterExpression || (timeFilter != null && valueFilter == null)) { + return readOneColumnValueUseFilter(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, + (SingleSeriesFilterExpression) valueFilter); + } else if (valueFilter instanceof CrossSeriesFilterExpression) { + return crossColumnQuery(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter, + (CrossSeriesFilterExpression) valueFilter); + } + throw new IOException("Query Not Support Exception"); + } + + private QueryDataSet queryWithoutFilter(List paths) throws IOException { + return new IteratorQueryDataSet(paths) { + @Override + public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException { + return recordReader.getValueInOneColumnWithoutException(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString()); + } + }; + } + + private QueryDataSet readOneColumnValueUseFilter(List paths, SingleSeriesFilterExpression timeFilter, + SingleSeriesFilterExpression freqFilter, SingleSeriesFilterExpression valueFilter) throws IOException { + logger.debug("start read one column data with filter"); + + return new IteratorQueryDataSet(paths) { + @Override + public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException { + return recordReader.getValuesUseFilter(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString() + , timeFilter, freqFilter, valueFilter); + } + }; + } + + private QueryDataSet crossColumnQuery(List paths, SingleSeriesFilterExpression timeFilter, SingleSeriesFilterExpression freqFilter, + CrossSeriesFilterExpression valueFilter) throws IOException { + CrossQueryTimeGenerator timeQueryDataSet = new CrossQueryTimeGenerator(timeFilter, freqFilter, valueFilter, FETCH_SIZE) { + @Override + public DynamicOneColumnData getDataInNextBatch(DynamicOneColumnData res, int fetchSize, + SingleSeriesFilterExpression valueFilter, int valueFilterNumber) throws ProcessorException, IOException { + return recordReader.getValuesUseFilter(res, fetchSize, valueFilter); + } + }; + + return new CrossQueryIteratorDataSet(timeQueryDataSet) { + @Override + public boolean getMoreRecords() throws IOException { + try { + long[] timeRet = crossQueryTimeGenerator.generateTimes(); + if (timeRet.length == 0) { + return true; + } + for (Path p : paths) { + String deltaObjectUID = p.getDeltaObjectToString(); + String measurementUID = p.getMeasurementToString(); + DynamicOneColumnData oneColDataList = recordReader.getValuesUseTimestamps(deltaObjectUID, measurementUID, timeRet); + mapRet.put(p.getFullPath(), oneColDataList); + } + + } catch (ProcessorException e) { + throw new IOException(e.getMessage()); + } + return false; + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java index 2130b324..d2247d40 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/IteratorQueryDataSet.java @@ -7,10 +7,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.PriorityQueue; +import java.util.*; public abstract class IteratorQueryDataSet extends QueryDataSet { private static final Logger logger = LoggerFactory.getLogger(IteratorQueryDataSet.class); @@ -36,14 +33,28 @@ public IteratorQueryDataSet(List paths) throws IOException { public abstract DynamicOneColumnData getMoreRecordsForOneColumn(Path colName , DynamicOneColumnData res) throws IOException; + //modified by hadoop public void initForRecord() { - heap = new PriorityQueue<>(retMap.size()); + size = retMap.size(); + heap = new PriorityQueue<>(size); + if (size > 0) { + deltaObjectIds = new String[size]; + measurementIds = new String[size]; + } else { + LOG.error("QueryDataSet init row record occurs error! the size of ret is 0."); + } + + int i = 0; for (Path p : retMap.keySet()) { + deltaObjectIds[i] = p.getDeltaObjectToString(); + measurementIds[i] = p.getMeasurementToString(); + DynamicOneColumnData res = retMap.get(p); if (res != null && res.curIdx < res.valueLength) { heapPut(res.getTime(res.curIdx)); } + i++; } } @@ -58,6 +69,7 @@ public boolean hasNextRecord() { return false; } + //modified by hadoop public RowRecord getNextRecord() { if (!ifInit) { initForRecord(); @@ -94,7 +106,7 @@ public RowRecord getNextRecord() { heapPut(res.getTime(res.curIdx)); } } else { - f = new Field(res.dataType, p.getMeasurementToString()); + f = new Field(res.dataType, p.getDeltaObjectToString(), p.getMeasurementToString()); f.setNull(true); } r.addField(f); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java index 1f894dce..b3643c9f 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryDataSet.java @@ -12,8 +12,8 @@ import java.util.PriorityQueue; public class QueryDataSet { - private static final Logger LOG = LoggerFactory.getLogger(QueryDataSet.class); - private static final char PATH_SPLITTER = '.'; + protected static final Logger LOG = LoggerFactory.getLogger(QueryDataSet.class); + protected static final char PATH_SPLITTER = '.'; /** * Time Generator for Cross Query when using batching read @@ -186,6 +186,10 @@ public boolean next() { } public RowRecord getCurrentRecord() { + if (!ifInit) { + initForRecord(); + ifInit = true; + } return currentRecord; } diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java index 752684b2..6f62d1f5 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/read/query/QueryEngine.java @@ -3,6 +3,7 @@ import cn.edu.tsinghua.tsfile.common.constant.QueryConstant; import cn.edu.tsinghua.tsfile.common.exception.ProcessorException; import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader; +import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.CrossSeriesFilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression; import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression; @@ -22,9 +23,9 @@ import java.util.Map; public class QueryEngine { - private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); - private static int FETCH_SIZE = 20000; - private RecordReader recordReader; + protected static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); + protected static int FETCH_SIZE = 20000; + protected RecordReader recordReader; public QueryEngine(ITsRandomAccessFileReader raf) throws IOException { recordReader = new RecordReader(raf); @@ -35,6 +36,11 @@ public QueryEngine(ITsRandomAccessFileReader raf, int fetchSize) throws IOExcept FETCH_SIZE = fetchSize; } + //for hadoop-connector + public QueryEngine(ITsRandomAccessFileReader raf, List rowGroupMetaDataList) throws IOException { + recordReader = new RecordReader(raf, rowGroupMetaDataList); + } + public static QueryDataSet query(QueryConfig config, String fileName) throws IOException { TsRandomAccessLocalFileReader raf = new TsRandomAccessLocalFileReader(fileName); QueryEngine queryEngine = new QueryEngine(raf); diff --git a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java index 884b9bb1..1703e1de 100644 --- a/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java +++ b/src/main/java/cn/edu/tsinghua/tsfile/timeseries/write/TsFileWriter.java @@ -174,7 +174,7 @@ public void write(TSRecord record) throws IOException, WriteProcessException { * @see cn.edu.tsinghua.tsfile.timeseries.write.series.IRowGroupWriter#getDataInMemory(String) * @param deltaObjectId deltaObject id * @param measurementId measurement id - * @return fist object is the current page data, second object is the all pages which is packaged + * @return first object is the current page data, second object is the all pages which is packaged */ public List getDataInMemory(String deltaObjectId, String measurementId) { if (groupWriters.get(deltaObjectId) == null) {