Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix db_stress for custom env #5122

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ if test "$USE_HDFS"; then
echo "JAVA_HOME has to be set for HDFS usage."
exit 1
fi
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64"
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native"
HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"
Expand Down
40 changes: 21 additions & 19 deletions env/env_hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
#ifndef ROCKSDB_HDFS_FILE_C
#define ROCKSDB_HDFS_FILE_C

#include <algorithm>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
#include <algorithm>
#include <iostream>
#include <sstream>
#include "rocksdb/status.h"
#include "util/logging.h"
#include "util/string_util.h"

#define HDFS_EXISTS 0
Expand Down Expand Up @@ -222,7 +223,7 @@ class HdfsWritableFile: public WritableFile {
filename_.c_str());
const char* src = data.data();
size_t left = data.size();
size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
filename_.c_str());
if (ret != left) {
Expand Down Expand Up @@ -252,7 +253,8 @@ class HdfsWritableFile: public WritableFile {

// This is used by HdfsLogger to write data to the debug log file
virtual Status Append(const char* src, size_t size) {
if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
static_cast<tSize>(size)) {
return IOError(filename_, errno);
}
return Status::OK();
Expand Down Expand Up @@ -280,11 +282,10 @@ class HdfsLogger : public Logger {
Status HdfsCloseHelper() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str());
Status s = file_->Close();
if (mylog != nullptr && mylog == this) {
mylog = nullptr;
}
return s;
return Status::OK();
}

protected:
Expand All @@ -297,14 +298,15 @@ class HdfsLogger : public Logger {
file_->getName().c_str());
}

