Skip to content

Commit b407cb1

Browse files
authored
server: support reading back channels (#597) (#777)
1 parent cc0c966 commit b407cb1

14 files changed

+894
-88
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Features:
4040
* Write streams with the UDP, UDP-multicast or TCP transport protocol
4141
* Write TLS-encrypted streams (TCP only)
4242
* Compute and provide SSRC, RTP-Info to clients
43+
* Read ONVIF back channels
4344
* Utilities
4445
* Parse RTSP elements
4546
* Encode/decode RTP packets into/from codec-specific frames
@@ -97,7 +98,9 @@ Features:
9798
* [server-auth](examples/server-auth/main.go)
9899
* [server-record-format-h264-to-disk](examples/server-record-format-h264-to-disk/main.go)
99100
* [server-play-format-h264-from-disk](examples/server-play-format-h264-from-disk/main.go)
101+
* [server-play-backchannel](examples/server-play-backchannel/main.go)
100102
* [proxy](examples/proxy/main.go)
103+
* [proxy-backchannel](examples/proxy-backchannel/main.go)
101104

102105
## API Documentation
103106

examples/proxy-backchannel/client.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
"github.com/bluenviron/gortsplib/v4"
8+
"github.com/bluenviron/gortsplib/v4/pkg/base"
9+
"github.com/bluenviron/gortsplib/v4/pkg/description"
10+
"github.com/bluenviron/gortsplib/v4/pkg/format"
11+
"github.com/pion/rtp"
12+
)
13+
14+
const (
15+
existingStream = "rtsp://127.0.0.1:8554/mystream"
16+
reconnectPause = 2 * time.Second
17+
)
18+
19+
func findG711BackChannel(desc *description.Session) (*description.Media, *format.G711) {
20+
for _, media := range desc.Medias {
21+
if media.IsBackChannel {
22+
for _, forma := range media.Formats {
23+
if g711, ok := forma.(*format.G711); ok {
24+
return media, g711
25+
}
26+
}
27+
}
28+
}
29+
return nil, nil
30+
}
31+
32+
type client struct {
33+
server *server
34+
}
35+
36+
func (c *client) initialize() {
37+
// start a separated routine
38+
go c.run()
39+
}
40+
41+
func (c *client) run() {
42+
for {
43+
err := c.read()
44+
log.Printf("ERR: %s\n", err)
45+
46+
time.Sleep(reconnectPause)
47+
}
48+
}
49+
50+
func (c *client) read() error {
51+
rc := gortsplib.Client{
52+
RequestBackChannels: true,
53+
}
54+
55+
// parse URL
56+
u, err := base.ParseURL(existingStream)
57+
if err != nil {
58+
return err
59+
}
60+
61+
// connect to the server
62+
err = rc.Start(u.Scheme, u.Host)
63+
if err != nil {
64+
return err
65+
}
66+
defer rc.Close()
67+
68+
// find available medias
69+
desc, _, err := rc.Describe(u)
70+
if err != nil {
71+
return err
72+
}
73+
74+
// find the back channel
75+
backChannelMedia, _ := findG711BackChannel(desc)
76+
if backChannelMedia == nil {
77+
panic("back channel not found")
78+
}
79+
80+
writeToClient := func(pkt *rtp.Packet) {
81+
rc.WritePacketRTP(backChannelMedia, pkt)
82+
}
83+
84+
// setup all medias
85+
err = rc.SetupAll(desc.BaseURL, desc.Medias)
86+
if err != nil {
87+
return err
88+
}
89+
90+
// notify the server that we are ready
91+
stream := c.server.setStreamReady(desc, writeToClient)
92+
defer c.server.setStreamUnready()
93+
94+
log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n")
95+
96+
// called when a RTP packet arrives
97+
rc.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
98+
log.Printf("received RTP packet from the client, routing to readers")
99+
100+
// route incoming packets to the server stream
101+
stream.WritePacketRTP(medi, pkt)
102+
})
103+
104+
// start playing
105+
_, err = rc.Play(nil)
106+
if err != nil {
107+
return err
108+
}
109+
110+
// wait until a fatal error
111+
return rc.Wait()
112+
}

