Skip to content

Commit

Permalink
Develop (#23)
Browse files Browse the repository at this point in the history
* Removes extra installation steps

* Fixes bug in which clients were unable to reconnect to response service

Signed-off-by: w4ffl35 <25737761+w4ffl35@users.noreply.github.com>

* Fixes installation instructions

* version bump

* Fixes pylint errors

Signed-off-by: w4ffl35 <25737761+w4ffl35@users.noreply.github.com>

* modify import order to satisfy pylint

* Added import error checks for pika and settings

* Adds import checks to satisfy pylint

* Adds badges to readme

Signed-off-by: w4ffl35 <25737761+w4ffl35@users.noreply.github.com>
  • Loading branch information
w4ffl35 authored Sep 12, 2022
1 parent 8b8ded7 commit 4648a5f
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 29 deletions.
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
# stablediffusiond

[![Pylint](https://github.com/w4ffl35/stablediffusiond/actions/workflows/pylint.yml/badge.svg)](https://github.com/w4ffl35/stablediffusiond/actions/workflows/pylint.yml)
![GitHub issues](https://img.shields.io/github/issues/w4ffl35/stablediffusiond)
![GitHub](https://img.shields.io/github/license/w4ffl35/stablediffusiond)
![GitHub release (latest by date)](https://img.shields.io/github/v/release/w4ffl35/stablediffusiond)
![GitHub Release Date](https://img.shields.io/github/release-date/w4ffl35/stablediffusiond)

![GitHub repo size](https://img.shields.io/github/repo-size/w4ffl35/stablediffusiond)
![GitHub code size in bytes](https://img.shields.io/github/languages/code-size/w4ffl35/stablediffusiond)
![Lines of code](https://img.shields.io/tokei/lines/github/w4ffl35/stablediffusiond)

---

A daemon which watches for messages on RabbitMQ and runs [Stable Diffusion](https://github.com/CompVis/stable-diffusion)

- **No hot loading** - Model stored in RAM (10GB~) for faster processing
- **Daemon** - existing solutions use a webserver, here we use a daemon which is more lightweight
- **No hot loading** - Model stored in RAM (10GB~) for faster processing
- **Less bloat** - code and dependencies have been kept to a minimum
- **Flexibility** - request daemon, response daemon and queue system can be run independently, allowing for more efficient use of resources
- **Easy to use** - just run the daemon and send messages to the queue using `send.py`
Expand Down Expand Up @@ -82,8 +94,6 @@ Starts a Stable Diffusion response queue runner

### bin commands

- start stable diffusion queue runner (daemon uses this) `stablediffusion_client`
- start the sable diffusion response runner (response daemon uses this) `stablediffusion_response_query`
- send a message to running stable diffusion queue `stablediffusion_client '{"prompt": "cat", "seed": 42}'`

---
Expand Down
35 changes: 25 additions & 10 deletions connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@
Collection of functions to connect to the queue and publish messages.
"""

import pika
try:
import pika
except ImportError:
print("Unable to import pika. Please install requirements.")
pika = None
try:
from settings import SERVER
except ImportError:
print("Unable to import settings file. Please create a settings.py file.")
SERVER = {}
from logger import info, error
from settings import SERVER


def params(queue_system):
Expand All @@ -13,9 +21,13 @@ def params(queue_system):
:param queue_system:
:return: queue, host, queue_system: the queue, host and queue system name from the settings file
"""
queue_settings = SERVER[queue_system]
host = queue_settings["host"]
queue = queue_settings["queue_name"]
try:
queue_settings = SERVER[queue_system]
host = queue_settings["host"]
queue = queue_settings["queue_name"]
except KeyError:
error(f"Unable to find settings for {queue_system} in settings file")
return None, None, None
return queue, host, queue_settings["name"]


Expand All @@ -31,11 +43,14 @@ def connect_queue(queue_system):
# connect to queue
info(f"Starting connection to {queue_system}")
info(f"Connecting to {queue_system} host {host}...")
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()

info(f"Declaring {queue_system} queue {queue}...")
channel.queue_declare(queue=queue)
if pika:
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
info(f"Declaring {queue_system} queue {queue}...")
channel.queue_declare(queue=queue)
else:
error("Unable to connect to queue. Pika not installed.")
return None, None

return connection, channel

Expand Down
35 changes: 28 additions & 7 deletions receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@
import sys
import os
import json
from classes.txt2img import Txt2Img
from classes.img2img import Img2Img
import logger as log
from connect import connect_queue, start_consumer, publish_queue, disconnect_queue
from settings import SCRIPTS
try:
from classes.txt2img import Txt2Img
from classes.img2img import Img2Img
except ImportError:
print("Unable to import classes. Please install requirements.")
Txt2Img = None
Img2Img = None

try:
from settings import SCRIPTS
except ImportError:
print("Unable to import settings file. Please create a settings.py file.")
SCRIPTS = {}

from connect import connect_queue, start_consumer, publish_queue, disconnect_queue
import logger as log

class Receiver:
"""
Expand All @@ -28,6 +38,9 @@ def txt2img_loader(self):
Loads the txt2img model
:return: Txt2Img instance
"""
if not Txt2Img:
raise ImportError("Unable to import classes. Please install requirements.")

if not self._txt2img_loader:
self._txt2img_loader = Txt2Img(
options=SCRIPTS["txt2img"],
Expand All @@ -42,6 +55,9 @@ def img2img_loader(self):
Loads the img2img model
:return: Img2Img instance
"""
if not Img2Img:
raise ImportError("Unable to import classes. Please install requirements.")

if not self._txt2img_loader:
self._img2img_loader = Img2Img(
options=SCRIPTS["img2img"],
Expand Down Expand Up @@ -89,8 +105,13 @@ def decode_binary_string(self, message):
:return: string
"""
try:
message = ''.join(chr(int(message[i * 8:i * 8 + 8], 2)) for i in range(len(message) // 8))
except Exception as exception:
# This is temporary. We will be switching to capnproto soon.
message = ''.join(
chr(
int(message[i * 8:i * 8 + 8], 2)
) for i in range(len(message) // 8)
)
except ValueError as exception:
log.warning(f"Unable to decode binary string. Returning original string. {exception}")
return message

Expand Down
34 changes: 29 additions & 5 deletions response_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,28 @@ def __init__(self):
self.host = "localhost" # the host this service is running on
self.port = 50007 # the port to listen on
self.max_client_connections = 1 # the maximum number of clients to accept
threading.Thread(target=self.open_socket).start()
threading.Thread(target=self.connect_to_queue, args=("response_queue",)).start()
self.run()
#threading.Thread(target=self.connect_to_queue, args=("response_queue",)).start()
self.connect_to_queue("response_queue")
# self.open_socket()
# self.connect_to_queue("response_queue")

def run(self):
"""
Starts a new thread with a client that has a connection to stablediffusion_responsed
:return: None
"""
self.thread = threading.Thread(target=self.connect_server)
self.thread.start()

def connect_server(self):
"""
Connect to stablediffusion_responsed socket and listen for responses.
:return: None
"""
self.open_socket()
self.listen_for_connections()

def open_socket(self):
"""
Open a socket to listen for incoming connections.
Expand All @@ -44,9 +61,16 @@ def open_socket(self):
log.error(str(err))
return
log.info(f"Socket opened {self.soc}")
self.soc.listen(self.max_client_connections)
self.soc_connection, self.soc_addr = self.soc.accept()
log.info(f"Connection established with {self.soc_addr}")

def listen_for_connections(self):
"""
Listen for incoming connections.
Returns:
"""
while True:
self.soc.listen(self.max_client_connections)
self.soc_connection, self.soc_addr = self.soc.accept()
log.info(f"Connection established with {self.soc_addr}")

def connect_to_queue(self, queue_name):
"""
Expand Down
13 changes: 10 additions & 3 deletions run_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
"""
import os
import logger
from settings import SCRIPTS, GENERAL

SCRIPTS_ROOT = GENERAL["sd_scripts"]
PYTHON_PATH = GENERAL["sd_python_path"]
try:
from settings import SCRIPTS, GENERAL

SCRIPTS_ROOT = GENERAL["sd_scripts"]
PYTHON_PATH = GENERAL["sd_python_path"]
except ImportError:
print("Unable to import settings file. Please create a settings.py file.")
SCRIPTS = {}
SCRIPTS_ROOT = ""
PYTHON_PATH = ""


# run python script from console
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name="stablediffusiond",
version="0.4.0",
version="0.4.1",
description="A daemon that watches a queue, runs stable diffusion and enqueues the results.",
url="https://github.com/w4ffl35/stablediffusiond.git",
author="w4ffl35 (Joe Curlee)",
Expand Down

0 comments on commit 4648a5f

Please sign in to comment.