virtual ~HdfsLogger() {
~HdfsLogger() override {
if (!closed_) {
closed_ = true;
HdfsCloseHelper();
}
}

virtual void Logv(const char* format, va_list ap) {
using Logger::Logv;
void Logv(const char* format, va_list ap) override {
const uint64_t thread_id = (*gettid_)();

// We try twice: the first time with a fixed-size stack allocated buffer,
Expand Down Expand Up @@ -382,7 +384,7 @@ const std::string HdfsEnv::pathsep = "/";
// open a file for sequential reading
Status HdfsEnv::NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
Expand All @@ -397,7 +399,7 @@ Status HdfsEnv::NewSequentialFile(const std::string& fname,
// open a file for random reading
Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
if (f == nullptr || !f->isValid()) {
Expand All @@ -412,7 +414,7 @@ Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
// create a new file for writing
Status HdfsEnv::NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
const EnvOptions& /*options*/) {
result->reset();
Status s;
HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
Expand All @@ -430,7 +432,9 @@ class HdfsDirectory : public Directory {
explicit HdfsDirectory(int fd) : fd_(fd) {}
~HdfsDirectory() {}

virtual Status Fsync() { return Status::OK(); }
Status Fsync() override { return Status::OK(); }

int GetFd() const { return fd_; }

private:
int fd_;
Expand Down Expand Up @@ -475,10 +479,10 @@ Status HdfsEnv::GetChildren(const std::string& path,
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
if (numEntries >= 0) {
for(int i = 0; i < numEntries; i++) {
char* pathname = pHdfsFileInfo[i].mName;
char* filename = std::rindex(pathname, '/');
if (filename != nullptr) {
result->push_back(filename+1);
std::string pathname(pHdfsFileInfo[i].mName);
size_t pos = pathname.rfind("/");
if (std::string::npos != pos) {
result->push_back(pathname.substr(pos + 1));
}
}
if (pHdfsFileInfo != nullptr) {
Expand Down Expand Up @@ -569,16 +573,14 @@ Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
return IOError(src, errno);
}

Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
// there isn's a very good way to atomically check and create
// a file via libhdfs
*lock = nullptr;
return Status::OK();
}

Status HdfsEnv::UnlockFile(FileLock* lock) {
return Status::OK();
}
Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }

Status HdfsEnv::NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) {
Expand Down
99 changes: 48 additions & 51 deletions hdfs/env_hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,110 +54,109 @@ class HdfsEnv : public Env {
hdfsDisconnect(fileSys_);
}

virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options);
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override;

virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options);
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override;

virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options);
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;

virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result);
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;

virtual Status FileExists(const std::string& fname);
Status FileExists(const std::string& fname) override;

virtual Status GetChildren(const std::string& path,
std::vector<std::string>* result);
Status GetChildren(const std::string& path,
std::vector<std::string>* result) override;

virtual Status DeleteFile(const std::string& fname);
Status DeleteFile(const std::string& fname) override;

virtual Status CreateDir(const std::string& name);
Status CreateDir(const std::string& name) override;

virtual Status CreateDirIfMissing(const std::string& name);
Status CreateDirIfMissing(const std::string& name) override;

virtual Status DeleteDir(const std::string& name);
Status DeleteDir(const std::string& name) override;

virtual Status GetFileSize(const std::string& fname, uint64_t* size);
Status GetFileSize(const std::string& fname, uint64_t* size) override;

virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime);
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override;

virtual Status RenameFile(const std::string& src, const std::string& target);
Status RenameFile(const std::string& src, const std::string& target) override;

virtual Status LinkFile(const std::string& src, const std::string& target) {
Status LinkFile(const std::string& /*src*/,
const std::string& /*target*/) override {
return Status::NotSupported(); // not supported
}

virtual Status LockFile(const std::string& fname, FileLock** lock);
Status LockFile(const std::string& fname, FileLock** lock) override;

virtual Status UnlockFile(FileLock* lock);
Status UnlockFile(FileLock* lock) override;

virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;

virtual void Schedule(void (*function)(void* arg), void* arg,
Priority pri = LOW, void* tag = nullptr, void (*unschedFunction)(void* arg) = 0) {
void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override {
posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
}

virtual int UnSchedule(void* tag, Priority pri) {
int UnSchedule(void* tag, Priority pri) override {
return posixEnv->UnSchedule(tag, pri);
}

virtual void StartThread(void (*function)(void* arg), void* arg) {
void StartThread(void (*function)(void* arg), void* arg) override {
posixEnv->StartThread(function, arg);
}

virtual void WaitForJoin() { posixEnv->WaitForJoin(); }
void WaitForJoin() override { posixEnv->WaitForJoin(); }

virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const
override {
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return posixEnv->GetThreadPoolQueueLen(pri);
}

virtual Status GetTestDirectory(std::string* path) {
Status GetTestDirectory(std::string* path) override {
return posixEnv->GetTestDirectory(path);
}

virtual uint64_t NowMicros() {
return posixEnv->NowMicros();
}
uint64_t NowMicros() override { return posixEnv->NowMicros(); }

virtual void SleepForMicroseconds(int micros) {
void SleepForMicroseconds(int micros) override {
posixEnv->SleepForMicroseconds(micros);
}

virtual Status GetHostName(char* name, uint64_t len) {
Status GetHostName(char* name, uint64_t len) override {
return posixEnv->GetHostName(name, len);
}

virtual Status GetCurrentTime(int64_t* unix_time) {
Status GetCurrentTime(int64_t* unix_time) override {
return posixEnv->GetCurrentTime(unix_time);
}

virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
return posixEnv->GetAbsolutePath(db_path, output_path);
}

virtual void SetBackgroundThreads(int number, Priority pri = LOW) {
void SetBackgroundThreads(int number, Priority pri = LOW) override {
posixEnv->SetBackgroundThreads(number, pri);
}

virtual int GetBackgroundThreads(Priority pri = LOW) {
int GetBackgroundThreads(Priority pri = LOW) override {
return posixEnv->GetBackgroundThreads(pri);
}

virtual void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
}

virtual std::string TimeToString(uint64_t number) {
std::string TimeToString(uint64_t number) override {
return posixEnv->TimeToString(number);
}

Expand All @@ -166,9 +165,7 @@ class HdfsEnv : public Env {
return (uint64_t)pthread_self();
}

virtual uint64_t GetThreadID() const override {
return HdfsEnv::gettid();
}
uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }

private:
std::string fsname_; // string of the form "hdfs://hostname:port/"
Expand Down Expand Up @@ -206,7 +203,7 @@ class HdfsEnv : public Env {
std::string host(parts[0]);
std::string remaining(parts[1]);

int rem = remaining.find(pathsep);
int rem = static_cast<int>(remaining.find(pathsep));
std::string portStr = (rem == 0 ? remaining :
remaining.substr(0, rem));

Expand Down
4 changes: 2 additions & 2 deletions hdfs/setup.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# shellcheck disable=SC2148
export USE_HDFS=1
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:/usr/lib/hadoop/lib/native
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$HADOOP_HOME/lib/native

export CLASSPATH=
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
for f in `find /usr/lib/hadoop-hdfs | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
for f in `find /usr/lib/hadoop/client | grep jar`; do export CLASSPATH=$CLASSPATH:$f; done
9 changes: 8 additions & 1 deletion tools/db_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,14 @@ class StressTest {
FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
}
}
DestroyDB(FLAGS_db, Options());
Options options;
options.env = FLAGS_env;
Status s = DestroyDB(FLAGS_db, options);
if (!s.ok()) {
fprintf(stderr, "Cannot destroy original db: %s\n",
s.ToString().c_str());
exit(1);
}
}
}

Expand Down