examples/proxy-backchannel/main.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package main
2+
3+
import "log"
4+
5+
// This example shows how to
6+
// 1. create a server that serves a single stream.
7+
// 2. create a client, that reads an existing stream from another server or camera, containing a back channel.
8+
// 3. route the stream from the client to the server, and from the server to all connected readers.
9+
// 4. route the back channel from connected readers to the server, and from the server to the client.
10+
11+
func main() {
12+
// allocate the server.
13+
s := &server{}
14+
s.initialize()
15+
16+
// allocate the client.
17+
// allow client to use the server.
18+
c := &client{server: s}
19+
c.initialize()
20+
21+
// start server and wait until a fatal error
22+
log.Printf("server is ready on %s", s.server.RTSPAddress)
23+
panic(s.server.StartAndWait())
24+
}

examples/proxy-backchannel/server.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"sync"
6+
7+
"github.com/bluenviron/gortsplib/v4"
8+
"github.com/bluenviron/gortsplib/v4/pkg/base"
9+
"github.com/bluenviron/gortsplib/v4/pkg/description"
10+
"github.com/bluenviron/gortsplib/v4/pkg/format"
11+
"github.com/pion/rtp"
12+
)
13+
14+
type server struct {
15+
server *gortsplib.Server
16+
mutex sync.RWMutex
17+
stream *gortsplib.ServerStream
18+
writeToClient func(*rtp.Packet)
19+
}
20+
21+
func (s *server) initialize() {
22+
// configure the server
23+
s.server = &gortsplib.Server{
24+
Handler: s,
25+
RTSPAddress: ":8556",
26+
UDPRTPAddress: ":8002",
27+
UDPRTCPAddress: ":8003",
28+
MulticastIPRange: "224.1.0.0/16",
29+
MulticastRTPPort: 8002,
30+
MulticastRTCPPort: 8003,
31+
}
32+
}
33+
34+
// called when a connection is opened.
35+
func (s *server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
36+
log.Printf("conn opened")
37+
}
38+
39+
// called when a connection is closed.
40+
func (s *server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
41+
log.Printf("conn closed (%v)", ctx.Error)
42+
}
43+
44+
// called when a session is opened.
45+
func (s *server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
46+
log.Printf("session opened")
47+
}
48+
49+
// called when a session is closed.
50+
func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
51+
log.Printf("session closed")
52+
}
53+
54+
// called when receiving a DESCRIBE request.
55+
func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
56+
log.Printf("DESCRIBE request")
57+
58+
s.mutex.RLock()
59+
defer s.mutex.RUnlock()
60+
61+
// stream is not available yet
62+
if s.stream == nil {
63+
return &base.Response{
64+
StatusCode: base.StatusNotFound,
65+
}, nil, nil
66+
}
67+
68+
return &base.Response{
69+
StatusCode: base.StatusOK,
70+
}, s.stream, nil
71+
}
72+
73+
// called when receiving a SETUP request.
74+
func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
75+
log.Printf("SETUP request")
76+
77+
s.mutex.RLock()
78+
defer s.mutex.RUnlock()
79+
80+
// stream is not available yet
81+
if s.stream == nil {
82+
return &base.Response{
83+
StatusCode: base.StatusNotFound,
84+
}, nil, nil
85+
}
86+
87+
return &base.Response{
88+
StatusCode: base.StatusOK,
89+
}, s.stream, nil
90+
}
91+
92+
// called when receiving a PLAY request.
93+
func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
94+
log.Printf("PLAY request")
95+
96+
ctx.Session.OnPacketRTPAny(func(m *description.Media, f format.Format, pkt *rtp.Packet) {
97+
log.Printf("received RTP packet from readers, routing to the client")
98+
99+
s.writeToClient(pkt)
100+
})
101+
102+
return &base.Response{
103+
StatusCode: base.StatusOK,
104+
}, nil
105+
}
106+
107+
func (s *server) setStreamReady(
108+
desc *description.Session,
109+
writeToClient func(*rtp.Packet),
110+
) *gortsplib.ServerStream {
111+
s.mutex.Lock()
112+
defer s.mutex.Unlock()
113+
114+
s.stream = &gortsplib.ServerStream{
115+
Server: s.server,
116+
Desc: desc,
117+
}
118+
err := s.stream.Initialize()
119+
if err != nil {
120+
panic(err)
121+
}
122+
123+
s.writeToClient = writeToClient
124+
125+
return s.stream
126+
}
127+
128+
func (s *server) setStreamUnready() {
129+
s.mutex.Lock()
130+
defer s.mutex.Unlock()
131+
132+
s.stream.Close()
133+
s.stream = nil
134+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package main
2+
3+
import (
4+
"crypto/rand"
5+
"fmt"
6+
"log"
7+
"time"
8+
9+
"github.com/bluenviron/gortsplib/v4"
10+
"github.com/bluenviron/gortsplib/v4/pkg/format"
11+
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
12+
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
13+
)
14+
15+
func multiplyAndDivide(v, m, d int64) int64 {
16+
secs := v / d
17+
dec := v % d
18+
return (secs*m + dec*m/d)
19+
}
20+
21+
func randUint32() (uint32, error) {
22+
var b [4]byte
23+
_, err := rand.Read(b[:])
24+
if err != nil {
25+
return 0, err
26+
}
27+
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
28+
}
29+
30+
func findTrack(r *mpegts.Reader) (*mpegts.Track, error) {
31+
for _, track := range r.Tracks() {
32+
if _, ok := track.Codec.(*mpegts.CodecH264); ok {
33+
return track, nil
34+
}
35+
}
36+
return nil, fmt.Errorf("H264 track not found")
37+
}
38+
39+
type audioStreamer struct {
40+
stream *gortsplib.ServerStream
41+
}
42+
43+
func (r *audioStreamer) initialize() {
44+
go r.run()
45+
}
46+
47+
func (r *audioStreamer) close() {
48+
}
49+
50+
func (r *audioStreamer) run() {
51+
// setup G711 -> RTP encoder
52+
rtpEnc, err := r.stream.Desc.Medias[0].Formats[0].(*format.G711).CreateEncoder()
53+
if err != nil {
54+
panic(err)
55+
}
56+
57+
start := time.Now()
58+
prevPTS := int64(0)
59+
60+
randomStart, err := randUint32()
61+
if err != nil {
62+
panic(err)
63+
}
64+
65+
// setup a ticker to sleep between writings
66+
ticker := time.NewTicker(100 * time.Millisecond)
67+
defer ticker.Stop()
68+
69+
for range ticker.C {
70+
// get current timestamp
71+
pts := multiplyAndDivide(int64(time.Since(start)), int64(r.stream.Desc.Medias[0].Formats[0].ClockRate()), int64(time.Second))
72+
73+
// generate dummy LPCM audio samples
74+
samples := createDummyAudio(pts, prevPTS)
75+
76+
// encode samples with G711
77+
samples, err = g711.Mulaw(samples).Marshal()
78+
if err != nil {
79+
panic(err)
80+
}
81+
82+
// generate RTP packets from G711 samples
83+
pkts, err := rtpEnc.Encode(samples)
84+
if err != nil {
85+
panic(err)
86+
}
87+
88+
log.Printf("writing RTP packets with PTS=%d, sample size=%d, pkt count=%d", prevPTS, len(samples), len(pkts))
89+
90+
// write RTP packets to the server
91+
for _, pkt := range pkts {
92+
pkt.Timestamp += uint32(int64(randomStart) + prevPTS)
93+
94+
err = r.stream.WritePacketRTP(r.stream.Desc.Medias[0], pkt)
95+
if err != nil {
96+
panic(err)
97+
}
98+
}
99+
100+
prevPTS = pts
101+
102+
}
103+
}

0 commit comments

Comments
 (0)