Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DataFrame nunique #1137

Merged
merged 5 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support dataframe unique
  • Loading branch information
hekaisheng committed Apr 8, 2020
commit a58a1e7b09042f1a5b6af3f823f38c0fa1adad83
9 changes: 6 additions & 3 deletions mars/dataframe/reduction/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .cumprod import DataFrameCumprod
from .cumsum import DataFrameCumsum

from .nunique import DataFrameNunique

def _install():
from ..core import DATAFRAME_TYPE, SERIES_TYPE
Expand All @@ -41,16 +42,18 @@ def _install():
from .cummin import cummin
from .cumprod import cumprod
from .cumsum import cumsum
from .nunique import nunique_dataframe, nunique_series

func_names = ['sum', 'prod', 'max', 'min', 'count', 'mean', 'var',
'std', 'cummax', 'cummin', 'cumprod', 'cumsum',
'agg', 'aggregate']
'agg', 'aggregate', 'nunique']
series_funcs = [sum_series, prod_series, max_series, min_series,
count_series, mean_series, var_series, std_series,
cummax, cummin, cumprod, cumsum, aggregate, aggregate]
cummax, cummin, cumprod, cumsum, aggregate, aggregate,
nunique_series]
df_funcs = [sum_dataframe, prod_dataframe, max_dataframe, min_dataframe,
count_dataframe, mean_dataframe, var_dataframe, std_dataframe,
cummax, cummin, cumprod, cumsum, aggregate, aggregate]
cummax, cummin, cumprod, cumsum, aggregate, aggregate, nunique_dataframe]
for func_name, series_func, df_func in zip(func_names, series_funcs, df_funcs):
for t in DATAFRAME_TYPE:
setattr(t, func_name, df_func)
Expand Down
27 changes: 21 additions & 6 deletions mars/dataframe/reduction/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def _tree_reduction(cls, chunks, op, combine_size, idx):
concat_dtypes = chks[0].dtypes
concat_shape = (sum([c.shape[0] for c in chks]), chks[0].shape[1])
else:
concat_index = chks[0].index
concat_index = chks[0].index_value
concat_dtypes = pd.Series([c.dtypes[0] for c in chks])
concat_shape = (chks[0].shape[0], (sum([c.shape[1] for c in chks])))
chk = concat_op.new_chunk(chks, shape=concat_shape, index=(i,),
Expand All @@ -174,12 +174,15 @@ def _tree_reduction(cls, chunks, op, combine_size, idx):
index_value=index_value))
chunks = new_chunks

if op.axis == 0:
concat_shape = (sum([c.shape[0] for c in chunks]), chunks[0].shape[1])
else:
concat_shape = (chunks[0].shape[0], (sum([c.shape[1] for c in chunks])))
concat_op = DataFrameConcat(axis=op.axis, object_type=ObjectType.dataframe)
chk = concat_op.new_chunk(chunks, index=(idx,))
chk = concat_op.new_chunk(chunks, index=(idx,), shape=concat_shape)
empty_df = build_empty_df(chunks[0].dtypes)
reduced_df = getattr(empty_df, getattr(cls, '_func_name'))(axis=op.axis, level=op.level,
numeric_only=op.numeric_only)
reduced_shape = (np.nan,) if op.axis == 1 else reduced_df.shape
reduced_df = cls._execute_reduction(empty_df, op)
reduced_shape = (chk.shape[0],) if op.axis == 1 else reduced_df.shape
new_op = op.copy().reset_key()
new_op._stage = OperandStage.agg
return new_op.new_chunk([chk], shape=reduced_shape, index=(idx,), dtype=reduced_df.dtype,
Expand Down Expand Up @@ -419,11 +422,21 @@ def _call_dataframe(self, df):
func_name = getattr(self, '_func_name')
if func_name == 'count':
reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, numeric_only=numeric_only)
dtype = reduced_df.dtype
elif func_name == 'nunique':
reduced_df = getattr(empty_df, func_name)(axis=axis)
if axis == 1:
# fix dtype when axis is 1
dtype = np.dtype('int')
else:
dtype = reduced_df.dtype
else:
reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, skipna=skipna,
numeric_only=numeric_only)
dtype = reduced_df.dtype

