Skip to main content

pvcs.go

pvcs.go - Overview

This file defines the PvcsRepo struct and its methods for retrieving and processing data related to Persistent Volume Claims (PVCs) in Kubernetes. It handles querying metrics, filtering attributes, and formatting the data for presentation.

Detailed Documentation

Constants

  • metricToUseForVolumes: The metric used to query volume data ( "k8s_volume_available").
  • volumeAttrsToEnrich: A list of attributes to enrich volume data with, such as pod UID, name, namespace, node name, statefulset name, cluster name and persistent volume claim name.
  • k8sPersistentVolumeClaimNameAttrKey: The attribute key for the persistent volume claim name ("k8s_persistentvolumeclaim_name").
  • queryNamesForVolumes: A map associating volume properties ( "available", "capacity", "usage", "inodes", "inodes_free", "inodes_used") with query names ( "A", "B", "F1", "C", "D", "E").
  • volumeQueryNames: A list of query names used for volumes ("A", "B", "C", "D", "E", "F1").
  • metricNamesForVolumes: A map associating volume properties ( "available", "capacity", "inodes", "inodes_free", "inodes_used") with their corresponding metric names.

PvcsRepo struct

type PvcsRepo struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
  • Purpose: Represents a repository for retrieving PVC-related data.
    • reader: An interfaces.Reader for fetching attribute keys and values.
    • querierV2: An interfaces.Querier for executing queries.

NewPvcsRepo function

func NewPvcsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *PvcsRepo {
return &PvcsRepo{reader: reader, querierV2: querierV2}
}
  • Purpose: Creates a new PvcsRepo instance.
    • Parameters:
      • reader: An interfaces.Reader instance.
      • querierV2: An interfaces.Querier instance.
    • Returns: A pointer to the newly created PvcsRepo.

(*PvcsRepo) GetPvcAttributeKeys function

func (p *PvcsRepo) GetPvcAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForVolumes
if req.Limit == 0 {
req.Limit = 50
}

attributeKeysResponse, err := p.reader.GetMetricAttributeKeys(ctx, &req)
if err != nil {
return nil, err
}

// TODO(srikanthccv): only return resource attributes when we have a way to
// distinguish between resource attributes and other attributes.
filteredKeys := []v3.AttributeKey{}
for _, key := range attributeKeysResponse.AttributeKeys {
if slices.Contains(pointAttrsToIgnore, key.Key) {
continue
}
filteredKeys = append(filteredKeys, key)
}

return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil
}
  • Purpose: Retrieves attribute keys associated with PVC metrics.
    • Parameters:
      • ctx: A context.Context for managing the request's lifecycle.
      • req: A v3.FilterAttributeKeyRequest containing filtering criteria.
    • Returns: A v3.FilterAttributeKeyResponse containing the retrieved attribute keys, or an error if the retrieval fails.

(*PvcsRepo) GetPvcAttributeValues function

func (p *PvcsRepo) GetPvcAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
req.DataSource = v3.DataSourceMetrics
req.AggregateAttribute = metricToUseForVolumes
if req.Limit == 0 {
req.Limit = 50
}

attributeValuesResponse, err := p.reader.GetMetricAttributeValues(ctx, &req)
if err != nil {
return nil, err
}
return attributeValuesResponse, nil
}
  • Purpose: Retrieves attribute values associated with PVC metrics.
    • Parameters:
      • ctx: A context.Context for managing the request's lifecycle.
      • req: A v3.FilterAttributeValueRequest containing filtering criteria.
    • Returns: A v3.FilterAttributeValueResponse containing the retrieved attribute values, or an error if the retrieval fails.

(*PvcsRepo) getMetadataAttributes function

func (p *PvcsRepo) getMetadataAttributes(ctx context.Context, req model.VolumeListRequest) (map[string]map[string]string, error) {
volumeAttrs := map[string]map[string]string{}

for _, key := range volumeAttrsToEnrich {
hasKey := false
for _, groupByKey := range req.GroupBy {
if groupByKey.Key == key {
hasKey = true
break
}
}
if !hasKey {
req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key})
}
}

