Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix the actor metric recording #415

Merged
merged 1 commit into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: fix the actor metric recording
  • Loading branch information
Tochemey committed Aug 10, 2024
commit 1a04aba0ee1e314389a6580f7b85cc593128da98
68 changes: 34 additions & 34 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ type actorSystem struct {
traceEnabled atomic.Bool
tracer trace.Tracer

// specifies whether metric is enabled
// specifies whether metrics is enabled
metricEnabled atomic.Bool

registry types.Registry
Expand Down Expand Up @@ -352,33 +352,33 @@ func (x *actorSystem) Register(ctx context.Context, actor Actor) error {
// This will send the given message to the actor after the given interval specified.
// The message will be sent once
func (x *actorSystem) ScheduleOnce(ctx context.Context, message proto.Message, pid PID, interval time.Duration) error {
spanCtx, span := x.tracer.Start(ctx, "ScheduleOnce")
ctx, span := x.tracer.Start(ctx, "ScheduleOnce")
defer span.End()
return x.scheduler.ScheduleOnce(spanCtx, message, pid, interval)
return x.scheduler.ScheduleOnce(ctx, message, pid, interval)
}

// RemoteScheduleOnce schedules a message to be sent to a remote actor in the future.
// This requires remoting to be enabled on the actor system.
// This will send the given message to the actor after the given interval specified
// The message will be sent once
func (x *actorSystem) RemoteScheduleOnce(ctx context.Context, message proto.Message, address *goaktpb.Address, interval time.Duration) error {
spanCtx, span := x.tracer.Start(ctx, "RemoteScheduleOnce")
ctx, span := x.tracer.Start(ctx, "RemoteScheduleOnce")
defer span.End()
return x.scheduler.RemoteScheduleOnce(spanCtx, message, address, interval)
return x.scheduler.RemoteScheduleOnce(ctx, message, address, interval)
}

// ScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
func (x *actorSystem) ScheduleWithCron(ctx context.Context, message proto.Message, pid PID, cronExpression string) error {
spanCtx, span := x.tracer.Start(ctx, "ScheduleWithCron")
ctx, span := x.tracer.Start(ctx, "ScheduleWithCron")
defer span.End()
return x.scheduler.ScheduleWithCron(spanCtx, message, pid, cronExpression)
return x.scheduler.ScheduleWithCron(ctx, message, pid, cronExpression)
}

// RemoteScheduleWithCron schedules a message to be sent to an actor in the future using a cron expression.
func (x *actorSystem) RemoteScheduleWithCron(ctx context.Context, message proto.Message, address *goaktpb.Address, cronExpression string) error {
spanCtx, span := x.tracer.Start(ctx, "RemoteScheduleWithCron")
ctx, span := x.tracer.Start(ctx, "RemoteScheduleWithCron")
defer span.End()
return x.scheduler.RemoteScheduleWithCron(spanCtx, message, address, cronExpression)
return x.scheduler.RemoteScheduleWithCron(ctx, message, address, cronExpression)
}

// Subscribe help receive dead letters whenever there are available
Expand Down Expand Up @@ -422,7 +422,7 @@ func (x *actorSystem) NumActors() uint64 {

// Spawn creates or returns the instance of a given actor in the system
func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID, error) {
spanCtx, span := x.tracer.Start(ctx, "Spawn")
ctx, span := x.tracer.Start(ctx, "Spawn")
defer span.End()

if !x.started.Load() {
Expand All @@ -445,7 +445,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
}
}

pid, err := x.configPID(spanCtx, name, actor)
pid, err := x.configPID(ctx, name, actor)
if err != nil {
span.SetStatus(codes.Error, "Spawn")
span.RecordError(err)
Expand All @@ -460,7 +460,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) (PID,
// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
// in the given optional options
func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (PID, error) {
spanCtx, span := x.tracer.Start(ctx, "SpawnFromFunc")
ctx, span := x.tracer.Start(ctx, "SpawnFromFunc")
defer span.End()

if !x.started.Load() {
Expand All @@ -470,7 +470,7 @@ func (x *actorSystem) SpawnNamedFromFunc(ctx context.Context, name string, recei
}

actor := newFuncActor(name, receiveFunc, opts...)
pid, err := x.configPID(spanCtx, name, actor)
pid, err := x.configPID(ctx, name, actor)
if err != nil {
span.SetStatus(codes.Error, "Spawn")
span.RecordError(err)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (x *actorSystem) SpawnRouter(ctx context.Context, poolSize int, routeesKind

// Kill stops a given actor in the system
func (x *actorSystem) Kill(ctx context.Context, name string) error {
spanCtx, span := x.tracer.Start(ctx, "Kill")
ctx, span := x.tracer.Start(ctx, "Kill")
defer span.End()

if !x.started.Load() {
Expand All @@ -517,7 +517,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {
if exist {
// stop the given actor. No need to record error in the span context
// because the shutdown method is taking care of that
return pid.Shutdown(spanCtx)
return pid.Shutdown(ctx)
}

err := ErrActorNotFound(actorPath.String())
Expand All @@ -528,7 +528,7 @@ func (x *actorSystem) Kill(ctx context.Context, name string) error {

// ReSpawn recreates a given actor in the system
func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) {
spanCtx, span := x.tracer.Start(ctx, "ReSpawn")
ctx, span := x.tracer.Start(ctx, "ReSpawn")
defer span.End()

if !x.started.Load() {
Expand All @@ -545,7 +545,7 @@ func (x *actorSystem) ReSpawn(ctx context.Context, name string) (PID, error) {

pid, exist := x.actors.get(actorPath)
if exist {
if err := pid.Restart(spanCtx); err != nil {
if err := pid.Restart(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to restart actor=%s", actorPath.String())
}

Expand Down Expand Up @@ -597,7 +597,7 @@ func (x *actorSystem) PeerAddress() string {
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goaktpb.Address, pid PID, err error) {
spanCtx, span := x.tracer.Start(ctx, "ActorOf")
ctx, span := x.tracer.Start(ctx, "ActorOf")
defer span.End()

x.locker.Lock()
Expand All @@ -619,7 +619,7 @@ func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *goak

// check in the cluster
if x.clusterEnabled.Load() {
actor, err := x.cluster.GetActor(spanCtx, actorName)
actor, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
x.logger.Infof("actor=%s not found", actorName)
Expand Down Expand Up @@ -676,7 +676,7 @@ func (x *actorSystem) LocalActor(actorName string) (PID, error) {
// When the cluster mode is not enabled an actor not found error will be returned
// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *goaktpb.Address, err error) {
spanCtx, span := x.tracer.Start(ctx, "RemoteActor")
ctx, span := x.tracer.Start(ctx, "RemoteActor")
defer span.End()

x.locker.Lock()
Expand All @@ -696,7 +696,7 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *
return nil, e
}

actor, err := x.cluster.GetActor(spanCtx, actorName)
actor, err := x.cluster.GetActor(ctx, actorName)
if err != nil {
if errors.Is(err, cluster.ErrActorNotFound) {
x.logger.Infof("actor=%s not found", actorName)
Expand All @@ -717,22 +717,22 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *

// Start starts the actor system
func (x *actorSystem) Start(ctx context.Context) error {
spanCtx, span := x.tracer.Start(ctx, "Start")
ctx, span := x.tracer.Start(ctx, "Start")
defer span.End()

x.started.Store(true)

if x.remotingEnabled.Load() {
x.enableRemoting(spanCtx)
x.enableRemoting(ctx)
}

if x.clusterEnabled.Load() {
if err := x.enableClustering(spanCtx); err != nil {
if err := x.enableClustering(ctx); err != nil {
return err
}
}

x.scheduler.Start(spanCtx)
x.scheduler.Start(ctx)

actorName := x.getSystemActorName(supervisorType)
pid, err := x.configPID(ctx, actorName, newSystemSupervisor(x.logger))
Expand All @@ -751,7 +751,7 @@ func (x *actorSystem) Start(ctx context.Context) error {

// Stop stops the actor system
func (x *actorSystem) Stop(ctx context.Context) error {
spanCtx, span := x.tracer.Start(ctx, "Stop")
ctx, span := x.tracer.Start(ctx, "Stop")
defer span.End()

x.logger.Infof("%s shutting down...", x.name)
Expand All @@ -768,17 +768,17 @@ func (x *actorSystem) Stop(ctx context.Context) error {
x.logger.Infof("%s is shutting down..:)", x.name)

x.started.Store(false)
x.scheduler.Stop(spanCtx)
x.scheduler.Stop(ctx)

if x.eventsStream != nil {
x.eventsStream.Shutdown()
}

ctx, cancel := context.WithTimeout(spanCtx, x.shutdownTimeout)
ctx, cancel := context.WithTimeout(ctx, x.shutdownTimeout)
defer cancel()

if x.remotingEnabled.Load() {
if err := x.remotingServer.Shutdown(spanCtx); err != nil {
if err := x.remotingServer.Shutdown(ctx); err != nil {
span.SetStatus(codes.Error, "Stop")
span.RecordError(err)
return err
Expand All @@ -789,7 +789,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}

if x.clusterEnabled.Load() {
if err := x.cluster.Stop(spanCtx); err != nil {
if err := x.cluster.Stop(ctx); err != nil {
span.SetStatus(codes.Error, "Stop")
span.RecordError(err)
return err
Expand Down Expand Up @@ -1124,16 +1124,16 @@ func (x *actorSystem) GetKinds(_ context.Context, request *connect.Request[inter
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
func (x *actorSystem) handleRemoteAsk(ctx context.Context, to PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
spanCtx, span := x.tracer.Start(ctx, "HandleRemoteAsk")
ctx, span := x.tracer.Start(ctx, "HandleRemoteAsk")
defer span.End()
return Ask(spanCtx, to, message, timeout)
return Ask(ctx, to, message, timeout)
}

// handleRemoteTell handles an asynchronous message to an actor
func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message proto.Message) error {
spanCtx, span := x.tracer.Start(ctx, "HandleRemoteTell")
ctx, span := x.tracer.Start(ctx, "HandleRemoteTell")
defer span.End()
return Tell(spanCtx, to, message)
return Tell(ctx, to, message)
}

// getSupervisor return the system supervisor
Expand Down
2 changes: 1 addition & 1 deletion actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ func TestActorSystem(t *testing.T) {
}
// sort the array
sort.Strings(expected)
// get the metric names
// get the metrics names
actual := make([]string, len(got.ScopeMetrics[0].Metrics))
for i, metric := range got.ScopeMetrics[0].Metrics {
actual[i] = metric.Name
Expand Down
Loading
Loading