Using agent-based approach to make ChatGPT capable of video analysis - part 2 - adding speech recognition and more

Hello.

In this follow-up, we will continue development of the solution to let ChatGPT describe videos.

In my approach, the video is split into fragments and then these fragments are turned into "comics". Then, every fragment gets described and all the descriptions are sent to the master AI agent, which interacts with the user. 

Since the last post, I updated the code to make it more modular and move AI processiong out of initalizer:

  
from moviepy.video.io.VideoFileClip import VideoFileClip
from PIL import Image
import os
import easyapiopenai
from WojSafeAPI import * #My own library for loading API keys safely, you cannot use it.

class Video_ChatGPT:
    def __init__(self,openai_api_key:str, video_path: str, time_between_frames_seconds: float,
                 frames_per_scene: int, process_audio=True, frame_height = 480,
                 watcher_model='gpt-4o-mini',master_model='gpt-4o',
                 watcher_token_limit = 500, master_token_limit=15*1000, add_descriptions_to_system=False,
                 add_to_watchers_system=""):
        
        self.video_path = video_path
        self.time_between_frames_seconds = time_between_frames_seconds
        self.api_key = openai_api_key
        self.frames_per_scene = frames_per_scene
        self.process_audio = process_audio
        self.frame_height = frame_height
        self.add_descriptions_to_system = add_descriptions_to_system
        
        temp1 = self.get_video_images(self.video_path, self.time_between_frames_seconds, self.frame_height)
        self.video_comics = self.create_comics(temp1, self.frames_per_scene)
        del temp1 #We can delete the list with all frames since only the one with the 'comics' will be used.
        
        self.ai_analyzer = self.AI_Analyser(
            api_key=self.api_key,
            frames_per_scene=self.frames_per_scene,
            watcher_model=watcher_model,
            master_model=master_model,
            watcher_token_limit=watcher_token_limit,
            master_token_limit=master_token_limit,
            add_to_watchers_system=add_to_watchers_system,
            add_descriptions_to_system=self.add_descriptions_to_system
        )

        self.video_description = self.ai_analyzer.describe_video(self.video_comics)

    def GetResponseFromMaster(self, prompt):
        return self.ai_analyzer.get_master_response(prompt, self.video_description)

    def get_video_images(self, video_path, seconds_between_frames=1.0, desired_height=480):
        print("Extracting frames from the video "+video_path)
        images = []
        try:
            with VideoFileClip(video_path) as clip:
                current_time = 0
                current_step = 0
                steps = int(int(clip.duration)/seconds_between_frames)
                while current_time < clip.duration:
                    image = Image.fromarray(clip.get_frame(current_time))
                    x,y = image.size
                    divider = desired_height/y
                    new_width=x*divider
                    image = image.convert('RGB')
                    image = image.resize((int(new_width),desired_height), resample=Image.LANCZOS)
                    images.append(image)
                    print("Extracted frame "+str(current_step)+" out of "+str(steps))
                    current_step += 1
                    current_time += seconds_between_frames
            print("Frames extracted.")
            return images
        except Exception as e:
            print(f"An error occurred when extracting video frames: {e}")
            return images

    def create_comics(self, images, amount_per_page: int):
        list_l = len(images)
        comics = []
        if list_l>=1:
            x,y = images[0].size
            n=0
            paste_y = 0
            canvas = Image.new('RGB', (x, y*amount_per_page), color='white')
            while n0:
                    comics.append(canvas)
                    print("Created image page "+str(int(n/amount_per_page))+" out of "+str(int(list_l/amount_per_page)))
                    paste_y = 0
                    canvas = Image.new('RGB', (x, y*amount_per_page), color='white')
                    canvas.paste(images[n],(0,paste_y))
                    n+=1
                    paste_y+=y
                else:
                    canvas.paste(images[n],(0,paste_y))
                    n+=1
                    paste_y+=y
        return comics

    class AI_Analyser:
        def __init__(self, api_key, frames_per_scene, watcher_model, master_model,
                 watcher_token_limit, master_token_limit, add_to_watchers_system, add_descriptions_to_system):
            self.api_key = api_key
            self.frames_per_scene = frames_per_scene
            self.add_descriptions_to_system = add_descriptions_to_system

            watcher_system = "You will receive a fragment of a video in form of a " + str(frames_per_scene) + " comic. Write a description of the fragment as a whole. Never mention it being a comic." + add_to_watchers_system
            self.frame_watcher_ai = easyapiopenai.ImgChatGPTAgent(self.api_key, watcher_model, watcher_token_limit, watcher_system)

            master_system = "You are a helpful assistant capable of video analysis. Do not speak about the video unless specifically asked about it."
            self.master_ai = easyapiopenai.ChatGPTAgent(self.api_key, master_model, master_token_limit, master_system)

        def describe_video(self, video_comics):
            description = ""
            n = 1
            print("Describing each scene...")
            for i in video_comics:
                try:
                    text = self.frame_watcher_ai.GetResponseWithImg(i, "Describe this fragment of the video based on the comic.").replace("\n", " ")
                    description += "Scene number " + str(n) + ":\n" + text + "\n\n"
                    print("Scene " + str(n) + " " + text)
                    print(" ")
                    n += 1
                except Exception as e:
                    print(f"An error occurred describing the fragment: {e}")
                    self.frame_watcher_ai.ClearHistory() #The AI keeps it previous responses for context, but too many may cause an error.
            print("Done describing.")
            return description

        def get_master_response(self, prompt, video_description):
            final_prompt = prompt
            if not self.add_descriptions_to_system:
                final_prompt += "\n\nDescription of the video:\n\n" + video_description
            return self.master_ai.GetResponse(final_prompt)