mq := v3.BuilderQuery{
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: metricToUseForVolumes,
DataType: v3.AttributeKeyDataTypeFloat64,
},
Temporality: v3.Unspecified,
GroupBy: req.GroupBy,
}

query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq)
if err != nil {
return nil, err
}

query = localQueryToDistributedQuery(query)

attrsListResponse, err := p.reader.GetListResultV3(ctx, query)
if err != nil {
return nil, err
}

for _, row := range attrsListResponse {
stringData := map[string]string{}
for key, value := range row.Data {
if str, ok := value.(string); ok {
stringData[key] = str
} else if strPtr, ok := value.(*string); ok {
stringData[key] = *strPtr
}
}

volumeName := stringData[k8sPersistentVolumeClaimNameAttrKey]
if _, ok := volumeAttrs[volumeName]; !ok {
volumeAttrs[volumeName] = map[string]string{}
}

for _, key := range req.GroupBy {
volumeAttrs[volumeName][key.Key] = stringData[key.Key]
}
}

return volumeAttrs, nil
}
  • Purpose: Retrieves metadata attributes for volumes based on the provided request.
    • Parameters:
      • ctx: A context.Context for managing the request's lifecycle.
      • req: A model.VolumeListRequest containing criteria for filtering and grouping volumes.
    • Returns: A map of volume names to their attribute key-value pairs, or an error if the retrieval fails.

(*PvcsRepo) getTopVolumeGroups function

func (p *PvcsRepo) getTopVolumeGroups(ctx context.Context, req model.VolumeListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) {
step, timeSeriesTableName, samplesTableName := getParamsForTopVolumes(req)

queryNames := queryNamesForVolumes[req.OrderBy.ColumnName]
topVolumeGroupsQueryRangeParams := &v3.QueryRangeParamsV3{
Start: req.Start,
End: req.End,
Step: step,
CompositeQuery: &v3.CompositeQuery{
BuilderQueries: map[string]*v3.BuilderQuery{},
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
},
}

for _, queryName := range queryNames {
query := q.CompositeQuery.BuilderQueries[queryName].Clone()
query.StepInterval = step
query.MetricTableHints = &v3.MetricTableHints{
TimeSeriesTableName: timeSeriesTableName,
SamplesTableName: samplesTableName,
}
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
topVolumeGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query
}

queryResponse, _, err := p.querierV2.QueryRange(ctx, topVolumeGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}
formattedResponse, err := postprocess.PostProcessResult(queryResponse, topVolumeGroupsQueryRangeParams)
if err != nil {
return nil, nil, err
}

if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 {
return nil, nil, nil
}

if req.OrderBy.Order == v3.DirectionDesc {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value
})
} else {
sort.Slice(formattedResponse[0].Series, func(i, j int) bool {
return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value
})
}

limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series)))

paginatedTopVolumeGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)]

topVolumeGroups := []map[string]string{}
for _, series := range paginatedTopVolumeGroupsSeries {
topVolumeGroups = append(topVolumeGroups, series.Labels)
}
allVolumeGroups := []map[string]string{}
for _, series := range formattedResponse[0].Series {
allVolumeGroups = append(allVolumeGroups, series.Labels)
}

return topVolumeGroups, allVolumeGroups, nil
}
  • Purpose: Retrieves the top volume groups based on the provided request and query parameters.
    • Parameters:
      • ctx: A context.Context for managing the request's lifecycle.
      • req: A model.VolumeListRequest containing criteria for filtering, ordering, and pagination.
      • q: A v3.QueryRangeParamsV3 containing query parameters.
    • Returns: Two slices of maps, representing the top volume groups and all volume groups, along with an error if the retrieval fails. Each map contains key-value pairs of attributes.

(*PvcsRepo) GetPvcList function

func (p *PvcsRepo) GetPvcList(ctx context.Context, req model.VolumeListRequest) (model.VolumeListResponse, error) {
resp := model.VolumeListResponse{}

if req.Limit == 0 {
req.Limit = 10
}

if req.OrderBy == nil {
req.OrderBy = &v3.OrderBy{ColumnName: "usage", Order: v3.DirectionDesc}
}

if req.GroupBy == nil {
req.GroupBy = []v3.AttributeKey{{Key: k8sPersistentVolumeClaimNameAttrKey}}
resp.Type = model.ResponseTypeList
} else {
resp.Type = model.ResponseTypeGroupedList
}

step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60))

