-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathplugin.go
260 lines (204 loc) · 6.7 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package main
import (
"context"
"net"
"os"
"path"
"time"
log "github.com/Sirupsen/logrus"
"github.com/mpreu/k8s-device-plugin-v4l2loopback/v4l2l"
"google.golang.org/grpc"
api "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
)
const (
// pluginSocket describes the local path to the socket file on the system.
pluginSocket = api.DevicePluginPath + "v4l2l.sock"
resourceName = "mpreu.de/v4l2l"
)
// V4l2lDevicePlugin is the type which implements the Kubernetes
// device plugin interface.
type V4l2lDevicePlugin struct {
resourceName string
socketName string
deviceMap map[string]v4l2l.Device
devices []*api.Device
server *grpc.Server
}
// NewV4l2lDevicePlugin constructs a V4l2lDevicePlugin
func NewV4l2lDevicePlugin() *V4l2lDevicePlugin {
devMap := make(map[string]v4l2l.Device)
var devices []*api.Device
for _, device := range v4l2l.GetDeviceList() {
id := device.Name
devMap[id] = device
devices = append(devices, &api.Device{
ID: id,
Health: api.Healthy,
})
}
return &V4l2lDevicePlugin{
resourceName: resourceName,
socketName: pluginSocket,
deviceMap: devMap,
devices: devices,
}
}
// GetDevicePluginOptions return options for the device plugin.
// Implementation of the 'DevicePluginServer' interface.
func (plugin *V4l2lDevicePlugin) GetDevicePluginOptions(context.Context, *api.Empty) (*api.DevicePluginOptions, error) {
return &api.DevicePluginOptions{
PreStartRequired: false,
}, nil
}
// Register registers the device plugin with the given resource name with the Kubelet.
func (plugin *V4l2lDevicePlugin) Register(kubeletEndpoint string, resourceName string) error {
log.Debugln("Entering register function")
conn, err := checkServerConnection(kubeletEndpoint)
if err != nil {
log.Errorf("Cannot establish connection to Kubelet endpoint: %v", err)
return err
}
defer conn.Close()
client := api.NewRegistrationClient(conn)
request := &api.RegisterRequest{
Version: api.Version,
Endpoint: path.Base(pluginSocket),
ResourceName: plugin.resourceName,
}
log.Debugf("RegisterRequest: %v", request)
_, err = client.Register(context.Background(), request)
if err != nil {
log.Errorf("Sending plugin register request failed: %v", err)
return err
}
return nil
}
// ListAndWatch communicates changes of device states and returns a
// new device list. Implementation of the 'DevicePluginServer' interface.
func (plugin *V4l2lDevicePlugin) ListAndWatch(e *api.Empty, s api.DevicePlugin_ListAndWatchServer) error {
log.Debugf("ListAndWatch devices: %v", plugin.devices)
response := api.ListAndWatchResponse{
Devices: plugin.devices,
}
err := s.Send(&response)
if err != nil {
log.Errorf("Error when sending ListAndWatch response: %v", err)
return err
}
for {
select {}
}
}
// Allocate is resposible to make the device available during the
// container creation process. Implementation of the 'DevicePluginServer' interface.
func (plugin *V4l2lDevicePlugin) Allocate(ctx context.Context, request *api.AllocateRequest) (*api.AllocateResponse, error) {
log.Debugf("Allocate request: %v", request.GetContainerRequests())
responses := make([]*api.ContainerAllocateResponse, len(request.GetContainerRequests()))
for i, ctnRequest := range request.GetContainerRequests() {
specs := createDeviceSpecs(plugin, ctnRequest)
r := &api.ContainerAllocateResponse{
Devices: specs,
}
responses[i] = r
}
response := api.AllocateResponse{
ContainerResponses: responses,
}
log.Debugf("Allocate response: %v", response)
return &response, nil
}
// PreStartContainer is called during registration phase of a container.
// Implementation of the 'DevicePluginServer' interface.
func (plugin *V4l2lDevicePlugin) PreStartContainer(context.Context, *api.PreStartContainerRequest) (*api.PreStartContainerResponse, error) {
return &api.PreStartContainerResponse{}, nil
}
// StartServer starts the gRPC server of the device plugin
func (plugin *V4l2lDevicePlugin) StartServer() error {
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
listener, err := net.Listen("unix", pluginSocket)
if err != nil {
return err
}
api.RegisterDevicePluginServer(plugin.server, plugin)
go plugin.server.Serve(listener)
// Be sure the connection is established
conn, err := checkServerConnection(pluginSocket)
if err != nil {
return err
}
conn.Close()
return nil
}
// StopServer stops the gRPC server of the device plugin.
func (plugin *V4l2lDevicePlugin) StopServer() error {
if plugin.server == nil {
return nil
}
plugin.server.Stop()
plugin.server = nil
return cleanupSocket()
}
// Serve starts the gRPC server and registers the device plugin to the Kubelet.
func (plugin *V4l2lDevicePlugin) Serve() error {
err := plugin.StartServer()
if err != nil {
log.Errorf("Could not start device plugin gRPC server: %v", err)
return err
}
log.Debugln("Start registering plugin to Kubelet")
err = plugin.Register(api.KubeletSocket, plugin.resourceName)
if err != nil {
log.Errorf("Could not register device plugin to Kubelet: %s", err)
plugin.StopServer()
return err
}
log.Debugln("Registered device plugin to Kubelet")
return nil
}
// CleanupSocket deletes the socket for the device plugin
func cleanupSocket() error {
if err := os.Remove(pluginSocket); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
// createDeviceSpec returns a kubernetes device spec for the
// device plugin api based on a V4l2l device.
func createDeviceSpec(d *v4l2l.Device) *api.DeviceSpec {
return &api.DeviceSpec{
ContainerPath: d.Path,
HostPath: d.Path,
Permissions: "rw",
}
}
// createDeviceSpecs returns a list of kubernetes device specs
// for the device plugin api. Based on a allocate request of a
// kubelet the corresponding V4l2l devices are selected.
func createDeviceSpecs(plugin *V4l2lDevicePlugin, request *api.ContainerAllocateRequest) []*api.DeviceSpec {
deviceIDs := request.GetDevicesIDs()
var specs []*api.DeviceSpec
for _, deviceID := range deviceIDs {
log.Debugf("Process 'Allocate' for deviceID: %s", deviceID)
currentDevice := plugin.deviceMap[deviceID]
ds := createDeviceSpec(¤tDevice)
specs = append(specs, ds)
}
return specs
}
// checkServerConnection tests the gRPC server of the device plugin.
// If no connection to the corresponding unix socket can be established
// it is considered as an error.
func checkServerConnection(endpoint string) (*grpc.ClientConn, error) {
timeout := 5 * time.Second
c, err := grpc.Dial(endpoint,
grpc.WithInsecure(),
grpc.WithTimeout(timeout),
grpc.WithBlock(),
grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", target, timeout)
}))
if err != nil {
return nil, err
}
return c, nil
}