package main
import (
"context"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/BurntSushi/toml"
"github.com/brutella/hap"
"github.com/brutella/hap/accessory"
"github.com/brutella/hap/characteristic"
"github.com/brutella/hap/log"
"github.com/brutella/hap/rtp"
"github.com/brutella/hap/service"
"github.com/brutella/hap/tlv8"
"github.com/w4/hkbi/blueiris"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
)
type Config struct {
ListenAddress string `toml:"listen-address"`
DataDir string `toml:"data-dir"`
Blueiris blueiris.BlueirisConfig
}
type GlobalState struct {
ssrcVideo int32
ssrcAudio int32
blueiris *blueiris.Blueiris
}
func main() {
log.Info.Enable()
log.Debug.Enable()
if len(os.Args) != 2 || os.Args[1] == "" {
log.Info.Fatalf("usage: %s /path/to/config.toml", os.Args[0])
}
var config = readConfig(os.Args[1])
run(config)
}
func readConfig(path string) Config {
var cfg Config
_, err := toml.DecodeFile(path, &cfg)
if err != nil {
log.Info.Panic(err)
}
return cfg
}
func run(config Config) {
bi, err := blueiris.NewBlueiris(config.Blueiris)
if err != nil {
log.Info.Fatalf("failed to login to bi: %s\n", err)
}
globalState := &GlobalState{
ssrcVideo: rand.Int31(),
ssrcAudio: rand.Int31(),
}
biCameras, err := bi.ListCameras()
if err != nil {
log.Info.Fatalf("failed to load cameras from bi: %s\n", err)
}
err = os.MkdirAll(config.DataDir, os.FileMode(0755))
if err != nil {
log.Info.Fatalf("failed to create data directory: %s\n", err)
}
knownCamerasPath := filepath.Join(config.DataDir, "knownCameras")
var knownCameras = make(map[string]int)
file, err := os.Open(knownCamerasPath)
if err != nil {
if !os.IsNotExist(err) {
log.Info.Fatalf("failed to read knownCameras: %s\n", err)
}
} else {
err = gob.NewDecoder(file).Decode(&knownCameras)
if err != nil {
log.Info.Fatalf("failed to decode knownCameras: %s\n", err)
}
}
_ = file.Close()
hasDiscoveredNewCameras := false
cameras := make([]*accessory.Camera, 0, len(biCameras))
motionSensors := make(map[string]*service.MotionSensor)
for _, camera := range biCameras {
cam := accessory.NewCamera(accessory.Info{
Name: camera.Name,
Manufacturer: "HKBI",
})
if id := knownCameras[camera.Name]; id != 0 {
log.Debug.Printf("reusing previously assigned id %d for camera %s", id, camera.Name)
cam.Id = uint64(id)
} else {
newId := 0
for _, i := range knownCameras {
if i >= newId {
newId = i + 1
}
}
log.Info.Printf("newly discovered camera %s assigned id %d", camera.Name, newId)
knownCameras[camera.Name] = newId
cam.Id = uint64(newId)
hasDiscoveredNewCameras = true
}
startListeningForStreams(camera.Id, cam.StreamManagement1, globalState, &config, bi.BaseUrl)
startListeningForStreams(camera.Id, cam.StreamManagement2, globalState, &config, bi.BaseUrl)
motionSensor := service.NewMotionSensor()
cam.AddS(motionSensor.S)
cameras = append(cameras, cam)
motionSensors[camera.Id] = motionSensor
}
if hasDiscoveredNewCameras {
file, err = os.Create(knownCamerasPath)
if err != nil {
log.Info.Fatalf("failed to create knownCameras: %s\n", err)
}
err = gob.NewEncoder(file).Encode(knownCameras)
if err != nil {
log.Info.Fatalf("failed to encode knownCameras: %s\n", err)
}
_ = file.Close()
}
var accessories = make([]*accessory.A, 0, len(cameras))
for _, camera := range cameras {
accessories = append(accessories, camera.A)
}
fs := hap.NewFsStore(config.DataDir)
server, err := hap.NewServer(fs, accessories[0], accessories[1:]...)
if err != nil {
log.Info.Panic(err)
}
server.Pin = "11111112"
server.Addr = config.ListenAddress
server.ServeMux().HandleFunc("/trigger", func(res http.ResponseWriter, req *http.Request) {
query := req.URL.Query()
state := query.Get("state")
cam := query.Get("cam")
if sensor := motionSensors[cam]; sensor != nil {
var motionDetected bool
if state == "off" {
motionDetected = false
} else {
motionDetected = true
}
sensor.MotionDetected.SetValue(motionDetected)
res.WriteHeader(http.StatusOK)
} else {
log.Info.Printf("Received trigger request for unknown camera: %s", cam)
res.WriteHeader(http.StatusBadRequest)
}
})
server.ServeMux().HandleFunc("/resource", func(res http.ResponseWriter, req *http.Request) {
var request struct {
Type string `json:"resource-type"`
Aid int `json:"aid"`
}
if !server.IsAuthorized(req) {
_ = hap.JsonError(res, hap.JsonStatusInsufficientPrivileges)
return
} else if req.Method != http.MethodPost {
res.WriteHeader(http.StatusBadRequest)
return
}
body, err := io.ReadAll(req.Body)
if err != nil {
log.Info.Println(err)
res.WriteHeader(http.StatusInternalServerError)
return
}
err = json.Unmarshal(body, &request)
if err != nil {
log.Info.Println(err)
res.WriteHeader(http.StatusBadRequest)
return
}
var cameraName string
for name, i := range knownCameras {
if i == request.Aid {
cameraName = name
break
}
}
if cameraName == "" {
log.Info.Printf("a snapshot was requested for camera %d but not camera with that id exists", request.Aid)
res.WriteHeader(http.StatusBadRequest)
return
}
switch request.Type {
case "image":
req, err := bi.FetchSnapshot(cameraName)
if err != nil {
log.Info.Println(err)
res.WriteHeader(http.StatusInternalServerError)
return
}
imageResponse, err := http.DefaultClient.Do(req)
if err != nil {
log.Info.Println(err)
res.WriteHeader(http.StatusInternalServerError)
return
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(imageResponse.Body)
res.Header().Set("Content-Type", "image/jpeg")
wr := hap.NewChunkedWriter(res, 2048)
_, err = io.Copy(wr, imageResponse.Body)
if err != nil {
log.Info.Printf("Failed to copy bytes for snapshot to HomeKit: %s\n", err)
return
}
default:
log.Info.Printf("unsupported resource request \"%s\"\n", request.Type)
res.WriteHeader(http.StatusInternalServerError)
return
}
})
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-c
signal.Stop(c)
cancel()
}()
err = server.ListenAndServe(ctx)
if err != nil {
log.Info.Panic(err)
}
}
func startListeningForStreams(cameraName string, mgmt *service.CameraRTPStreamManagement, globalState *GlobalState, config *Config, blueirisBase *url.URL) {
setTlv8Payload(mgmt.StreamingStatus.Bytes, rtp.StreamingStatus{Status: rtp.StreamingStatusAvailable})
setTlv8Payload(mgmt.SupportedRTPConfiguration.Bytes, rtp.NewConfiguration(rtp.CryptoSuite_AES_CM_128_HMAC_SHA1_80))
setTlv8Payload(mgmt.SupportedVideoStreamConfiguration.Bytes, rtp.DefaultVideoStreamConfiguration())
setTlv8Payload(mgmt.SupportedAudioStreamConfiguration.Bytes, rtp.DefaultAudioStreamConfiguration())
var activeStreams = ActiveStreams{
mutex: &sync.Mutex{},
streams: map[string]*Stream{},
}
mgmt.SetupEndpoints.OnValueUpdate(func(new, old []byte, r *http.Request) {
if r == nil {
return
}
var req rtp.SetupEndpoints
err := tlv8.Unmarshal(new, &req)
if err != nil {
log.Info.Printf("Could not unmarshal tlv8 data: %s\n", err)
return
}
var uuid = hex.EncodeToString(req.SessionId)
resp := rtp.SetupEndpointsResponse{
SessionId: req.SessionId,
Status: rtp.SessionStatusSuccess,
AccessoryAddr: rtp.Addr{
IPVersion: req.ControllerAddr.IPVersion,
IPAddr: strings.Split(r.Context().Value(http.LocalAddrContextKey).(net.Addr).String(), ":")[0],
VideoRtpPort: req.ControllerAddr.VideoRtpPort,
AudioRtpPort: req.ControllerAddr.AudioRtpPort,
},
Video: req.Video,
Audio: req.Audio,
SsrcVideo: globalState.ssrcVideo,
SsrcAudio: globalState.ssrcAudio,
}
activeStreams.mutex.Lock()
activeStreams.streams[uuid] = &Stream{
mutex: &sync.Mutex{},
cmd: nil,
req: req,
resp: resp,
}
activeStreams.mutex.Unlock()
setTlv8Payload(mgmt.SetupEndpoints.Bytes, resp)
})
mgmt.SelectedRTPStreamConfiguration.OnValueRemoteUpdate(func(buf []byte) {
var cfg rtp.StreamConfiguration
err := tlv8.Unmarshal(buf, &cfg)
if err != nil {
log.Info.Fatalf("Could not unmarshal tlv8 data: %s\n", err)
}
uuid := hex.EncodeToString(cfg.Command.Identifier)
switch cfg.Command.Type {
case rtp.SessionControlCommandTypeStart:
stream := activeStreams.streams[uuid]
if stream == nil {
return
}
log.Info.Printf("%s: starting stream\n", uuid)
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.cmd != nil && stream.cmd.Process != nil {
log.Info.Printf("%s: requested to start stream, but stream was already running. shutting down previous\n", uuid)
_ = stream.cmd.Process.Signal(syscall.SIGINT)
status, _ := stream.cmd.Process.Wait()
log.Info.Printf("%s: ffmpeg exited with %s\n", uuid, status.String())
stream.cmd = nil
}
endpoint := fmt.Sprintf(
"srtp://%s:%d?rtcpport=%d&pkt_size=%d",
stream.req.ControllerAddr.IPAddr,
stream.req.ControllerAddr.VideoRtpPort,
stream.req.ControllerAddr.VideoRtpPort,
1378,
)
source := blueirisBase.JoinPath(cameraName)
source.Scheme = "rtsp"
source.User = url.UserPassword(config.Blueiris.Username, config.Blueiris.Password)
cmd := exec.Command(
"ffmpeg",
"-an",
"-rtsp_transport", "tcp",
"-use_wallclock_as_timestamps", "1",
"-i", source.String(),
"-an",
"-sn",
"-dn",
"-bsf:v", "dump_extra",
"-vcodec", "copy",
"-payload_type", fmt.Sprintf("%d", cfg.Video.RTP.PayloadType),
"-ssrc", fmt.Sprintf("%d", globalState.ssrcVideo),
"-f", "rtp",
"-srtp_out_suite", "AES_CM_128_HMAC_SHA1_80",
"-srtp_out_params", stream.req.Video.SrtpKey(),
endpoint,
)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
log.Debug.Println(cmd)
err := cmd.Start()
if err != nil {
log.Info.Printf("Failed to spawn ffmpeg: %s\n", err)
return
}
stream.cmd = cmd
setTlv8Payload(mgmt.StreamingStatus.Bytes, rtp.StreamingStatus{Status: rtp.StreamingStatusAvailable})
case rtp.SessionControlCommandTypeEnd:
stream := activeStreams.streams[uuid]
if stream == nil {
return
}
log.Info.Printf("%s: ending stream\n", uuid)
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.cmd == nil || stream.cmd.Process == nil {
log.Info.Printf("%s: attempted to end already ended stream\n", uuid)
return
}
_ = stream.cmd.Process.Signal(syscall.SIGINT)
status, _ := stream.cmd.Process.Wait()
log.Info.Printf("%s: ffmpeg exited with %s\n", uuid, status.String())
stream.cmd = nil
setTlv8Payload(mgmt.StreamingStatus.Bytes, rtp.StreamingStatus{Status: rtp.StreamingStatusAvailable})
case rtp.SessionControlCommandTypeSuspend:
stream := activeStreams.streams[uuid]
if stream == nil {
return
}
log.Info.Printf("%s: suspending stream\n", uuid)
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.cmd == nil || stream.cmd.Process == nil {
log.Info.Printf("%s: attempted to suspend inactive stream\n", uuid)
return
}
err := stream.cmd.Process.Signal(syscall.SIGSTOP)
if err != nil {
log.Info.Printf("%s: failed to suspend ffmpeg: %s\n", uuid, err)
}
case rtp.SessionControlCommandTypeResume:
stream := activeStreams.streams[uuid]
if stream == nil {
return
}
log.Info.Printf("%s: resuming stream\n", uuid)
stream.mutex.Lock()
defer stream.mutex.Unlock()
if stream.cmd == nil || stream.cmd.Process == nil {
log.Info.Printf("%s: attempted to resume inactive stream\n", uuid)
return
}
err := stream.cmd.Process.Signal(syscall.SIGCONT)
if err != nil {
log.Info.Printf("%s: failed to resume ffmpeg: %s\n", uuid, err)
}
case rtp.SessionControlCommandTypeReconfigure:
log.Info.Printf("%s: ignoring reconfigure message\n", uuid)
default:
log.Debug.Printf("%s: Unknown command type %d\n", uuid, cfg.Command.Type)
}
})
}
func setTlv8Payload(c *characteristic.Bytes, v interface{}) {
if val, err := tlv8.Marshal(v); err == nil {
c.SetValue(val)
} else {
log.Info.Println(err)
}
}
type Stream struct {
mutex *sync.Mutex
cmd *exec.Cmd
req rtp.SetupEndpoints
resp rtp.SetupEndpointsResponse
}
type ActiveStreams struct {
mutex *sync.Mutex
streams map[string]*Stream
}