go语言作为近十年来优秀的现代开发语言的代表,由于继承了c语言的简洁和很多现代语言的表达方式,在广泛的应用场景中得到众多爱好者的喜爱,如何将go和c、c++进行联合开发,拓展整个开发生态,不用重复造轮子,掌握cgo可以让你得心应手的在c和go之间传递信息,打通任督二脉。
go在流媒体传输领域也有很强大的生态和优秀的轮子,比起传统的ffmpeg这种大而全的库,可以选择性的用一些小巧强悍的go语言写的库来替代ffmpeg,比如rtsp拉流,笔者用ffmpeg在android下写了一个推拉流的播放器,但是由于ffmpeg自成体系,在灵活定制方面有一些局限性,于是尝试用go rtsp来代替ffmpeg的rtsp拉流。
首先我们需要利用cgo的交叉编译特性封装一个可以被c/c++调用的动态库,其中用到了cgo 进行设置和回调函数,并传递了类的指针,从而实现了c++ class的特性,达到了面向对象多路连接的设计目标。以下是go写的动态库源码,由于go的包管理做的特别棒,你可以用很少的代码实现一个多路拉流的应用。感觉比c++爽多了。
package mainimport ("fmt""sync""time""unsafe""github.com/deepch/vdk/av""github.com/deepch/vdk/codec/h264parser""github.com/deepch/vdk/format/rtsp"
)/*
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#cgo CFLAGS: -I.
void OnSendPacket(void * callclass,unsigned char *data, int len,int mediatype);
typedef void (*CallbackFunc)(unsigned char *,int,int);
*/
import "C"// 导出的回调函数类型
// type
// void SendcallbackFunc(unsigned char* data, int length);
// extern CallbackFunc _callback;
//
// static void setCallbackWrapper(CallbackFunc callback) {
// _callback = callback;
// }
type CallbackFunc func(*byte, int, int)// 接口定义
type MediaInterface interface {SetCallback(callback CallbackFunc)
}// 结构体实现接口
type MediaImplementation struct {id intcallback CallbackFunc// C._callbackcallclass unsafe.Pointer // 修改为unsafe.Pointer类型
}var (instances = make(map[int]*MediaImplementation)instancesLock sync.MutexcallbackLock sync.Mutex
)func (m *MediaImplementation) SendData(data []byte, length int, mediatype int) {// if C._callback != nil {fmt.Println("callback data len", length, mediatype)// arr := (*C.uchar)(unsafe.Pointer(&data[0]))// C.OnSendPacket(arr, C.int(length), C.int(mediatype))
}func (m *MediaImplementation) CallBackData(data *byte, length int, mediatype int) {goSlice := (*[1 << 30]byte)(unsafe.Pointer(data))[:length:length]m.SendData(goSlice, length, mediatype)
}func (m *MediaImplementation) RtspClientStart(uri string) {// m.rtsp = unsafe.Pointer(&Rtspclient{// callback: m.CallBackData,// })// ((*Rtspclient)(m.rtsp)).rtspclient(uri)m.rtspConsumer(uri)
}func (m *MediaImplementation) SetCallback(callback CallbackFunc) {m.callback = callback
}//export CallRtspClientStart
func CallRtspClientStart(impl unsafe.Pointer, data *C.char) {if impl == nil {fmt.Println("Invalid implementation")return}m := (*MediaImplementation)(impl)m.rtspConsumer(C.GoString(data))// m.RtspClientStart(C.GoString(data))
}//export NewMediaImplementation
func NewMediaImplementation() uintptr {instancesLock.Lock()defer instancesLock.Unlock()id := len(instances) + 1instance := &MediaImplementation{id: id,}instances[id] = instancereturn uintptr(unsafe.Pointer(instance))
}//export GetMediaInstanceByID
func GetMediaInstanceByID(id C.int) uintptr { // 修改返回类型为uintptrinstancesLock.Lock()defer instancesLock.Unlock()if instance, ok := instances[int(id)]; ok {return uintptr(unsafe.Pointer(instance))}return uintptr(0) // 返回表示未找到实例
}//export CallSendData
func CallSendData(impl unsafe.Pointer, data *C.uchar, length C.int, mediatype C.int) {if impl == nil {fmt.Println("Invalid implementation")return}goSlice := (*[1 << 30]byte)(unsafe.Pointer(data))[:length:length]m := (*MediaImplementation)(impl)go m.SendData(goSlice, int(length), int(mediatype))
}//export SetCallbackWrapper
func SetCallbackWrapper(impl unsafe.Pointer, callback unsafe.Pointer) {if impl == nil {fmt.Println("Invalid implementation")return}m := (*MediaImplementation)(impl)m.callclass = callbackfmt.Println("SetCallbackWrapper callback", m.callback, callback)// C._callback = callbackFunc// m.SetCallback(callbackFunc)
}func main() {}func (m *MediaImplementation) rtspConsumer(uri string) {annexbNALUStartCode := func() []byte { return []byte{0x00, 0x00, 0x00, 0x01} }//fmt.Println("rtspConsumer starting...")session, err := rtsp.DialTimeout(uri, 10*time.Second)// defer session.Close()if err != nil {fmt.Errorf("rtsp Dial Error: %v", err)return//panic(err)}//session.RtpKeepAliveTimeout = 10 * time.Secondsession.RtpTimeout = 20 * time.Secondsession.RtpKeepAliveTimeout = 5 * time.Secondsession.RtspTimeout = 20 * time.Secondcodecs, err := session.Streams()if err != nil {fmt.Errorf("stream error: %v", err)// continue//panic(err)}for i, t := range codecs {fmt.Printf("Stream", i, "is of type", t.Type().String())}if codecs[0].Type() != av.H264 {//fmt.Println("RTSP feed must begin with a H264 codec")// continue//panic("RTSP feed must begin with a H264 codec")}if len(codecs) != 1 {//fmt.Println("Ignoring all but the first stream.")}// var previousTime time.Durationfor {// select {// case <-rtspsrcch:// default:// if KVMrtsp.BInUse != true {// break// }pkt, err := session.ReadPacket()if err != nil {break}// if pkt.Idx != 0 {// //audio or other stream, skip it// continue// }if codecs[pkt.Idx].Type().IsVideo() {pkt.Data = pkt.Data[4:]// For every key-frame pre-pend the SPS and PPSif pkt.IsKeyFrame {pkt.Data = append(annexbNALUStartCode(), pkt.Data...)pkt.Data = append(codecs[0].(h264parser.CodecData).PPS(), pkt.Data...)pkt.Data = append(annexbNALUStartCode(), pkt.Data...)pkt.Data = append(codecs[0].(h264parser.CodecData).SPS(), pkt.Data...)pkt.Data = append(annexbNALUStartCode(), pkt.Data...)} else {pkt.Data = append(annexbNALUStartCode(), pkt.Data...)}// bufferDuration := pkt.Time - previousTime// previousTime = pkt.Time// m.CallBackData(pkt.Data,len(pkt.Data))fmt.Println("Is Video IsKeyFrame", pkt.IsKeyFrame, codecs[pkt.Idx].Type(), pkt.Time)mediatype := 0switch codecs[pkt.Idx].Type() {case av.H264:mediatype = 1case av.H265:mediatype = 2}length := len(pkt.Data)arr := (*C.uchar)(unsafe.Pointer(&pkt.Data[0]))C.OnSendPacket(m.callclass, arr, C.int(length), C.int(mediatype))// if err = KVMrtsp.Track.WriteSample(media.Sample{Data: pkt.Data, Duration: bufferDuration}); err != nil && err != io.ErrClosedPipe {// logger.Errorf("WriteSample error %v", err)// break// //panic(err)// }} else if codecs[pkt.Idx].Type().IsAudio() {// codecs, err = session.Streams()// if err != nil {// fmt.Println("Error getting streams")// break// }codec := codecs[pkt.Idx].(av.AudioCodecData)duration, err := codec.PacketDuration(pkt.Data)if err != nil {fmt.Println("Failed to get duration for audio:", err)break}fmt.Println("IS Audio Packet duration:", duration)mediatype := 0switch codecs[pkt.Idx].Type() {case av.AAC:mediatype = 3case av.PCM_MULAW:mediatype = 4case av.PCM_ALAW:mediatype = 5case av.SPEEX:mediatype = 6case av.NELLYMOSER:mediatype = 7case av.PCM:mediatype = 8case av.OPUS:mediatype = 9}length := len(pkt.Data)arr := (*C.uchar)(unsafe.Pointer(&pkt.Data[0]))C.OnSendPacket(m.callclass, arr, C.int(length), C.int(mediatype))// m.CallBackData(pkt.Data,len(pkt.Data))// err = audioTrack.WriteSample(media.Sample{Data: pkt.Data, Duration: duration})// if err != nil {// //fmt.Println("Failed to write audio sample", err)// break// }}}if err = session.Close(); err != nil {fmt.Errorf("session Close error %v", err)return}// time.Sleep(5 * time.Second)// }
}