🏡 index : ~doyle/hkbi.git

package main

import (
	"context"
	"encoding/gob"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"github.com/BurntSushi/toml"
	"github.com/brutella/hap"
	"github.com/brutella/hap/accessory"
	"github.com/brutella/hap/characteristic"
	"github.com/brutella/hap/log"
	"github.com/brutella/hap/rtp"
	"github.com/brutella/hap/service"
	"github.com/brutella/hap/tlv8"
	"github.com/w4/hkbi/blueiris"
	"io"
	"math/rand"
	"net"
	"net/http"
	"net/url"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"strings"
	"sync"
	"syscall"
)

type Config struct {
	ListenAddress string `toml:"listen-address"`
	DataDir       string `toml:"data-dir"`
	Blueiris      blueiris.BlueirisConfig
}

type GlobalState struct {
	ssrcVideo int32
	ssrcAudio int32
	blueiris  *blueiris.Blueiris
}

func main() {
	log.Info.Enable()
	log.Debug.Enable()

	if len(os.Args) != 2 || os.Args[1] == "" {
		log.Info.Fatalf("usage: %s /path/to/config.toml", os.Args[0])
	}

	var config = readConfig(os.Args[1])
	run(config)
}

func readConfig(path string) Config {
	var cfg Config
	_, err := toml.DecodeFile(path, &cfg)
	if err != nil {
		log.Info.Panic(err)
	}

	return cfg
}

