Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cold shard eviction #219

Merged
merged 128 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 76 commits
Commits
Show all changes
128 commits
Select commit Hold shift + click to select a range
5de5d30
Refactor shard states array/lock usage.
knighton Apr 6, 2023
fae0611
Rename: _PartitionState -> _IterState.
knighton Apr 6, 2023
a1c1f86
Move state_dict(), load_state_dict() up.
knighton Apr 7, 2023
8374c8a
Redo shard states, implement evict_shard(), redo download_shard().
knighton Apr 7, 2023
b2841bf
Initialize shard_states, make shared barrier take changeable num_procs.
knighton Apr 8, 2023
75a9723
Fix (lint).
knighton Apr 8, 2023
c663f21
Merge branch 'main' into james/evict
knighton Apr 20, 2023
9decddc
init_local_dir -> are_shards_present -> _shard_states.
knighton Apr 20, 2023
d8ea1dd
Fix typo in ticks.
knighton Apr 20, 2023
c788634
Update StreamingDataset arguments: drop keep_raw and add cache_limit.
knighton Apr 21, 2023
c326deb
worker_barrier -> shared_barrier
knighton Apr 21, 2023
a6bdd07
Split shared.py int three: prefix, memory, and barrier.
knighton Apr 22, 2023
c1ed176
Rename CreateSharedMemory (a class) -> SharedMemory.
knighton Apr 22, 2023
e9d2e8f
Add and use SharedArray.
knighton Apr 22, 2023
5f0d08b
Create and populate _shard_raw_sizes, _shard_zip_sizes shm arrays.
knighton Apr 22, 2023
5b32b0e
Merge branch 'main' into james/evict
knighton Apr 22, 2023
8cd0531
Stream.validate_weights
knighton Apr 23, 2023
9f250ef
Purportedly do the rest of cold shard eviction.
knighton Apr 23, 2023
d8d5617
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 23, 2023
54add55
Fix (docstring).
knighton Apr 23, 2023
c7b5f37
SharedScalar.
knighton Apr 23, 2023
a9e08fb
Add comments.
knighton Apr 24, 2023
4d91193
Pull weight derivations out of StreamingDataset to Stream.apply_weights
knighton Apr 24, 2023
afb78a9
Rename _sample_ids -> _work in line with StreamingDataset._get_work
knighton Apr 24, 2023
7dc25df
Fix (docstrings -- cache usage in bytes).
knighton Apr 24, 2023
c0d3871
Update streaming/base/format/base/reader.py
knighton Apr 24, 2023
8c5aaf8
Refactor shard init_local_dir, move eviction from stream to shard.
knighton Apr 24, 2023
ec545ce
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 24, 2023
c7e16a1
Rename variable: ls -> fileames_present.
knighton Apr 24, 2023
7f3cfc0
Docstring improvement.
knighton Apr 24, 2023
24542e0
Improve docstrings.
knighton Apr 24, 2023
53f2191
Merge branch 'main' into james/evict
knighton Apr 24, 2023
a7ce3e9
Include the index(es) in the calculation of cache usage.
knighton Apr 25, 2023
cbbf52e
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 25, 2023
cdd35f7
Forgetting something?
knighton Apr 25, 2023
3077885
Merge branch 'main' into james/evict
knighton Apr 25, 2023
1d6511a
Harden _IterState shutdown, improve docstrings.
knighton Apr 25, 2023
05fad12
Prototype test_eviction.
knighton Apr 25, 2023
54afedb
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 25, 2023
2bfbfaf
Merge branch 'main' into james/evict
knighton Apr 25, 2023
257893a
Fix (docstring).
knighton Apr 25, 2023
1e6def8
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 25, 2023
41a7eba
Merge branch 'main' into james/evict
knighton Apr 25, 2023
80d671b
Merge branch 'main' into james/evict
knighton Apr 25, 2023
959f6d8
Merge branch 'main' into james/evict
knighton Apr 25, 2023
74ee2ff
Unfuck the merge.
knighton Apr 26, 2023
72ad378
Fix (lint).
knighton Apr 26, 2023
e8dd0c4
Merge branch 'main' into james/evict
knighton Apr 26, 2023
c226171
Merge branch 'main' into james/evict
knighton Apr 26, 2023
adec1e5
Merge branch 'main' into james/evict
knighton Apr 26, 2023
b63e2fa
Unfuck the merge.
knighton Apr 26, 2023
ef7d968
Fix (missing paren).
knighton Apr 26, 2023
d5936a9
Expose mt/mp-safe download_shard(), evict_shard().
knighton Apr 27, 2023
a729839
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 27, 2023
33cf070
Improve docstring.
knighton Apr 27, 2023
738d1b6
Several small fixes.
knighton Apr 28, 2023
5afca91
Change tick back.
knighton Apr 28, 2023
4ac2cf2
Sleepy __del__.
knighton Apr 28, 2023
44ef7bf
10x.
knighton Apr 28, 2023
80be2fb
Break parts of tests into their own methods.
knighton Apr 28, 2023
2b12d13
Add a grace period for the old when registering the new.
knighton Apr 28, 2023
a562ae4
Fix stray utf-8.
knighton Apr 28, 2023
aad7c62
ValueError -> RuntimeError.
knighton Apr 28, 2023
0cee886
Docstring.
knighton Apr 28, 2023
bbcd066
Improve eviction tests.
knighton Apr 28, 2023
b8fe9c5
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 28, 2023
5975be4
Also call exit() at the end of SD.__iter__
knighton Apr 28, 2023
742f22d
Update usage.
knighton Apr 28, 2023
9e00574
Get specific about exception types.
knighton Apr 28, 2023
9921c09
Fix (docstrings).
knighton Apr 28, 2023
dd0b7f3
Fix (docstrings).
knighton Apr 28, 2023
93474cc
Merge branch 'main' into james/evict
knighton Apr 28, 2023
39e6f71
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton Apr 28, 2023
a74530a
zip_nokeep, zip_keep.
knighton Apr 28, 2023
57053d0
Fix (lint).
knighton Apr 28, 2023
70781cc
cache_limit check in SD init.
knighton Apr 28, 2023
378d176
Tweak logic.
knighton May 8, 2023
895b5c4
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 8, 2023
16b148b
Up get_prefix retries to account for tightened TICK.
knighton May 8, 2023
eff7a40
f64 time() -> u64 time_ns() (better properties).
knighton May 8, 2023
4876758
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 8, 2023
28232e4
Merge branch 'main' into james/evict
knighton May 8, 2023
7ad7de1
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 8, 2023
f7836a3
Replace while with for.
knighton May 8, 2023
50d45f0
While -> for.
knighton May 8, 2023
65d640e
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 8, 2023
f8bb998
Update streaming/base/shared/memory.py
knighton May 8, 2023
298b2ba
SharedMemory.buf property
knighton May 8, 2023
f865b67
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 8, 2023
c9b3cf3
Remove spurious imports.
knighton May 8, 2023
e4678e1
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 9, 2023
77e0231
Harden concurrent download/evict logic.
knighton May 9, 2023
ca48983
Switch shared barrier usage in SD init to torch dist barrier.
knighton May 10, 2023
ab85bd1
SD _cache_filelock -> SD torch multiprocesing _cache_lock.
knighton May 10, 2023
173d480
Do the same to SharedBarrier.
knighton May 10, 2023
dc146f6
Fancy exception handling in __iter__ threads.
knighton May 10, 2023
bde5ea2
Merge branch 'main' into james/evict
knighton May 10, 2023
bc04cab
Fix (get_shards call).
knighton May 11, 2023
e41a269
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 11, 2023
08cfead
Merge branch 'main' into james/evict
knighton May 11, 2023
3fea380
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 11, 2023
b44ea24
Switch torch.multiprocessing.Manager().Lock() _cache_lock to FileLock…
knighton May 11, 2023
5b1f80b
Switch SharedBarrer torch mp lock to FileLock.
knighton May 11, 2023
f768883
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 11, 2023
6a3e818
Fix (yapf).
knighton May 11, 2023
91a576f
Speed up test_eviction.
knighton May 11, 2023
1bca9f1
Fix (typo).
knighton May 11, 2023
27e855e
Switch dist barrier -> wait_for_file_to_exist.
knighton May 12, 2023
29c9f64
Merge branch 'main' into james/evict
knighton May 14, 2023
b37c9cc
Fixed boto3 interpreter shutdown issue and fixed waiting for shard in…
karan6181 May 18, 2023
cb419a1
Add support for human readble size format
karan6181 May 18, 2023
966237e
Check for a correct number of thread exits
karan6181 May 18, 2023
2de1628
Removed extra eviction test
karan6181 May 19, 2023
cede2ca
Merge remote-tracking branch 'origin/main' into cold_shard_eviction
karan6181 May 19, 2023
1a46fc2
Draft shuffling docs.
knighton May 24, 2023
96422e1
Add missing dirs.
knighton May 24, 2023
49bf2f8
Attempt to harden logic.
knighton May 24, 2023
8a31ae6
EOF line.
knighton May 24, 2023
7412e14
Tweak docs.
knighton May 24, 2023
7749595
Lower the predownload value and updated the default shuffle_algo
karan6181 May 24, 2023
5db6ef6
Update readme with cold shard eviction details and update predownload…
karan6181 May 24, 2023
fc0b555
Lingo.
knighton May 25, 2023
58334e0
NCN x64.
knighton May 25, 2023
e162ab6
Merge branch 'main' into james/evict
knighton May 25, 2023
7c5cbca
Tweak lingo.
knighton May 25, 2023
990f093
Merge branch 'james/evict' of github.com:mosaicml/streaming into jame…
knighton May 25, 2023
5e719c2
"Dynamically"
knighton May 25, 2023
f8498ba
revert NCN 64 changes
karan6181 May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
651 changes: 448 additions & 203 deletions streaming/base/dataset.py

