Skip to content

Commit

Permalink
JDBC使用阻塞IO时使用SingleThreadAsyncCallback
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Aug 26, 2024
1 parent ad17ffd commit a8b6d89
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -128,7 +129,7 @@ public void executeCheckpointOnClose() {
forceCheckpointTasks.add(() -> executeCheckpoint(true));
wakeUp();
try {
latchOnClose.await();
latchOnClose.await(3000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,67 @@

import java.sql.SQLException;

import com.lealone.db.async.ConcurrentAsyncCallback;
import com.lealone.db.async.AsyncCallback;
import com.lealone.db.async.AsyncHandler;
import com.lealone.db.async.AsyncResult;
import com.lealone.db.async.Future;
import com.lealone.db.session.Session;

public class JdbcAsyncCallback<T> extends ConcurrentAsyncCallback<T> {
public class JdbcAsyncCallback<T> implements Future<T> {

private AsyncCallback<T> ac;

private JdbcAsyncCallback(boolean isSingleThread) {
ac = AsyncCallback.create(isSingleThread);
}

public T get(JdbcWrapper jw) throws SQLException {
try {
return get();
return ac.get();
} catch (Exception e) {
throw jw.logAndConvert(e); // 抛出SQLException
}
}

@Override
public T get() {
return ac.get();
}

@Override
public T get(long timeoutMillis) {
return ac.get(timeoutMillis);
}

@Override
public Future<T> onSuccess(AsyncHandler<T> handler) {
return ac.onSuccess(handler);
}

@Override
public Future<T> onFailure(AsyncHandler<Throwable> handler) {
return ac.onFailure(handler);
}

@Override
public Future<T> onComplete(AsyncHandler<AsyncResult<T>> handler) {
return ac.onComplete(handler);
}

public void setAsyncResult(Throwable cause) {
ac.setAsyncResult(cause);
}

public void setAsyncResult(T result) {
ac.setAsyncResult(result);
}

public void setAsyncResult(AsyncResult<T> asyncResult) {
ac.setAsyncResult(asyncResult);
}

public static <T> JdbcAsyncCallback<T> create(Session session) {
// JDBC使用阻塞IO时使用SingleThreadAsyncCallback
return new JdbcAsyncCallback<>(session.isBio());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private JdbcAsyncCallback<JdbcPreparedStatement> prepareStatementInternal(boolea
private JdbcAsyncCallback<JdbcPreparedStatement> prepareStatementInternal(boolean async, String sql,
int id, int resultSetType, int resultSetConcurrency, boolean closedByResultSet,
int fetchSize) {
JdbcAsyncCallback<JdbcPreparedStatement> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<JdbcPreparedStatement> ac = createJdbcAsyncCallback();
try {
checkClosed();
checkTypeConcurrency(resultSetType, resultSetConcurrency);
Expand All @@ -219,6 +219,10 @@ private JdbcAsyncCallback<JdbcPreparedStatement> prepareStatementInternal(boolea
return ac;
}

protected <T> JdbcAsyncCallback<T> createJdbcAsyncCallback() {
return JdbcAsyncCallback.create(session);
}

/**
* Creates a new prepared statement.
*
Expand Down Expand Up @@ -498,7 +502,7 @@ public Future<Boolean> commitAsync() throws SQLException {
}

private JdbcAsyncCallback<Boolean> commitInternal() throws SQLException {
JdbcAsyncCallback<Boolean> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Boolean> ac = createJdbcAsyncCallback();
prepareStatement0("COMMIT", commit).onComplete(ar0 -> {
checkClosed();
if (ar0.isFailed()) {
Expand Down Expand Up @@ -537,7 +541,7 @@ public Future<Boolean> rollbackAsync() throws SQLException {
}

private JdbcAsyncCallback<Boolean> rollbackInternal() throws SQLException {
JdbcAsyncCallback<Boolean> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Boolean> ac = createJdbcAsyncCallback();
prepareStatement0("ROLLBACK", rollback).onComplete(ar0 -> {
checkClosed();
if (ar0.isFailed()) {
Expand Down Expand Up @@ -882,7 +886,7 @@ private JdbcAsyncCallback<JdbcCallableStatement> prepareCallInternal(boolean asy
private JdbcAsyncCallback<JdbcCallableStatement> prepareCallInternal(boolean async, String sql,
int id, int resultSetType, int resultSetConcurrency, boolean closedByResultSet,
int fetchSize) {
JdbcAsyncCallback<JdbcCallableStatement> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<JdbcCallableStatement> ac = createJdbcAsyncCallback();
try {
checkClosed();
checkTypeConcurrency(resultSetType, resultSetConcurrency);
Expand Down Expand Up @@ -1066,7 +1070,7 @@ private void executeUpdateSync(String sql) throws SQLException {
private JdbcAsyncCallback<JdbcPreparedStatement> prepareStatement0(String sql,
JdbcPreparedStatement old) {
if (old != null) {
JdbcAsyncCallback<JdbcPreparedStatement> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<JdbcPreparedStatement> ac = createJdbcAsyncCallback();
ac.setAsyncResult(old);
return ac;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private JdbcAsyncCallback<ResultSet> executeQueryInternal(boolean async) {
debugCodeAssign(TraceObjectType.RESULT_SET, id,
async ? "executeQueryAsync()" : "executeQuery()");
}
JdbcAsyncCallback<ResultSet> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<ResultSet> ac = createJdbcAsyncCallback();
try {
checkAndClose();
setExecutingStatement(command);
Expand Down Expand Up @@ -185,7 +185,7 @@ public Future<Integer> executeUpdateAsync() {
}

private JdbcAsyncCallback<Integer> executeUpdateInternal(boolean async) {
JdbcAsyncCallback<Integer> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Integer> ac = createJdbcAsyncCallback();
try {
checkAndClose();
setExecutingStatement(command);
Expand All @@ -204,7 +204,7 @@ private JdbcAsyncCallback<Integer> executeUpdateInternal(boolean async) {
}

private JdbcAsyncCallback<Integer> executeUpdateInternal(Value[] set) {
JdbcAsyncCallback<Integer> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Integer> ac = createJdbcAsyncCallback();
try {
checkAndClose();
setExecutingStatement(command);
Expand Down Expand Up @@ -309,7 +309,7 @@ public boolean execute() throws SQLException {
if (isDebugEnabled()) {
debugCodeCall("execute");
}
JdbcAsyncCallback<Boolean> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Boolean> ac = createJdbcAsyncCallback();
try {
checkAndClose();
executeInternal(ac, command, true);
Expand Down Expand Up @@ -397,7 +397,7 @@ public int[] executeBatch() throws SQLException {
debugCodeCall("executeBatch");
ArrayList<Value[]> batchParameters = this.batchParameters;
this.batchParameters = null;
JdbcAsyncCallback<int[]> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<int[]> ac = createJdbcAsyncCallback();
try {
checkAndClose();
if (batchParameters == null || batchParameters.isEmpty()) {
Expand Down Expand Up @@ -1323,7 +1323,7 @@ public void setURL(int parameterIndex, URL x) throws SQLException {
@Override
public ResultSetMetaData getMetaData() throws SQLException {
debugCodeCall("getMetaData");
JdbcAsyncCallback<ResultSetMetaData> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<ResultSetMetaData> ac = createJdbcAsyncCallback();
try {
checkClosed();
command.getMetaData().onComplete(ar -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private JdbcAsyncCallback<ResultSet> executeQueryInternal(String sql, boolean as
debugCodeAssign(TraceObjectType.RESULT_SET, id,
"executeQuery" + (async ? "Async" : "") + "(" + quote(sql) + ")");
}
JdbcAsyncCallback<ResultSet> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<ResultSet> ac = createJdbcAsyncCallback();
try {
SQLCommand command = createSQLCommand(sql, false);
command.executeQuery(maxRows, isScrollable()).onComplete(ar -> {
Expand Down Expand Up @@ -227,7 +227,7 @@ private int executeUpdateSync(String sql) throws SQLException {
}

private JdbcAsyncCallback<Integer> executeUpdateInternal(String sql) {
JdbcAsyncCallback<Integer> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Integer> ac = createJdbcAsyncCallback();
try {
SQLCommand command = createSQLCommand(sql, false);
command.executeUpdate().onComplete(ar -> {
Expand All @@ -245,6 +245,10 @@ private JdbcAsyncCallback<Integer> executeUpdateInternal(String sql) {
return ac;
}

protected <T> JdbcAsyncCallback<T> createJdbcAsyncCallback() {
return JdbcAsyncCallback.create(session);
}

/**
* Executes an arbitrary statement. If another result set exists for this
* statement, this will be closed (even if this statement fails).
Expand Down Expand Up @@ -350,7 +354,7 @@ private boolean executeInternal(String sql) throws SQLException {
// }
// }
// }
JdbcAsyncCallback<Boolean> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Boolean> ac = createJdbcAsyncCallback();
try {
SQLCommand command = createSQLCommand(sql, true);
command.prepare(false).onComplete(ar -> {
Expand Down Expand Up @@ -483,7 +487,7 @@ public int[] executeBatch() throws SQLException {
debugCodeCall("executeBatch");
ArrayList<String> batchCommands = this.batchCommands;
this.batchCommands = null;
JdbcAsyncCallback<int[]> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<int[]> ac = createJdbcAsyncCallback();
try {
checkAndClose();
if (batchCommands == null || batchCommands.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import com.lealone.common.exceptions.DbException;
import com.lealone.common.trace.TraceObject;
import com.lealone.db.async.AsyncCallback;

public class JdbcWrapper extends TraceObject implements Wrapper {

Expand All @@ -37,7 +36,7 @@ public SQLException logAndConvert(Exception ex) { // 只是把protected变成pub
return super.logAndConvert(ex);
}

public static void setAsyncResult(AsyncCallback<?> ac, Throwable cause) {
public static void setAsyncResult(JdbcAsyncCallback<?> ac, Throwable cause) {
// 转换成SQLException
ac.setAsyncResult(DbException.toSQLException(cause));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected void sendClose() {
protected void sendFetch(int fetchSize) throws IOException {
// 释放buffer
in.closeInputStream();
JdbcAsyncCallback<Boolean> ac = new JdbcAsyncCallback<>();
JdbcAsyncCallback<Boolean> ac = JdbcAsyncCallback.create(session);
session.<ResultFetchRowsAck> send(new ResultFetchRows(resultId, fetchSize)).onComplete(ar -> {
if (ar.isSucceeded()) {
in = (TransferInputStream) ar.getResult().in;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ClientSession extends SessionBase implements LobLocalStorage.LobRea
private final int id;
private final LocalDataHandler dataHandler;
private final Trace trace;
private final boolean isBio;

ClientSession(TcpClientConnection tcpConnection, ConnectionInfo ci, String server, Session parent,
int id) {
Expand All @@ -77,6 +78,7 @@ public class ClientSession extends SessionBase implements LobLocalStorage.LobRea

initTraceSystem(ci);
trace = traceSystem == null ? Trace.NO_TRACE : traceSystem.getTrace(TraceModuleType.JDBC);
isBio = tcpConnection.getWritableChannel().isBio();
}

@Override
Expand Down Expand Up @@ -308,7 +310,7 @@ public void runInternal(NetInputStream in) throws Exception {
out.writeRequestHeader(packetId, packet.getType());
packet.encode(out, getProtocolVersion());
out.flush();
if (ac != null && tcpConnection.getWritableChannel().isBio())
if (ac != null && isBio)
tcpConnection.getWritableChannel().read(tcpConnection);
} catch (Throwable e) {
if (ac != null) {
Expand Down Expand Up @@ -355,4 +357,9 @@ public boolean isSingleThreadCallback() {
public <T> AsyncCallback<T> createCallback() {
return AsyncCallback.create(singleThreadCallback);
}

@Override
public boolean isBio() {
return isBio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.lealone.server.protocol.AckPacket;
import com.lealone.server.protocol.AckPacketHandler;
import com.lealone.server.protocol.Packet;
import com.lealone.sql.SQLCommand;
import com.lealone.sql.PreparedSQLStatement.YieldableCommand;
import com.lealone.sql.SQLCommand;

public class DelegatedSession implements Session {

Expand Down Expand Up @@ -247,4 +247,9 @@ public YieldableCommand getYieldableCommand(boolean checkTimeout, TimeoutListene
public void init() {
session.init();
}

@Override
public boolean isBio() {
return session.isBio();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.lealone.server.protocol.AckPacket;
import com.lealone.server.protocol.AckPacketHandler;
import com.lealone.server.protocol.Packet;
import com.lealone.sql.SQLCommand;
import com.lealone.sql.PreparedSQLStatement.YieldableCommand;
import com.lealone.sql.SQLCommand;
import com.lealone.storage.page.IPage;
import com.lealone.transaction.Transaction;

Expand Down Expand Up @@ -261,4 +261,8 @@ public static interface TimeoutListener {
}

void init();

default boolean isBio() {
return false;
}
}
33 changes: 26 additions & 7 deletions lealone-test/src/test/java/com/lealone/test/misc/CRUDExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,36 @@
public class CRUDExample {

public static void main(String[] args) throws Exception {
TestBase test = new TestBase();
test.setEmbedded(true);
test.setInMemory(true);
Connection conn = test.getConnection(LealoneDatabase.NAME);
Connection conn = null;

conn = getBioConnection();
crud(conn);

conn = getNioConnection();
crud(conn);

conn = getEmbeddedConnection();
crud(conn);
test = new TestBase();
}

public static Connection getBioConnection() throws Exception {
TestBase test = new TestBase();
test.setNetFactoryName(BioNetFactory.NAME);
return test.getConnection(LealoneDatabase.NAME);
}

public static Connection getNioConnection() throws Exception {
TestBase test = new TestBase();
test.setNetFactoryName(NioNetFactory.NAME);
test.addConnectionParameter(ConnectionSetting.MAX_PACKET_SIZE.name(), 16 * 1024 * 1024);
conn = test.getConnection(LealoneDatabase.NAME);
crud(conn);
return test.getConnection(LealoneDatabase.NAME);
}

public static Connection getEmbeddedConnection() throws Exception {
TestBase test = new TestBase();
test.setEmbedded(true);
test.setInMemory(true);
return test.getConnection(LealoneDatabase.NAME);
}

public static void crud(Connection conn) throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion lealone-test/src/test/resources/lealone-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ listen_address: 127.0.0.1

scheduler:
parameters: {
scheduler_count: 4,
scheduler_count: 8,
max_packet_count_per_loop: 10, # 每次循环最多读取多少个数据包,默认20
}

Expand Down

0 comments on commit a8b6d89

Please sign in to comment.