# # IoT Birdfeeder using PiCowbell OV5640 autofocus camera. # Sends PIR-triggered image to adafruitIO (PIR sensor not integrated yet) # Current test setup uses shutter button on PiCowbell OV5640 # # Jeff Mangum 2024-06-23 # import gc print('Free after gc load at very beginning: ',gc.mem_free()) import os print('Free after os load: ',gc.mem_free()) import time print('Free after time load: ',gc.mem_free()) import alarm print('Free after alarm load: ',gc.mem_free()) import ssl print('Free after ssl load: ',gc.mem_free()) import binascii print('Free after binascii load: ',gc.mem_free()) import busio print('Free after busio load: ',gc.mem_free()) import board print('Free after board load: ',gc.mem_free()) import digitalio print('Free after digitalio load: ',gc.mem_free()) import adafruit_ov5640 print('Free after ov5640 load: ',gc.mem_free()) import sdcardio print('Free after sdcardio load: ',gc.mem_free()) import storage print('Free after storage load: ',gc.mem_free()) import socketpool print('Free after socketpool load: ',gc.mem_free()) import wifi print('Free after wifi load: ',gc.mem_free()) import adafruit_requests print('Free after adafruit_requests load: ',gc.mem_free()) from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError, validate_feed_key print('Free after adafruit_io load: ',gc.mem_free()) import keypad print('Free after keypad load: ',gc.mem_free()) print("PiCowbell Birdfeeder Camera") class Base64Stream: """A file wrapper that retuns base64 encoded data when read.""" def __init__(self, file_handle): self._file_handle = file_handle self._read_buffer = b"" def seek(self, target, whence=0): """Change the stream position to the given offset. Behaviour depends on the whence parameter.""" if target == whence == 0: # move to start self._read_buffer = b"" elif target == 0 and whence == 2: # move to end pass else: raise ValueError("Seeking to the middle of a file is not supported.") return self._file_handle.seek(target, whence) def tell(self): """Return the current stream position as an opaque number""" pos = self._file_handle.tell() pos = pos * 4 / 3 if pos % 4: pos += 4 - pos % 4 return int(pos) def read(self, size=-1): """Read and return at most size characters from the stream as a single str. If size is negative or None, reads until EOF.""" encoded_data = b"" if size == 0: return encoded_data if size in (-1, None): read_size = -1 else: if size < 4: raise ValueError( "To correctly encode, you must request at lease 4 bytes." ) read_size = int((size - len(self._read_buffer)) * 3 / 4) data = self._read_buffer data += self._file_handle.read(read_size) len_data = len(data) if len_data == 0: # no data return b"" return encoded_data if len_data == len(self._read_buffer): # no new data, clear out the _read_buffer and use self._read_buffer = b"" elif len_data <= 3: # if we have less then 4 bytes, clear out the _read_buffer and use self._read_buffer = b"" else: # calculate the extra bytes (we don't want to encode multiples that aren't of 3) extra_data = -1 * (len_data % 3) if extra_data != 0: self._read_buffer = data[extra_data:] data = data[:extra_data] else: self._read_buffer = b"" encoded_data = binascii.b2a_base64(data).strip() return encoded_data def readinto(self, buffer): """Read and fill the supplied buffer with at most size characters from the stream.""" size = len(buffer) data = self.read(size) read_size = len(data) buffer[0:read_size] = data return read_size class IO_HTTP_FILE(IO_HTTP): def send_file(self, webhook, file_handle): path = f'{webhook["hook_url"]}/raw' wrapped_file_handle=Base64Stream(file_handle) with self._http.post(path, data=wrapped_file_handle) as response: self._handle_error(response) def create_new_feed(self, feed_key, feed_desc=None, feed_license=None, history=True): validate_feed_key(feed_key) path = self._compose_path("feeds") payload = {"name": feed_key, "description": feed_desc, "license": feed_license, "history": history} return self._post(path, payload) def get_webhook(self, feed_key): validate_feed_key(feed_key) path = self._compose_path("feeds/{0}/webhooks".format(feed_key)) return self._get(path) def create_new_webhook(self, feed_key): validate_feed_key(feed_key) path = self._compose_path("feeds/{0}/webhooks".format(feed_key)) payload = {"webhook": {"data_rate_limit": ""}} return self._post(path, payload) print(f"Custom code: {gc.mem_free()}") print('Free before sdcard setup: ',gc.mem_free()) print("Initializing SD card") sd_spi = busio.SPI(clock=board.GP18, MOSI=board.GP19, MISO=board.GP16) sd_cs = board.GP17 sdcard = sdcardio.SDCard(sd_spi, sd_cs) vfs = storage.VfsFat(sdcard) storage.mount(vfs, "/sd") print('Free before i2c and cam bus setting: ',gc.mem_free()) print("construct bus") i2c = busio.I2C(board.GP5, board.GP4) print("construct camera") reset = digitalio.DigitalInOut(board.GP14) cam = adafruit_ov5640.OV5640( i2c, data_pins=( board.GP6, board.GP7, board.GP8, board.GP9, board.GP10, board.GP11, board.GP12, board.GP13, ), clock=board.GP3, vsync=board.GP0, href=board.GP2, mclk=None, shutdown=None, reset=reset, #size=adafruit_ov5640.OV5640_SIZE_240X240, # 240x240. Works. #size=adafruit_ov5640.OV5640_SIZE_QQVGA, # 160x320. Works. #size=adafruit_ov5640.OV5640_SIZE_QVGA, # 320x240. Works. Requires 7680 bytes when quality = 10 #size=adafruit_ov5640.OV5640_SIZE_HVGA, # 480x320. Works. Requires 15360 bytes when quality = 10 size=adafruit_ov5640.OV5640_SIZE_VGA, # 640x480. Works. Requires 30720 bytes when quality = 10 #size=adafruit_ov5640.OV5640_SIZE_QSXGA, # 2560x1920. Fails. Requires 409600 bytes memory ) print("Chip ID: ", cam.chip_id) print('Free before keys setting: ',gc.mem_free()) keys = keypad.Keys((board.GP22,), value_when_pressed=False, pull=True) print('Free before exists definition: ',gc.mem_free()) def exists(filename): try: os.stat(filename) return True except OSError as _: return False print('Free before image counter variable set: ',gc.mem_free()) _image_counter = 0 print('Free before defining open_next_image: ',gc.mem_free()) def open_next_image(): global _image_counter # pylint: disable=global-statement while True: filename = f"/sd/img{_image_counter:04d}.jpg" _image_counter += 1 if exists(filename): continue print("# writing to", filename) #return open(filename, "wb") return filename def log_mem(msg: str): gc.collect() print(f"{msg}: {gc.mem_free()}") print('Free before cam variable settings: ',gc.mem_free()) cam.colorspace = adafruit_ov5640.OV5640_COLOR_JPEG cam.flip_y = False cam.flip_x = False cam.test_pattern = False cam.quality = 11 # NOTE: Cannot use quality setting less than 11 or run out of memory # set focus to estimated bird location cam.autofocus_vcm_step = 145 #capture_buffer_size = cam.width*cam.height//6 # NOTE: Must be set to value large enough to hold jpeg image # Use instead of cam.capture_buffer_size # width*height/5 is what espressif's esp32-camera package uses (don't know why) print('Free before wifi and io variable settings: ',gc.mem_free()) ### WiFi ### # Add settings.toml to your filesystem CIRCUITPY_WIFI_SSID and CIRCUITPY_WIFI_PASSWORD keys # with your WiFi credentials. DO NOT share that file or commit it into Git or other # source control. # Set your Adafruit IO Username, Key and Port in settings.toml # (visit io.adafruit.com if you need to create an account, # or if you need your Adafruit IO key.) aio_username = os.getenv("ADAFRUIT_AIO_USERNAME") aio_key = os.getenv("ADAFRUIT_AIO_KEY") #print(f"Connecting to {os.getenv('CIRCUITPY_WIFI_SSID')}") wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")) #print(f"Connected to {os.getenv('CIRCUITPY_WIFI_SSID')}!") pool = socketpool.SocketPool(wifi.radio) requests = adafruit_requests.Session(pool, ssl.create_default_context()) del adafruit_requests print('Free after del adafruit_requests: ',gc.mem_free()) ''' # Initialize an Adafruit IO HTTP API object io = IO_HTTP(aio_username, aio_key, requests) gc.collect() print('Free before aio feed function definition: ',gc.mem_free()) # The following try takes about 30k of memory... try: # Get the 'birdfeeder' feed from Adafruit IO feed_camera = io.get_feed("birdfeeder") except AdafruitIO_RequestError: # If no 'birdfeeder' feed exists, create one feed_camera = io.create_new_feed("birdfeeder") ''' print('Free before send_jpeg_to_io definition: ',gc.mem_free()) # initialize PIR sensor #pir = digitalio.DigitalInOut(board.GP22) #pir.direction = digitalio.Direction.INPUT ''' def send_jpeg_to_io(b,jpeg): gc.collect() print('Free at top of send_jpeg_to_io: ',gc.mem_free()) # before we send the image to IO, it needs to be encoded into base64 encoded_data = binascii.b2a_base64(jpeg).strip() gc.collect() print('Free after encoding jpeg: ',gc.mem_free()) del b del jpeg gc.collect() print('Free after del jpeg: ',gc.mem_free()) # then, send the encoded_data to Adafruit IO camera feed from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError gc.collect() print('Free after adafruit_io load: ',gc.mem_free()) # Initialize an Adafruit IO HTTP API object io = IO_HTTP(aio_username, aio_key, requests) gc.collect() print('Free after io assignment: ',gc.mem_free()) # The following try takes about 30k of memory... try: # Get the 'birdfeeder' feed from Adafruit IO feed_camera = io.get_feed("birdfeeder") except AdafruitIO_RequestError: # If no 'birdfeeder' feed exists, create one feed_camera = io.create_new_feed("birdfeeder") #if get_feed(): gc.collect() print('Free after aio feed function definition: ',gc.mem_free()) print("Sending image to IO...") io.send_data(feed_camera["key"], encoded_data) print("Sent image to IO!") del IO_HTTP del AdafruitIO_RequestError del feed_camera del io gc.collect() print('Free at end of send_jpeg_to_io: ',gc.mem_free()) ''' log_mem('Free at start of picture taking') # Initialize an Adafruit IO HTTP API object #io = IO_HTTP(aio_username, aio_key, requests) io = IO_HTTP_FILE(aio_username, aio_key, requests) log_mem('Free after io assignment') while True: shutter = keys.events.get() # event will be None if nothing has happened. if shutter: if shutter.pressed: log_mem('Free before b assignment') b = bytearray(cam.capture_buffer_size) time.sleep(0.01) log_mem('Free before allocating jpeg') jpeg = cam.capture(b) print(f"Captured {len(jpeg)} bytes of jpeg data") print(f" (had allocated {cam.capture_buffer_size} bytes") print(f"Resolution {cam.width}x{cam.height}") try: #with open_next_image() as f: filename = open_next_image() print("Filename: ",filename) f = open(filename, "wb") f.write(jpeg) f.close() #if os.stat(filename)[6] == 0: print("WARNING...file size 0...") print("JPG File Size: ",os.stat(filename)[6]," bytes") print("# Wrote image") log_mem('Free after deleting f') except OSError as e: print(e) log_mem('Free after writing jpeg to sdcard') # if the camera successfully captured a jpeg, send it to IO if jpeg is not None: #send_jpeg_to_io(b,jpeg) log_mem('Free at top of send_jpeg_to_io: ') # before we send the image to IO, it needs to be encoded into base64 #encoded_data = binascii.b2a_base64(jpeg).strip() #gc.collect() #print('Free after encoding jpeg: ',gc.mem_free()) del b del jpeg log_mem('Free after del b and jpeg') # then, send the encoded_data to Adafruit IO camera feed #from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError #print('Free after adafruit_io load: ',gc.mem_free()) # Initialize an Adafruit IO HTTP API object #io = IO_HTTP(aio_username, aio_key, requests) #io = IO_HTTP_FILE(aio_username, aio_key, requests) #print('Free after io assignment: ',gc.mem_free()) ''' # The following try takes about 30k of memory... try: # Get the 'birdfeeder' feed from Adafruit IO feed_camera = io.get_feed("birdfeeder") except AdafruitIO_RequestError: # If no 'birdfeeder' feed exists, create one feed_camera = io.create_new_feed("birdfeeder", history=False) ''' #if get_feed(): #log_mem('Free after aio feed function definition') print("Sending image to IO...") #io.send_data(feed_camera["key"], encoded_data webhook_camera = io.get_webhook("birdfeeder") if webhook_camera: webhook_camera = webhook_camera[0] else: webhook_camera = io.create_new_webhook("birdfeeder") log_mem('feed validation') print("Filename before send: ",filename) with open(filename, "rb") as file_handle: r = io.send_file(webhook_camera, file_handle) log_mem('sent image') #print("Sent image to IO!") log_mem('Free at end of send_jpeg_to_io') else: print("ERROR: JPEG capture failed!") time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + 10) alarm.exit_and_deep_sleep_until_alarms(time_alarm) ''' print("Waiting for movement...") old_pir_value = pir.value while True: gc.collect() print('Free at top of while loop: ',gc.mem_free()) pir_value = pir.value # if we detect movement, take a photo if pir_value: if not old_pir_value: print("Movement detected, taking picture!") # take a picture and save it into a jpeg bytes object #b = bytearray(cam.capture_buffer_size) b = bytearray(capture_buffer_size) jpeg = cam.capture(b) # if the camera successfully captured a jpeg, send it to IO if jpeg is not None: send_jpeg_to_io() else: print("ERROR: JPEG capture failed!") else: if old_pir_value: print("Movement ended") # update old_pir_value old_pir_value = pir_value '''