Large diffs are not rendered by default.

149 changes: 148 additions & 1 deletion streaming/base/format/base/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

"""Read and decode sample from shards."""

import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, Iterator, List, Optional, Set

from streaming.base.array import Array

Expand Down Expand Up @@ -74,6 +75,152 @@ def __len__(self) -> int:
"""
return self.samples

def _evict_raw(self) -> None:
"""Remove all raw files belonging to this shard."""
for raw_info, _ in self.file_pairs:
filename = os.path.join(self.dirname, self.split, raw_info.basename)
if os.path.exists(filename):
os.remove(filename)
knighton marked this conversation as resolved.
Show resolved Hide resolved

def _evict_zip(self) -> None:
"""Remove all zip files belonging to this shard."""
for _, zip_info in self.file_pairs:
if zip_info:
filename = os.path.join(self.dirname, self.split, zip_info.basename)
if os.path.exists(filename):
os.remove(filename)
knighton marked this conversation as resolved.
Show resolved Hide resolved

def evict(self) -> None:
"""Remove all files belonging to this shard."""
self._evict_raw()
self._evict_zip()

def init_local_dir(self, filenames_present: Set[str], keep_zip: bool) -> bool:
"""Bring what shard files are present to a consistent state, returning whether present.

Args:
filenames_present (Set[str]): The listing of all files under dirname/[split/]. This is
listed once and then saved because there could potentially be very many shard
files.
keep_zip (bool): Whether to keep zip files when decompressing. Possible when
compression was used. Necessary when local is the remote or there is no remote.

