视频导入和导出
MoMo Lv5

概述

内容

由于在视频的算法计算和同步时是一次性计算完成,导致回顾时不能重新定位到需求位置(有人经过、有声音),需要将视频可以导出,同时可以导入到把视频重新放到pipeline中可复现

需求

  1. 导出文件
    1. 有一个用户操作的入口 [分享菜单]
    2. 导出后是一个mp4完整文件/ 原始数据格式(0000001是一帧的开头,根据标识保存数据)
  2. 读取文件读成pipeline的输入
  3. 文件转换成一帧

过程

pipelineFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
object DemoPipelineFactory {

fun createVideoPipeline(
sink: EventSink,
ai: IntelligenceRepository,
logger: Logger
): LabelPipeline<Bitmap> {
val facePipelineBuilder = LabelPipeline.Builder<List<MarkedFace>>(logger)
val facePipeline = facePipelineBuilder
.sinkTo(sink)
.addOp(HMSFaceVerifier(ai, logger))
.build()
val facePipeSplitter = SplitExtractor(facePipeline, GoogleMLKitFaceDetector(logger))

val videoPipeBuilder = LabelPipeline.Builder<Bitmap>(logger)
return videoPipeBuilder
.sinkTo(sink)
.addOp(facePipeSplitter)
// .addOp(HMSSceneDetector()) // TODO 接入新的标签回调
.addOp(TFLiteObjectDetector(ModelName.SSD_MOBILE_NET_V1, ai, logger))
.build()
}

fun createAudioPcmPipeline(
eventSink: EventSink,
intelligenceRepository: IntelligenceRepository,
logger: Logger
): LabelPipeline<ShortArray> {
val audioPipeBuilder = LabelPipeline.Builder<ShortArray>(logger)
return audioPipeBuilder
.sinkTo(eventSink)
.addOp(YAMNetAudioClassifier(intelligenceRepository, logger))
.build()
}
}

LabelPipeLine

