Skip to content

Implement Livestate SDK #5626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/plugin/sdk/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
toolRegistry *toolregistry.ToolRegistry
}

// withLogger copies the commonFields and sets the logger to the given one.
func (c commonFields) withLogger(logger *zap.Logger) commonFields {
c.logger = logger
return c

Check warning on line 124 in pkg/plugin/sdk/deployment.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/deployment.go#L122-L124

Added lines #L122 - L124 were not covered by tests
}

// DeploymentPluginServiceServer is the gRPC server that handles requests from the piped.
type DeploymentPluginServiceServer[Config, DeployTargetConfig any] struct {
deployment.UnimplementedDeploymentServiceServer
Expand Down
272 changes: 254 additions & 18 deletions pkg/plugin/sdk/livestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import (
"context"
"encoding/json"
"fmt"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/livestate"
)

Expand All @@ -31,8 +33,7 @@
Plugin

Register(server *grpc.Server)
setCommonFields(commonFields)
setConfig([]byte) error
setFields(commonFields) error
livestate.LivestateServiceServer
}
)
Expand All @@ -46,7 +47,7 @@
// GetLivestate returns the live state of the resources in the given application.
// It returns the resources' live state and the difference between the desired state and the live state.
// It's allowed to return only the resources' live state if the difference is not available, or only the difference if the live state is not available.
GetLivestate(context.Context, *Config, []*DeployTarget[DeployTargetConfig], TODO) (TODO, error)
GetLivestate(context.Context, *Config, []*DeployTarget[DeployTargetConfig], *GetLivestateInput) (*GetLivestateResponse, error)
}

// LivestatePluginServer is a wrapper for LivestatePlugin to satisfy the LivestateServiceServer interface.
Expand All @@ -55,8 +56,9 @@
livestate.UnimplementedLivestateServiceServer
commonFields

base LivestatePlugin[Config, DeployTargetConfig]
config Config
base LivestatePlugin[Config, DeployTargetConfig]
config Config
deployTargets map[string]*DeployTarget[DeployTargetConfig]
}

// RegisterLivestatePlugin registers the given LivestatePlugin to the sdk.
Expand All @@ -79,23 +81,257 @@
livestate.RegisterLivestateServiceServer(server, s)
}