Returns:
bool: Whether the shard is present.
"""
# For raw/zip to be considered present, each raw/zip file must be present.
raw_files_present = 0
zip_files_present = 0
for raw_info, zip_info in self.file_pairs:
if raw_info:
filename = os.path.join(self.dirname, self.split, raw_info.basename)
if filename in filenames_present:
raw_files_present += 1
if zip_info:
filename = os.path.join(self.dirname, self.split, zip_info.basename)
if filename in filenames_present:
zip_files_present += 1

# If the shard raw files are partially present, garbage collect the present ones and mark
# the shard raw as not present, in order to achieve consistency.
if not raw_files_present:
is_raw_present = False
elif raw_files_present < len(self.file_pairs):
is_raw_present = False
self._evict_raw()
else:
is_raw_present = True

# Same as the above, but for shard zip files.
if not zip_files_present:
is_zip_present = False
elif zip_files_present < len(self.file_pairs):
is_zip_present = False
self._evict_zip()
else:
is_zip_present = True

# Do we keep_zip?
if keep_zip:
# If we can keep_zip, and we do, and have either raw or zip, we must have the other one
# too, because they are downloaded and decompressed together.
if self.compression and (is_zip_present != is_raw_present):
if is_raw_present:
is_raw_present = False
self._evict_raw()
elif is_zip_present:
is_zip_present = False
self._evict_zip()
else:
# If we don't keep_zip, drop any zip files.
if is_zip_present:
is_zip_present = False
self._evict_zip()

# Now, the shard is either entirely or not at all present given keep_zip.
return is_raw_present

def get_raw_size(self) -> int:
"""Get the raw (uncompressed) size of this shard.

Returns:
int: Size in bytes.
"""
size = 0
for info, _ in self.file_pairs:
size += info.bytes
return size

def get_zip_size(self) -> Optional[int]:
"""Get the zip (compressed) size of this shard, if compression was used.

Returns:
Optional[int]: Size in bytes, or ``None`` if does not exist.
"""
size = 0
for _, info in self.file_pairs:
if info is None:
knighton marked this conversation as resolved.
Show resolved Hide resolved
return None
size += info.bytes
return size

def get_full_size(self) -> int:
"""Get the full size of this shard.