com/danale/edge/intelligence/pipe/LabelPipeline.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
open class LabelPipeline<T : Any>(
private val extractors: List<LabelExtractor<T>>,
private val logger: Logger,
private val providedSink: EventSink?
) : Closeable {

companion object {
const val TAG = "LabelPipeline"
}

init {
extractors.forEach {
if (providedSink != null) {
logger.i(TAG, "init, sink ${it.hashCode()} to ${providedSink.hashCode()}")
it.bindEventSink(providedSink)
}
}
}

fun process(item: T, metaData: LabeledObject.MetaData): Single<LabeledObject<T>> {
var single = Single.just(LabeledObject(ConcurrentHashMap(), item, metaData = metaData))
for (op in extractors) {
single = single.flatMap { op.extractLabel(it).toSingleDefault(it) }
}
return single
}

class Builder<T : Any>(val logger: Logger) {

private val list = LinkedList<LabelExtractor<T>>()
private var sink: EventSink? = null

fun sinkTo(sink: EventSink): Builder<T> {
this.sink = sink
return this
}

fun addOp(extractor: LabelExtractor<T>): Builder<T> {
list.addLast(extractor)
return this
}

fun build(): LabelPipeline<T> {
return LabelPipeline(list, logger, sink)
}
}

override fun close() {
for (op in extractors) {
op.flushSession(providedSink)
op.close()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
interface LabelExtractor<T : Any> : Closeable, EventSource {

/**
* Extractor执行计算
*/
fun extractLabel(item: LabeledObject<T>): Completable

/**
* 告知计算已结束,Extractor应将尚未上报的事件进行上报
*/
fun flushSession(sink: EventSink?) {}

fun labelKey(sourceType: String, category: String, vendor: String, vendorKey: String): String {
return "$sourceType:$category:$vendor:$vendorKey"
}
}

视频导出

通过ByteBuffer导出成MP4格式视频

com/danale/edge/ui/flow/VideoFlowViewModel.kt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
fun attachPlayer(
surface: Surface,
clipDri: String,
clipDataSource: ByteBufferVideoSource? = null,
positionResponder: ClipPositionResponder? = null
) {
logger.d(
TAG,
"attachPlayer, clipDri=${Fuzzy.interval(clipDri)}, data=${clipDataSource?.hashCode()}"
)

positionResponder?.let {
// 弱引用保存播放进度的响应体
weakPositionResponder = WeakReference(it)
}

loadPlayerExecutor.submit {
detachPlayerSync()

if (player == null) {
logger.i(TAG, "attachPlayer, instantiate")
player = IjkPlayerProvider.provideIjkPlayerDefaultConfig()
}
player?.apply {
try {
setSurface(surface)
setOnPreparedListener {
// 开始播放视频
}
setOnErrorListener { mediaPlayer, errArg1, errArg2 ->
// 错误被处理
}
setOnCompletionListener {
// 视频播放完毕
}
} catch (e: Exception) {
logger.e(TAG, "attachPlayer, null mp", e)
}
}

if (clipDataSource != null) {
logger.i(TAG, "attachPlayer, use exist data")
try {
// 播放已有视频
} catch (e: Exception) {}
} else {
logger.i(TAG, "attachPlayer, load clip ${Fuzzy.interval(clipDri)}")
val latch = CountDownLatch(1)
dataSubscription = sdk.getBlobWithDri(clipDri)
.subscribeOn(Schedulers.io())
.subscribeBy(
onSuccess = {
logger.i(TAG, "attachPlayer, blob ok")
try {
val data = ByteBufferVideoSource()
data.load(it)
preparePlayer(data)

/***保存bytebuffer****/
saveBytebuffer(it, clipDri.split("clip_")[1])

} catch (e: Exception) {}
latch.countDown()
},
onError = {}
)
latch.await()
}
}

}

其中data.load(it)是获取视频来源,it是ByteBuffer格式,通过保it为MP4格式可以实现视频的导出,不同的视频通过clipDri加以区分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 获取存储路径
private fun getPath(): String {
val result = (Environment.getExternalStorageDirectory().toString()
+ File.separator + Environment.DIRECTORY_DCIM
+ File.separator + "Camera" + File.separator)
val file = File(result)
if (!file.exists()) {
file.mkdir()
}
return result + File.separator
}

// 保存ByteBuffer
private fun saveBytebuffer(byteBuffer: ByteBuffer, fileName: String){
logger.d(TAG, "attachPlayer, save bytebuffer")
val file = File(getPath()+ fileName + ".mp4")
logger.d(TAG, "attachPlayer, file=$file")
if(!file.exists()){
file.createNewFile();
}
val os: OutputStream = FileOutputStream(file,true)
os.write(byteBuffer.array())
os.flush()
os.close()
}

clipDri是String格式,但直接将其存为文件名会有非法字符,[D*]dri:data_worm:kv:blob:grid/827f8b9cfbb780df63f56fe6e7bef03c:clip_f00b3dd5c79411edaa64d54fbc172c8c

通过截取拿到clip_后的部分组成文件名

视频导入

图片导入

通过ffmpeg将mp4视频文件分割成jpg图片文件

1
ffmpeg -i  test.mp4 -r 10 -f image2 %05d.jpg
  • -i : 指定输入文件
  • -r : 帧数 10
  • -f : 指定格式化的格式为image2
  • image2后面跟着的是文件名
  • %5d:以为5位数按正序编号

将图片放入到app的文件夹中,由于手机的安全性,放入其他文件夹会读不到文件/sdcard/Android/data/com.alcidae.edgeapp/files

通过context.getExternalFilesDir(null)获取到文件夹路径,存入每个图片的名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private fun getPics(): MutableList<String> {
logger.d(TAG, "start getPics, ${context.getExternalFilesDir(null)}")
val fileNames: MutableList<String> = mutableListOf()
val fileTree = File("${context.getExternalFilesDir(null)}/test").walk()
fileTree.maxDepth(1) //需遍历的目录层次为1,即无须检查子目录
.filter {
it.isFile
} //只挑选文件,不处理文件夹
.filter {
it.extension in listOf("jpg")
}//选择扩展名为jpg的文件
.forEach {
fileNames.add(it.absolutePath)
}//循环 处理符合条件的文件
return fileNames
}

通过循环读取图片,并将图片转成bitmap输入到pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@SuppressLint("CheckResult")
private fun pipelineFromPic(files: MutableList<String>) {
for (i in 0 until files.size){
val file = File(files[i])

//readBytes读取字节数组形式的文件内容
val bytes = file.readBytes()
//decodeByteArray从字节数组解析图片
val currBitmap = BitmapFactory.decodeByteArray(bytes, 0, bytes.size)

val metaData =
LabeledObject.MetaData(dataTimestamp = i.toLong()) // 新建data 承载源数据
videoPipeline.process(currBitmap, metaData)
// 扁平化
.flatMap {
val elapsedNano = it.getElapsedMillis()
logger.d(TAG, "for, inferenceTime = $elapsedNano")
it.obj.recycle() // 释放与此位图关联的本机对象,并清除对像素数据的引用
return@flatMap Single.just(it) // 转换为发出该对象的Single
}
.subscribeBy(
onSuccess = {
logger.d(TAG, "for, onSuccess, i=$i, labeledObject = $it")
},
onError = {
logger.d(TAG, "for, onError, i=$i, throwable = $it")
close()
},
)

}

videoPipeline.close()
}
1
2
3
val files = getPics()

pipelineFromPic(files)

音频导入

通过ffmpeg工具将mp4文件转成pcm文件

1
2
ffmpeg -i test.mp4 test.mp3
ffmpeg -i test.mp3 -ar 48000 -ac 2 -f s16le test.pcm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@SuppressLint("CheckResult")
fun videoPipeline() {
val file = File("${context.getExternalFilesDir(null)}/test/test.pcm")
val bytes = file.readBytes()
val metaData = LabeledObject.MetaData(dataTimestamp = 1.toLong())

logger.d(TAG, "bytes.size= ${bytes.size}")
val step = 640 // 每次读取的大小
val num = floor((bytes.size / step).toDouble()) // 循环次数

for (i in 0 until num.toInt()) {
val byte = bytes.slice(i..i + step) // 截取pcm
val shorts = ShortArray(byte.size / 2) {
(byte[it * 2].toUByte().toInt() + (byte[(it * 2) + 1].toInt() shl 8)).toShort()
} // 转换成shortArray

Maybe.fromSingle(audioPipeline.process(shorts, metaData)
.flatMap {
val elapsedNano = it.getElapsedMillis()
logger.d(TAG, "audioPipe, inferenceTime = $elapsedNano")
return@flatMap Single.just(it)
}).subscribeBy(
onSuccess = {
logger.d(TAG, "for, onSuccess, labeledObject = $it")
},
onError = {
logger.d(TAG, "for, onError, throwable = $it")
close()
},
)

}
audioPipeline.close()
}
Powered by Hexo & Theme Keep
Unique Visitor Page View