if __name__ == "__main__":
    agent = Video_ChatGPT(openai_api_key=YourAPIKey('openai'), video_path="film.webm", time_between_frames_seconds=5, frames_per_scene=5, process_audio=False, frame_height=300)
    while True:
        print(agent.GetResponseFromMaster(input("Type: ")))
  

This code works exacly as code at the end of the previous post, I just restructured it.

First, let's change the way it works and process the video after the question was asked. This way, the AI will be better in answering specific questions since it will know what to look for already.

I moved creating the description and "comics" out of the constructor entirely and made sure that the "comics" are generated only once (after all they can be reused). I also modified the System message of AI responside for looking at the comics to return NO-INFO-HERE if a fragment does not contain anything relevant. That AI agent now also knows the prompt too.

Since the changes were too big, I will just paste the whole current code:

  
from moviepy.video.io.VideoFileClip import VideoFileClip
from PIL import Image
import os
import easyapiopenai
from WojSafeAPI import * #My own library for loading API keys safely, you cannot use it.

class Video_ChatGPT:
    def __init__(self,openai_api_key:str, video_path: str, time_between_frames_seconds: float,
                 frames_per_scene: int, process_audio=True, frame_height = 480,
                 watcher_model='gpt-4o-mini',master_model='gpt-4o',
                 watcher_token_limit = 500, master_token_limit=15*1000, add_descriptions_to_system=False,
                 add_to_watchers_system="", remember_context = True):
        
        self.video_path = video_path
        self.time_between_frames_seconds = time_between_frames_seconds
        self.api_key = openai_api_key
        self.frames_per_scene = frames_per_scene
        self.process_audio = process_audio
        self.frame_height = frame_height
        self.add_descriptions_to_system = add_descriptions_to_system
        
        self.video_description = "The video was not analyzed yet."
        self.video_comics = []
        
        self.watcher_model=watcher_model
        self.master_model=master_model
        self.watcher_token_limit=watcher_token_limit
        self.master_token_limit=master_token_limit
        self.add_to_watchers_system=add_to_watchers_system
        
        self.remember_context = remember_context

    def CreateVideoInfo(self, prompt):
        if len(self.video_comics) < 1: #The comics need to be created only once
            temp1 = self.get_video_images(self.video_path, self.time_between_frames_seconds, self.frame_height)
            self.video_comics = self.create_comics(temp1, self.frames_per_scene)
            del temp1 #We can delete the list with all frames since only the one with the 'comics' will be used.
        
        self.ai_analyzer = self.AI_Analyser(
            api_key=self.api_key,
            frames_per_scene=self.frames_per_scene,
            watcher_model=self.watcher_model,
            master_model=self.master_model,
            watcher_token_limit=self.watcher_token_limit,
            master_token_limit=self.master_token_limit,
            add_to_watchers_system=self.add_to_watchers_system,
            add_descriptions_to_system=self.add_descriptions_to_system,
            remember_context = self.remember_context
        )

        self.video_description = self.ai_analyzer.describe_video(self.video_comics, prompt)


    def GetResponseFromMaster(self, prompt):
        self.CreateVideoInfo(prompt)
        return self.ai_analyzer.get_master_response(prompt, self.video_description)

    def get_video_images(self, video_path, seconds_between_frames=1.0, desired_height=480):
        print("Extracting frames from the video "+video_path)
        images = []
        try:
            with VideoFileClip(video_path) as clip:
                current_time = 0
                current_step = 0
                steps = int(int(clip.duration)/seconds_between_frames)
                while current_time < clip.duration:
                    image = Image.fromarray(clip.get_frame(current_time))
                    x,y = image.size
                    divider = desired_height/y
                    new_width=x*divider
                    image = image.convert('RGB')
                    image = image.resize((int(new_width),desired_height), resample=Image.LANCZOS)
                    images.append(image)
                    print("Extracted frame "+str(current_step)+" out of "+str(steps))
                    current_step += 1
                    current_time += seconds_between_frames
            print("Frames extracted.")
            return images
        except Exception as e:
            print(f"An error occurred when extracting video frames: {e}")
            return images

    def create_comics(self, images, amount_per_page: int):
        list_l = len(images)
        comics = []
        if list_l>=1:
            x,y = images[0].size
            n=0
            paste_y = 0
            canvas = Image.new('RGB', (x, y*amount_per_page), color='white')
            while n0:
                    comics.append(canvas)
                    print("Created image page "+str(int(n/amount_per_page))+" out of "+str(int(list_l/amount_per_page)))
                    paste_y = 0
                    canvas = Image.new('RGB', (x, y*amount_per_page), color='white')
                    canvas.paste(images[n],(0,paste_y))
                    n+=1
                    paste_y+=y
                else:
                    canvas.paste(images[n],(0,paste_y))
                    n+=1
                    paste_y+=y
        return comics

    class AI_Analyser:
        def __init__(self, api_key, frames_per_scene, watcher_model, master_model,
                 watcher_token_limit, master_token_limit, add_to_watchers_system, add_descriptions_to_system, remember_context=True):
            self.api_key = api_key
            self.frames_per_scene = frames_per_scene
            self.add_descriptions_to_system = add_descriptions_to_system
            
            self.remember_context = remember_context

            watcher_system = "You will receive a fragment of a video in form of a " + str(frames_per_scene) + "panel comic. Write a detailed description of the fragment as a whole. Never mention it being a comic." + add_to_watchers_system
            self.frame_watcher_ai = easyapiopenai.ImgChatGPTAgent(self.api_key, watcher_model, watcher_token_limit, watcher_system)

            master_system = "You are a helpful assistant capable of video analysis. Do not speak about the video unless specifically asked about it."
            self.master_ai = easyapiopenai.ChatGPTAgent(self.api_key, master_model, master_token_limit, master_system)

        def describe_video(self, video_comics, original_prompt):
            description = ""
            n = 1
            print("Describing each scene...")
            for i in video_comics:
                try:
                    if not self.remember_context:
                        self.frame_watcher_ai.ClearHistory()
                    text = self.frame_watcher_ai.GetResponseWithImg(i,
                    "Describe this fragment of the video based on the comic.\nFocus on answering the original prompt:\n" +
                    original_prompt +
                    "\n\nThe comic might not contain anything related to the prompt, if that is the case then write NO-INFO-HERE. "+
                    "Describe the comic if it is related to the question in any way at all."
                    ).replace("\n", " ")
                    description += "Scene number " + str(n) + ":\n" + text + "\n\n"
                    print("Scene " + str(n) + ": " + text)
                    print(" ")
                    #i.show()
                    n += 1
                except Exception as e:
                    print(f"An error occurred describing the fragment: {e}")
                    self.frame_watcher_ai.ClearHistory() #The AI keeps it previous responses for context, but too many may cause an error.
            print("Done describing.")
            return description

        def get_master_response(self, prompt, video_description):
            final_prompt = prompt
            if not self.add_descriptions_to_system:
                final_prompt += "\n\nDescription of the video:\n\n" + video_description
            return self.master_ai.GetResponse(final_prompt)

