Skip to content

Commit cca881e

Browse files
authored
feat(p2p): Federation and AI swarms (#2723)
* Wip p2p enhancements * get online state * Pass-by token to show in the dashboard Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Style * Minor fixups * parametrize SearchID * Refactoring * Allow to expose/bind more services Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Add federation * Display federated mode in the WebUI Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Small fixups Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * make federated nodes visible from the WebUI * Fix version display * improve web page * live page update * visual enhancements * enhancements * visual enhancements --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent dd95ae1 commit cca881e

File tree

23 files changed

+814
-81
lines changed

23 files changed

+814
-81
lines changed

.github/release.yml

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ changelog:
1313
labels:
1414
- bug
1515
- regression
16+
- title: "🖧 P2P area"
17+
labels:
18+
- area/p2p
1619
- title: Exciting New Features 🎉
1720
labels:
1821
- Semver-Minor

Makefile

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ RANDOM := $(shell bash -c 'echo $$RANDOM')
5353
VERSION?=$(shell git describe --always --tags || echo "dev" )
5454
# go tool nm ./local-ai | grep Commit
5555
LD_FLAGS?=
56-
override LD_FLAGS += -X "github.com/go-skynet/LocalAI/internal.Version=$(VERSION)"
57-
override LD_FLAGS += -X "github.com/go-skynet/LocalAI/internal.Commit=$(shell git rev-parse HEAD)"
56+
override LD_FLAGS += -X "github.com/mudler/LocalAI/internal.Version=$(VERSION)"
57+
override LD_FLAGS += -X "github.com/mudler/LocalAI/internal.Commit=$(shell git rev-parse HEAD)"
5858

5959
OPTIONAL_TARGETS?=
6060

@@ -147,7 +147,7 @@ endif
147147

148148
# glibc-static or glibc-devel-static required
149149
ifeq ($(STATIC),true)
150-
LD_FLAGS=-linkmode external -extldflags -static
150+
LD_FLAGS+=-linkmode external -extldflags -static
151151
endif
152152

153153
ifeq ($(findstring stablediffusion,$(GO_TAGS)),stablediffusion)

core/cli/cli.go

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ var CLI struct {
99
cliContext.Context `embed:""`
1010

1111
Run RunCMD `cmd:"" help:"Run LocalAI, this the default command if no other command is specified. Run 'local-ai run --help' for more information" default:"withargs"`
12+
Federated FederatedCLI `cmd:"" help:"Run LocalAI in federated mode"`
1213
Models ModelsCMD `cmd:"" help:"Manage LocalAI models and definitions"`
1314
TTS TTSCMD `cmd:"" help:"Convert text to speech"`
1415
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`

core/cli/federated.go

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package cli
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"net"
9+
"time"
10+
11+
"math/rand/v2"
12+
13+
cliContext "github.com/mudler/LocalAI/core/cli/context"
14+
"github.com/mudler/LocalAI/core/p2p"
15+
"github.com/mudler/edgevpn/pkg/node"
16+
"github.com/mudler/edgevpn/pkg/protocol"
17+
"github.com/mudler/edgevpn/pkg/types"
18+
"github.com/rs/zerolog/log"
19+
)
20+
21+
type FederatedCLI struct {
22+
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
23+
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
24+
}
25+
26+
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
27+
28+
n, err := p2p.NewNode(f.Peer2PeerToken)
29+
if err != nil {
30+
return fmt.Errorf("creating a new node: %w", err)
31+
}
32+
err = n.Start(context.Background())
33+
if err != nil {
34+
return fmt.Errorf("creating a new node: %w", err)
35+
}
36+
37+
if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, nil); err != nil {
38+
return err
39+
}
40+
41+
return Proxy(context.Background(), n, f.Address, p2p.FederatedID)
42+
}
43+
44+
func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error {
45+
46+
log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
47+
// Open local port for listening
48+
l, err := net.Listen("tcp", listenAddr)
49+
if err != nil {
50+
log.Error().Err(err).Msg("Error listening")
51+
return err
52+
}
53+
// ll.Info("Binding local port on", srcaddr)
54+
55+
ledger, _ := node.Ledger()
56+
57+
// Announce ourselves so nodes accepts our connection
58+
ledger.Announce(
59+
ctx,
60+
10*time.Second,
61+
func() {
62+
// Retrieve current ID for ip in the blockchain
63+
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
64+
// If mismatch, update the blockchain
65+
//if !found {
66+
updatedMap := map[string]interface{}{}
67+
updatedMap[node.Host().ID().String()] = &types.User{
68+
PeerID: node.Host().ID().String(),
69+
Timestamp: time.Now().String(),
70+
}
71+
ledger.Add(protocol.UsersLedgerKey, updatedMap)
72+
// }
73+
},
74+
)
75+
76+
defer l.Close()
77+
for {
78+
select {
79+
case <-ctx.Done():
80+
return errors.New("context canceled")
81+
default:
82+
log.Debug().Msg("New for connection")
83+
// Listen for an incoming connection.
84+
conn, err := l.Accept()
85+
if err != nil {
86+
fmt.Println("Error accepting: ", err.Error())
87+
continue
88+
}
89+
90+
// Handle connections in a new goroutine, forwarding to the p2p service
91+
go func() {
92+
var tunnelAddresses []string
93+
for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) {
94+
if v.IsOnline() {
95+
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
96+
} else {
97+
log.Info().Msgf("Node %s is offline", v.ID)
98+
}
99+
}
100+
101+
// open a TCP stream to one of the tunnels
102+
// chosen randomly
103+
// TODO: optimize this and track usage
104+
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]
105+
106+
tunnelConn, err := net.Dial("tcp", tunnelAddr)
107+
if err != nil {
108+
log.Error().Err(err).Msg("Error connecting to tunnel")
109+
return
110+
}
111+
112+
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
113+
closer := make(chan struct{}, 2)
114+
go copyStream(closer, tunnelConn, conn)
115+
go copyStream(closer, conn, tunnelConn)
116+
<-closer
117+
118+
tunnelConn.Close()
119+
conn.Close()
120+
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
121+
}()
122+
}
123+
}
124+
125+
}
126+
127+
func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
128+
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
129+
io.Copy(dst, src)
130+
}

core/cli/run.go

+43-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package cli
33
import (
44
"context"
55
"fmt"
6+
"net"
7+
"os"
68
"strings"
79
"time"
810

@@ -50,7 +52,7 @@ type RunCMD struct {
5052
DisableWebUI bool `env:"LOCALAI_DISABLE_WEBUI,DISABLE_WEBUI" default:"false" help:"Disable webui" group:"api"`
5153
OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"api"`
5254
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
53-
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
55+
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
5456
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
5557
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
5658
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
@@ -59,6 +61,7 @@ type RunCMD struct {
5961
WatchdogIdleTimeout string `env:"LOCALAI_WATCHDOG_IDLE_TIMEOUT,WATCHDOG_IDLE_TIMEOUT" default:"15m" help:"Threshold beyond which an idle backend should be stopped" group:"backends"`
6062
EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"`
6163
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
64+
Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"`
6265
}
6366

6467
func (r *RunCMD) Run(ctx *cliContext.Context) error {
@@ -91,9 +94,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
9194
config.WithOpaqueErrors(r.OpaqueErrors),
9295
}
9396

97+
token := ""
9498
if r.Peer2Peer || r.Peer2PeerToken != "" {
9599
log.Info().Msg("P2P mode enabled")
96-
token := r.Peer2PeerToken
100+
token = r.Peer2PeerToken
97101
if token == "" {
98102
// IF no token is provided, and p2p is enabled,
99103
// we generate one and wait for the user to pick up the token (this is for interactive)
@@ -104,14 +108,46 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
104108

105109
log.Info().Msg("To use the token, you can run the following command in another node or terminal:")
106110
fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", token)
111+
}
112+
opts = append(opts, config.WithP2PToken(token))
107113

108-
// Ask for user confirmation
109-
log.Info().Msg("Press a button to proceed")
110-
var input string
111-
fmt.Scanln(&input)
114+
node, err := p2p.NewNode(token)
115+
if err != nil {
116+
return err
112117
}
118+
113119
log.Info().Msg("Starting P2P server discovery...")
114-
if err := p2p.LLamaCPPRPCServerDiscoverer(context.Background(), token); err != nil {
120+
if err := p2p.ServiceDiscoverer(context.Background(), node, token, "", func() {
121+
var tunnelAddresses []string
122+
for _, v := range p2p.GetAvailableNodes("") {
123+
if v.IsOnline() {
124+
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
125+
} else {
126+
log.Info().Msgf("Node %s is offline", v.ID)
127+
}
128+
}
129+
tunnelEnvVar := strings.Join(tunnelAddresses, ",")
130+
131+
os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)
132+
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
133+
}); err != nil {
134+
return err
135+
}
136+
}
137+
138+
if r.Federated {
139+
_, port, err := net.SplitHostPort(r.Address)
140+
if err != nil {
141+
return err
142+
}
143+
if err := p2p.ExposeService(context.Background(), "localhost", port, token, p2p.FederatedID); err != nil {
144+
return err
145+
}
146+
node, err := p2p.NewNode(token)
147+
if err != nil {
148+
return err
149+
}
150+
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.FederatedID, nil); err != nil {
115151
return err
116152
}
117153
}

core/cli/worker/worker_p2p.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
type P2P struct {
2222
WorkerFlags `embed:""`
23-
Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"`
23+
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
2424
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
2525
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
2626
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
@@ -59,7 +59,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
5959
p = r.RunnerPort
6060
}
6161

62-
err = p2p.BindLLamaCPPWorker(context.Background(), address, p, r.Token)
62+
err = p2p.ExposeService(context.Background(), address, p, r.Token, "")
6363
if err != nil {
6464
return err
6565
}
@@ -99,7 +99,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
9999
}
100100
}()
101101

102-
err = p2p.BindLLamaCPPWorker(context.Background(), address, fmt.Sprint(port), r.Token)
102+
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, "")
103103
if err != nil {
104104
return err
105105
}

core/config/application_config.go

+7
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type ApplicationConfig struct {
3232
CORSAllowOrigins string
3333
ApiKeys []string
3434
OpaqueErrors bool
35+
P2PToken string
3536

3637
ModelLibraryURL string
3738

@@ -95,6 +96,12 @@ func WithCsrf(b bool) AppOption {
9596
}
9697
}
9798

99+
func WithP2PToken(s string) AppOption {
100+
return func(o *ApplicationConfig) {
101+
o.P2PToken = s
102+
}
103+
}
104+
98105
func WithModelLibraryURL(url string) AppOption {
99106
return func(o *ApplicationConfig) {
100107
o.ModelLibraryURL = url

0 commit comments

Comments
 (0)