// setCommonFields sets the common fields to the plugin server.
func (s *LivestatePluginServer[Config, DeployTargetConfig]) setCommonFields(c commonFields) {
s.commonFields = c
}
// setFields sets the common fields and configs to the server.
func (s *LivestatePluginServer[Config, DeployTargetConfig]) setFields(fields commonFields) error {
s.commonFields = fields

Check warning on line 86 in pkg/plugin/sdk/livestate.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/livestate.go#L85-L86

Added lines #L85 - L86 were not covered by tests

// setConfig sets the configuration to the plugin server.
func (s *LivestatePluginServer[Config, DeployTargetConfig]) setConfig(bytes []byte) error {
if bytes == nil {
return nil
cfg := fields.config
if cfg.Config != nil {
if err := json.Unmarshal(cfg.Config, &s.config); err != nil {
s.logger.Fatal("failed to unmarshal the plugin config", zap.Error(err))
return err
}

Check warning on line 93 in pkg/plugin/sdk/livestate.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/livestate.go#L88-L93

Added lines #L88 - L93 were not covered by tests
}
if err := json.Unmarshal(bytes, &s.config); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)

s.deployTargets = make(map[string]*DeployTarget[DeployTargetConfig], len(cfg.DeployTargets))
for _, dt := range cfg.DeployTargets {
var sdkDt DeployTargetConfig
if err := json.Unmarshal(dt.Config, &sdkDt); err != nil {
s.logger.Fatal("failed to unmarshal deploy target config", zap.Error(err))
return err
}
s.deployTargets[dt.Name] = &DeployTarget[DeployTargetConfig]{
Name: dt.Name,
Labels: dt.Labels,
Config: sdkDt,
}

Check warning on line 107 in pkg/plugin/sdk/livestate.go

View check run for this annotation

Codecov / codecov/patch

pkg/plugin/sdk/livestate.go#L96-L107

Added lines #L96 - L107 were not covered by tests
}

return nil
}

// GetLivestate returns the live state of the resources in the given application.
func (s *LivestatePluginServer[Config, DeployTargetConfig]) GetLivestate(context.Context, *livestate.GetLivestateRequest) (*livestate.GetLivestateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetLivestate not implemented")
func (s *LivestatePluginServer[Config, DeployTargetConfig]) GetLivestate(ctx context.Context, request *livestate.GetLivestateRequest) (*livestate.GetLivestateResponse, error) {
// Get the deploy targets set on the deployment from the piped plugin config.
deployTargets := make([]*DeployTarget[DeployTargetConfig], 0, len(request.GetDeployTargets()))
for _, name := range request.GetDeployTargets() {
dt, ok := s.deployTargets[name]
if !ok {
return nil, status.Errorf(codes.Internal, "the deploy target %s is not found in the piped plugin config", name)
}

deployTargets = append(deployTargets, dt)
}
Comment on lines +116 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[refactor] Let's convert this block into a function in another PR since the same logic appears in ExecuteStage(), etc.


client := &Client{
base: s.client,
pluginName: s.Name(),
applicationID: request.GetApplicationId(),
toolRegistry: s.toolRegistry,
}

response, err := s.base.GetLivestate(ctx, &s.config, deployTargets, &GetLivestateInput{
Request: GetLivestateRequest{
ApplicationID: request.ApplicationId,
DeploymentSource: newDeploymentSource(request.GetDeploySource()),
},
Client: client,
Logger: s.logger,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get the live state: %v", err)
}

return response.toModel(time.Now()), nil
}

// GetLivestateInput is the input for the GetLivestate method.
type GetLivestateInput struct {
// Request is the request for getting the live state.
Request GetLivestateRequest
// Client is the client for accessing the piped API.
Client *Client
// Logger is the logger for logging.
Logger *zap.Logger
}

// GetLivestateResponse is the response for the GetLivestate method.
type GetLivestateRequest struct {
// ApplicationID is the ID of the application.
ApplicationID string
// DeploymentSource is the source of the deployment.
DeploymentSource DeploymentSource
}

// GetLivestateResponse is the response for the GetLivestate method.
type GetLivestateResponse struct {
// LiveState is the live state of the application.
LiveState ApplicationLiveState
// SyncState is the sync state of the application.
SyncState ApplicationSyncState
}

// toModel converts the GetLivestateResponse to the model.GetLivestateResponse.
func (r *GetLivestateResponse) toModel(now time.Time) *livestate.GetLivestateResponse {
return &livestate.GetLivestateResponse{
ApplicationLiveState: r.LiveState.toModel(now),
SyncState: r.SyncState.toModel(now),
}
}

// ApplicationLiveState represents the live state of an application.
type ApplicationLiveState struct {
Resources []ResourceState
HealthStatus ApplicationHealthStatus
}

// toModel converts the ApplicationLiveState to the model.ApplicationLiveState.
func (s *ApplicationLiveState) toModel(now time.Time) *model.ApplicationLiveState {
resources := make([]*model.ResourceState, 0, len(s.Resources))
for _, rs := range s.Resources {
resources = append(resources, rs.toModel(now))
}
return &model.ApplicationLiveState{
Resources: resources,
HealthStatus: s.HealthStatus.toModel(),
}
}

// ResourceState represents the live state of a resource.
type ResourceState struct {
// ID is the unique identifier of the resource.
ID string
// ParentIDs is the list of the parent resource's IDs.
ParentIDs []string
// Name is the name of the resource.
Name string
// ResourceType is the type of the resource.
ResourceType string
// ResourceMetadata is the metadata of the resource.
ResourceMetadata map[string]string
// HealthStatus is the health status of the resource.
HealthStatus ResourceHealthStatus
// HealthDescription is the description of the health status.
HealthDescription string
// DeployTarget is the target where the resource is deployed.
DeployTarget string
// PluginName is the name of the plugin that provides the resource.
PluginName string
// CreatedAt is the time when the resource was created.
CreatedAt time.Time
}

// toModel converts the ResourceState to the model.ResourceState.
func (s *ResourceState) toModel(now time.Time) *model.ResourceState {
return &model.ResourceState{
Id: s.ID,
ParentIds: s.ParentIDs,
Name: s.Name,
ResourceType: s.ResourceType,
ResourceMetadata: s.ResourceMetadata,
HealthStatus: s.HealthStatus.toModel(),
HealthDescription: s.HealthDescription,
DeployTarget: s.DeployTarget,
PluginName: s.PluginName,
CreatedAt: s.CreatedAt.Unix(),
UpdatedAt: now.Unix(),
}
}

// ApplicationHealthStatus represents the health status of an application.
type ApplicationHealthStatus int

const (
// ApplicationHealthStateUnknown represents the unknown health status of an application.
ApplicationHealthStateUnknown ApplicationHealthStatus = iota
// ApplicationHealthStateHealthy represents the healthy health status of an application.
ApplicationHealthStateHealthy
// ApplicationHealthStateOther represents the other health status of an application.
ApplicationHealthStateOther
)

// toModel converts the ApplicationHealthStatus to the model.ApplicationLiveState_Status.
func (s ApplicationHealthStatus) toModel() model.ApplicationLiveState_Status {
switch s {
case ApplicationHealthStateHealthy:
return model.ApplicationLiveState_HEALTHY
case ApplicationHealthStateOther:
return model.ApplicationLiveState_OTHER
default:
return model.ApplicationLiveState_UNKNOWN
}
}

// ResourceHealthStatus represents the health status of a resource.
type ResourceHealthStatus int

const (
// ResourceHealthStateUnknown represents the unknown health status of a resource.
ResourceHealthStateUnknown ResourceHealthStatus = iota
// ResourceHealthStateHealthy represents the healthy health status of a resource.
ResourceHealthStateHealthy
// ResourceHealthStateUnhealthy represents the unhealthy health status of a resource.
ResourceHealthStateUnhealthy
)

// toModel converts the ResourceHealthStatus to the model.ResourceState_HealthStatus.
func (s ResourceHealthStatus) toModel() model.ResourceState_HealthStatus {
switch s {
case ResourceHealthStateHealthy:
return model.ResourceState_HEALTHY
case ResourceHealthStateUnhealthy:
return model.ResourceState_UNHEALTHY
default:
return model.ResourceState_UNKNOWN
}
}

// ApplicationSyncState represents the sync state of an application.
type ApplicationSyncState struct {
// Status is the sync status of the application.
Status ApplicationSyncStatus
// ShortReason is the short reason of the sync status.
// for example, "The service manifest doesn't be synced"
ShortReason string
// Reason is the reason of the sync status.
// actually, it's the difference between the desired state and the live state.
Reason string
}

// toModel converts the ApplicationSyncState to the model.ApplicationSyncState.
func (s *ApplicationSyncState) toModel(now time.Time) *model.ApplicationSyncState {
return &model.ApplicationSyncState{
Status: s.Status.toModel(),
ShortReason: s.ShortReason,
Reason: s.Reason,
Timestamp: now.Unix(),
}
}

// ApplicationSyncStatus represents the sync status of an application.
type ApplicationSyncStatus int

const (
// ApplicationSyncStateUnknown represents the unknown sync status of an application.
ApplicationSyncStateUnknown ApplicationSyncStatus = iota
// ApplicationSyncStateSynced represents the synced sync status of an application.
ApplicationSyncStateSynced
// ApplicationSyncStateOutOfSync represents the out-of-sync sync status of an application.
ApplicationSyncStateOutOfSync
// ApplicationSyncStateInvalidConfig represents the invalid-config sync status of an application.
ApplicationSyncStateInvalidConfig
)

// toModel converts the ApplicationSyncStatus to the model.ApplicationSyncStatus.
func (s ApplicationSyncStatus) toModel() model.ApplicationSyncStatus {
switch s {
case ApplicationSyncStateSynced:
return model.ApplicationSyncStatus_SYNCED
case ApplicationSyncStateOutOfSync:
return model.ApplicationSyncStatus_OUT_OF_SYNC
case ApplicationSyncStateInvalidConfig:
return model.ApplicationSyncStatus_INVALID_CONFIG
default:
return model.ApplicationSyncStatus_UNKNOWN
}
}
Loading