11<?php
22declare (strict_types=1 );
33
4- namespace SerendipitySwow \Socket \Streams ;
4+
5+ namespace Serendipity \Job \Kernel ;
56
67use SerendipitySwow \Socket \Exceptions \OpenStreamException ;
78use SerendipitySwow \Socket \Exceptions \StreamStateException ;
8- use SerendipitySwow \Socket \Exceptions \WriteStreamException ;
99use SerendipitySwow \Socket \Interfaces \StreamInterface ;
10- use function error_get_last ;
11- use function fclose ;
12- use function fwrite ;
13- use function is_resource ;
14- use function stream_set_timeout ;
10+ use Swow \Buffer ;
11+ use Swow \Socket as SwowSocket ;
12+ use Throwable ;
1513
1614final class Socket implements StreamInterface
1715{
1816 /**
1917 * Default connection timeout in seconds.
2018 */
2119 public const DEFAULT_CONNECTION_TIMEOUT = 5 ;
20+ /**
21+ * Default write timeout in seconds.
22+ */
23+ public const DEFAULT_WRITE_TIMEOUT = 5 ;
24+
25+ /**
26+ * Default read timeout in seconds.
27+ */
28+ public const DEFAULT_READ_TIMEOUT = 5 ;
2229
2330 /**
2431 * @var string Hostname/IP
@@ -34,24 +41,39 @@ final class Socket implements StreamInterface
3441 * @var int Connection timeout
3542 */
3643 private int $ connectionTimeout ;
44+ /**
45+ * @var int Write timeout
46+ */
47+ private int $ writeTimeout ;
48+ /**
49+ * @var int Read timeout
50+ */
51+ private int $ readTimeout ;
3752
3853 /**
39- * @var resource
54+ * @var SwowSocket|null
4055 */
41- private $ socket ;
56+ private ? SwowSocket $ socket ;
4257
4358 /**
44- * Create a TCP socket.
45- *
46- * @param string $host The hostname.
47- * @param int $port The port number.
48- * @param null|int $connectionTimeout
59+ * @var Buffer|null
4960 */
50- public function __construct (string $ host , int $ port , int $ connectionTimeout = null )
61+ private ?Buffer $ buffer ;
62+
63+ public function __construct (
64+ string $ host ,
65+ int $ port ,
66+ int $ connectionTimeout = null ,
67+ int $ writeTimeout = null ,
68+ int $ readTimeout = null
69+ )
5170 {
5271 $ this ->host = $ host ;
5372 $ this ->port = $ port ;
5473 $ this ->connectionTimeout = $ connectionTimeout ?: self ::DEFAULT_CONNECTION_TIMEOUT ;
74+ $ this ->writeTimeout = $ writeTimeout ?: self ::DEFAULT_WRITE_TIMEOUT ;
75+ $ this ->readTimeout = $ readTimeout ?: self ::DEFAULT_READ_TIMEOUT ;
76+ $ this ->buffer = new Buffer ();
5577 }
5678
5779 /**
@@ -63,90 +85,73 @@ public function __destruct()
6385 }
6486
6587 /**
66- * @inheritDoc
88+ * @return bool
6789 */
6890 public function isOpen (): bool
6991 {
70- return is_resource ( $ this ->socket );
92+ return $ this ->socket -> isEstablished ( );
7193 }
7294
7395 /**
74- * @inheritDoc
96+ * @throws Throwable
7597 */
7698 public function open (): void
7799 {
78100 if ($ this ->isOpen ()) {
79101 throw new StreamStateException ('Stream already opened. ' );
80102 }
81- $ socket = @stream_socket_client (sprintf ('tcp://%s:%s ' , $ this ->host , $ this ->port ), $ errno , $ errstr , 1 );
82- if (!is_resource ($ socket )) {
83- throw new OpenStreamException ($ errstr , $ errno );
103+ try {
104+ $ socket = new SwowSocket (SwowSocket::TYPE_TCP );
105+ if (!$ socket ) {
106+ throw new OpenStreamException ('Stream UnKnown# ' );
107+ }
108+ $ this ->socket = $ socket ;
109+ $ this ->socket ->connect ($ this ->host , $ this ->port , $ this ->connectionTimeout );
110+ $ this ->writeTimeout && $ this ->socket ->setWriteTimeout ($ this ->writeTimeout );
111+ $ this ->readTimeout && $ this ->socket ->setReadTimeout ($ this ->readTimeout );
112+ } catch (Throwable $ throwable ) {
113+ throw $ throwable ;
84114 }
85- $ this ->socket = $ socket ;
86- $ this ->setTimeout ($ this ->connectionTimeout , 5 );
115+
87116 }
88117
89- /**
90- * @inheritDoc
91- */
92118 public function close (): void
93119 {
94120 if ($ this ->isOpen ()) {
95- fclose ( $ this ->socket );
121+ $ this ->socket -> close ( );
96122 $ this ->socket = null ;
123+ $ this ->buffer = null ;
97124 }
98125 }
99126
100127 /**
101- * @inheritDoc
128+ * @param string $string
102129 */
103- public function write (string $ string ): int
130+ public function write (string $ string ): void
104131 {
105132 if (!$ this ->isOpen ()) {
106133 throw new StreamStateException ('Stream not opened. ' );
107134 }
108- $ bytes = fwrite ($ this ->socket , $ string , strlen ($ string ));
109- if ($ bytes === false ) {
110- throw new WriteStreamException (error_get_last ());
111- }
112- return $ bytes ;
135+
136+ $ this ->socket ->send ($ this ->buffer ->clear ()
137+ ->write ($ string , strlen ($ string )));
113138 }
114139
115140 /**
116- * @inheritDoc
141+ * @param int $length
142+ *
143+ * @return string|null
117144 */
118145 public function readChar (int $ length = 65535 ): ?string
119146 {
120147 if (!$ this ->isOpen ()) {
121148 throw new StreamStateException ('Stream not opened. ' );
122149 }
123- $ char = fread ($ this ->socket , $ length );
124- if ($ char === false || $ char === '' ) {
150+ $ this ->socket ->read ($ this ->buffer ->clear (), $ length );
151+ $ char = $ this ->buffer ->toString ();
152+ if ($ char === '' ) {
125153 return null ;
126154 }
127155 return $ char ;
128156 }
129-
130- /**
131- * @inheritDoc
132- */
133- public function setTimeout (int $ seconds , int $ microseconds ): bool
134- {
135- if (!$ this ->isOpen ()) {
136- throw new StreamStateException ('Stream not opened. ' );
137- }
138- return stream_set_timeout ($ this ->socket , $ seconds , $ microseconds );
139- }
140-
141- /**
142- * @inheritDoc
143- */
144- public function timedOut (): bool
145- {
146- if (!$ this ->isOpen ()) {
147- throw new StreamStateException ('Stream not opened. ' );
148- }
149- $ metadata = stream_get_meta_data ($ this ->socket );
150- return (bool )$ metadata ['timed_out ' ];
151- }
152157}
0 commit comments