-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvideo_node.py
More file actions
149 lines (129 loc) · 4.7 KB
/
video_node.py
File metadata and controls
149 lines (129 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import datetime
import time
from typing import Optional, Union
import chimerapy.engine as cpe
import cv2
import imutils
import numpy as np
from chimerapy.orchestrator import sink_node, source_node
@source_node(name="ChimeraPyTutorial_Video")
class Video(cpe.Node):
"""A generic video capture node.
This can be used to capture a local webcam or a video from the local file system
Parameters
----------
name : str, optional (default: 'VideoNode')
The name of the node
width: int, optional (default: 640)
The width of the video
height: int, optional (default: 480)
The height of the video
video_src: Union[str, int], optional (default: 0)
The video source. This can be a local file path or a webcam index
frame_rate: int, optional (default: 30)
The frame rate of the video, in frames per second
frame_key: str, optional (default: 'frame')
The key to use for the frame in the data chunk
include_meta: bool, optional (default: False)
Whether to include the metadata in the data chunk
loop: bool, optional (default: False)
Whether to loop the video when it reaches the end
**kwargs
Additional keyword arguments to pass to the Node constructor
Notes
-----
The frame_rate is not guaranteed to be exact. It is only a best effort.
ToDo: Research and Implement a proper frame rate limiter
"""
def __init__(
self,
video_src: Union[str, int] = 0,
name: str = "VideoNode",
width: Optional[int] = 640,
height: Optional[int] = 480,
frame_rate: int = 30,
frame_key: str = "frame",
include_meta: bool = False,
loop: bool = False,
**kwargs,
) -> None:
self.video_src = video_src
self.width = width
self.height = height
self.frame_rate = frame_rate
self.include_meta = include_meta
self.frame_key = frame_key
self.cp: Optional[cv2.VideoCapture] = None
self.frame_count = 0
self.sleep_factor = 0.95
self.debug = kwargs.get("debug", False)
self.loop = loop
super().__init__(name=name, **kwargs)
def setup(self) -> None:
self.cp = cv2.VideoCapture(self.video_src)
self.frame_count = 0
def step(self) -> cpe.DataChunk:
data_chunk = cpe.DataChunk()
ret, frame = self.cp.read()
if not ret:
if self.loop:
self.logger.info("Restarting video")
if isinstance(self.video_src, str) and self.video_src.startswith(
"http"
):
self.cp.release()
self.cp = cv2.VideoCapture(self.video_src)
else:
self.cp.set(cv2.CAP_PROP_POS_FRAMES, 0)
ret, frame = self.cp.read()
else:
self.logger.error("Could not read frame from video source")
h = self.height or 480
w = self.width or 640
frame = np.zeros((h, w, 3), dtype=np.uint8)
cv2.putText(
frame,
"Read Error",
(h // 2, w // 2),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255),
2,
)
if self.width or self.height:
frame = imutils.resize(frame, width=self.width, height=self.height)
if self.debug:
cv2.imshow(
f"{self.name}_{self.id[0:6]}", frame
) # Will the window name be unique?
cv2.waitKey(1)
data_chunk.add(self.frame_key, frame, "image")
if self.include_meta:
data_chunk.add(
"metadata",
{
"width": self.width,
"height": self.height,
"source_id": self.id,
"source_name": self.name,
"frame_rate": self.frame_rate,
"frame_count": self.frame_count,
"belongs_to_video_src": bool(ret),
},
)
# Sleeping
if self.frame_count == 0:
self.initial = datetime.datetime.now()
else:
current_datetime = datetime.datetime.now()
delta = (current_datetime - self.initial).total_seconds()
expected = self.frame_count / self.frame_rate
sleep_time = expected - delta
time.sleep(max(self.sleep_factor * sleep_time, 0))
# Update
self.frame_count += 1
return data_chunk
def teardown(self) -> None:
self.cp.release()
if self.debug:
cv2.destroyAllWindows()