Skip to content

Commit a2df4ad

Browse files
committed
Wip p2p enhancements
1 parent 9280060 commit a2df4ad

File tree

3 files changed

+77
-14
lines changed

3 files changed

+77
-14
lines changed

core/cli/worker/worker_p2p.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
type P2P struct {
2222
WorkerFlags `embed:""`
23-
Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"`
23+
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
2424
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
2525
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
2626
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`

core/p2p/p2p.go

+43-11
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ import (
1111
"net"
1212
"os"
1313
"strings"
14+
"sync"
1415
"time"
1516

17+
"github.com/ipfs/go-log"
1618
"github.com/libp2p/go-libp2p/core/peer"
1719
"github.com/mudler/LocalAI/pkg/utils"
20+
"github.com/mudler/edgevpn/pkg/config"
1821
"github.com/mudler/edgevpn/pkg/node"
1922
"github.com/mudler/edgevpn/pkg/protocol"
23+
"github.com/mudler/edgevpn/pkg/services"
2024
"github.com/mudler/edgevpn/pkg/types"
2125
"github.com/phayes/freeport"
22-
23-
"github.com/ipfs/go-log"
24-
"github.com/mudler/edgevpn/pkg/config"
25-
"github.com/mudler/edgevpn/pkg/services"
2626
zlog "github.com/rs/zerolog/log"
2727

2828
"github.com/mudler/edgevpn/pkg/logger"
@@ -34,6 +34,11 @@ func GenerateToken() string {
3434
return newData.Base64()
3535
}
3636

37+
func nodeID() string {
38+
hostname, _ := os.Hostname()
39+
return hostname
40+
}
41+
3742
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
3843

3944
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
@@ -135,6 +140,15 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
135140
io.Copy(dst, src)
136141
}
137142

143+
var availableNodes = []NodeData{}
144+
var mu sync.Mutex
145+
146+
func GetAvailableNodes() []NodeData {
147+
mu.Lock()
148+
defer mu.Unlock()
149+
return availableNodes
150+
}
151+
138152
// This is the main of the server (which keeps the env variable updated)
139153
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
140154
func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
@@ -151,19 +165,22 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
151165
zlog.Error().Msg("Discoverer stopped")
152166
return
153167
case tunnel := <-tunnels:
154-
155-
totalTunnels = append(totalTunnels, tunnel)
168+
totalTunnels = append(totalTunnels, tunnel.TunnelAddress)
156169
os.Setenv("LLAMACPP_GRPC_SERVERS", strings.Join(totalTunnels, ","))
157170
zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", strings.Join(totalTunnels, ","))
171+
mu.Lock()
172+
defer mu.Unlock()
173+
availableNodes = append(availableNodes, tunnel)
174+
zlog.Info().Msgf("Node %s available", tunnel.ID)
158175
}
159176
}
160177
}()
161178

162179
return nil
163180
}
164181

165-
func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
166-
tunnels := make(chan string)
182+
func discoveryTunnels(ctx context.Context, token string) (chan NodeData, error) {
183+
tunnels := make(chan NodeData)
167184

168185
nodeOpts, err := newNodeOpts(token)
169186
if err != nil {
@@ -196,18 +213,24 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
196213
zlog.Debug().Msg("Searching for workers")
197214

198215
data := ledger.LastBlock().Storage["services_localai"]
199-
for k := range data {
216+
for k, v := range data {
200217
zlog.Info().Msgf("Found worker %s", k)
201218
if _, found := emitted[k]; !found {
202219
emitted[k] = true
220+
nd := &NodeData{}
221+
if err := v.Unmarshal(nd); err != nil {
222+
zlog.Error().Msg("cannot unmarshal node data")
223+
continue
224+
}
203225
//discoveredPeers <- k
204226
port, err := freeport.GetFreePort()
205227
if err != nil {
206228
fmt.Print(err)
207229
}
208230
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
209231
go allocateLocalService(ctx, n, tunnelAddress, k)
210-
tunnels <- tunnelAddress
232+
nd.TunnelAddress = tunnelAddress
233+
tunnels <- *nd
211234
}
212235
}
213236
}
@@ -217,6 +240,12 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
217240
return tunnels, err
218241
}
219242

243+
type NodeData struct {
244+
Name string
245+
ID string
246+
TunnelAddress string
247+
}
248+
220249
// This is the P2P worker main
221250
func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
222251
llger := logger.New(log.LevelFatal)
@@ -255,7 +284,10 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
255284
// If mismatch, update the blockchain
256285
if !found {
257286
updatedMap := map[string]interface{}{}
258-
updatedMap[name] = "p2p"
287+
updatedMap[name] = &NodeData{
288+
Name: name,
289+
ID: nodeID(),
290+
}
259291
ledger.Add("services_localai", updatedMap)
260292
}
261293
},

docs/static/install.sh

+33-2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ DOCKER_INSTALL=${DOCKER_INSTALL:-$docker_found}
7676
USE_AIO=${USE_AIO:-false}
7777
API_KEY=${API_KEY:-}
7878
CORE_IMAGES=${CORE_IMAGES:-false}
79+
P2P_TOKEN=${P2P_TOKEN:-}
80+
WORKER=${WORKER:-false}
7981
# nprocs -1
8082
if available nproc; then
8183
procs=$(nproc)
@@ -132,15 +134,22 @@ configure_systemd() {
132134

133135
info "Adding current user to local-ai group..."
134136
$SUDO usermod -a -G local-ai $(whoami)
135-
137+
STARTCOMMAND="run"
138+
if [ "$WORKER" = true ]; then
139+
if [ -n "$P2P_TOKEN" ]; then
140+
STARTCOMMAND="worker p2p-llama-cpp-rpc"
141+
else
142+
STARTCOMMAND="worker llama-cpp-rpc"
143+
fi
144+
fi
136145
info "Creating local-ai systemd service..."
137146
cat <<EOF | $SUDO tee /etc/systemd/system/local-ai.service >/dev/null
138147
[Unit]
139148
Description=LocalAI Service
140149
After=network-online.target
141150
142151
[Service]
143-
ExecStart=$BINDIR/local-ai run
152+
ExecStart=$BINDIR/local-ai $STARTCOMMAND
144153
User=local-ai
145154
Group=local-ai
146155
Restart=always
@@ -159,6 +168,11 @@ EOF
159168
$SUDO echo "THREADS=$THREADS" | $SUDO tee -a /etc/localai.env >/dev/null
160169
$SUDO echo "MODELS_PATH=$MODELS_PATH" | $SUDO tee -a /etc/localai.env >/dev/null
161170

171+
if [ -n "$P2P_TOKEN" ]; then
172+
$SUDO echo "LOCALAI_P2P_TOKEN=$P2P_TOKEN" | $SUDO tee -a /etc/localai.env >/dev/null
173+
$SUDO echo "LOCALAI_P2P=true" | $SUDO tee -a /etc/localai.env >/dev/null
174+
fi
175+
162176
SYSTEMCTL_RUNNING="$(systemctl is-system-running || true)"
163177
case $SYSTEMCTL_RUNNING in
164178
running|degraded)
@@ -407,6 +421,19 @@ install_docker() {
407421
# exit 0
408422
fi
409423

424+
STARTCOMMAND="run"
425+
if [ "$WORKER" = true ]; then
426+
if [ -n "$P2P_TOKEN" ]; then
427+
STARTCOMMAND="worker p2p-llama-cpp-rpc"
428+
else
429+
STARTCOMMAND="worker llama-cpp-rpc"
430+
fi
431+
fi
432+
envs=""
433+
if [ -n "$P2P_TOKEN" ]; then
434+
envs="-e LOCALAI_P2P_TOKEN=$P2P_TOKEN -e LOCALAI_P2P=true"
435+
fi
436+
410437
IMAGE_TAG=
411438
if [ "$HAS_CUDA" ]; then
412439
IMAGE_TAG=${VERSION}-cublas-cuda12-ffmpeg
@@ -430,6 +457,7 @@ install_docker() {
430457
--restart=always \
431458
-e API_KEY=$API_KEY \
432459
-e THREADS=$THREADS \
460+
$envs \
433461
-d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG
434462
elif [ "$HAS_AMD" ]; then
435463
IMAGE_TAG=${VERSION}-hipblas-ffmpeg
@@ -448,6 +476,7 @@ install_docker() {
448476
--restart=always \
449477
-e API_KEY=$API_KEY \
450478
-e THREADS=$THREADS \
479+
$envs \
451480
-d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG
452481
elif [ "$HAS_INTEL" ]; then
453482
IMAGE_TAG=${VERSION}-sycl-f32-ffmpeg
@@ -465,6 +494,7 @@ install_docker() {
465494
--restart=always \
466495
-e API_KEY=$API_KEY \
467496
-e THREADS=$THREADS \
497+
$envs \
468498
-d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG
469499
else
470500
IMAGE_TAG=${VERSION}-ffmpeg
@@ -481,6 +511,7 @@ install_docker() {
481511
-e MODELS_PATH=/models \
482512
-e API_KEY=$API_KEY \
483513
-e THREADS=$THREADS \
514+
$envs \
484515
-d -p $PORT:8080 --name local-ai localai/localai:$IMAGE_TAG
485516
fi
486517

0 commit comments

Comments
 (0)