Skip to content

Commit

Permalink
multi datadir support
Browse files Browse the repository at this point in the history
  • Loading branch information
ptnapoleon authored and krummas committed Dec 29, 2015
1 parent f997914 commit f780fd7
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
8 changes: 7 additions & 1 deletion ccmlib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct
# Classes that are to follow the respective logging level
self._debug = []
self._trace = []
self.data_dir_count = 1

if self.name.lower() == "current":
raise RuntimeError("Cannot name a cluster 'current'.")
Expand Down Expand Up @@ -75,6 +76,10 @@ def set_partitioner(self, partitioner):
self._update_config()
return self

def set_datadir_count(self, n):
self.data_dir_count = int(n)
return self

def set_install_dir(self, install_dir=None, version=None, verbose=False):
if version is None:
self.__install_dir = install_dir
Expand Down Expand Up @@ -480,7 +485,8 @@ def _update_config(self):
'config_options': self._config_options,
'dse_config_options': self._dse_config_options,
'log_level': self.__log_level,
'use_vnodes': self.use_vnodes
'use_vnodes': self.use_vnodes,
'datadirs': self.data_dir_count
}, f)

def __update_pids(self, started):
Expand Down
2 changes: 2 additions & 0 deletions ccmlib/cluster_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def load(path, name):
cluster.__log_level = data['log_level']
if 'use_vnodes' in data:
cluster.use_vnodes = data['use_vnodes']
if 'datadirs' in data:
cluster.data_dir_count = int(data['datadirs'])
except KeyError as k:
raise common.LoadError("Error Loading " + filename + ", missing property:" + k)

Expand Down
9 changes: 8 additions & 1 deletion ccmlib/cmds/cluster_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ def get_parser(self):
help="Enable client authentication (only vaid with --ssl)", default=False)
parser.add_option('--node-ssl', type="string", dest="node_ssl_path",
help="Path to keystore.jks and truststore.jks for internode encryption", default=None)
parser.add_option('--root', action="store_true", dest="allow_root", help="Allow CCM to start cassandra as root", default=False)
parser.add_option('--byteman', action="store_true", dest="install_byteman", help="Start nodes with byteman agent running", default=False)
parser.add_option('--root', action="store_true", dest="allow_root",
help="Allow CCM to start cassandra as root", default=False)
parser.add_option('--datadirs', type="int", dest="datadirs",
help="Number of data directories to use", default=1)
return parser

def validate(self, parser, options, args):
Expand Down Expand Up @@ -174,6 +177,9 @@ def run(self):
if self.options.node_ssl_path:
cluster.enable_internode_ssl(self.options.node_ssl_path)

if self.options.datadirs:
cluster.set_datadir_count(self.options.datadirs)

if self.nodes is not None:
try:
if self.options.debug_log:
Expand Down Expand Up @@ -292,6 +298,7 @@ def get_parser(self):
help="Ipprefix to use to create the ip of a node")
parser.add_option('-I', '--ip-format', type="string", dest="ipformat",
help="Format to use when creating the ip of a node (supports enumerating ipv6-type addresses like fe80::%d%lo0)")

return parser

def validate(self, parser, options, args):
Expand Down
72 changes: 41 additions & 31 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ def update_logback(self, new_logback_config):
common.copy_file(new_logback_config, cassandra_conf_dir)

def clear(self, clear_all=False, only_data=False):
data_dirs = ['data']
data_dirs = ['data{0}'.format(x) for x in xrange(0, self.cluster.data_dir_count)]
if not only_data:
data_dirs.append("commitlogs")
if clear_all:
Expand Down Expand Up @@ -1031,34 +1031,40 @@ def _find_cmd(self, cmd):
return fcmd

def list_keyspaces(self):
keyspaces = os.listdir(os.path.join(self.get_path(), 'data'))
keyspaces = os.listdir(os.path.join(self.get_path(), 'data0'))
keyspaces.remove('system')
return keyspaces

def get_sstables(self, keyspace, column_family):
keyspace_dir = os.path.join(self.get_path(), 'data', keyspace)
def get_sstables_per_data_directory(self, keyspace, column_family):
keyspace_dirs = [os.path.join(self.get_path(), "data{0}".format(x), keyspace) for x in xrange(0, self.cluster.data_dir_count)]
cf_glob = '*'
if column_family:
# account for changes in data dir layout from CASSANDRA-5202
if self.get_base_cassandra_version() < 2.1:
cf_glob = column_family
else:
cf_glob = column_family + '-*'
if not os.path.exists(keyspace_dir):
raise common.ArgumentError("Unknown keyspace {0}".format(keyspace))
for keyspace_dir in keyspace_dirs:
if not os.path.exists(keyspace_dir):
raise common.ArgumentError("Unknown keyspace {0}".format(keyspace))