if __name__ == "__main__":
    agent = Video_ChatGPT(openai_api_key=YourAPIKey('openai'), video_path="film.webm", time_between_frames_seconds=5, frames_per_scene=5, remember_context = False, process_audio=False, frame_height=300)
    while True:
        print(agent.GetResponseFromMaster(input("Type: ")))
  

Now let's run it and use the movie "Steamboat Willie" as video input.

First, I asked question: What role does the parrot play in the movie?

As expected, the AI described only the scenes where the parrot appearead:

The final answer was: "The parrot plays a significant comedic role in the movie by acting as a playful and mischievous foil to the main character. In the various scenes, it introduces humor through its antics and interactions, contributing to the chaos and highlighting the lighthearted nature of the narrative. The parrot's actions lead to comedic struggles for the main character, providing comic relief and enhancing the playful atmosphere of the film. The character dynamics involving the parrot drive the storyline forward by introducing humorous and unexpected situations."

Since the current version allows that, I asked the AI in next message to tell me how the movie starts:

This time, the AI described every fragment, honestly as expected too:

The final answer was: "The movie starts with a title screen for "Steamboat Willie," a classic Disney cartoon. The opening scene prominently displays the title with whimsical graphics, indicating it's a Mickey Mouse sound cartoon. The screen credits Walt Disney and Ub Iwerks, highlighting their contributions. Following the title, there's a visual of a steamboat navigating through water, hinting at the character of Mickey Mouse aboard. The black-and-white art style reflects the era's animation, showing a lively steamboat with flags waving and the steam engine chugging along. This introduction sets the stage for the adventures that will unfold, establishing the iconic character and the joyous, musical tone of the animation."