reduced_shape = (df.shape[0],) if axis == 1 else reduced_df.shape
return self.new_series([df], shape=reduced_shape, dtype=reduced_df.dtype,
return self.new_series([df], shape=reduced_shape, dtype=dtype,
index_value=parse_index(reduced_df.index, store_data=axis == 0))

def _call_series(self, series):
Expand All @@ -442,6 +455,8 @@ def _call_series(self, series):
func_name = getattr(self, '_func_name')
if func_name == 'count':
reduced_series = empty_series.count(level=level)
elif func_name == 'nunique':
reduced_series = empty_series.nunique()
else:
reduced_series = getattr(empty_series, func_name)(axis=axis, level=level, skipna=skipna,
numeric_only=numeric_only)
Expand Down
99 changes: 99 additions & 0 deletions mars/dataframe/reduction/nunique.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import OrderedDict

import pandas as pd

from ... import opcodes as OperandDef
from ...serialize import BoolField
from ...utils import lazy_import
from .core import DataFrameReductionOperand, DataFrameReductionMixin, ObjectType


cudf = lazy_import('cudf', globals=globals())


class DataFrameNunique(DataFrameReductionOperand, DataFrameReductionMixin):
_op_type_ = OperandDef.NUNIQUE
_func_name = 'nunique'

_dropna = BoolField('dropna')

def __init__(self, dropna=None, **kw):
super(DataFrameNunique, self).__init__(_dropna=dropna, **kw)

@property
def dropna(self):
return self._dropna

@classmethod
def _execute_map(cls, ctx, op, reduction_func=None):
xdf = cudf if op.gpu else pd
in_data = ctx[op.inputs[0].key]
if isinstance(in_data, xdf.Series) or op.object_type == ObjectType.series:
unique_values = in_data.drop_duplicates()
ctx[op.outputs[0].key] = xdf.Series(unique_values, name=in_data.name)
else:
if op.axis == 0:
df = xdf.DataFrame(OrderedDict((d, [v.drop_duplicates().to_list()]) for d, v in in_data.iteritems()))
else:
df = xdf.DataFrame(in_data.apply(xdf.Series.drop_duplicates, axis=1))
hekaisheng marked this conversation as resolved.
Show resolved Hide resolved
ctx[op.outputs[0].key] = df

@classmethod
def _execute_combine(cls, ctx, op):
xdf = cudf if op.gpu else pd
in_data = ctx[op.inputs[0].key]
if isinstance(in_data, xdf.Series):
unique_values = in_data.explode().drop_duplicates()
ctx[op.outputs[0].key] = xdf.Series(unique_values, name=in_data.name)
else:
if op.axis == 0:
df = xdf.DataFrame(OrderedDict((d, [v.explode().drop_duplicates().to_list()])
for d, v in in_data.iteritems()))
else:
df = xdf.DataFrame(in_data.apply(lambda x: x.explode().drop_duplicates(), axis=1))
hekaisheng marked this conversation as resolved.
Show resolved Hide resolved
ctx[op.outputs[0].key] = df

@classmethod
def _execute_agg(cls, ctx, op):
xdf = cudf if op.gpu else pd
in_data = ctx[op.inputs[0].key]
if isinstance(in_data, xdf.Series):
ctx[op.outputs[0].key] = in_data.explode().nunique(dropna=op.dropna)
else:
ctx[op.outputs[0].key] = in_data.apply(
hekaisheng marked this conversation as resolved.
Show resolved Hide resolved
lambda x: x.explode().nunique(dropna=op.dropna), axis=op.axis)

@classmethod
def _execute_reduction(cls, in_data, op, min_count=None, reduction_func=None):
kwargs = dict()
if op.axis is not None:
kwargs['axis'] = op.axis
return in_data.nunique(dropna=op.dropna, **kwargs)


def nunique_dataframe(df, axis=0, dropna=True, combine_size=None):
op = DataFrameNunique(axis=axis, dropna=dropna, combine_size=combine_size,
object_type=ObjectType.series)
return op(df)


def nunique_series(df, dropna=True, combine_size=None):
op = DataFrameNunique(dropna=dropna, combine_size=combine_size,
object_type=ObjectType.scalar)
return op(df)


36 changes: 34 additions & 2 deletions mars/dataframe/reduction/tests/test_reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from mars.dataframe.core import DataFrame, IndexValue, Series
from mars.dataframe.reduction import DataFrameSum, DataFrameProd, DataFrameMin, \
DataFrameMax, DataFrameCount, DataFrameMean, DataFrameVar, DataFrameCummin, \
DataFrameCummax, DataFrameCumprod, DataFrameCumsum
DataFrameCummax, DataFrameCumprod, DataFrameCumsum, DataFrameNunique
from mars.dataframe.merge import DataFrameConcat
from mars.dataframe.datasource.series import from_pandas as from_pandas_series
from mars.dataframe.datasource.dataframe import from_pandas as from_pandas_df
Expand Down Expand Up @@ -199,7 +199,7 @@ def testDataFrameReduction(self):
reduction_df = reduction_df.tiles()

self.assertEqual(len(reduction_df.chunks), 5)
self.assertEqual(reduction_df.nsplits, ((np.nan,) * 5,))
self.assertEqual(reduction_df.nsplits, ((4,) * 5,))
self.assertIsInstance(reduction_df.chunks[0].op, self.op)
self.assertIsInstance(reduction_df.chunks[0].inputs[0].op, DataFrameConcat)
self.assertEqual(len(reduction_df.chunks[0].inputs[0].inputs), 2)
Expand Down Expand Up @@ -369,6 +369,38 @@ def testDataFrameReduction(self):
self.assertEqual(reduction_df.chunks[-1].inputs[-1].op.stage, OperandStage.map)
self.assertEqual(len(reduction_df.chunks[-1].inputs), 7)

def testNunique(self):
data = pd.DataFrame(np.random.randint(0, 6, size=(20, 10)),
columns=['c' + str(i) for i in range(10)])
df = from_pandas_df(data, chunk_size=3)
result = df.nunique()

self.assertEqual(result.shape, (10,))
self.assertEqual(result.op.object_type, ObjectType.series)
self.assertIsInstance(result.op, DataFrameNunique)

tiled = result.tiles()
self.assertEqual(tiled.shape, (10,))
self.assertEqual(len(tiled.chunks), 4)
self.assertEqual(tiled.nsplits, ((3, 3, 3, 1,),))
self.assertEqual(tiled.chunks[0].op.stage, OperandStage.agg)
self.assertIsInstance(tiled.chunks[0].op, DataFrameNunique)

data2 = data.copy()
df2 = from_pandas_df(data2, chunk_size=3)
result2 = df2.nunique(axis=1)

self.assertEqual(result2.shape, (20,))
self.assertEqual(result2.op.object_type, ObjectType.series)
self.assertIsInstance(result2.op, DataFrameNunique)

tiled = result2.tiles()
self.assertEqual(tiled.shape, (20,))
self.assertEqual(len(tiled.chunks), 7)
self.assertEqual(tiled.nsplits, ((3, 3, 3, 3, 3, 3, 2,),))
self.assertEqual(tiled.chunks[0].op.stage, OperandStage.agg)
self.assertIsInstance(tiled.chunks[0].op, DataFrameNunique)


class TestAggregate(TestBase):
def testDataFrameAggregate(self):
Expand Down
69 changes: 69 additions & 0 deletions mars/dataframe/reduction/tests/test_reduction_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,75 @@ def testDataFrameCount(self):
expected = data.count(axis='columns', numeric_only=True)
pd.testing.assert_series_equal(result, expected)

def testNunique(self):
data1 = pd.Series(np.random.randint(0, 5, size=(20,)))

series = from_pandas_series(data1)
result = self.executor.execute_dataframe(series.nunique(), concat=True)[0]
expected = data1.nunique()
self.assertEqual(result, expected)

series = from_pandas_series(data1, chunk_size=6)
result = self.executor.execute_dataframe(series.nunique(), concat=True)[0]
expected = data1.nunique()
self.assertEqual(result, expected)

# test dropna
data2 = data1.copy()
data2[[2, 9, 18]] = np.nan

series = from_pandas_series(data2)
result = self.executor.execute_dataframe(series.nunique(), concat=True)[0]
expected = data2.nunique()
self.assertEqual(result, expected)

series = from_pandas_series(data2, chunk_size=3)
result = self.executor.execute_dataframe(series.nunique(dropna=False), concat=True)[0]
expected = data2.nunique(dropna=False)
self.assertEqual(result, expected)

# test dataframe
data1 = pd.DataFrame(np.random.randint(0, 6, size=(20, 20)),
columns=['c' + str(i) for i in range(20)])
df = from_pandas_df(data1)
result = self.executor.execute_dataframe(df.nunique(), concat=True)[0]
expected = data1.nunique()
pd.testing.assert_series_equal(result, expected)

df = from_pandas_df(data1, chunk_size=6)
result = self.executor.execute_dataframe(df.nunique(), concat=True)[0]
expected = data1.nunique()
pd.testing.assert_series_equal(result, expected)

df = from_pandas_df(data1)
result = self.executor.execute_dataframe(df.nunique(axis=1), concat=True)[0]
expected = data1.nunique(axis=1)
pd.testing.assert_series_equal(result, expected)

df = from_pandas_df(data1, chunk_size=3)
result = self.executor.execute_dataframe(df.nunique(axis=1), concat=True)[0]
expected = data1.nunique(axis=1)
pd.testing.assert_series_equal(result, expected)

# test dropna
data2 = data1.copy()
data2.iloc[[2, 9, 18], [2, 9, 18]] = np.nan

df = from_pandas_df(data2)
result = self.executor.execute_dataframe(df.nunique(), concat=True)[0]
expected = data2.nunique()
pd.testing.assert_series_equal(result, expected)

df = from_pandas_df(data2, chunk_size=3)
result = self.executor.execute_dataframe(df.nunique(dropna=False), concat=True)[0]
expected = data2.nunique(dropna=False)
pd.testing.assert_series_equal(result, expected)

df = from_pandas_df(data1, chunk_size=3)
result = self.executor.execute_dataframe(df.nunique(axis=1), concat=True)[0]
expected = data1.nunique(axis=1)
pd.testing.assert_series_equal(result, expected)


cum_reduction_functions = dict(
cummax=dict(func_name='cummax'),
Expand Down
1 change: 1 addition & 0 deletions mars/serialize/protos/operand.proto
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ message OperandDef {
TRANSFORM = 718;
CHECK_NA = 719;
DROP_NA = 720;
NUNIQUE = 721;

FUSE = 801;

Expand Down