Skip to content

Commit

Permalink
add comment for QueryDataSet, OnePassQueryDataSet, Field, OldRowRecor…
Browse files Browse the repository at this point in the history
…d, RowRecord
  • Loading branch information
mdf369 committed Nov 12, 2018
1 parent 745d18a commit 921b4db
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

public class OnePassQueryDataSet implements QueryDataSet{
protected static final Logger LOG = LoggerFactory.getLogger(OnePassQueryDataSet.class);
/** default path splitter **/
protected static final char PATH_SPLITTER = '.';

/**
Expand Down Expand Up @@ -55,21 +56,34 @@ public class OnePassQueryDataSet implements QueryDataSet{
**/
protected int[] emptyTimeIdxs;

/**
* deltaObjectIds[i] stores the deltaObjectId of cols[i]
**/
protected String[] deltaObjectIds;
/**
* measurementIds[i] stores the measurementId of cols[i]
**/
protected String[] measurementIds;
protected HashMap<Long, Integer> timeMap; // timestamp occurs time
protected int size;
protected boolean ifInit = false;
protected int size; // size of query result
protected boolean ifInit = false; // flag of whether this dataset has been inited or not
protected OldRowRecord currentRecord = null;
private Map<String, Object> deltaMap; // this variable is used for IoTDb

/**
* init {@code mapRet}
*/
public OnePassQueryDataSet() {
mapRet = new LinkedHashMap<>();
}

/**
*
*/
public void initForRecord() {
size = mapRet.keySet().size();

// init all Array
if (size > 0) {
heap = new PriorityQueue<>(size);
cols = new DynamicOneColumnData[size];
Expand All @@ -83,6 +97,7 @@ public void initForRecord() {
heap = new PriorityQueue<>();
}

// loop this.mapRet and init value of all Array
int i = 0;
for (String key : mapRet.keySet()) {
cols[i] = mapRet.get(key);
Expand All @@ -91,108 +106,163 @@ public void initForRecord() {
timeIdxs[i] = 0;
emptyTimeIdxs[i] = 0;

// check if current data is valid
if (cols[i] != null && (cols[i].valueLength > 0 || cols[i].timeLength > 0 || cols[i].emptyTimeLength > 0)) {
// get min value of time and empty time at index 0
long minTime = Long.MAX_VALUE;
if (cols[i].timeLength > 0) {
minTime = cols[i].getTime(0);
}
if (cols[i].emptyTimeLength > 0) {
minTime = Math.min(minTime, cols[i].getEmptyTime(0));
}
// update this.heap and this.timeMap with min time
heapPut(minTime);
}
i++;
}
}

/**
* update this.heap and this.timeMap by input time
* @param t input time
*/
protected void heapPut(long t) {
if (!timeMap.containsKey(t)) {
heap.add(t);
timeMap.put(t, 1);
}
}

/**
* poll one time value as next candidate
* @return
*/
protected Long heapGet() {
Long t = heap.poll();
timeMap.remove(t);
return t;
}

/**
* judge if unread data still existed
* @return
*/
public boolean hasNextRecord() {
// make sure this dataset is inited
if (!ifInit) {
initForRecord();
ifInit = true;
}
// check if there is data in queue
if (heap.peek() != null) {
return true;
}
return false;
}

/**
* get next unread record
* @return
*/
public OldRowRecord getNextRecord() {
// make sure this dataset is inited
if (!ifInit) {
initForRecord();
ifInit = true;
}

// get next time
Long minTime = heapGet();
if (minTime == null) {
return null;
}

// construct a record to store all data in {@code cols}
OldRowRecord record = new OldRowRecord(minTime, null, null);
for (int i = 0; i < size; i++) {
// init deltaObjectId
if (i == 0) {
record.setDeltaObjectId(deltaObjectIds[i]);
}

// init field which stores data
Field field = new Field(cols[i].dataType, deltaObjectIds[i], measurementIds[i]);
if (timeIdxs[i] < cols[i].timeLength && minTime == cols[i].getTime(timeIdxs[i])) {
// put data into {@code cols[i]}
field.setNull(false);
putValueToField(cols[i], timeIdxs[i], field);
// accumulate counter
timeIdxs[i]++;

// get min value of time and empty time at index i
long nextTime = Long.MAX_VALUE;
if (timeIdxs[i] < cols[i].timeLength) {
nextTime = cols[i].getTime(timeIdxs[i]);
}
if (emptyTimeIdxs[i] < cols[i].emptyTimeLength) {
nextTime = Math.min(nextTime, cols[i].getEmptyTime(emptyTimeIdxs[i]));
}
// update this.heap and this.timeMap with min time
if (nextTime != Long.MAX_VALUE) {
heapPut(nextTime);
}
} else if (emptyTimeIdxs[i] < cols[i].emptyTimeLength && minTime == cols[i].getEmptyTime(emptyTimeIdxs[i])) {
// set field as null
field.setNull(true);
// accumulate counter
emptyTimeIdxs[i]++;

// get min value of time and empty time at index i
long nextTime = Long.MAX_VALUE;
if (emptyTimeIdxs[i] < cols[i].emptyTimeLength) {
nextTime = cols[i].getEmptyTime(emptyTimeIdxs[i]);
}
if (timeIdxs[i] < cols[i].timeLength) {
nextTime = Math.min(nextTime, cols[i].getTime(timeIdxs[i]));
}
// update this.heap and this.timeMap with min time
if (nextTime != Long.MAX_VALUE) {
heapPut(nextTime);
}
} else {
// just set field as null
field.setNull(true);
}

// add this field to record
record.addField(field);
}
return record;
}


/**
* check if unread record still exists
* @return
* @throws IOException
*/
@Override
public boolean hasNext() throws IOException {
return hasNextRecord();
}

/**
* get next record in new format
* @return
* @throws IOException
*/
@Override
public RowRecord next() throws IOException {
OldRowRecord oldRowRecord = getNextRecord();
return OnePassQueryDataSet.convertToNew(oldRowRecord);
}

/**
* convert record from old format to new format
* @param oldRowRecord record in old format
* @return record in new format
*/
public static RowRecord convertToNew(OldRowRecord oldRowRecord) {
RowRecord rowRecord = new RowRecord(oldRowRecord.timestamp);
for(Field field: oldRowRecord.fields) {
Expand Down Expand Up @@ -230,6 +300,10 @@ public static RowRecord convertToNew(OldRowRecord oldRowRecord) {
return rowRecord;
}

/**
* get this.currentRecord
* @return
*/
public OldRowRecord getCurrentRecord() {
if (!ifInit) {
initForRecord();
Expand All @@ -238,6 +312,12 @@ public OldRowRecord getCurrentRecord() {
return currentRecord;
}

/**
* put value at index {@code idx} in {@code col} into value field {@code f}
* @param col datas
* @param idx index of data
* @param f target to put data
*/
public void putValueToField(DynamicOneColumnData col, int idx, Field f) {
switch (col.dataType) {
case BOOLEAN:
Expand Down Expand Up @@ -266,6 +346,9 @@ public void putValueToField(DynamicOneColumnData col, int idx, Field f) {
}
}

/**
* clear all data in memory
*/
public void clear() {
this.ifInit = false;
for (DynamicOneColumnData col : mapRet.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,44 @@
* @author Jinrui Zhang
*/
public class Field {

/** data type of this field **/
public TSDataType dataType;
/** delta object ID of this field **/
public String deltaObjectId;
/** measurement ID of this field **/
public String measurementId;
/** value in boolean **/
private boolean boolV;
/** value in int **/
private int intV;
/** value in long **/
private long longV;
/** value in float **/
private float floatV;
/** value in double **/
private double doubleV;
/** value in Binary **/
private Binary binaryV;
/** if is true, then this field contains no value. vice versa **/
private boolean isNull;

/**
* init field and set {@code deltaObjectId} to default value
* @param dataType
* @param measurementId
*/
public Field(TSDataType dataType, String measurementId) {
this.dataType = dataType;
this.measurementId = measurementId;
this.deltaObjectId = "default";
}

/**
* init field
* @param dataType
* @param deltaObjectId
* @param measurementId
*/
public Field(TSDataType dataType, String deltaObjectId, String measurementId) {
this.dataType = dataType;
this.deltaObjectId = deltaObjectId;
Expand Down Expand Up @@ -83,6 +103,10 @@ public void setBinaryV(Binary binaryV) {
this.binaryV = binaryV;
}

/**
* get value in String format
* @return
*/
public String getStringValue() {
if (isNull) {
return "null";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@
* @author Jinrui Zhang
*/
public class OldRowRecord {
/** time stamp of this record **/
public long timestamp;
/** delta object ID of this record **/
public String deltaObjectId;
/** all value fields of this record **/
public List<Field> fields;

/**
* set timestamp and deltaObjectId
* @param timestamp
* @param deltaObjectId
* @param deltaObjectType
*/
public OldRowRecord(long timestamp, String deltaObjectId, String deltaObjectType) {
this.timestamp = timestamp;
this.deltaObjectId = deltaObjectId;
Expand All @@ -44,6 +53,11 @@ public void setDeltaObjectId(String did) {
this.deltaObjectId = did;
}

/**
* add one value field
* @param f
* @return
*/
public int addField(Field f) {
this.fields.add(f);
return fields.size();
Expand All @@ -59,6 +73,10 @@ public String toString() {
return sb.toString();
}

/**
* convert this format of record to TSRecord
* @return
*/
public TSRecord toTSRecord() {
TSRecord r = new TSRecord(timestamp, deltaObjectId);
for (Field f : fields) {
Expand All @@ -70,6 +88,13 @@ public TSRecord toTSRecord() {
return r;
}

/**
* convert {@code Field} to {@code DataPoint}
* @param dataType
* @param measurementId
* @param f
* @return
*/
private DataPoint createDataPoint(TSDataType dataType, String measurementId, Field f) {
switch (dataType) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,32 @@
* Created by zhangjinrui on 2017/12/26.
*/
public class RowRecord {
/** time stamp of this record **/
private long timestamp;
/** all value fields of this record **/
private LinkedHashMap<Path, TsPrimitiveType> fields;

/**
* init this.fields
*/
public RowRecord() {
fields = new LinkedHashMap<>();
}

/**
* init this.fields and set time stamp
* @param timestamp
*/
public RowRecord(long timestamp) {
this();
this.timestamp = timestamp;
}

/**
* add one <path, field>
* @param path
* @param tsPrimitiveType
*/
public void putField(Path path, TsPrimitiveType tsPrimitiveType) {
fields.put(path, tsPrimitiveType);
}
Expand Down
Loading

0 comments on commit 921b4db

Please sign in to comment.