Now, let's add speech support to it.

Since I didn't have time to look at how audio-capable ChatGPT 4o model works yet, so I will use a legacy speech recognition tool, which will generate transcript of each fragment in similar manner as descriptions are generated.

I added this method (and a helper one) to get transcript of every fragment:

New imports:
import moviepy.editor as mp
import speech_recognition as sr

  
    def split_into_sublists(self, big_list, x):
        n = len(big_list)
        sublist_size = n // x
        remainder = n % x
        sublists = []
        start = 0
        for i in range(x):
            end = start + sublist_size + (1 if i < remainder else 0)
            sublists.append(big_list[start:end])
            start = end
        return sublists

    def ExtractAudioTranscripts(self, language="en"):
        transcripts = []
        try:
            with VideoFileClip(self.video_path) as clip:
                duration = clip.duration
                fragment_start = 0
                recognizer = sr.Recognizer()

                while fragment_start < duration:
                    fragment_end = min(fragment_start + self.time_between_frames_seconds*self.frames_per_scene, duration)
                    
                    audio_clip = clip.subclip(fragment_start, fragment_end).audio
                    temp_audio_path = f"audio_fragment_{int(fragment_start)}.wav"
                    audio_clip.write_audiofile(temp_audio_path, codec="pcm_s16le")
                    try:
                        with sr.AudioFile(temp_audio_path) as source:
                            audio_data = recognizer.record(source)
                            transcript = recognizer.recognize_google(audio_data, language=language)
                            transcripts.append(transcript)
                    except Exception as e:
                        print(f"Error transcribing audio fragment {fragment_start}-{fragment_end}: {e}")
                        transcripts.append("ERROR")
                    finally:
                        if os.path.exists(temp_audio_path):
                            os.remove(temp_audio_path)

                    fragment_start = fragment_end

        except Exception as e:
            print(f"An error occurred while processing the audio: {e}")
            transcripts.append("ERROR")
        final_transcripts = []
        temp1 = self.split_into_sublists(transcripts,self.comic_amount) 
        #comic_amount is len(comics), amount of generated "comics" read at the end of their generator method.
        for a in temp1:
            temp2 = ""
            for b in a:
                temp2 = temp2 + " " + str(b)
            final_transcripts.append(temp2.strip())
        return final_transcripts
  