query := PvcsTableListQuery.Clone()

query.Start = req.Start
query.End = req.End
query.Step = step

for _, query := range query.CompositeQuery.BuilderQueries {
query.StepInterval = step
if req.Filters != nil && len(req.Filters.Items) > 0 {
if query.Filters == nil {
query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}
}
query.Filters.Items = append(query.Filters.Items, req.Filters.Items...)
}
query.GroupBy = req.GroupBy
}

volumeAttrs, err := p.getMetadataAttributes(ctx, req)
if err != nil {
return resp, err
}

topVolumeGroups, allVolumeGroups, err := p.getTopVolumeGroups(ctx, req, query)
if err != nil {
return resp, err
}

groupFilters := map[string][]string{}
for _, topVolumeGroup := range topVolumeGroups {
for k, v := range topVolumeGroup {
groupFilters[k] = append(groupFilters[k], v)
}
}

for groupKey, groupValues := range groupFilters {
hasGroupFilter := false
if req.Filters != nil && len(req.Filters.Items) > 0 {
for _, filter := range req.Filters.Items {
if filter.Key.Key == groupKey {
hasGroupFilter = true
break
}
}
}

if !hasGroupFilter {
for _, query := range query.CompositeQuery.BuilderQueries {
query.Filters.Items = append(query.Filters.Items, v3.FilterItem{
Key: v3.AttributeKey{Key: groupKey},
Value: groupValues,
Operator: v3.FilterOperatorIn,
})
}
}
}

queryResponse, _, err := p.querierV2.QueryRange(ctx, query)
if err != nil {
return resp, err
}

formattedResponse, err := postprocess.PostProcessResult(queryResponse, query)
if err != nil {
return resp, err
}

records := []model.VolumeListRecord{}

for _, result := range formattedResponse {
for _, row := range result.Table.Rows {

record := model.VolumeListRecord{
VolumeUsage: -1,
VolumeAvailable: -1,
VolumeCapacity: -1,
VolumeInodes: -1,
VolumeInodesFree: -1,
VolumeInodesUsed: -1,
Meta: map[string]string{},
}

if volumeName, ok := row.Data[k8sPersistentVolumeClaimNameAttrKey].(string); ok {
record.PersistentVolumeClaimName = volumeName
}

if volumeAvailable, ok := row.Data["A"].(float64); ok {
record.VolumeAvailable = volumeAvailable
}
if volumeCapacity, ok := row.Data["B"].(float64); ok {
record.VolumeCapacity = volumeCapacity
}

if volumeInodes, ok := row.Data["C"].(float64); ok {
record.VolumeInodes = volumeInodes
}

if volumeInodesFree, ok := row.Data["D"].(float64); ok {
record.VolumeInodesFree = volumeInodesFree
}

if volumeInodesUsed, ok := row.Data["E"].(float64); ok {
record.VolumeInodesUsed = volumeInodesUsed
}

record.VolumeUsage = record.VolumeCapacity - record.VolumeAvailable

record.Meta = map[string]string{}
if _, ok := volumeAttrs[record.PersistentVolumeClaimName]; ok && record.PersistentVolumeClaimName != "" {
record.Meta = volumeAttrs[record.PersistentVolumeClaimName]
}

for k, v := range row.Data {
if slices.Contains(volumeQueryNames, k) {
continue
}
if labelValue, ok := v.(string); ok {
record.Meta[k] = labelValue
}
}

records = append(records, record)
}
}
resp.Total = len(allVolumeGroups)
resp.Records = records

resp.SortBy(req.OrderBy)

return resp, nil
}
  • Purpose: Retrieves a list of PVCs based on the provided request.
    • Parameters:
      • ctx: A context.Context for managing the request's lifecycle.
      • req: A model.VolumeListRequest containing criteria for filtering, ordering, and pagination.
    • Returns: A model.VolumeListResponse containing the list of PVCs, or an error if the retrieval fails.

Code Examples

Not applicable.

Include in Getting Started: NO