"Full" in this case means both the raw (decompressed) and zip (compressed) versions are
resident (assuming it has a zip form). This is the maximum disk usage the shard can reach.
When compressed was used, even if keep_zip is ``False``, the zip form must still be
resident at the same time as the raw form during shard decompression.

Returns:
int: Size in bytes.
"""
raw_size = self.get_raw_size()
zip_size = self.get_zip_size() or 0
return raw_size + zip_size

def get_persistent_size(self, keep_zip: bool) -> int:
"""Get the persistent size of this shard.

"Persistent" in this case means whether both raw and zip are present is subject to
keep_zip. If we are not keeping zip files after decompression, they don't count to the
shard's persistent size on disk.

Args:
keep_zip (bool): Whether to keep zip files after decompressing.

Returns:
int: Size in bytes.
"""
if self.compression:
if keep_zip:
size = self.get_full_size()
else:
size = self.get_raw_size()
else:
size = self.get_raw_size()
return size

@abstractmethod
def decode_sample(self, data: bytes) -> Dict[str, Any]:
"""Decode a sample dict from bytes.
Expand Down
6 changes: 4 additions & 2 deletions streaming/base/shared/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
we are coordinating separately instantiated pytorch worker processes.
"""

from streaming.base.shared.array import SharedArray as SharedArray
from streaming.base.shared.barrier import SharedBarrier as SharedBarrier
from streaming.base.shared.memory import CreateSharedMemory as CreateSharedMemory
from streaming.base.shared.memory import SharedMemory as SharedMemory
from streaming.base.shared.prefix import get_shm_prefix as get_shm_prefix
from streaming.base.shared.scalar import SharedScalar as SharedScalar

__all__ = ['SharedBarrier', 'CreateSharedMemory', 'get_shm_prefix']
__all__ = ['SharedArray', 'SharedBarrier', 'SharedMemory', 'get_shm_prefix', 'SharedScalar']
58 changes: 58 additions & 0 deletions streaming/base/shared/array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""A numpy array of predetermined shape and dtype that lives in shared memory."""

from typing import Any, Tuple, Union

import numpy as np
from numpy.typing import NDArray

from streaming.base.shared.memory import SharedMemory


class SharedArray:
"""A numpy array of predetermined shape and dtype that lives in shared memory.

Args:
shape (Union[int, Tuple[int]]): Shape of the array.
dtype (type): Dtype of the array.
name (str): Its name in shared memory.
"""

def __init__(self, shape: Union[int, Tuple[int]], dtype: type, name: str) -> None:
self.shape = np.empty(shape).shape
self.dtype = dtype
self.name = name
size = np.prod(shape) * dtype(0).nbytes
self.shm = SharedMemory(name=name, size=size)

def numpy(self) -> NDArray:
"""Get as a numpy array.

We can't just internally store and use this numpy array shared memory wrapper because it's
not compatible with spawn.
"""
return np.ndarray(self.shape, buffer=self.shm.buf, dtype=self.dtype)

def __getitem__(self, index: Any) -> Any:
"""Get the scalar(s) at the given index, slice, or array of indices.

Args:
index (Any): The index, slice, or array of indices.

Returns:
The scalar(s) at the given location(s).
"""
arr = self.numpy()
return arr[index]

def __setitem__(self, index: Any, value: Any) -> Any:
"""Set the scalar(s) at the given index, slice, or array of indices.

Args:
index (Any): The index, slice, or array of indices.
value (Any): The scalar(s) at the given location(s).
"""
arr = self.numpy()
arr[index] = value
31 changes: 11 additions & 20 deletions streaming/base/shared/barrier.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Synchronization primitives that live in shared memory.
"""Barrier that lives in shared memory.

For when using `threading` or `multiprocessing` from the python standard library won't do, because
we are coordinating separately instantiated pytorch worker processes.
Implemented with shared array and a filelock.
"""

import atexit
import os
import shutil
from multiprocessing import resource_tracker # pyright: ignore
from time import sleep

import numpy as np
from filelock import FileLock

from streaming.base.shared.memory import CreateSharedMemory
from streaming.base.shared.array import SharedArray

# Time to wait, in seconds.
TICK = 0.07
Expand All @@ -39,24 +37,19 @@ class SharedBarrier:

def __init__(self, filelock_path: str, shm_path: str) -> None:
self.filelock_path = filelock_path
self.created_shms = []
self.opened_shms = []