This method returns list containg transcript of every "comic". Now let's add it to description of every "comic":


  
        def describe_video(self, video_comics, original_prompt):
            description = ""
            n = 1
            transcript_iter = 0
            print("Describing each scene...")
            for i in video_comics:
                transcript_iter += 1
                try:
                    if not self.remember_context:
                        self.frame_watcher_ai.ClearHistory()
                        
                    prompt_text = "Describe this fragment of the video based on the comic.\nFocus on answering the original prompt:\n" + original_prompt + "\n\nThe comic might not contain anything related to the prompt, if that is the case then write NO-INFO-HERE. "+"Describe the comic if it is related to the question in any way at all."
                    
                    if self.use_audio:
                        prompt_text = prompt_text + "\n\nTranscript of this fragment:\n\n"+self.transcripts[transcript_iter-1]
                    
                    print("Prompt text: "+prompt_text+"\n")
                        
                    text = self.frame_watcher_ai.GetResponseWithImg(i,prompt_text).replace("\n", " ")
                    description += "Scene number " + str(n) + ":\n" + text + "\n\n"
                    print("Scene " + str(n) + ": " + text)
                    print(" ")
                    #i.show()
                    n += 1
                except Exception as e:
                    print(f"An error occurred describing the fragment: {e}")
                    self.frame_watcher_ai.ClearHistory() #The AI keeps it previous responses for context, but too many may cause an error.
            print("Done describing.")
            return description
  

This way we add the transcript to every "comic" for analysis. During tests I also noticed that the AI started to consider every scene as part of separate movie, so I added "All scenes are parts of the same video." to system of the master AI.

Let's test it now. I decided to use my own video about a redstone signal repeater, to ensure I won't have to deal with anyone's copyright nor that ChatGPT knows it already (it has 22 views as of now).

I asked: "What is this video about? What kind of machine it describes?"

Transcripts were not perfect, but that it understandable, it never is.

The answer was correct! "The video is about a redstone machine in the game Minecraft, specifically focusing on a device known as a "repeater." It demonstrates how this machine can be used to repeat a signal multiple times and is limited by operational constraints such as "Max. 5 slots" and "up to 320 cycles." The video serves as an educational tutorial on redstone mechanics, showcasing the setup and functionality of a complex redstone circuit used for automating processes, managing resources, and performing repetitive tasks within the game. The scenes highlight the creative and technical aspect of using redstone to build and program automated systems, explaining the components and interactions involved in operating and resetting the machine."

As we can conclude, my solution works pretty well. 

This is the whole final code (also available on my GitHub):

  
from moviepy.video.io.VideoFileClip import VideoFileClip
from PIL import Image
import os
import easyapiopenai
from WojSafeAPI import * #My own library for loading API keys safely, you cannot use it.

import moviepy.editor as mp
import speech_recognition as sr

