Skip to content

Commit

Permalink
Add send large file from cli (#101)
Browse files Browse the repository at this point in the history
* finish sendlarge on cli

* use chunksize from core

* add close send

* modify usage
  • Loading branch information
hongjijun233 committed Oct 10, 2023
1 parent d65c91e commit 4081f59
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cmd/workload/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,18 @@ func Command() *cli.Command {
},
},
},
{
Name: "sendlarge",
Usage: "send single large file to workload(s)",
ArgsUsage: sendArgsUsage,
Action: utils.ExitCoder(cmdWorkloadSendLarge),
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "file",
Usage: "copy local file to workload, only can use single time. src_path:dst_path[:mode[:uid:gid]]",
},
},
},
{
Name: "dissociate",
Usage: "dissociate workload(s) from eru, return it resource but not remove it",
Expand Down
123 changes: 123 additions & 0 deletions cmd/workload/sendlarge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package workload

import (
"context"
"fmt"
"io"
"sync"

"github.com/projecteru2/cli/cmd/utils"
corepb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/types"

"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

type sendLargeWorkloadsOptions struct {
client corepb.CoreRPCClient
// workload ids
ids []string
dst string
content []byte
modes *corepb.FileMode
owners *corepb.FileOwner
}

func (o *sendLargeWorkloadsOptions) run(ctx context.Context) error {
stream, err := o.client.SendLargeFile(ctx)

Check failure on line 28 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / publish

o.client.SendLargeFile undefined (type pb.CoreRPCClient has no field or method SendLargeFile)

Check failure on line 28 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / unittests

o.client.SendLargeFile undefined (type pb.CoreRPCClient has no field or method SendLargeFile)

Check failure on line 28 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

o.client.SendLargeFile undefined (type pb.CoreRPCClient has no field or method SendLargeFile)

Check failure on line 28 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

o.client.SendLargeFile undefined (type pb.CoreRPCClient has no field or method SendLargeFile)
if err != nil {
logrus.Errorf("[SendLarge] Failed send %s", o.dst)
return err
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return
}

if msg.Error != "" {
logrus.Errorf("[SendLarge] Failed send %s to %s", msg.Path, msg.Id)
} else {
logrus.Infof("[SendLarge] Send %s to %s success", msg.Path, msg.Id)
}
}
}()

fileOptions := o.toSendLargeFileChunks()
for _, chunk := range fileOptions {
err := stream.Send(chunk)
if err != nil {
logrus.Errorf("[SendLarge] Failed send %s", chunk.Dst)
return err
}
}
stream.CloseSend()
wg.Wait()
return nil
}

func (o *sendLargeWorkloadsOptions) toSendLargeFileChunks() []*corepb.FileOptions {

Check failure on line 68 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / publish

undefined: corepb.FileOptions

Check failure on line 68 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / unittests

undefined: pb.FileOptions

Check failure on line 68 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

undefined: pb.FileOptions

Check failure on line 68 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

undefined: pb.FileOptions
maxChunkSize := types.SendLargeFileChunkSize

Check failure on line 69 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / publish

undefined: types.SendLargeFileChunkSize

Check failure on line 69 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / unittests

undefined: types.SendLargeFileChunkSize

Check failure on line 69 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

undefined: types.SendLargeFileChunkSize

Check failure on line 69 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

undefined: types.SendLargeFileChunkSize
ret := make([]*corepb.FileOptions, 0)

Check failure on line 70 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / publish

undefined: corepb.FileOptions

Check failure on line 70 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / unittests

undefined: pb.FileOptions

Check failure on line 70 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

undefined: pb.FileOptions
for idx := 0; idx < len(o.content); idx += maxChunkSize {
fileOption := &corepb.FileOptions{

Check failure on line 72 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / publish

undefined: corepb.FileOptions

Check failure on line 72 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / unittests

undefined: pb.FileOptions

Check failure on line 72 in cmd/workload/sendlarge.go

View workflow job for this annotation

GitHub Actions / lint

undefined: pb.FileOptions (typecheck)
Ids: o.ids,
Dst: o.dst,
Size: int64(len(o.content)),
Mode: o.modes,
Owner: o.owners,
}
if idx+maxChunkSize > len(o.content) {
fileOption.Chunk = o.content[idx:]
} else {
fileOption.Chunk = o.content[idx : idx+maxChunkSize]
}
ret = append(ret, fileOption)
}
return ret
}

func cmdWorkloadSendLarge(c *cli.Context) error {
client, err := utils.NewCoreRPCClient(c)
if err != nil {
return err
}

content, modes, owners := utils.GenerateFileOptions(c)
if len(content) == 0 {
return fmt.Errorf("files should not be empty")
}
if len(content) >= 2 {
return fmt.Errorf("can not send multiple files at the same time")
}

ids := c.Args().Slice()
if len(ids) == 0 {
return fmt.Errorf("Workload ID(s) should not be empty")
}

targetFileName := func() string {
for key := range content {
return key
}
return ""
}()
o := &sendLargeWorkloadsOptions{
client: client,
ids: ids,
dst: targetFileName,
content: content[targetFileName],
modes: modes[targetFileName],
owners: owners[targetFileName],
}
return o.run(c.Context)
}

0 comments on commit 4081f59

Please sign in to comment.