# Create three int32 fields in shared memory: num_enter, num_exit, flag.
size = 3 * np.int32(0).nbytes
shared_barrier_shm = CreateSharedMemory(name=shm_path, size=size)
self._shm = shared_barrier_shm.shm
self._arr = SharedArray(3, np.int32, shm_path)
self.num_enter = 0
self.num_exit = -1
self.flag = True

# Create filelock.
dirname = os.path.dirname(filelock_path)
os.makedirs(dirname, exist_ok=True)
if dirname:
os.makedirs(dirname, exist_ok=True)
self.lock = FileLock(filelock_path)

self._arr = np.ndarray(3, buffer=self._shm.buf, dtype=np.int32)
self._arr[0] = 0
self._arr[1] = -1
self._arr[2] = True

def cleanup():
"""Directory clean up."""
if os.path.islink(dirname):
Expand Down Expand Up @@ -125,10 +118,6 @@ def __call__(self, num_procs: int) -> None:
Args:
num_procs (int): How many processes are sharing this barrier.
"""
# Re-init the numpy array pointing to shared memory. Necessary when spawn is the
# multiprocessing method used.
self._arr = np.ndarray(3, buffer=self._shm.buf, dtype=np.int32)

# Initialize num_exit to the number of processes.
with self.lock:
if self.num_exit == -1:
Expand Down Expand Up @@ -160,3 +149,5 @@ def __call__(self, num_procs: int) -> None:
# Note that we exited.
with self.lock:
self.num_exit += 1
if self.num_exit == num_procs:
self.num_exit = -1
35 changes: 14 additions & 21 deletions streaming/base/shared/memory.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,35 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Synchronization primitives that live in shared memory.

For when using `threading` or `multiprocessing` from the python standard library won't do, because
we are coordinating separately instantiated pytorch worker processes.
"""
"""Improved quiet implementation of shared memory in pure python."""

import atexit
from multiprocessing import resource_tracker # pyright: ignore
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.shared_memory import SharedMemory as BuiltinSharedMemory
from time import sleep
from typing import Any, Optional

# Time to wait, in seconds.
TICK = 0.07

# Time out to wait before raising exception
TIMEOUT = 60


class CreateSharedMemory:
"""Create a new Shared Memory block or attach to an existing shared memory block.
class SharedMemory:
"""Improved quiet implementation of shared memory.

Args:
name (Optional[str], optional): A unique shared memory block name. Defaults to ``None``.
create (Optional[bool], optional): Creates a new shared memory block or attaches to an
existing shared memory block. Defaults to ``None``.
name (str, optional): A unique shared memory block name. Defaults to ``None``.
create (bool, optional): Creates a new shared memory block or attaches to an existin
knighton marked this conversation as resolved.
Show resolved Hide resolved
shared memory block. Defaults to ``None``.
size (int, optional): A size of a shared memory block. Defaults to ``0``.
auto_cleanup (bool, optional): Register atexit handler for cleanup or not.
Defaults to ``True``.
auto_cleanup (bool, optional): Register atexit handler for cleanup or not. Defaults to
``True``.
"""

def __init__(self,
name: Optional[str] = None,
create: Optional[bool] = None,
size: int = 0,
auto_cleanup: bool = True):

self.created_shms = []
self.opened_shms = []
shm = None
Expand All @@ -47,28 +39,29 @@ def __init__(self,
try:
if create is True:
# Creates a new shared memory block
shm = SharedMemory(name, create, size)
shm = BuiltinSharedMemory(name, create, size)
self.created_shms.append(shm)
elif create is False:
# Avoid tracking shared memory resources in a process who attaches to an existing
# shared memory block because the process who created the shared memory is
# responsible for destroying the shared memory block.
resource_tracker.register = self.fix_register
# Attaches to an existing shared memory block
shm = SharedMemory(name, create, size)
shm = BuiltinSharedMemory(name, create, size)
self.opened_shms.append(shm)
else:
try:
# Creates a new shared memory block
shm = SharedMemory(name, True, size)
shm = BuiltinSharedMemory(name, True, size)
self.created_shms.append(shm)
except FileExistsError:
sleep(TICK)
resource_tracker.register = self.fix_register
# Attaches to an existing shared memory block
shm = SharedMemory(name, False, size)
shm = BuiltinSharedMemory(name, False, size)
self.opened_shms.append(shm)
self.shm = shm
self.buf = shm.buf
knighton marked this conversation as resolved.
Show resolved Hide resolved
finally:
resource_tracker.register = original_rtracker_reg

Expand Down
Loading