func run(config Config) {
	bi, err := blueiris.NewBlueiris(config.Blueiris)
	if err != nil {
		log.Info.Fatalf("failed to login to bi: %s\n", err)
	}

	globalState := &GlobalState{
		ssrcVideo: rand.Int31(),
		ssrcAudio: rand.Int31(),
	}

	// fetch cameras from BlueIris
	biCameras, err := bi.ListCameras()
	if err != nil {
		log.Info.Fatalf("failed to load cameras from bi: %s\n", err)
	}

	err = os.MkdirAll(config.DataDir, os.FileMode(0755))
	if err != nil {
		log.Info.Fatalf("failed to create data directory: %s\n", err)
	}

	knownCamerasPath := filepath.Join(config.DataDir, "knownCameras")

	// read our known camera list from disk for stable ids
	var knownCameras = make(map[string]int)
	file, err := os.Open(knownCamerasPath)
	if err != nil {
		// if not exists, we'll just ignore the error and default to the empty list
		if !os.IsNotExist(err) {
			log.Info.Fatalf("failed to read knownCameras: %s\n", err)
		}
	} else {
		err = gob.NewDecoder(file).Decode(&knownCameras)
		if err != nil {
			log.Info.Fatalf("failed to decode knownCameras: %s\n", err)
		}
	}
	_ = file.Close()

	hasDiscoveredNewCameras := false

	// create HomeKit cameras and motion sensors from the fetched BlueIris cameras
	cameras := make([]*accessory.Camera, 0, len(biCameras))
	motionSensors := make(map[string]*service.MotionSensor)
	for _, camera := range biCameras {
		// create the HomeKit camera accessory
		cam := accessory.NewCamera(accessory.Info{
			Name:         camera.Name,
			Manufacturer: "HKBI",
		})

		if id := knownCameras[camera.Name]; id != 0 {
			log.Debug.Printf("reusing previously assigned id %d for camera %s", id, camera.Name)
			cam.Id = uint64(id)
		} else {
			newId := 0
			for _, i := range knownCameras {
				if i >= newId {
					newId = i + 1
				}
			}

			log.Info.Printf("newly discovered camera %s assigned id %d", camera.Name, newId)
			knownCameras[camera.Name] = newId
			cam.Id = uint64(newId)

			hasDiscoveredNewCameras = true
		}

		// setup stream request handling on management channels 1 & 2
		startListeningForStreams(camera.Id, cam.StreamManagement1, globalState, &config, bi.BaseUrl)
		startListeningForStreams(camera.Id, cam.StreamManagement2, globalState, &config, bi.BaseUrl)

		// create the HomeKit motion sensor accessory
		motionSensor := service.NewMotionSensor()
		cam.AddS(motionSensor.S)

		// add the cameras to our output array/map for adding to the server and dispatching
		// events to
		cameras = append(cameras, cam)
		motionSensors[camera.Id] = motionSensor
	}

	// write newly discovered cameras to disk
	if hasDiscoveredNewCameras {
		file, err = os.Create(knownCamerasPath)
		if err != nil {
			log.Info.Fatalf("failed to create knownCameras: %s\n", err)
		}

		err = gob.NewEncoder(file).Encode(knownCameras)
		if err != nil {
			log.Info.Fatalf("failed to encode knownCameras: %s\n", err)
		}

		_ = file.Close()
	}

	// fetch all the created accessories for exposing to HomeKit
	var accessories = make([]*accessory.A, 0, len(cameras))
	for _, camera := range cameras {
		accessories = append(accessories, camera.A)
	}

	// setup hap's state storage
	fs := hap.NewFsStore(config.DataDir)

	// start building the homekit accessory protocol (hap) server
	server, err := hap.NewServer(fs, accessories[0], accessories[1:]...)
	if err != nil {
		log.Info.Panic(err)
	}

	// set our HAP config
	server.Pin = "11111112"
	server.Addr = config.ListenAddress

	// endpoint to trigger a camera's motion sensor for 10 seconds
	server.ServeMux().HandleFunc("/trigger", func(res http.ResponseWriter, req *http.Request) {
		query := req.URL.Query()
		state := query.Get("state")
		cam := query.Get("cam")

		if sensor := motionSensors[cam]; sensor != nil {
			var motionDetected bool
			if state == "off" {
				motionDetected = false
			} else {
				motionDetected = true
			}

			sensor.MotionDetected.SetValue(motionDetected)

			res.WriteHeader(http.StatusOK)
		} else {
			log.Info.Printf("Received trigger request for unknown camera: %s", cam)
			res.WriteHeader(http.StatusBadRequest)
		}
	})

	// endpoint to handle snapshot requests from HomeKit
	server.ServeMux().HandleFunc("/resource", func(res http.ResponseWriter, req *http.Request) {
		var request struct {
			Type string `json:"resource-type"`
			Aid  int    `json:"aid"`
		}

		// ensure this is a valid resource request
		if !server.IsAuthorized(req) {
			_ = hap.JsonError(res, hap.JsonStatusInsufficientPrivileges)
			return
		} else if req.Method != http.MethodPost {
			res.WriteHeader(http.StatusBadRequest)
			return
		}

		// read request body
		body, err := io.ReadAll(req.Body)
		if err != nil {
			log.Info.Println(err)
			res.WriteHeader(http.StatusInternalServerError)
			return
		}

		// parse request body
		err = json.Unmarshal(body, &request)
		if err != nil {
			log.Info.Println(err)
			res.WriteHeader(http.StatusBadRequest)
			return
		}

		var cameraName string
		for name, i := range knownCameras {
			if i == request.Aid {
				cameraName = name
				break
			}
		}

		if cameraName == "" {
			log.Info.Printf("a snapshot was requested for camera %d but not camera with that id exists", request.Aid)
			res.WriteHeader(http.StatusBadRequest)
			return
		}

		switch request.Type {
		case "image":
			// build request to fetch a snapshot of the camera from blueiris
			req, err := bi.FetchSnapshot(cameraName)
			if err != nil {
				log.Info.Println(err)
				res.WriteHeader(http.StatusInternalServerError)
				return
			}

			// send request to blueiris
			imageResponse, err := http.DefaultClient.Do(req)
			if err != nil {
				log.Info.Println(err)
				res.WriteHeader(http.StatusInternalServerError)
				return
			}
			defer func(Body io.ReadCloser) {
				_ = Body.Close()
			}(imageResponse.Body)

			// set response headers
			res.Header().Set("Content-Type", "image/jpeg")

			// stream response from blueiris to HomeKit
			wr := hap.NewChunkedWriter(res, 2048)
			_, err = io.Copy(wr, imageResponse.Body)
			if err != nil {
				log.Info.Printf("Failed to copy bytes for snapshot to HomeKit: %s\n", err)
				return
			}
		default:
			log.Info.Printf("unsupported resource request \"%s\"\n", request.Type)
			res.WriteHeader(http.StatusInternalServerError)
			return
		}
	})

	// set up a listener for sigint and sigterm signals to stop the server
	c := make(chan os.Signal)
	signal.Notify(c, os.Interrupt)
	signal.Notify(c, syscall.SIGTERM)

	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		<-c
		signal.Stop(c)
		cancel()
	}()

	// spawn the server
	err = server.ListenAndServe(ctx)
	if err != nil {
		log.Info.Panic(err)
	}
}