# data directory layout is changed from 1.1
if self.get_base_cassandra_version() < 1.1:
files = glob.glob(os.path.join(keyspace_dir, "{0}*-Data.db".format(column_family)))
files = [glob.glob(os.path.join(keyspace_dir, "{0}*-Data.db".format(column_family))) for keyspace_dir in keyspace_dirs]
elif self.get_base_cassandra_version() < 2.2:
files = glob.glob(os.path.join(keyspace_dir, cf_glob, "%s-%s*-Data.db" % (keyspace, column_family)))
files = [glob.glob(os.path.join(keyspace_dir, cf_glob, "%s-%s*-Data.db" % (keyspace, column_family))) for keyspace_dir in keyspace_dirs]
else:
files = glob.glob(os.path.join(keyspace_dir, cf_glob, "*big-Data.db"))
for f in files:
if os.path.exists(f.replace('Data.db', 'Compacted')):
files.remove(f)
files = [glob.glob(os.path.join(keyspace_dir, cf_glob, "*big-Data.db")) for keyspace_dir in keyspace_dirs]

for d in files:
for f in d:
if os.path.exists(f.replace('Data.db', 'Compacted')):
files.remove(f)
return files

def get_sstables(self, keyspace, column_family):
return [f for sublist in self.get_sstables_per_data_directory(keyspace, column_family) for f in sublist]

def stress(self, stress_options=None, capture_output=False, **kwargs):
if stress_options is None:
stress_options = []
Expand Down Expand Up @@ -1288,11 +1294,12 @@ def __update_yaml(self):
if self.network_interfaces['binary'] is not None and self.get_base_cassandra_version() >= 1.2:
_, data['native_transport_port'] = self.network_interfaces['binary']

data['data_file_directories'] = [os.path.join(self.get_path(), 'data')]
data['data_file_directories'] = [os.path.join(self.get_path(), 'data{0}'.format(x)) for x in xrange(0, self.cluster.data_dir_count)]
data['commitlog_directory'] = os.path.join(self.get_path(), 'commitlogs')
data['saved_caches_directory'] = os.path.join(self.get_path(), 'saved_caches')

if self.get_cassandra_version() > '3.0' and 'hints_directory' in yaml_text:
data['hints_directory'] = os.path.join(self.get_path(), 'data', 'hints')
data['hints_directory'] = os.path.join(self.get_path(), 'hints')

if self.cluster.partitioner:
data['partitioner'] = self.cluster.partitioner
Expand Down Expand Up @@ -1491,8 +1498,10 @@ def _find_pid_on_windows(self):

def _get_directories(self):
dirs = []
for i in ['data', 'commitlogs', 'saved_caches', 'logs', 'conf', 'bin', os.path.join('data', 'hints')]:
for i in ['commitlogs', 'saved_caches', 'logs', 'conf', 'bin', 'hints']:
dirs.append(os.path.join(self.get_path(), i))
for x in xrange(0, self.cluster.data_dir_count):
dirs.append(os.path.join(self.get_path(), 'data{0}'.format(x)))
return dirs

def __get_status_string(self):
Expand Down Expand Up @@ -1600,26 +1609,27 @@ def __gather_sstables(self, datafiles=None, keyspace=None, columnfamilies=None):
if not columnfamilies or len(columnfamilies) > 1:
raise common.ArgumentError("Exactly one column family must be specified with datafiles")

cf_dir = os.path.join(os.path.realpath(self.get_path()), 'data', keyspace, columnfamilies[0])
for x in xrange(0, self.cluster.data_dir_count):
cf_dir = os.path.join(os.path.realpath(self.get_path()), 'data{0}'.format(x), keyspace, columnfamilies[0])

sstables = set()
for datafile in datafiles:
if not os.path.isabs(datafile):
datafile = os.path.join(os.getcwd(), datafile)
sstables = set()
for datafile in datafiles:
if not os.path.isabs(datafile):
datafile = os.path.join(os.getcwd(), datafile)

if not datafile.startswith(cf_dir + '-') and not datafile.startswith(cf_dir + os.sep):
raise NodeError("File doesn't appear to belong to the specified keyspace and column family: " + datafile)
if not datafile.startswith(cf_dir + '-') and not datafile.startswith(cf_dir + os.sep):
raise NodeError("File doesn't appear to belong to the specified keyspace and column familily: " + datafile)

sstable = _sstable_regexp.match(os.path.basename(datafile))
if not sstable:
raise NodeError("File doesn't seem to be a valid sstable filename: " + datafile)
sstable = _sstable_regexp.match(os.path.basename(datafile))
if not sstable:
raise NodeError("File doesn't seem to be a valid sstable filename: " + datafile)

sstable = sstable.groupdict()
if not sstable['tmp'] and sstable['number'] not in sstables:
if not os.path.exists(datafile):
raise IOError("File doesn't exist: " + datafile)
sstables.add(sstable['number'])
files.append(datafile)
sstable = sstable.groupdict()
if not sstable['tmp'] and sstable['number'] not in sstables:
if not os.path.exists(datafile):
raise IOError("File doesn't exist: " + datafile)
sstables.add(sstable['number'])
files.append(datafile)

return files

Expand Down

0 comments on commit f780fd7

Please sign in to comment.