class Video_ChatGPT:
    def __init__(self,openai_api_key:str, video_path: str, time_between_frames_seconds: float,
                 frames_per_scene: int, process_audio=True, audio_language='en', frame_height = 480,
                 watcher_model='gpt-4o-mini',master_model='gpt-4o',
                 watcher_token_limit = 500, master_token_limit=15*1000, add_descriptions_to_system=False,
                 add_to_watchers_system="", remember_context = True):
        
        self.video_path = video_path
        self.time_between_frames_seconds = time_between_frames_seconds
        self.api_key = openai_api_key
        self.frames_per_scene = frames_per_scene
        self.process_audio = process_audio
        self.frame_height = frame_height
        self.add_descriptions_to_system = add_descriptions_to_system
        
        self.video_description = "The video was not analyzed yet."
        self.video_comics = []
        
        self.watcher_model=watcher_model
        self.master_model=master_model
        self.watcher_token_limit=watcher_token_limit
        self.master_token_limit=master_token_limit
        self.add_to_watchers_system=add_to_watchers_system
        
        self.remember_context = remember_context
        
        self.transcripts = []
        self.audio_language = audio_language

    def CreateVideoInfo(self, prompt):
        if len(self.video_comics) < 1: #The comics need to be created only once
            temp1 = self.get_video_images(self.video_path, self.time_between_frames_seconds, self.frame_height)
            self.video_comics = self.create_comics(temp1, self.frames_per_scene)
            del temp1 #We can delete the list with all frames since only the one with the 'comics' will be used.
        
        if self.process_audio ==True and len(self.transcripts) < 1:
            self.transcripts = self.ExtractAudioTranscripts(self.audio_language)
            print(self.transcripts)
            print(" ")
        
        self.ai_analyzer = self.AI_Analyser(
            api_key=self.api_key,
            frames_per_scene=self.frames_per_scene,
            watcher_model=self.watcher_model,
            master_model=self.master_model,
            watcher_token_limit=self.watcher_token_limit,
            master_token_limit=self.master_token_limit,
            add_to_watchers_system=self.add_to_watchers_system,
            add_descriptions_to_system=self.add_descriptions_to_system,
            remember_context = self.remember_context,
            transcripts=self.transcripts,
            use_audio=self.process_audio
        )

        self.video_description = self.ai_analyzer.describe_video(self.video_comics, prompt)


    def GetResponseFromMaster(self, prompt):
        self.CreateVideoInfo(prompt)
        return self.ai_analyzer.get_master_response(prompt, self.video_description)

    def get_video_images(self, video_path, seconds_between_frames=1.0, desired_height=480):
        print("Extracting frames from the video "+video_path)
        images = []
        try:
            with VideoFileClip(video_path) as clip:
                current_time = 0
                current_step = 0
                steps = int(int(clip.duration)/seconds_between_frames)
                while current_time < clip.duration:
                    image = Image.fromarray(clip.get_frame(current_time))
                    x,y = image.size
                    divider = desired_height/y
                    new_width=x*divider
                    image = image.convert('RGB')
                    image = image.resize((int(new_width),desired_height), resample=Image.LANCZOS)
                    images.append(image)
                    print("Extracted frame "+str(current_step)+" out of "+str(steps))
                    current_step += 1
                    current_time += seconds_between_frames
            print("Frames extracted.")
            return images
        except Exception as e:
            print(f"An error occurred when extracting video frames: {e}")
            return images

    def create_comics(self, images, amount_per_page: int):
        list_l = len(images)
        comics = []
        if list_l>=1:
            x,y = images[0].size
            n=0
            paste_y = 0
            canvas = Image.new('RGB', (x, y*amount_per_page), color='white')
            while n0:
                    comics.append(canvas)
                    print("Created image page "+str(int(n/amount_per_page))+" out of "+str(int(list_l/amount_per_page)))
                    paste_y = 0
                    canvas = Image.new('RGB', (x, y*amount_per_page), color='white')
                    canvas.paste(images[n],(0,paste_y))
                    n+=1
                    paste_y+=y
                else:
                    canvas.paste(images[n],(0,paste_y))
                    n+=1
                    paste_y+=y
        self.comic_amount = len(comics)
        return comics

    def split_into_sublists(self, big_list, x):
        n = len(big_list)
        sublist_size = n // x
        remainder = n % x
        sublists = []
        start = 0
        for i in range(x):
            end = start + sublist_size + (1 if i < remainder else 0)
            sublists.append(big_list[start:end])
            start = end
        return sublists

    def ExtractAudioTranscripts(self, language="en"):
        transcripts = []
        try:
            with VideoFileClip(self.video_path) as clip:
                duration = clip.duration
                fragment_start = 0
                recognizer = sr.Recognizer()

                while fragment_start < duration:
                    fragment_end = min(fragment_start + self.time_between_frames_seconds*self.frames_per_scene, duration)
                    
                    audio_clip = clip.subclip(fragment_start, fragment_end).audio
                    temp_audio_path = f"audio_fragment_{int(fragment_start)}.wav"
                    audio_clip.write_audiofile(temp_audio_path, codec="pcm_s16le")
                    try:
                        with sr.AudioFile(temp_audio_path) as source:
                            audio_data = recognizer.record(source)
                            transcript = recognizer.recognize_google(audio_data, language=language)
                            transcripts.append(transcript)
                    except Exception as e:
                        print(f"Error transcribing audio fragment {fragment_start}-{fragment_end}: {e}")
                        transcripts.append("ERROR")
                    finally:
                        if os.path.exists(temp_audio_path):
                            os.remove(temp_audio_path)

                    fragment_start = fragment_end

        except Exception as e:
            print(f"An error occurred while processing the audio: {e}")
            transcripts.append("ERROR")
        final_transcripts = []
        temp1 = self.split_into_sublists(transcripts,self.comic_amount)
        for a in temp1:
            temp2 = ""
            for b in a:
                temp2 = temp2 + " " + str(b)
            final_transcripts.append(temp2.strip())
        return final_transcripts

    class AI_Analyser:
        def __init__(self, api_key, frames_per_scene, watcher_model, master_model,
                 watcher_token_limit, master_token_limit, add_to_watchers_system, add_descriptions_to_system, remember_context=True,
                     transcripts = [], use_audio=False):
            self.api_key = api_key
            self.frames_per_scene = frames_per_scene
            self.add_descriptions_to_system = add_descriptions_to_system
            
            self.remember_context = remember_context

            watcher_system = "You will receive a fragment of a video in form of a " + str(frames_per_scene) + "panel comic. Write a detailed description of the fragment as a whole. Never mention it being a comic." + add_to_watchers_system
            self.frame_watcher_ai = easyapiopenai.ImgChatGPTAgent(self.api_key, watcher_model, watcher_token_limit, watcher_system)

            master_system = "You are a helpful assistant capable of video analysis. Do not speak about the video unless specifically asked about it. All scenes are parts of the same video. "
            self.master_ai = easyapiopenai.ChatGPTAgent(self.api_key, master_model, master_token_limit, master_system)
            
            self.transcripts = transcripts
            self.use_audio = use_audio

        def describe_video(self, video_comics, original_prompt):
            description = ""
            n = 1
            transcript_iter = 0
            print("Describing each scene...")
            for i in video_comics:
                transcript_iter += 1
                try:
                    if not self.remember_context:
                        self.frame_watcher_ai.ClearHistory()
                        
                    prompt_text = "Describe this fragment of the video based on the comic.\nFocus on answering the original prompt:\n" + original_prompt + "\n\nThe comic might not contain anything related to the prompt, if that is the case then write NO-INFO-HERE. "+"Describe the comic if it is related to the question in any way at all."
                    
                    if self.use_audio:
                        prompt_text = prompt_text + "\n\nTranscript of this fragment:\n\n"+self.transcripts[transcript_iter-1]
                    
                    print("Prompt text: "+prompt_text+"\n")
                        
                    text = self.frame_watcher_ai.GetResponseWithImg(i,prompt_text).replace("\n", " ")
                    description += "Scene number " + str(n) + ":\n" + text + "\n\n"
                    print("Scene " + str(n) + ": " + text)
                    print(" ")
                    #i.show()
                    n += 1
                except Exception as e:
                    print(f"An error occurred describing the fragment: {e}")
                    self.frame_watcher_ai.ClearHistory() #The AI keeps it previous responses for context, but too many may cause an error.
            print("Done describing.")
            return description

        def get_master_response(self, prompt, video_description):
            final_prompt = prompt
            if not self.add_descriptions_to_system:
                final_prompt += "\n\nDescription of the video:\n\n" + video_description
            return self.master_ai.GetResponse(final_prompt)

if __name__ == "__main__":
    agent = Video_ChatGPT(openai_api_key=YourAPIKey('openai'), video_path="video.mp4", time_between_frames_seconds=5, frames_per_scene=5,
                          remember_context = False, process_audio=True, frame_height=300, audio_language='en')
    while True:
        print(agent.GetResponseFromMaster(input("Type: ")))
  

In next part, if I publish it, I will replace the legacy speech recognition with the more advanced AI model, which hopefully not only will be better in understanding speech but might be able to recognize other sounds too (I don't know yet what gpt-4o-audio-preview can, but knowing OpenAI it could be capable of that). 

Thank you for reading this!

Comments

Popular posts from this blog

Project ISKRA - introduction

ISKRA experiment 02_10_2024-1 - "Defeat the Minecraft creeper" [reupload]

Using agent-based approach to make ChatGPT capable of video analysis - part 1 - proof of concept