// sets up a camera accessory for streaming
func startListeningForStreams(cameraName string, mgmt *service.CameraRTPStreamManagement, globalState *GlobalState, config *Config, blueirisBase *url.URL) {
	// set up some basic parameters for HomeKit to know that the camera is available
	setTlv8Payload(mgmt.StreamingStatus.Bytes, rtp.StreamingStatus{Status: rtp.StreamingStatusAvailable})
	setTlv8Payload(mgmt.SupportedRTPConfiguration.Bytes, rtp.NewConfiguration(rtp.CryptoSuite_AES_CM_128_HMAC_SHA1_80))
	setTlv8Payload(mgmt.SupportedVideoStreamConfiguration.Bytes, rtp.DefaultVideoStreamConfiguration())
	setTlv8Payload(mgmt.SupportedAudioStreamConfiguration.Bytes, rtp.DefaultAudioStreamConfiguration())

	// shared state for all the spawned streams, with a mapping to the session id for us to
	// figure out which stream is being referred to
	var activeStreams = ActiveStreams{
		mutex:   &sync.Mutex{},
		streams: map[string]*Stream{},
	}

	// handle the initial request sent to us from HomeKit to set up a new stream
	mgmt.SetupEndpoints.OnValueUpdate(func(new, old []byte, r *http.Request) {
		// HomeKit ends up sending us two requests, but the second one doesn't have a http request attached,
		// so we can just ignore it
		if r == nil {
			return
		}

		var req rtp.SetupEndpoints

		// unmarshal request from HomeKit
		err := tlv8.Unmarshal(new, &req)
		if err != nil {
			log.Info.Printf("Could not unmarshal tlv8 data: %s\n", err)
			return
		}

		// encode the session id, so it's human-readable for logging
		var uuid = hex.EncodeToString(req.SessionId)

		// build the response to send back to HomeKit
		resp := rtp.SetupEndpointsResponse{
			SessionId: req.SessionId,
			Status:    rtp.SessionStatusSuccess,
			AccessoryAddr: rtp.Addr{
				IPVersion:    req.ControllerAddr.IPVersion,
				IPAddr:       strings.Split(r.Context().Value(http.LocalAddrContextKey).(net.Addr).String(), ":")[0],
				VideoRtpPort: req.ControllerAddr.VideoRtpPort,
				AudioRtpPort: req.ControllerAddr.AudioRtpPort,
			},
			Video:     req.Video,
			Audio:     req.Audio,
			SsrcVideo: globalState.ssrcVideo,
			SsrcAudio: globalState.ssrcAudio,
		}

		// create and track the new stream
		activeStreams.mutex.Lock()
		activeStreams.streams[uuid] = &Stream{
			mutex: &sync.Mutex{},
			cmd:   nil,
			req:   req,
			resp:  resp,
		}
		activeStreams.mutex.Unlock()

		// send the response to HomeKit
		setTlv8Payload(mgmt.SetupEndpoints.Bytes, resp)
	})

	// handle streaming requests from HomeKit
	mgmt.SelectedRTPStreamConfiguration.OnValueRemoteUpdate(func(buf []byte) {
		var cfg rtp.StreamConfiguration

		// unmarshal request from HomeKit
		err := tlv8.Unmarshal(buf, &cfg)
		if err != nil {
			log.Info.Fatalf("Could not unmarshal tlv8 data: %s\n", err)
		}

		// encode the session id, so it's human-readable for logging
		uuid := hex.EncodeToString(cfg.Command.Identifier)

		// match the command that HomeKit wants to perform for the stream uuid
		switch cfg.Command.Type {
		case rtp.SessionControlCommandTypeStart:
			stream := activeStreams.streams[uuid]
			if stream == nil {
				return
			}

			log.Info.Printf("%s: starting stream\n", uuid)

			// lock the stream, so we're not racing with another request to spawn an ffmpeg instance
			// and update the state
			stream.mutex.Lock()
			defer stream.mutex.Unlock()

			// close any previous ffmpeg instances that were open for the given stream uuid
			if stream.cmd != nil && stream.cmd.Process != nil {
				log.Info.Printf("%s: requested to start stream, but stream was already running. shutting down previous\n", uuid)

				_ = stream.cmd.Process.Signal(syscall.SIGINT)
				status, _ := stream.cmd.Process.Wait()
				log.Info.Printf("%s: ffmpeg exited with %s\n", uuid, status.String())

				stream.cmd = nil
			}

			// build the endpoint that HomeKit wants us to stream to
			endpoint := fmt.Sprintf(
				"srtp://%s:%d?rtcpport=%d&pkt_size=%d",
				stream.req.ControllerAddr.IPAddr,
				stream.req.ControllerAddr.VideoRtpPort,
				stream.req.ControllerAddr.VideoRtpPort,
				1378,
			)

			// build the blueiris rtsp source
			source := blueirisBase.JoinPath(cameraName)
			source.Scheme = "rtsp"
			source.User = url.UserPassword(config.Blueiris.Username, config.Blueiris.Password)

			// build ffmpeg command for pulling RTSP stream from BlueIris and forwarding to the HomeKit
			// controller's SRTP port using pass-through for low CPU, the BlueIris RTSP web server needs
			// to be set to 2,000kb/s bitrate though otherwise iOS will silently fail
			cmd := exec.Command(
				"ffmpeg",
				// input
				"-an",
				"-rtsp_transport", "tcp",
				"-use_wallclock_as_timestamps", "1",
				"-i", source.String(),
				// no audio
				"-an",
				// no subs
				"-sn",
				// no data
				"-dn",
				// add extra keyframes, so we don't need to worry about the blueiris settings
				"-bsf:v", "dump_extra",
				// copy data directly from the blueiris stream
				"-vcodec", "copy",
				// requested payload type from client
				"-payload_type", fmt.Sprintf("%d", cfg.Video.RTP.PayloadType),
				// sync source
				"-ssrc", fmt.Sprintf("%d", globalState.ssrcVideo),
				// format rtp
				"-f", "rtp",
				// forward over srtp to the controller
				"-srtp_out_suite", "AES_CM_128_HMAC_SHA1_80",
				"-srtp_out_params", stream.req.Video.SrtpKey(),
				endpoint,
			)

			// forward ffmpeg to console
			cmd.Stdout = os.Stdout
			cmd.Stderr = os.Stderr
			log.Debug.Println(cmd)

			// spawn ffmpeg command
			err := cmd.Start()
			if err != nil {
				log.Info.Printf("Failed to spawn ffmpeg: %s\n", err)
				return
			}

			// update our state to contain the spawned command, so we can control it later
			stream.cmd = cmd

			// sanity check to ensure our status is still available so new clients can still request
			// streams
			setTlv8Payload(mgmt.StreamingStatus.Bytes, rtp.StreamingStatus{Status: rtp.StreamingStatusAvailable})
		case rtp.SessionControlCommandTypeEnd:
			stream := activeStreams.streams[uuid]
			if stream == nil {
				return
			}

			log.Info.Printf("%s: ending stream\n", uuid)

			// lock the stream, so we're not racing with another request on the process and update
			// the state
			stream.mutex.Lock()
			defer stream.mutex.Unlock()

			// ensure the stream is still open
			if stream.cmd == nil || stream.cmd.Process == nil {
				log.Info.Printf("%s: attempted to end already ended stream\n", uuid)
				return
			}

			// send a sigint to ffmpeg and wait for it to finish
			_ = stream.cmd.Process.Signal(syscall.SIGINT)
			status, _ := stream.cmd.Process.Wait()
			log.Info.Printf("%s: ffmpeg exited with %s\n", uuid, status.String())

			// remove command from our state so HomeKit can't attempt to close it twice
			stream.cmd = nil

			// sanity check to ensure our status is still available so new clients can still request
			// streams
			setTlv8Payload(mgmt.StreamingStatus.Bytes, rtp.StreamingStatus{Status: rtp.StreamingStatusAvailable})
		case rtp.SessionControlCommandTypeSuspend:
			stream := activeStreams.streams[uuid]
			if stream == nil {
				return
			}

			log.Info.Printf("%s: suspending stream\n", uuid)

			// lock the stream, so we're not racing with another request on the process
			stream.mutex.Lock()
			defer stream.mutex.Unlock()

			// ensure HomeKit isn't attempting to suspend a closed stream
			if stream.cmd == nil || stream.cmd.Process == nil {
				log.Info.Printf("%s: attempted to suspend inactive stream\n", uuid)
				return
			}

			// send a sigstop signal to ffmpeg
			err := stream.cmd.Process.Signal(syscall.SIGSTOP)
			if err != nil {
				log.Info.Printf("%s: failed to suspend ffmpeg: %s\n", uuid, err)
			}
		case rtp.SessionControlCommandTypeResume:
			stream := activeStreams.streams[uuid]
			if stream == nil {
				return
			}

			log.Info.Printf("%s: resuming stream\n", uuid)

			// lock the stream, so we're not racing with another request on the process
			stream.mutex.Lock()
			defer stream.mutex.Unlock()

			// ensure HomeKit isn't attempting to resume a closed stream
			if stream.cmd == nil || stream.cmd.Process == nil {
				log.Info.Printf("%s: attempted to resume inactive stream\n", uuid)
				return
			}

			// send a sigcont signal to ffmpeg
			err := stream.cmd.Process.Signal(syscall.SIGCONT)
			if err != nil {
				log.Info.Printf("%s: failed to resume ffmpeg: %s\n", uuid, err)
			}
		case rtp.SessionControlCommandTypeReconfigure:
			log.Info.Printf("%s: ignoring reconfigure message\n", uuid)
		default:
			log.Debug.Printf("%s: Unknown command type %d\n", uuid, cfg.Command.Type)
		}
	})
}

func setTlv8Payload(c *characteristic.Bytes, v interface{}) {
	if val, err := tlv8.Marshal(v); err == nil {
		c.SetValue(val)
	} else {
		log.Info.Println(err)
	}
}

type Stream struct {
	mutex *sync.Mutex
	cmd   *exec.Cmd
	req   rtp.SetupEndpoints
	resp  rtp.SetupEndpointsResponse
}

type ActiveStreams struct {
	mutex   *sync.Mutex
	streams map[string]*Stream
}