Skip to content

Commit

Permalink
Merge pull request #125 from thulab/modify-for-hadoop-connector
Browse files Browse the repository at this point in the history
Modify the tsfile for hadoop connector
  • Loading branch information
xingtanzjr committed Dec 29, 2017
2 parents 6328bec + 7039b7f commit 6b3b04b
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void convertToTSF(FileMetaData metadataInThrift) {
deltaObjectMap = new HashMap<>();
for (Map.Entry<String, DeltaObject> entry : metadataInThrift.getDelta_object_map().entrySet()){
DeltaObject object = entry.getValue();
deltaObjectMap.put(entry.getKey(), new TsDeltaObject(object.getOffset(),
deltaObjectMap.put(entry.getKey(), new TsDeltaObject(object.getOffset(),
object.getMetadata_block_size(), object.getStart_time(), object.getEnd_time()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public FileReader(ITsRandomAccessFileReader raf) throws IOException {
* @param reader
* @param rowGroupMetaDataList
*/
public FileReader(ITsRandomAccessFileReader reader, List<RowGroupMetaData> rowGroupMetaDataList) {
public FileReader(ITsRandomAccessFileReader reader, List<RowGroupMetaData> rowGroupMetaDataList) throws IOException {
this.randomAccessFileReader = reader;
this.rwLock = new ReentrantReadWriteLock();
this.rowGroupReaderLRUList = new LinkedList<>();
Expand Down Expand Up @@ -141,6 +141,10 @@ public List<RowGroupReader> getRowGroupReaderListByDeltaObject(String deltaObjec
return this.rowGroupReaderMap.get(deltaObjectUID);
}

public List<RowGroupReader> getRowGroupReaderListByDeltaObjectByHadoop(String deltaObjectUID) throws IOException {
return this.rowGroupReaderMap.get(deltaObjectUID);
}

public TSDataType getDataTypeBySeriesName(String deltaObject, String measurement) throws IOException {
loadDeltaObj(deltaObject);
List<RowGroupReader> rgrList = getRowGroupReaderMap().get(deltaObject);
Expand Down Expand Up @@ -299,4 +303,33 @@ public TsFileMetaData getFileMetaData() {
return this.fileMetaData;
}

//used by hadoop
public List<RowGroupMetaData> getSortedRowGroupMetaDataList() throws IOException{
List<RowGroupMetaData> rowGroupMetaDataList = new ArrayList<>();
Collection<String> deltaObjects = fileMetaData.getDeltaObjectMap().keySet();
for (String deltaObjectID : deltaObjects) {
this.rwLock.writeLock().lock();
try {
TsDeltaObject deltaObj = this.fileMetaData.getDeltaObject(deltaObjectID);
TsRowGroupBlockMetaData blockMeta = new TsRowGroupBlockMetaData();
blockMeta.convertToTSF(ReadWriteThriftFormatUtils.readRowGroupBlockMetaData(this.randomAccessFileReader,
deltaObj.offset, deltaObj.metadataBlockSize));
rowGroupMetaDataList.addAll(blockMeta.getRowGroups());
} finally {
this.rwLock.writeLock().unlock();
}
}

Comparator<RowGroupMetaData> comparator = new Comparator<RowGroupMetaData>() {
@Override
public int compare(RowGroupMetaData o1, RowGroupMetaData o2) {

return Long.signum(o1.getMetaDatas().get(0).getProperties().getFileOffset() - o2.getMetaDatas().get(0).getProperties().getFileOffset());
}

};
rowGroupMetaDataList.sort(comparator);
return rowGroupMetaDataList;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import cn.edu.tsinghua.tsfile.common.exception.UnSupportedDataTypeException;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterFactory;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
Expand Down Expand Up @@ -32,6 +33,11 @@ public RecordReader(ITsRandomAccessFileReader raf) throws IOException {
this.fileReader = new FileReader(raf);
}

//for hadoop-connector
public RecordReader(ITsRandomAccessFileReader raf, List<RowGroupMetaData> rowGroupMetaDataList) throws IOException {
this.fileReader = new FileReader(raf, rowGroupMetaDataList);
}

/**
* Read one path without filter.
*
Expand All @@ -44,7 +50,9 @@ public RecordReader(ITsRandomAccessFileReader raf) throws IOException {
*/
public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fetchSize
, String deltaObjectUID, String measurementUID) throws IOException {

checkSeries(deltaObjectUID, measurementUID);

List<RowGroupReader> rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID);
int i = 0;
if (res != null) {
Expand All @@ -61,6 +69,41 @@ public DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fe
return res;
}

/**
* Read one path without filter and do not throw exceptino. Used by hadoop.
*
* @param res the iterative result
* @param fetchSize fetch size
* @param deltaObjectUID delta object id
* @param measurementUID measurement Id
* @return the result in means of DynamicOneColumnData
* @throws IOException TsFile read error
*/
public DynamicOneColumnData getValueInOneColumnWithoutException(DynamicOneColumnData res, int fetchSize
, String deltaObjectUID, String measurementUID) throws IOException {
try {
checkSeriesByHadoop(deltaObjectUID, measurementUID);
}catch(IOException ex){
if(res == null)res = new DynamicOneColumnData();
res.dataType = fileReader.getRowGroupReaderListByDeltaObject(deltaObjectUID).get(0).getDataTypeBySeriesName(measurementUID);
return res;
}
List<RowGroupReader> rowGroupReaderList = fileReader.getRowGroupReaderListByDeltaObjectByHadoop(deltaObjectUID);
int i = 0;
if (res != null) {
i = res.getRowGroupIndex();
}
for (; i < rowGroupReaderList.size(); i++) {
RowGroupReader rowGroupReader = rowGroupReaderList.get(i);
res = getValueInOneColumn(res, fetchSize, rowGroupReader, measurementUID);
if (res.valueLength >= fetchSize) {
res.hasReadAll = false;
break;
}
}
return res;
}

private DynamicOneColumnData getValueInOneColumn(DynamicOneColumnData res, int fetchSize,
RowGroupReader rowGroupReader, String measurementId) throws IOException {
return rowGroupReader.getValueReaders().get(measurementId).readOneColumn(res, fetchSize);
Expand Down Expand Up @@ -364,6 +407,26 @@ private void checkSeries(String deltaObject, String measurement) throws IOExcept
throw new IOException("Series is not exist in current file: " + deltaObject + "#" + measurement);
}

private void checkSeriesByHadoop(String deltaObject, String measurement) throws IOException {
if (seriesSchemaMap == null) {
seriesSchemaMap = new HashMap<>();
Map<String, ArrayList<SeriesSchema>> seriesSchemaListMap = getAllSeriesSchemasGroupByDeltaObject();
for (String key : seriesSchemaListMap.keySet()) {
HashMap<String, SeriesSchema> tmap = new HashMap<>();
for (SeriesSchema ss : seriesSchemaListMap.get(key)) {
tmap.put(ss.name, ss);
}
seriesSchemaMap.put(key, tmap);
}
}
if (seriesSchemaMap.containsKey(deltaObject)) {
if (seriesSchemaMap.get(deltaObject).containsKey(measurement)) {
return;
}
}
throw new IOException("Series is not exist in current file: " + deltaObject + "#" + measurement);
}

public List<RowGroupReader> getAllRowGroupReaders() {
return fileReader.getRowGroupReaderList();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package cn.edu.tsinghua.tsfile.timeseries.read.query;

import cn.edu.tsinghua.tsfile.common.exception.ProcessorException;
import cn.edu.tsinghua.tsfile.common.utils.ITsRandomAccessFileReader;
import cn.edu.tsinghua.tsfile.file.metadata.RowGroupMetaData;
import cn.edu.tsinghua.tsfile.file.metadata.TimeSeriesChunkMetaData;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.CrossSeriesFilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.SingleSeriesFilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.read.RecordReader;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import cn.edu.tsinghua.tsfile.timeseries.write.desc.MeasurementDescriptor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class HadoopQueryEngine extends QueryEngine {

private static final String SEPARATOR_DEVIDE_SERIES = ".";
private List<RowGroupMetaData> rowGroupMetaDataList;

public HadoopQueryEngine(ITsRandomAccessFileReader raf, List<RowGroupMetaData> rowGroupMetaDataList) throws IOException {
super(raf, rowGroupMetaDataList);
this.rowGroupMetaDataList = rowGroupMetaDataList;
}

private List<String> initDeviceIdList() {
Set<String> deviceIdSet = new HashSet<>();
for (RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) {
deviceIdSet.add(rowGroupMetaData.getDeltaObjectID());
}
return new ArrayList<>(deviceIdSet);
}

private List<String> initSensorIdList(){
Set<String> sensorIdSet = new HashSet<>();
for(RowGroupMetaData rowGroupMetaData : rowGroupMetaDataList) {
for(TimeSeriesChunkMetaData timeSeriesChunkMetaData : rowGroupMetaData.getTimeSeriesChunkMetaDataList()){
sensorIdSet.add(timeSeriesChunkMetaData.getProperties().getMeasurementUID());
}
}
return new ArrayList<>(sensorIdSet);
}

public QueryDataSet queryWithSpecificRowGroups(List<String> deviceIdList, List<String> sensorIdList, FilterExpression timeFilter, FilterExpression freqFilter, FilterExpression valueFilter) throws IOException{
if(deviceIdList == null)deviceIdList = initDeviceIdList();
if(sensorIdList == null)sensorIdList = initSensorIdList();

List<Path> paths = new ArrayList<>();
for(String deviceId : deviceIdList){
for(String sensorId: sensorIdList){
paths.add(new Path(deviceId + SEPARATOR_DEVIDE_SERIES + sensorId));
}
}

if (timeFilter == null && freqFilter == null && valueFilter == null) {
return queryWithoutFilter(paths);
} else if (valueFilter instanceof SingleSeriesFilterExpression || (timeFilter != null && valueFilter == null)) {
return readOneColumnValueUseFilter(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter,
(SingleSeriesFilterExpression) valueFilter);
} else if (valueFilter instanceof CrossSeriesFilterExpression) {
return crossColumnQuery(paths, (SingleSeriesFilterExpression) timeFilter, (SingleSeriesFilterExpression) freqFilter,
(CrossSeriesFilterExpression) valueFilter);
}
throw new IOException("Query Not Support Exception");
}

private QueryDataSet queryWithoutFilter(List<Path> paths) throws IOException {
return new IteratorQueryDataSet(paths) {
@Override
public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException {
return recordReader.getValueInOneColumnWithoutException(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString());
}
};
}

private QueryDataSet readOneColumnValueUseFilter(List<Path> paths, SingleSeriesFilterExpression timeFilter,
SingleSeriesFilterExpression freqFilter, SingleSeriesFilterExpression valueFilter) throws IOException {
logger.debug("start read one column data with filter");

return new IteratorQueryDataSet(paths) {
@Override
public DynamicOneColumnData getMoreRecordsForOneColumn(Path p, DynamicOneColumnData res) throws IOException {
return recordReader.getValuesUseFilter(res, FETCH_SIZE, p.getDeltaObjectToString(), p.getMeasurementToString()
, timeFilter, freqFilter, valueFilter);
}
};
}

private QueryDataSet crossColumnQuery(List<Path> paths, SingleSeriesFilterExpression timeFilter, SingleSeriesFilterExpression freqFilter,
CrossSeriesFilterExpression valueFilter) throws IOException {
CrossQueryTimeGenerator timeQueryDataSet = new CrossQueryTimeGenerator(timeFilter, freqFilter, valueFilter, FETCH_SIZE) {
@Override
public DynamicOneColumnData getDataInNextBatch(DynamicOneColumnData res, int fetchSize,
SingleSeriesFilterExpression valueFilter, int valueFilterNumber) throws ProcessorException, IOException {
return recordReader.getValuesUseFilter(res, fetchSize, valueFilter);
}
};

return new CrossQueryIteratorDataSet(timeQueryDataSet) {
@Override
public boolean getMoreRecords() throws IOException {
try {
long[] timeRet = crossQueryTimeGenerator.generateTimes();
if (timeRet.length == 0) {
return true;
}
for (Path p : paths) {
String deltaObjectUID = p.getDeltaObjectToString();
String measurementUID = p.getMeasurementToString();
DynamicOneColumnData oneColDataList = recordReader.getValuesUseTimestamps(deltaObjectUID, measurementUID, timeRet);
mapRet.put(p.getFullPath(), oneColDataList);
}

} catch (ProcessorException e) {
throw new IOException(e.getMessage());
}
return false;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.PriorityQueue;
import java.util.*;

public abstract class IteratorQueryDataSet extends QueryDataSet {
private static final Logger logger = LoggerFactory.getLogger(IteratorQueryDataSet.class);
Expand All @@ -36,14 +33,28 @@ public IteratorQueryDataSet(List<Path> paths) throws IOException {
public abstract DynamicOneColumnData getMoreRecordsForOneColumn(Path colName
, DynamicOneColumnData res) throws IOException;

//modified by hadoop
public void initForRecord() {
heap = new PriorityQueue<>(retMap.size());
size = retMap.size();
heap = new PriorityQueue<>(size);

if (size > 0) {
deltaObjectIds = new String[size];
measurementIds = new String[size];
} else {
LOG.error("QueryDataSet init row record occurs error! the size of ret is 0.");
}

int i = 0;
for (Path p : retMap.keySet()) {
deltaObjectIds[i] = p.getDeltaObjectToString();
measurementIds[i] = p.getMeasurementToString();

DynamicOneColumnData res = retMap.get(p);
if (res != null && res.curIdx < res.valueLength) {
heapPut(res.getTime(res.curIdx));
}
i++;
}
}

Expand All @@ -58,6 +69,7 @@ public boolean hasNextRecord() {
return false;
}

//modified by hadoop
public RowRecord getNextRecord() {
if (!ifInit) {
initForRecord();
Expand Down Expand Up @@ -94,7 +106,7 @@ public RowRecord getNextRecord() {
heapPut(res.getTime(res.curIdx));
}
} else {
f = new Field(res.dataType, p.getMeasurementToString());
f = new Field(res.dataType, p.getDeltaObjectToString(), p.getMeasurementToString());
f.setNull(true);
}
r.addField(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import java.util.PriorityQueue;

public class QueryDataSet {
private static final Logger LOG = LoggerFactory.getLogger(QueryDataSet.class);
private static final char PATH_SPLITTER = '.';
protected static final Logger LOG = LoggerFactory.getLogger(QueryDataSet.class);
protected static final char PATH_SPLITTER = '.';

/**
* Time Generator for Cross Query when using batching read
Expand Down Expand Up @@ -186,6 +186,10 @@ public boolean next() {
}

public RowRecord getCurrentRecord() {
if (!ifInit) {
initForRecord();
ifInit = true;
}
return currentRecord;
}

Expand Down
Loading

0 comments on commit 6b3b04b

Please sign in to comment.