Kubernetes Cluster Connectivity Guide
This guide explains how Image Factory connects to Kubernetes clusters for hybrid infrastructure execution, covering both local development and multi-cluster production patterns.
Use it as a technical reference for connectivity patterns and operational concerns. Validate implementation details against the current infrastructure provider code and configuration model.
Overview
This guide covers how the Image Factory system connects to Kubernetes clusters for hybrid infrastructure execution, supporting both local development and production multi-cluster deployments.
Connection Methods
- Local Development: Direct kubeconfig access
- Production: Secure service account tokens, IAM authentication
- Multi-Cluster: Dynamic cluster discovery and routing
Local Development Cluster Connection
Method 1: Kubeconfig File (Most Common)
// internal/infrastructure/k8s/client.go
type K8sClient struct {
config *rest.Config
clientset *kubernetes.Clientset
tektonClient *tektonclientset.Clientset
}
func NewK8sClient(kubeconfigPath string) (*K8sClient, error) {
// Load kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed to load kubeconfig: %w", err)
}
// Create clients
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
tektonClient, err := tektonclientset.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create tekton client: %w", err)
}
return &K8sClient{
config: config,
clientset: clientset,
tektonClient: tektonClient,
}, nil
}
Configuration:
# config/local.yaml
kubernetes:
clusters:
- name: "local-dev"
kubeconfig: "~/.kube/config"
default: true
context: "docker-desktop" # or "minikube", "kind"
Method 2: In-Cluster Configuration (When Running in K8s)
func NewInClusterClient() (*K8sClient, error) {
// Use in-cluster config when running as a pod
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
}
// Create clients with in-cluster config
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
return &K8sClient{
config: config,
clientset: clientset,
}, nil
}
Production Multi-Cluster Connectivity
Architecture Overview
Image Factory Backend
│
├── Cluster Registry API
│ ├── Cluster Discovery
│ ├── Health Monitoring
│ └── Load Balancing
│
└── Cluster Connectors
├── Service Account Auth
├── IAM Authentication
└── Certificate-Based Auth
Method 1: Service Account Tokens (Recommended for Production)
// internal/infrastructure/k8s/cluster_manager.go
type ClusterManager struct {
clusters map[string]*ClusterConnection
registry *ClusterRegistry
}
type ClusterConnection struct {
Name string
Endpoint string
CAData []byte
Token string // Service account token
Namespace string
client *K8sClient
lastHealth time.Time
}
func (cm *ClusterManager) ConnectToCluster(clusterName string) (*K8sClient, error) {
cluster, exists := cm.clusters[clusterName]
if !exists {
return nil, fmt.Errorf("cluster %s not found", clusterName)
}
// Create REST config from service account token
config := &rest.Config{
Host: cluster.Endpoint,
BearerToken: cluster.Token,
TLSClientConfig: rest.TLSClientConfig{
CAData: cluster.CAData,
},
}
// Create and cache client
client, err := NewK8sClientFromConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to connect to cluster %s: %w", clusterName, err)
}
cluster.client = client
cluster.lastHealth = time.Now()
return client, nil
}
Service Account Setup:
# Create service account for Image Factory
kubectl create serviceaccount image-factory-sa -n image-factory
# Create cluster role with necessary permissions
kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: image-factory-role
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "configmaps", "secrets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["tekton.dev"]
resources: ["pipelineruns", "taskruns", "pipelines", "tasks"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
EOF
# Bind role to service account
kubectl create clusterrolebinding image-factory-binding \
--clusterrole=image-factory-role \
--serviceaccount=image-factory:image-factory-sa
# Get service account token
kubectl get secret $(kubectl get serviceaccount image-factory-sa -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' | base64 --decode
Method 2: Cloud Provider IAM Authentication
AWS EKS with IAM
// internal/infrastructure/k8s/aws_connector.go
type AWSClusterConnector struct {
region string
clusterName string
roleARN string
}
func (c *AWSClusterConnector) Connect(ctx context.Context) (*K8sClient, error) {
// Get EKS cluster details
eksClient := eks.New(sess.Must(sess.NewSession(&aws.Config{
Region: aws.String(c.region),
})))
cluster, err := eksClient.DescribeCluster(&eks.DescribeClusterInput{
Name: &c.clusterName,
})
if err != nil {
return nil, fmt.Errorf("failed to describe EKS cluster: %w", err)
}
// Create AWS IAM authenticator
config := &rest.Config{
Host: cluster.Cluster.Endpoint,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(*cluster.Cluster.CertificateAuthority.Data),
},
}
// Use AWS IAM authenticator
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return &awsAuthTransport{
RoundTripper: rt,
region: c.region,
clusterName: c.clusterName,
roleARN: c.roleARN,
}
}
return NewK8sClientFromConfig(config)
}
GCP GKE with Workload Identity
// internal/infrastructure/k8s/gcp_connector.go
type GCPClusterConnector struct {
projectID string
clusterName string
location string
serviceAccount string
}
func (c *GCPClusterConnector) Connect(ctx context.Context) (*K8sClient, error) {
// Use GCP Workload Identity
config, err := google.DefaultClient(ctx, container.CloudPlatformScope)
if err != nil {
return nil, fmt.Errorf("failed to get GCP client: %w", err)
}
// Get GKE cluster details
service, err := container.NewService(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create GKE service: %w", err)
}
cluster, err := service.Projects.Locations.Clusters.Get(
fmt.Sprintf("projects/%s/locations/%s/clusters/%s",
c.projectID, c.location, c.clusterName)).Do()
if err != nil {
return nil, fmt.Errorf("failed to get GKE cluster: %w", err)
}
// Create REST config with GCP auth
restConfig := &rest.Config{
Host: fmt.Sprintf("https://%s", cluster.Endpoint),
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(cluster.MasterAuth.ClusterCaCertificate),
},
}
// GCP authentication is handled by the client
restConfig.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return &gcpAuthTransport{
RoundTripper: rt,
tokenSource: config,
}
}
return NewK8sClientFromConfig(restConfig)
}
Azure AKS with Managed Identity
// internal/infrastructure/k8s/azure_connector.go
type AzureClusterConnector struct {
subscriptionID string
resourceGroupName string
clusterName string
clientID string
}
func (c *AzureClusterConnector) Connect(ctx context.Context) (*K8sClient, error) {
// Azure authentication using managed identity
cred, err := azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{
ID: azidentity.ClientID(c.clientID),
})
if err != nil {
return nil, fmt.Errorf("failed to create Azure credential: %w", err)
}
// Get AKS cluster details
clustersClient, err := armcontainerservice.NewManagedClustersClient(c.subscriptionID, cred, nil)
if err != nil {
return nil, fmt.Errorf("failed to create AKS client: %w", err)
}
cluster, err := clustersClient.Get(ctx, c.resourceGroupName, c.clusterName, nil)
if err != nil {
return nil, fmt.Errorf("failed to get AKS cluster: %w", err)
}
// Create REST config with Azure auth
config := &rest.Config{
Host: *cluster.Properties.Fqdn,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(*cluster.Properties.KubeConfig.CertificateAuthorityData),
},
}
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
return &azureAuthTransport{
RoundTripper: rt,
cred: cred,
}
}
return NewK8sClientFromConfig(config)
}
Method 3: Certificate-Based Authentication
// internal/infrastructure/k8s/cert_connector.go
type CertificateClusterConnector struct {
endpoint string
caCert []byte
clientCert []byte
clientKey []byte
}
func (c *CertificateClusterConnector) Connect(ctx context.Context) (*K8sClient, error) {
config := &rest.Config{
Host: c.endpoint,
TLSClientConfig: rest.TLSClientConfig{
CAData: c.caCert,
CertData: c.clientCert,
KeyData: c.clientKey,
},
}
return NewK8sClientFromConfig(config)
}
Cluster Registry And Discovery
Cluster Registry API
// internal/infrastructure/k8s/registry.go
type ClusterRegistry struct {
clusters map[string]*ClusterInfo
discovery ClusterDiscovery
healthCheck *HealthChecker
}
type ClusterInfo struct {
Name string
Provider string // aws, gcp, azure, onprem
Region string
Endpoint string
AuthMethod string // service-account, iam, cert
Status ClusterStatus
Capabilities []string // gpu, arm64, windows, etc.
LastHealth time.Time
}
func (r *ClusterRegistry) DiscoverClusters(ctx context.Context) error {
// Discover clusters based on configuration
discovered, err := r.discovery.Discover(ctx)
if err != nil {
return fmt.Errorf("failed to discover clusters: %w", err)
}
// Update registry
for _, cluster := range discovered {
r.clusters[cluster.Name] = cluster
}
return nil
}
func (r *ClusterRegistry) GetHealthyClusters() []*ClusterInfo {
var healthy []*ClusterInfo
for _, cluster := range r.clusters {
if cluster.Status == ClusterHealthy {
healthy = append(healthy, cluster)
}
}
return healthy
}
Dynamic Cluster Discovery
// internal/infrastructure/k8s/discovery.go
type ClusterDiscovery interface {
Discover(ctx context.Context) ([]*ClusterInfo, error)
}
// AWS EKS Discovery
type AWSEKSDiscovery struct {
regions []string
}
func (d *AWSEKSDiscovery) Discover(ctx context.Context) ([]*ClusterInfo, error) {
var clusters []*ClusterInfo
for _, region := range d.regions {
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))
eksClient := eks.New(sess)
// List all EKS clusters in region
result, err := eksClient.ListClusters(&eks.ListClustersInput{})
if err != nil {
continue // Skip region if error
}
for _, clusterName := range result.Clusters {
cluster := &ClusterInfo{
Name: *clusterName,
Provider: "aws",
Region: region,
AuthMethod: "iam",
Status: ClusterUnknown,
}
clusters = append(clusters, cluster)
}
}
return clusters, nil
}
Connection Management And Health Monitoring
Connection Pooling
// internal/infrastructure/k8s/connection_pool.go
type ConnectionPool struct {
clients map[string]*PooledClient
maxIdle time.Duration
maxClients int
mu sync.RWMutex
}
type PooledClient struct {
client *K8sClient
lastUsed time.Time
clusterName string
}
func (p *ConnectionPool) Get(clusterName string) (*K8sClient, error) {
p.mu.Lock()
defer p.mu.Unlock()
if client, exists := p.clients[clusterName]; exists {
// Check if client is still valid
if time.Since(client.lastUsed) < p.maxIdle {
client.lastUsed = time.Now()
return client.client, nil
}
// Remove stale client
delete(p.clients, clusterName)
}
// Create new client
client, err := p.createClient(clusterName)
if err != nil {
return nil, err
}
p.clients[clusterName] = &PooledClient{
client: client,
lastUsed: time.Now(),
clusterName: clusterName,
}
return client, nil
}
func (p *ConnectionPool) HealthCheck(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.performHealthChecks(ctx)
}
}
}
Health Monitoring
// internal/infrastructure/k8s/health.go
type HealthChecker struct {
timeout time.Duration
}
func (h *HealthChecker) CheckClusterHealth(ctx context.Context, client *K8sClient) ClusterStatus {
ctx, cancel := context.WithTimeout(ctx, h.timeout)
defer cancel()
// Check API server connectivity
_, err := client.clientset.ServerVersion()
if err != nil {
return ClusterUnhealthy
}
// Check Tekton installation
_, err = client.tektonClient.TektonV1beta1().Pipelines("").List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return ClusterDegraded
}
// Check node capacity
nodes, err := client.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ClusterDegraded
}
// Check if there are ready nodes
readyNodes := 0
for _, node := range nodes.Items {
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
readyNodes++
break
}
}
}
if readyNodes == 0 {
return ClusterUnhealthy
}
return ClusterHealthy
}
Security Considerations
Authentication Best Practices
-
Service Accounts for Production
- Use dedicated service accounts with minimal permissions
- Rotate tokens regularly
- Store tokens securely (vault, secrets manager)
-
IAM Authentication
- Use cloud provider IAM roles
- Implement least privilege access
- Enable audit logging
-
Certificate Management
- Use short-lived certificates
- Implement certificate rotation
- Store certificates in secure locations
Network Security
# Network policies for cluster access
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: image-factory-access
namespace: image-factory
spec:
podSelector:
matchLabels:
app: image-factory
policyTypes:
- Egress
egress:
- to:
- ipBlock:
cidr: 10.0.0.0/8 # Cluster internal
ports:
- protocol: TCP
port: 443
- to:
- ipBlock:
cidr: 172.16.0.0/12 # Cluster internal
ports:
- protocol: TCP
port: 6443 # API server
RBAC Configuration
# Cluster role for Image Factory
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: image-factory-cluster-role
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/exec", "configmaps", "secrets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["tekton.dev"]
resources: ["pipelineruns", "taskruns", "pipelines", "tasks", "conditions"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["tekton.dev"]
resources: ["clustertasks"]
verbs: ["get", "list", "watch"]
Configuration Management
Cluster Configuration Schema
// internal/config/kubernetes.go
type KubernetesConfig struct {
Clusters []ClusterConfig `yaml:"clusters"`
DefaultCluster string `yaml:"default_cluster"`
ConnectionPool ConnectionPoolConfig `yaml:"connection_pool"`
HealthCheck HealthCheckConfig `yaml:"health_check"`
}
type ClusterConfig struct {
Name string `yaml:"name"`
Provider string `yaml:"provider"` // aws, gcp, azure, local
Region string `yaml:"region"`
Endpoint string `yaml:"endpoint"`
Auth AuthConfig `yaml:"auth"`
Capabilities []string `yaml:"capabilities"`
Labels map[string]string `yaml:"labels"`
}
type AuthConfig struct {
Method string `yaml:"method"` // service-account, iam, cert, kubeconfig
Kubeconfig string `yaml:"kubeconfig"` // for local development
TokenPath string `yaml:"token_path"` // for service account tokens
RoleARN string `yaml:"role_arn"` // for AWS IAM
ServiceAccount string `yaml:"service_account"` // for GCP
ClientID string `yaml:"client_id"` // for Azure
CertPath string `yaml:"cert_path"` // for certificates
KeyPath string `yaml:"key_path"` // for certificates
}
Example Configuration
# config/production.yaml
kubernetes:
default_cluster: "prod-us-east-1"
connection_pool:
max_idle: "5m"
max_clients: 10
health_check:
interval: "30s"
timeout: "10s"
clusters:
- name: "prod-us-east-1"
provider: "aws"
region: "us-east-1"
auth:
method: "iam"
role_arn: "arn:aws:iam::123456789012:role/image-factory-role"
capabilities: ["gpu", "arm64"]
labels:
environment: "production"
region: "us-east-1"
- name: "staging-eu-west-1"
provider: "aws"
region: "eu-west-1"
auth:
method: "service-account"
token_path: "/secrets/staging-token"
capabilities: ["amd64"]
labels:
environment: "staging"
region: "eu-west-1"
- name: "local-dev"
provider: "local"
auth:
method: "kubeconfig"
kubeconfig: "~/.kube/config"
capabilities: ["amd64", "arm64"]
labels:
environment: "development"
Testing Cluster Connections
Unit Tests
func TestClusterConnection(t *testing.T) {
tests := []struct {
name string
cluster ClusterConfig
expectError bool
}{
{
name: "valid AWS cluster",
cluster: ClusterConfig{
Name: "test-aws",
Provider: "aws",
Region: "us-east-1",
Auth: AuthConfig{
Method: "iam",
RoleARN: "arn:aws:iam::123456789012:role/test-role",
},
},
expectError: false,
},
{
name: "invalid cluster config",
cluster: ClusterConfig{
Name: "invalid",
Auth: AuthConfig{
Method: "unknown",
},
},
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
connector := NewClusterConnector(tt.cluster)
client, err := connector.Connect(context.Background())
if tt.expectError {
assert.Error(t, err)
assert.Nil(t, client)
} else {
assert.NoError(t, err)
assert.NotNil(t, client)
}
})
}
}
Integration Tests
func TestMultiClusterConnectivity(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test")
}
// Setup test clusters
clusters := []ClusterConfig{
{
Name: "local-kind",
Provider: "local",
Auth: AuthConfig{
Method: "kubeconfig",
Kubeconfig: "~/.kube/config",
},
},
// Add more test clusters as available
}
manager := NewClusterManager(clusters)
for _, cluster := range clusters {
t.Run(fmt.Sprintf("connect-%s", cluster.Name), func(t *testing.T) {
client, err := manager.ConnectToCluster(cluster.Name)
assert.NoError(t, err)
assert.NotNil(t, client)
// Test basic connectivity
version, err := client.clientset.ServerVersion()
assert.NoError(t, err)
assert.NotEmpty(t, version.GitVersion)
})
}
}
Monitoring And Observability
Connection Metrics
// internal/infrastructure/k8s/metrics.go
type ClusterMetrics struct {
ConnectionAttempts prometheus.Counter
ConnectionSuccesses prometheus.Counter
ConnectionFailures prometheus.Counter
ConnectionLatency prometheus.Histogram
ActiveConnections prometheus.Gauge
ClusterHealthStatus prometheus.Gauge
}
func (m *ClusterMetrics) RecordConnection(clusterName string, success bool, duration time.Duration) {
m.ConnectionAttempts.WithLabelValues(clusterName).Inc()
m.ConnectionLatency.WithLabelValues(clusterName).Observe(duration.Seconds())
if success {
m.ConnectionSuccesses.WithLabelValues(clusterName).Inc()
} else {
m.ConnectionFailures.WithLabelValues(clusterName).Inc()
}
}
Logging
// Structured logging for cluster connections
type ClusterLogger struct {
logger *zap.Logger
}
func (l *ClusterLogger) LogConnectionAttempt(clusterName, method string) {
l.logger.Info("attempting cluster connection",
zap.String("cluster", clusterName),
zap.String("method", method),
zap.Time("timestamp", time.Now()),
)
}
func (l *ClusterLogger) LogConnectionResult(clusterName string, success bool, duration time.Duration, err error) {
fields := []zap.Field{
zap.String("cluster", clusterName),
zap.Bool("success", success),
zap.Duration("duration", duration),
}
if err != nil {
fields = append(fields, zap.Error(err))
}
if success {
l.logger.Info("cluster connection successful", fields...)
} else {
l.logger.Error("cluster connection failed", fields...)
}
}
Quick Start For Local Development
-
Install kubectl and configure kubeconfig:
# For Docker Desktop kubectl config use-context docker-desktop # For minikube minikube start kubectl config use-context minikube # For kind kind create cluster kubectl config use-context kind-kind -
Install Tekton:
kubectl apply -f https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml -
Configure Image Factory:
# config/local.yaml kubernetes: clusters: - name: "local-dev" provider: "local" auth: method: "kubeconfig" kubeconfig: "~/.kube/config" -
Test Connection:
client, err := NewK8sClient("~/.kube/config") if err != nil { log.Fatal(err) } version, err := client.clientset.ServerVersion() fmt.Printf("Connected to Kubernetes %s\n", version.GitVersion)
Summary
This guide documents connectivity patterns for both local development and production multi-cluster deployments. It is most useful as implementation reference when designing secure, scalable Kubernetes access for hybrid infrastructure execution.